serious memory leak in the producer subsystem fixed
[mir.git] / source / mircoders / global / JobQueue.java
index 34dd7b0..99ffed7 100755 (executable)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2001, 2002  The Mir-coders group
+ * Copyright (C) 2001, 2002 The Mir-coders group
  *
  * This file is part of Mir.
  *
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  *
  * In addition, as a special exception, The Mir-coders gives permission to link
- * the code of this program with the com.oreilly.servlet library, any library
- * licensed under the Apache Software License, The Sun (tm) Java Advanced
- * Imaging library (JAI), The Sun JIMI library (or with modified versions of
- * the above that use the same license as the above), and distribute linked
- * combinations including the two.  You must obey the GNU General Public
- * License in all respects for all of the code used other than the above
- * mentioned libraries.  If you modify this file, you may extend this exception
- * to your version of the file, but you are not obligated to do so.  If you do
- * not wish to do so, delete this exception statement from your version.
+ * the code of this program with  any library licensed under the Apache Software License,
+ * The Sun (tm) Java Advanced Imaging library (JAI), The Sun JIMI library
+ * (or with modified versions of the above that use the same license as the above),
+ * and distribute linked combinations including the two.  You must obey the
+ * GNU General Public License in all respects for all of the code used other than
+ * the above mentioned libraries.  If you modify this file, you may extend this
+ * exception to your version of the file, but you are not obligated to do so.
+ * If you do not wish to do so, delete this exception statement from your version.
  */
 
 package mircoders.global;
 
