producer abort + timezone support
[mir.git] / source / mircoders / global / JobQueue.java
index 375d287..4fafe7f 100755 (executable)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2001, 2002  The Mir-coders group
+ * Copyright (C) 2001, 2002 The Mir-coders group
  *
  * This file is part of Mir.
  *
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  *
  * In addition, as a special exception, The Mir-coders gives permission to link
- * the code of this program with the com.oreilly.servlet library, any library
- * licensed under the Apache Software License, The Sun (tm) Java Advanced
- * Imaging library (JAI), The Sun JIMI library (or with modified versions of
- * the above that use the same license as the above), and distribute linked
- * combinations including the two.  You must obey the GNU General Public
- * License in all respects for all of the code used other than the above
- * mentioned libraries.  If you modify this file, you may extend this exception
- * to your version of the file, but you are not obligated to do so.  If you do
- * not wish to do so, delete this exception statement from your version.
+ * the code of this program with  any library licensed under the Apache Software License,
+ * The Sun (tm) Java Advanced Imaging library (JAI), The Sun JIMI library
+ * (or with modified versions of the above that use the same license as the above),
+ * and distribute linked combinations including the two.  You must obey the
+ * GNU General Public License in all respects for all of the code used other than
+ * the above mentioned libraries.  If you modify this file, you may extend this
+ * exception to your version of the file, but you are not obligated to do so.
+ * If you do not wish to do so, delete this exception statement from your version.
  */
 
 package mircoders.global;
 
