serious memory leak in the producer subsystem fixed
[mir.git] / source / mircoders / global / JobQueue.java
index f60116f..99ffed7 100755 (executable)
+/*
+ * Copyright (C) 2001, 2002 The Mir-coders group
+ *
+ * This file is part of Mir.
+ *
+ * Mir is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Mir is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Mir; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ * In addition, as a special exception, The Mir-coders gives permission to link
+ * the code of this program with  any library licensed under the Apache Software License,
+ * The Sun (tm) Java Advanced Imaging library (JAI), The Sun JIMI library
+ * (or with modified versions of the above that use the same license as the above),
+ * and distribute linked combinations including the two.  You must obey the
+ * GNU General Public License in all respects for all of the code used other than
+ * the above mentioned libraries.  If you modify this file, you may extend this
+ * exception to your version of the file, but you are not obligated to do so.
+ * If you do not wish to do so, delete this exception statement from your version.
+ */
+
 package mircoders.global;
 
-import java.util.*;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import mir.log.LoggerWrapper;
 
 // important: objects passed as data must not be altered once put into a job
 
 public class JobQueue {
-  private Vector jobs;
-  private Map dataToJob;
+  private List jobHandlers;
+  private Map identifierToJobHandler;
+  private int nrJobs;
+  private int jobCleanupTreshold;
+  private JobQueueRunner queueRunner;
+  private Thread thread;
+  private LoggerWrapper logger;
+  private long lastCleanup;
 
+  public static final int STATUS_CREATED = -1;
   public static final int STATUS_PENDING = 0;
   public static final int STATUS_PROCESSING = 1;
   public static final int STATUS_PROCESSED = 2;
+  public static final int STATUS_CANCELLED = 3;
+  public static final int STATUS_ABORTED = 4;
+
+  public static final int PRIORITY_NORMAL = 100;
+  public static final int PRIORITY_LOW = 10;
+  public static final int PRIORITY_HIGH = 1000;
 
-  public JobQueue() {
-    jobs = new Vector();
-    dataToJob = new HashMap();
+  public static final int FINISHEDJOBS_LOGSIZE = 10;
+
+  public JobQueue(LoggerWrapper aLogger) {
+    logger = aLogger;
+    jobHandlers = new ArrayList();
+    identifierToJobHandler = new HashMap();
+    nrJobs = 0;
+    lastCleanup = 0;
+    jobCleanupTreshold = 900; // seconds
+    queueRunner = new JobQueueRunner(logger);
+    thread = new Thread(queueRunner, "JobQueue");
+    thread.setDaemon(true);
+    thread.start();
   }
 
-  public void appendJob(Object aData) {
-    synchronized (jobs) {
-      Job job = new Job(aData);
-      jobs.add(job);
-      dataToJob.put(aData, job);
+  public String appendJob(Job aJob, String aDescription) {
+    try {
+      if (System.currentTimeMillis() - lastCleanup > 60000) {
+        cleanupJobList();
+      }
+    }
+    catch (Throwable t) {
+      logger.error("error while cleaning up joblist: " + t.toString());
+    }
+
+    synchronized (jobHandlers) {
+      JobHandler jobHandler = new JobHandler(aJob, Integer.toString(nrJobs), aDescription);
+      nrJobs++;
+
+      jobHandlers.add(jobHandler);
+      identifierToJobHandler.put(jobHandler.getIdentifier(), jobHandler);
+
+      jobHandler.setPending();
+
+      jobHandlers.notify();
+
+      return jobHandler.getIdentifier();
     }
   }
 
-  public Object acquirePendingJob() {
-    synchronized (jobs) {
-      Iterator i = jobs.iterator();
+  public List getJobsInfo() {
+    if (System.currentTimeMillis() - lastCleanup > 60000) {
+      cleanupJobList();
+    }
 
-      while (i.hasNext()) {
-        Job job = (Job) i.next();
+    List result = new ArrayList();
 
-        if (job.setProcessing()) {
-          return job.getData();
-        }
+    synchronized (jobHandlers) {
+      Iterator i = jobHandlers.iterator();
+
+      while (i.hasNext()) {
+        result.add(0, ((JobHandler) i.next()).getJobInfo());
       }
     }
 
-    return null;
+    return result;
   }
 
-  public void flagOffJob(Object aData) {
-    synchronized (jobs) {
-      Job job = (Job) dataToJob.get(aData);
+  private void cleanupJobList() {
+    List toRemove = new ArrayList();
 
-      if (job!=null) {
-        job.setProcessed();
+    synchronized (jobHandlers) {
+      Iterator i = jobHandlers.iterator();
+
+      Calendar tresholdCalendar = new GregorianCalendar();
+      tresholdCalendar.add(Calendar.SECOND, -jobCleanupTreshold);
+      Date treshold = tresholdCalendar.getTime();
+
+      while (i.hasNext()) {
+        JobHandler jobHandler = (JobHandler) i.next();
+
+        synchronized (jobHandler) {
+          if (jobHandler.isFinished() && jobHandler.getLastChange().before(treshold)) {
+            toRemove.add(jobHandler);
+            identifierToJobHandler.remove(jobHandler.getIdentifier());
+          }
+        }
       }
+
+      jobHandlers.removeAll(toRemove);
     }
+
+    lastCleanup = System.currentTimeMillis();
   }
 
-  public void cleanupJobs() {
-    synchronized (jobs) {
-      Iterator i = jobs.iterator();
+  /**
+   * Returns when a new producer job has become available
+   */
+  private JobHandler acquirePendingJob() throws InterruptedException {
+    int priorityFound= 0;
+    JobHandler jobFound;
 
-      while (i.hasNext()) {
-        Job job = (Job) i.next();
+    synchronized (jobHandlers) {
+      do {
+        jobFound = null;
+        Iterator i = jobHandlers.iterator();
+        while (i.hasNext()) {
+          JobHandler job = (JobHandler) i.next();
 
-        if (job.hasBeenProcessed()) {
-          i.remove();
+          if (job.isPending() && (jobFound==null || priorityFound<job.getPriority())) {
+            jobFound = job;
+            priorityFound = job.getPriority();
+          }
         }
+
+        if (jobFound==null) {
+          jobHandlers.wait();
+        }
+
+      } while (jobFound==null);
+
+      return jobFound;
+    }
+  }
+
+  public void cancelJobs(List aJobs) {
+    synchronized (jobHandlers) {
+      Iterator i = aJobs.iterator();
+
+      while (i.hasNext()) {
+        ((JobHandler) identifierToJobHandler.get(i.next())).cancelOrAbortJob();
       }
     }
   }
 
-  public List makeJobListSnapshot() {
-    synchronized (jobs) {
-      return (List) jobs.clone();
+  public void cancelAllJobs() {
+    synchronized (jobHandlers) {
+      Iterator i = jobHandlers.iterator();
+
+      while (i.hasNext()) {
+        ((JobHandler) i.next()).cancelOrAbortJob();
+      }
     }
   }
 
-  public class Job implements Cloneable {
-    private Object data;
+  public interface Job {
+    /**
+     * This method should cause the run() method to terminate as soon as
+     * possible.
+     */
+    void abort();
+
+    /**
+     * This method should perform the actions associated with the job.
+     * @return <code>true</code> if terminated normally, <code>false</code> if aborted
+     */
+    boolean run();
+  }
+
+  public static class JobInfo {
+    private String identifier;
+    private Date lastChange;
     private int status;
+    private long runningTime;
+    private int priority;
+    private String description;
 
-    public Job(Object aData, int aStatus) {
-      data = aData;
+    private JobInfo(String aDescription, int aStatus, Date aLastChange, String anIdentifier, long aRunningTime, int aPriority) {
+      description = aDescription;
+      lastChange = aLastChange;
       status = aStatus;
+      identifier = anIdentifier;
+      priority = aPriority;
+      runningTime = aRunningTime;
+    }
+
+    public String getDescription() {
+      return description;
+    }
+
+    public int getStatus() {
+      return status;
+    }
+
+    public int getPriority() {
+      return priority;
+    }
+
+    public Date getLastChange() {
+      return lastChange;
+    }
+
+    public String getIdentifier() {
+      return identifier;
+    }
+
+    public long getRunningTime() {
+      return runningTime;
+    }
+  }
+
+  public class JobHandler {
+    private Job job;
+    private String identifier;
+    private String description;
+
+    private Date lastChange;
+    private long starttime;
+    private long endtime;
+    private int status;
+    private int priority;
+    private boolean hasRun;
+
+    public JobHandler(Job aJob, String anIdentifier, String aDescription, int aPriority) {
+      job = aJob;
+      description = aDescription;
+      identifier = anIdentifier;
+      priority = aPriority;
+      status = STATUS_CREATED;
     }
 
-    public Job(Object aData) {
-      this(aData, STATUS_PENDING);
+    public JobHandler(Job aJob, String anIdentifier, String aDescription) {
+      this(aJob, anIdentifier, aDescription, PRIORITY_NORMAL);
     }
 
-    public Object getData() {
-      return data;
+    public JobInfo getJobInfo() {
+      return new JobInfo(getDescription(), getStatus(), getLastChange(), getIdentifier(), getRunningTime(), priority);
+    }
+
+    private void runJob() {
+      if (setProcessing()) {
+        if (job.run())
+          setProcessed();
+        else
+          setAborted();
+      }
+    }
+
+    private void cancelOrAbortJob() {
+      synchronized (this) {
+        if (isPending())
+          setCancelled();
+        if (isProcessing())
+          job.abort();
+      }
     }
 
     public int getStatus() {
@@ -94,18 +308,71 @@ public class JobQueue {
       }
     }
 
-    protected boolean setProcessing() {
+    public String getIdentifier() {
+      return identifier;
+    }
+
+    public String getDescription() {
+      return description;
+    }
+
+    public long getRunningTime() {
+      synchronized(this) {
+        long result = 0;
+
+        if (hasRun) {
+          if (isFinished())
+            result = endtime;
+          else
+            result = System.currentTimeMillis();
+
+          result = result - starttime;
+        }
+
+        return result;
+      }
+    }
+
+    public int getPriority() {
+      return priority;
+    }
+
+    private boolean setProcessing() {
       return setStatus(STATUS_PENDING, STATUS_PROCESSING);
     }
 
-    protected void setProcessed() {
+    private void setProcessed() {
       setStatus(STATUS_PROCESSING, STATUS_PROCESSED);
     }
 
+    private void setAborted() {
+      setStatus(STATUS_PROCESSING, STATUS_ABORTED);
+    }
+
+    private void setPending() {
+      setStatus(STATUS_CREATED, STATUS_PENDING);
+    }
+
+    private boolean setCancelled() {
+      return setStatus(STATUS_PENDING, STATUS_CANCELLED);
+    }
+
     public boolean hasBeenProcessed() {
       return getStatus() == STATUS_PROCESSED;
     }
 
+    public boolean hasBeenAborted() {
+      return getStatus() == STATUS_ABORTED;
+    }
+
+    public boolean isCancelled() {
+      return getStatus() == STATUS_CANCELLED;
+    }
+
+    public boolean isFinished() {
+      return hasBeenProcessed() || hasBeenAborted() || isCancelled();
+    }
+
     public boolean isProcessing() {
       return getStatus() == STATUS_PROCESSING;
     }
@@ -114,21 +381,58 @@ public class JobQueue {
       return getStatus() == STATUS_PENDING;
     }
 
+    public Date getLastChange() {
+      synchronized (this) {
+        return lastChange;
+      }
+    }
+
     private boolean setStatus(int anOldStatus, int aNewStatus) {
       synchronized(this) {
         if (status == anOldStatus) {
           status = aNewStatus;
+          lastChange = (new GregorianCalendar()).getTime();
+          if (isProcessing()) {
+            starttime = System.currentTimeMillis();
+            hasRun = true;
+          }
+
+          if (isFinished()) {
+            endtime = System.currentTimeMillis();
+          }
           return true;
         }
-        else {
-          return false;
-        }
+                               return false;
       }
     }
+  }
 
-    protected Object clone() {
-      synchronized(this) {
-        return new Job(data, status);
+  private class JobQueueRunner implements Runnable {
+    
+    public JobQueueRunner(LoggerWrapper aLogger) {
+      logger = aLogger;
+    }
+
+    public void run() {
+      logger.debug("starting JobQueueRunner");
+
+      try {
+        while (true) {
+          try {
+            JobHandler job = acquirePendingJob();
+
+            logger.debug("  starting job ("+job.getIdentifier()+"): " +job.getDescription());
+            job.runJob();
+            logger.debug("  finished job ("+job.getIdentifier()+"): " +job.getDescription());
+
+            job=null;
+          }
+          catch (InterruptedException e) {
+          }
+        }
+      }
+      finally {
+        logger.warn("JobQueueRunner terminated");
       }
     }
   }