-import java.util.*;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import mir.log.LoggerWrapper;
 
 // important: objects passed as data must not be altered once put into a job
 
 public class JobQueue {
-  private Vector jobs;
-  private Vector finishedJobs;
-  private Map dataToJob;
-  private Map identifierToJob;
+  private List jobHandlers;
+  private Map identifierToJobHandler;
   private int nrJobs;
+  private int jobCleanupTreshold;
+  private JobQueueRunner queueRunner;
+  private Thread thread;
+  private LoggerWrapper logger;
+  private long lastCleanup;
 
+  public static final int STATUS_CREATED = -1;
   public static final int STATUS_PENDING = 0;
   public static final int STATUS_PROCESSING = 1;
   public static final int STATUS_PROCESSED = 2;
@@ -54,129 +67,180 @@ public class JobQueue {
 
   public static final int FINISHEDJOBS_LOGSIZE = 10;
 
-  public JobQueue() {
-    finishedJobs = new Vector();
-    jobs = new Vector();
-    dataToJob = new HashMap();
-    identifierToJob = new HashMap();
+  public JobQueue(LoggerWrapper aLogger) {
+    logger = aLogger;
+    jobHandlers = new ArrayList();
+    identifierToJobHandler = new HashMap();
     nrJobs = 0;
+    lastCleanup = 0;
+    jobCleanupTreshold = 900; // seconds
+    queueRunner = new JobQueueRunner(logger);
+    thread = new Thread(queueRunner, "JobQueue");
+    thread.setDaemon(true);
+    thread.start();
   }
 
-  public String appendJob(Object aData) {
-    synchronized (jobs) {
-      Job job = new Job(aData, Integer.toString(nrJobs));
-      nrJobs++;
-      jobs.add(job);
-      dataToJob.put(aData, job);
-      identifierToJob.put(job.getIdentifier(), job);
-      return job.getIdentifier();
+  public String appendJob(Job aJob, String aDescription) {
+    try {
+      if (System.currentTimeMillis() - lastCleanup > 60000) {
+        cleanupJobList();
+      }
+    }
+    catch (Throwable t) {
+      logger.error("error while cleaning up joblist: " + t.toString());
     }
-  }
 
-  public Object acquirePendingJob() {
-    synchronized (jobs) {
-      int priorityFound= 0;
-      Job jobFound;
+    synchronized (jobHandlers) {
+      JobHandler jobHandler = new JobHandler(aJob, Integer.toString(nrJobs), aDescription);
+      nrJobs++;
 
-      do {
-        jobFound = null;
-        Iterator i = jobs.iterator();
-        while (i.hasNext()) {
-          Job job = (Job) i.next();
+      jobHandlers.add(jobHandler);
+      identifierToJobHandler.put(jobHandler.getIdentifier(), jobHandler);
 
-          if (job.isPending() && (jobFound==null || priorityFound<job.getPriority())) {
-            jobFound = job;
-            priorityFound = job.getPriority();
-          }
-        }
-      }
-      while (jobFound!=null && !jobFound.setProcessing());
+      jobHandler.setPending();
 
-      if (jobFound!=null)
-        return jobFound.getData();
-      else
-        return null;
+      jobHandlers.notify();
+
+      return jobHandler.getIdentifier();
     }
   }
 
-  private void finishJob(Job aJob) {
-    synchronized (jobs) {
-      identifierToJob.remove(aJob.identifier);
-      jobs.remove(aJob);
-      finishedJobs.insertElementAt(aJob, 0);
-      if (finishedJobs.size()>FINISHEDJOBS_LOGSIZE)
-        finishedJobs.remove(finishedJobs.size()-1);
+  public List getJobsInfo() {
+    if (System.currentTimeMillis() - lastCleanup > 60000) {
+      cleanupJobList();
     }
-  }
 
-  public void jobProcessed(Object aData) {
-    synchronized (jobs) {
-      Job job = (Job) dataToJob.get(aData);
+    List result = new ArrayList();
 
-      if (job!=null) {
-        job.setProcessed();
-        finishJob(job);
+    synchronized (jobHandlers) {
+      Iterator i = jobHandlers.iterator();
+
+      while (i.hasNext()) {
+        result.add(0, ((JobHandler) i.next()).getJobInfo());
       }
     }
+
+    return result;
   }
 
-  public void jobAborted(Object aData) {
-    synchronized (jobs) {
-      Job job = (Job) dataToJob.get(aData);
+  private void cleanupJobList() {
+    List toRemove = new ArrayList();
+
+    synchronized (jobHandlers) {
+      Iterator i = jobHandlers.iterator();
 
-      if (job!=null) {
-        job.setAborted();
-        finishJob(job);
+      Calendar tresholdCalendar = new GregorianCalendar();
+      tresholdCalendar.add(Calendar.SECOND, -jobCleanupTreshold);
+      Date treshold = tresholdCalendar.getTime();
+
+      while (i.hasNext()) {
+        JobHandler jobHandler = (JobHandler) i.next();
+
+        synchronized (jobHandler) {
+          if (jobHandler.isFinished() && jobHandler.getLastChange().before(treshold)) {
+            toRemove.add(jobHandler);
+            identifierToJobHandler.remove(jobHandler.getIdentifier());
+          }
+        }
       }
+
+      jobHandlers.removeAll(toRemove);
     }
+
+    lastCleanup = System.currentTimeMillis();
   }
 
-  public void cancelJob(Object aData) {
-    synchronized (jobs) {
-      Job job = (Job) dataToJob.get(aData);
+  /**
+   * Returns when a new producer job has become available
+   */
+  private JobHandler acquirePendingJob() throws InterruptedException {
+    int priorityFound= 0;
+    JobHandler jobFound;
 
-      if (job!=null && job.setCancelled()) {
-        finishJob(job);
-      }
+    synchronized (jobHandlers) {
+      do {
+        jobFound = null;
+        Iterator i = jobHandlers.iterator();
+        while (i.hasNext()) {
+          JobHandler job = (JobHandler) i.next();
+
+          if (job.isPending() && (jobFound==null || priorityFound<job.getPriority())) {
+            jobFound = job;
+            priorityFound = job.getPriority();
+          }
+        }
+
+        if (jobFound==null) {
+          jobHandlers.wait();
+        }
+
+      } while (jobFound==null);
+
+      return jobFound;
     }
   }
 
-  public void makeJobListSnapshots(List aJobList, List aFinishedJobList) {
-    synchronized (jobs) {
-      aJobList.addAll(makeJobListSnapshot());
-      aFinishedJobList.addAll(makeFinishedJobListSnapshot());
+  public void cancelJobs(List aJobs) {
+    synchronized (jobHandlers) {
+      Iterator i = aJobs.iterator();
+
+      while (i.hasNext()) {
+        ((JobHandler) identifierToJobHandler.get(i.next())).cancelOrAbortJob();
+      }
     }
   }
 
-  public List makeJobListSnapshot() {
-    synchronized (jobs) {
-      return (List) jobs.clone();
+  public void cancelAllJobs() {
+    synchronized (jobHandlers) {
+      Iterator i = jobHandlers.iterator();
+
+      while (i.hasNext()) {
+        ((JobHandler) i.next()).cancelOrAbortJob();
+      }
     }
   }
 
-  public List makeFinishedJobListSnapshot() {
-    synchronized (jobs) {
-      return (List) finishedJobs.clone();
-    }
+  public interface Job {
+    /**
+     * This method should cause the run() method to terminate as soon as
+     * possible.
+     */
+    void abort();
+
+    /**
+     * This method should perform the actions associated with the job.
+     * @return <code>true</code> if terminated normally, <code>false</code> if aborted
+     */
+    boolean run();
   }
 
-  public class Job implements Cloneable {
-    private Object data;
-    private Date lastChange;
+  public static class JobInfo {
     private String identifier;
+    private Date lastChange;
     private int status;
+    private long runningTime;
     private int priority;
+    private String description;
 
-    public Job(Object aData, String anIdentifier, int aStatus, int aPriority, Date aLastChange) {
-      data = aData;
+    private JobInfo(String aDescription, int aStatus, Date aLastChange, String anIdentifier, long aRunningTime, int aPriority) {
+      description = aDescription;
+      lastChange = aLastChange;
       status = aStatus;
       identifier = anIdentifier;
       priority = aPriority;
-      lastChange = aLastChange;
+      runningTime = aRunningTime;
+    }
+
+    public String getDescription() {
+      return description;
     }
 
-    public Job(Object aData, String anIdentifier, int aStatus, int aPriority) {
-      this(aData, anIdentifier, aStatus, aPriority, (new GregorianCalendar()).getTime());
+    public int getStatus() {
+      return status;
+    }
+
+    public int getPriority() {
+      return priority;
     }
 
     public Date getLastChange() {
@@ -187,12 +251,55 @@ public class JobQueue {
       return identifier;
     }
 
-    public Job(Object aData, String anIdentifier) {
-      this(aData, anIdentifier, STATUS_PENDING, PRIORITY_NORMAL);
+    public long getRunningTime() {
+      return runningTime;
+    }
+  }
+
+  public class JobHandler {
+    private Job job;
+    private String identifier;
+    private String description;
+
+    private Date lastChange;
+    private long starttime;
+    private long endtime;
+    private int status;
+    private int priority;
+    private boolean hasRun;
+
+    public JobHandler(Job aJob, String anIdentifier, String aDescription, int aPriority) {
+      job = aJob;
+      description = aDescription;
+      identifier = anIdentifier;
+      priority = aPriority;
+      status = STATUS_CREATED;
+    }
+
+    public JobHandler(Job aJob, String anIdentifier, String aDescription) {
+      this(aJob, anIdentifier, aDescription, PRIORITY_NORMAL);
     }
 
-    public Object getData() {
-      return data;
+    public JobInfo getJobInfo() {
+      return new JobInfo(getDescription(), getStatus(), getLastChange(), getIdentifier(), getRunningTime(), priority);
+    }
+
+    private void runJob() {
+      if (setProcessing()) {
+        if (job.run())
+          setProcessed();
+        else
+          setAborted();
+      }
+    }
+
+    private void cancelOrAbortJob() {
+      synchronized (this) {
+        if (isPending())
+          setCancelled();
+        if (isProcessing())
+          job.abort();
+      }
     }
 
     public int getStatus() {
@@ -201,23 +308,52 @@ public class JobQueue {
       }
     }
 
+    public String getIdentifier() {
+      return identifier;
+    }
+
+    public String getDescription() {
+      return description;
+    }
+
+    public long getRunningTime() {
+      synchronized(this) {
+        long result = 0;
+
+        if (hasRun) {
+          if (isFinished())
+            result = endtime;
+          else
+            result = System.currentTimeMillis();
+
+          result = result - starttime;
+        }
+
+        return result;
+      }
+    }
+
     public int getPriority() {
       return priority;
     }
 
-    protected boolean setProcessing() {
+    private boolean setProcessing() {
       return setStatus(STATUS_PENDING, STATUS_PROCESSING);
     }
 
-    protected void setProcessed() {
+    private void setProcessed() {
       setStatus(STATUS_PROCESSING, STATUS_PROCESSED);
     }
 
-    protected void setAborted() {
+    private void setAborted() {
       setStatus(STATUS_PROCESSING, STATUS_ABORTED);
     }
 
-    protected boolean setCancelled() {
+    private void setPending() {
+      setStatus(STATUS_CREATED, STATUS_PENDING);
+    }
+
+    private boolean setCancelled() {
       return setStatus(STATUS_PENDING, STATUS_CANCELLED);
     }
 
@@ -245,23 +381,58 @@ public class JobQueue {
       return getStatus() == STATUS_PENDING;
     }
 
+    public Date getLastChange() {
+      synchronized (this) {
+        return lastChange;
+      }
+    }
+
     private boolean setStatus(int anOldStatus, int aNewStatus) {
       synchronized(this) {
         if (status == anOldStatus) {
           status = aNewStatus;
           lastChange = (new GregorianCalendar()).getTime();
+          if (isProcessing()) {
+            starttime = System.currentTimeMillis();
+            hasRun = true;
+          }
+
+          if (isFinished()) {
+            endtime = System.currentTimeMillis();
+          }
           return true;
         }
-        else {
-          return false;
-        }
+                               return false;
       }
     }
+  }
 
-    protected Object clone() {
-      synchronized(this) {
-        System.out.println("  blabla");
-        return new Job(data, identifier, status, priority, lastChange);
+  private class JobQueueRunner implements Runnable {
+    
+    public JobQueueRunner(LoggerWrapper aLogger) {
+      logger = aLogger;
+    }
+
+    public void run() {
+      logger.debug("starting JobQueueRunner");
+
+      try {
+        while (true) {
+          try {
+            JobHandler job = acquirePendingJob();
+
+            logger.debug("  starting job ("+job.getIdentifier()+"): " +job.getDescription());
+            job.runJob();
+            logger.debug("  finished job ("+job.getIdentifier()+"): " +job.getDescription());
+
+            job=null;
+          }
+          catch (InterruptedException e) {
+          }
+        }
+      }
+      finally {
+        logger.warn("JobQueueRunner terminated");
       }
     }
   }