+
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Vector;
+import java.util.*;
+
+import mir.log.*;
 
 // important: objects passed as data must not be altered once put into a job
 
 public class JobQueue {
-  private Vector jobs;
-  private Vector finishedJobs;
-  private Map dataToJob;
-  private Map identifierToJob;
+  private Vector jobHandlers;
+  private Map identifierToJobHandler;
   private int nrJobs;
+  private int jobCleanupTreshold;
+  private JobQueueRunner queueRunner;
+  private Thread thread;
+  private LoggerWrapper logger;
 
+  public static final int STATUS_CREATED = -1;
   public static final int STATUS_PENDING = 0;
   public static final int STATUS_PROCESSING = 1;
   public static final int STATUS_PROCESSED = 2;
@@ -60,129 +65,131 @@ public class JobQueue {
 
   public static final int FINISHEDJOBS_LOGSIZE = 10;
 
-  public JobQueue() {
-    finishedJobs = new Vector();
-    jobs = new Vector();
-    dataToJob = new HashMap();
-    identifierToJob = new HashMap();
+  public JobQueue(LoggerWrapper aLogger) {
+    logger = aLogger;
+    jobHandlers = new Vector();
+    identifierToJobHandler = new HashMap();
     nrJobs = 0;
+    jobCleanupTreshold = 1000;
+    queueRunner = new JobQueueRunner(logger);
+    thread = new Thread(queueRunner);
+    thread.start();
   }
 
-  public String appendJob(Object aData) {
-    synchronized (jobs) {
-      Job job = new Job(aData, Integer.toString(nrJobs));
+  public String appendJob(Job aJob, String aDescription) {
+    synchronized (jobHandlers) {
+      JobHandler jobHandler = new JobHandler(aJob, Integer.toString(nrJobs), aDescription);
       nrJobs++;
-      jobs.add(job);
-      dataToJob.put(aData, job);
-      identifierToJob.put(job.getIdentifier(), job);
-      return job.getIdentifier();
+      jobHandlers.add(jobHandler);
+      identifierToJobHandler.put(jobHandler.getIdentifier(), jobHandler);
+      jobHandler.setPending();
+
+      return jobHandler.getIdentifier();
     }
   }
 
-  public Object acquirePendingJob() {
-    synchronized (jobs) {
-      int priorityFound= 0;
-      Job jobFound;
+  public List getJobsInfo() {
+    List result = new Vector();
 
-      do {
-        jobFound = null;
-        Iterator i = jobs.iterator();
-        while (i.hasNext()) {
-          Job job = (Job) i.next();
+    synchronized (jobHandlers) {
+      Iterator i = jobHandlers.iterator();
 
-          if (job.isPending() && (jobFound==null || priorityFound<job.getPriority())) {
-            jobFound = job;
-            priorityFound = job.getPriority();
-          }
-        }
+      while (i.hasNext()) {
+        result.add(((JobHandler) i.next()).getJobInfo());
       }
-      while (jobFound!=null && !jobFound.setProcessing());
-
-      if (jobFound!=null)
-        return jobFound.getData();
-      else
-        return null;
     }
-  }
 
-  private void finishJob(Job aJob) {
-    synchronized (jobs) {
-      identifierToJob.remove(aJob.identifier);
-      jobs.remove(aJob);
-      finishedJobs.insertElementAt(aJob, 0);
-      if (finishedJobs.size()>FINISHEDJOBS_LOGSIZE)
-        finishedJobs.remove(finishedJobs.size()-1);
-    }
+    return result;
   }
 
-  public void jobProcessed(Object aData) {
-    synchronized (jobs) {
-      Job job = (Job) dataToJob.get(aData);
+  private void cleanupJobList() {
+    synchronized (jobHandlers) {
+      Iterator i = jobHandlers.iterator();
 
-      if (job!=null) {
-        job.setProcessed();
-        finishJob(job);
-      }
-    }
-  }
+      Calendar tresholdCalendar = new GregorianCalendar();
+      tresholdCalendar.add(Calendar.SECOND, -jobCleanupTreshold);
+      Date treshold = tresholdCalendar.getTime();
 
-  public void jobAborted(Object aData) {
-    synchronized (jobs) {
-      Job job = (Job) dataToJob.get(aData);
+      while (i.hasNext()) {
+        JobHandler jobHandler = (JobHandler) i.next();
 
-      if (job!=null) {
-        job.setAborted();
-        finishJob(job);
+        synchronized (jobHandler) {
+          if (jobHandler.isFinished() && jobHandler.getLastChange().before(treshold)) {
+            jobHandlers.remove(jobHandler);
+          }
+        }
       }
     }
   }
 
-  public void cancelJob(Object aData) {
-    synchronized (jobs) {
-      Job job = (Job) dataToJob.get(aData);
+  private JobHandler acquirePendingJob() {
+    synchronized (jobHandlers) {
+      int priorityFound= 0;
+      JobHandler jobFound;
+
+      jobFound = null;
+      Iterator i = jobHandlers.iterator();
+      while (i.hasNext()) {
+        JobHandler job = (JobHandler) i.next();
 
-      if (job!=null && job.setCancelled()) {
-        finishJob(job);
+        if (job.isPending() && (jobFound==null || priorityFound<job.getPriority())) {
+          jobFound = job;
+          priorityFound = job.getPriority();
+        }
       }
-    }
-  }
 
-  public void makeJobListSnapshots(List aJobList, List aFinishedJobList) {
-    synchronized (jobs) {
-      aJobList.addAll(makeJobListSnapshot());
-      aFinishedJobList.addAll(makeFinishedJobListSnapshot());
+      return jobFound;
     }
   }
 
-  public List makeJobListSnapshot() {
-    synchronized (jobs) {
-      return (List) jobs.clone();
+  public void cancelJobs(List aJobs) {
+    synchronized (jobHandlers) {
+      Iterator i = aJobs.iterator();
+
+      while (i.hasNext()) {
+        ((JobHandler) identifierToJobHandler.get(i.next())).cancelOrAbortJob();
+      }
     }
   }
 
-  public List makeFinishedJobListSnapshot() {
-    synchronized (jobs) {
-      return (List) finishedJobs.clone();
-    }
+  public interface Job {
+    void abort();
+
+    /**
+     *
+     *
+     * @return <code>true</code> if terminated normally, <code>false</code> if aborted
+     */
+    boolean run();
   }
 
-  public class Job implements Cloneable {
-    private Object data;
-    private Date lastChange;
+  public static class JobInfo {
     private String identifier;
+    private Date lastChange;
     private int status;
+    private long runningTime;
     private int priority;
+    private String description;
 
-    public Job(Object aData, String anIdentifier, int aStatus, int aPriority, Date aLastChange) {
-      data = aData;
+    private JobInfo(String aDescription, int aStatus, Date aLastChange, String anIdentifier, long aRunningTime, int aPriority) {
+      description = aDescription;
+      lastChange = aLastChange;
       status = aStatus;
       identifier = anIdentifier;
       priority = aPriority;
-      lastChange = aLastChange;
+      runningTime = aRunningTime;
     }
 
-    public Job(Object aData, String anIdentifier, int aStatus, int aPriority) {
-      this(aData, anIdentifier, aStatus, aPriority, (new GregorianCalendar()).getTime());
+    public String getDescription() {
+      return description;
+    }
+
+    public int getStatus() {
+      return status;
+    }
+
+    public int getPriority() {
+      return priority;
     }
 
     public Date getLastChange() {
@@ -193,37 +200,109 @@ public class JobQueue {
       return identifier;
     }
 
-    public Job(Object aData, String anIdentifier) {
-      this(aData, anIdentifier, STATUS_PENDING, PRIORITY_NORMAL);
+    public long getRunningTime() {
+      return runningTime;
     }
+  }
+
+  public class JobHandler {
+    private Job job;
+    private String identifier;
+    private String description;
 
-    public Object getData() {
-      return data;
+    private Date lastChange;
+    private long starttime;
+    private long endtime;
+    private int status;
+    private int priority;
+    private boolean hasRun;
+
+    public JobHandler(Job aJob, String anIdentifier, String aDescription, int aPriority) {
+      job = aJob;
+      description = aDescription;
+      identifier = anIdentifier;
+      priority = aPriority;
+      status = STATUS_CREATED;
+    }
+
+    public JobHandler(Job aJob, String anIdentifier, String aDescription) {
+      this(aJob, anIdentifier, aDescription, PRIORITY_NORMAL);
     }
 
+    public JobInfo getJobInfo() {
+      return new JobInfo(getDescription(), getStatus(), getLastChange(), getIdentifier(), getRunningTime(), priority);
+    }
+
+    private void runJob() {
+      if (setProcessing()) {
+        if (job.run())
+          setProcessed();
+        else
+          setAborted();
+      }
+    };
+
+    private void cancelOrAbortJob() {
+      synchronized (this) {
+        if (isPending())
+          setCancelled();
+        if (isProcessing())
+          job.abort();
+      }
+    };
+
     public int getStatus() {
       synchronized(this) {
         return status;
       }
     }
 
+    public String getIdentifier() {
+      return identifier;
+    }
+
+    public String getDescription() {
+      return description;
+    }
+
+    public long getRunningTime() {
+      synchronized(this) {
+        long result = 0;
+
+        if (hasRun) {
+          if (isFinished())
+            result = endtime;
+          else
+            result = System.currentTimeMillis();
+
+          result = result - starttime;
+        }
+
+        return result;
+      }
+    }
+
     public int getPriority() {
       return priority;
     }
 
-    protected boolean setProcessing() {
+    private boolean setProcessing() {
       return setStatus(STATUS_PENDING, STATUS_PROCESSING);
     }
 
-    protected void setProcessed() {
+    private void setProcessed() {
       setStatus(STATUS_PROCESSING, STATUS_PROCESSED);
     }
 
-    protected void setAborted() {
+    private void setAborted() {
       setStatus(STATUS_PROCESSING, STATUS_ABORTED);
     }
 
-    protected boolean setCancelled() {
+    private void setPending() {
+      setStatus(STATUS_CREATED, STATUS_PENDING);
+    }
+
+    private boolean setCancelled() {
       return setStatus(STATUS_PENDING, STATUS_CANCELLED);
     }
 
@@ -251,11 +330,25 @@ public class JobQueue {
       return getStatus() == STATUS_PENDING;
     }
 
+    public Date getLastChange() {
+      synchronized (this) {
+        return lastChange;
+      }
+    }
+
     private boolean setStatus(int anOldStatus, int aNewStatus) {
       synchronized(this) {
         if (status == anOldStatus) {
           status = aNewStatus;
           lastChange = (new GregorianCalendar()).getTime();
+          if (isProcessing()) {
+            starttime = System.currentTimeMillis();
+            hasRun = true;
+          }
+
+          if (isFinished()) {
+            endtime = System.currentTimeMillis();
+          }
           return true;
         }
         else {
@@ -263,10 +356,37 @@ public class JobQueue {
         }
       }
     }
+  }
 
-    protected Object clone() {
-      synchronized(this) {
-        return new Job(data, identifier, status, priority, lastChange);
+  private class JobQueueRunner implements Runnable {
+    private LoggerWrapper logger;
+
+    public JobQueueRunner(LoggerWrapper aLogger) {
+      logger = aLogger;
+    }
+
+    public void run() {
+      logger.debug("starting JobQueueRunner");
+
+      try {
+        while (true) {
+          JobHandler job = acquirePendingJob();
+          if (job != null) {
+            logger.debug("  starting job ("+job.getIdentifier()+"): " +job.getDescription());
+            job.runJob();
+            logger.debug("  finished job ("+job.getIdentifier()+"): " +job.getDescription());
+          }
+          else {
+            try {
+              Thread.sleep(1500);
+            }
+            catch (InterruptedException e) {
+            }
+          }
+        }
+      }
+      finally {
+        logger.warn("JobQueueRunner terminated");
       }
     }
   }