implemented a simple producer queue
authorzapata <zapata>
Tue, 30 Apr 2002 22:09:10 +0000 (22:09 +0000)
committerzapata <zapata>
Tue, 30 Apr 2002 22:09:10 +0000 (22:09 +0000)
source/mircoders/global/JobQueue.java [new file with mode: 0755]
source/mircoders/global/ProducerEngine.java
source/mircoders/localizer/basic/MirBasicOpenPostingLocalizer.java
source/mircoders/servlet/ServletModuleProducer.java

diff --git a/source/mircoders/global/JobQueue.java b/source/mircoders/global/JobQueue.java
new file mode 100755 (executable)
index 0000000..2d57888
--- /dev/null
@@ -0,0 +1,114 @@
+package mircoders.global;
+
+import java.util.*;
+
+// important: objects passed as data must not be altered once put into a job
+
+public class JobQueue {
+  private List jobs;
+  private Map dataToJob;
+
+  public static final int STATUS_PENDING = 0;
+  public static final int STATUS_PROCESSING = 1;
+  public static final int STATUS_PROCESSED = 2;
+
+  public JobQueue() {
+    jobs = new Vector();
+    dataToJob = new HashMap();
+  }
+
+  public void appendJob(Object aData) {
+    synchronized (jobs) {
+      Job job = new Job(aData);
+      jobs.add(job);
+      dataToJob.put(aData, job);
+    }
+  }
+
+  public Object acquirePendingJob() {
+    synchronized (jobs) {
+      Iterator i = jobs.iterator();
+
+      while (i.hasNext()) {
+        Job job = (Job) i.next();
+
+        if (job.setProcessing()) {
+          return job.getData();
+        }
+      }
+    }
+
+    return null;
+  }
+
+  public void flagOffJob(Object aData) {
+    synchronized (jobs) {
+      Job job = (Job) dataToJob.get(aData);
+
+      if (job!=null) {
+        job.setProcessed();
+      }
+    }
+  }
+
+  public void cleanupJobs() {
+    synchronized (jobs) {
+      Iterator i = jobs.iterator();
+
+      while (i.hasNext()) {
+        Job job = (Job) i.next();
+
+        if (job.hasBeenProcessed()) {
+          i.remove();
+        }
+      }
+    }
+  }
+
+  public class Job {
+    private Object data;
+    private int status;
+    private int identifier;
+
+    public Job(Object aData) {
+      data = aData;
+      status = STATUS_PENDING;
+    }
+
+    public Object getData() {
+      return data;
+    }
+
+    public int getStatus() {
+      synchronized(this) {
+        return status;
+      }
+    }
+
+    public boolean setProcessing() {
+      return setStatus(STATUS_PENDING, STATUS_PROCESSING);
+    }
+
+    public void setProcessed() {
+      setStatus(STATUS_PROCESSING, STATUS_PROCESSED);
+    }
+
+    public boolean hasBeenProcessed() {
+      return getStatus() == STATUS_PROCESSED;
+    }
+
+    private boolean setStatus(int anOldStatus, int aNewStatus) {
+      synchronized(this) {
+        if (status == anOldStatus) {
+          status = aNewStatus;
+          return true;
+        }
+        else {
+          return false;
+        }
+      }
+    }
+
+  }
+}
+
index 3083837..33ebb65 100755 (executable)
@@ -7,21 +7,35 @@ import mir.util.*;
 
 public class ProducerEngine {
   private Map producers;
-  private List Queue;
+  private JobQueue producerJobQueue;
   private Thread queueThread;
+  private PrintWriter log;
 
   protected ProducerEngine() {
     producers = MirGlobal.localizer().producers().factories();
-    Queue = new Vector();
+    producerJobQueue = new JobQueue();
+    try {
+      log = new PrintWriter(new FileWriter(new File("/tmp/producer.log")));
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e.getMessage());
+    }
+
+    queueThread = new Thread(new ProducerJobQueueThread());
+    queueThread.start();
+  }
 
-//    queueThread = new Thread(
+  public void addJob(String aProducerFactory, String aVerb) {
+    producerJobQueue.appendJob(new ProducerJob(aProducerFactory, aVerb));
+    log.println(aProducerFactory+"."+aVerb+" added to queue");
+    log.flush();
   }
 
-  public void addTask(String aProducerFactory, String aVerb) {
-    produceNow(aProducerFactory, aVerb, new PrintWriter(new NullWriter()));
+  public void printQueueStatus(PrintWriter aWriter) {
+
   }
 
-  public void produceNow(String aProducerFactory, String aVerb, PrintWriter aLogger) {
+  private void produceNow(String aProducerFactory, String aVerb, PrintWriter aLogger) {
     try {
                  long startTime;
                  long endTime;
@@ -57,27 +71,67 @@ public class ProducerEngine {
     }
   }
 
-  private class ProducerQueueItem {
-    String factory;
-    String verb;
+  private class ProducerJob {
+    private String factoryName;
+    private String verb;
 
-    public ProducerQueueItem(String aFactory, String aVerb) {
-      factory = aFactory;
+    public ProducerJob(String aFactory, String aVerb) {
+      factoryName = aFactory;
       verb = aVerb;
     }
 
-    public String getVerb() {
-      return verb;
-    }
+    public void execute() {
+      ProducerFactory factory;
+      Producer producer;
+                 long startTime;
+                 long endTime;
+
+                 startTime = System.currentTimeMillis();
+      log.println("Producing job: "+factoryName+"."+verb);
+
+      try {
+        factory = (ProducerFactory) producers.get(factoryName);
+
+        if (factory!=null) {
+          synchronized(factory) {
+            producer = factory.makeProducer(verb);
+          }
+          if (producer!=null) {
+            producer.produce(log);
+          }
+        }
+      }
+      catch (Throwable t) {
+        log.println("  exception "+t.getMessage());
+        t.printStackTrace(log);
+      }
+      log.println("Done producing job: "+factoryName+"."+verb);
+                 endTime = System.currentTimeMillis();
 
-    public String getFactory() {
-      return factory;
+                 log.println("Time: " + (endTime-startTime) + " ms");
+                 log.flush();
     }
   }
 
-  private class ProducerQueue implements Runnable {
+  private class ProducerJobQueueThread implements Runnable {
     public void run() {
-      while (false) {
+      log.println("starting ProducerJobQueueThread");
+      log.flush();
+
+      while (true) {
+        ProducerJob job = (ProducerJob) producerJobQueue.acquirePendingJob();
+        if (job!=null) {
+          job.execute();
+          producerJobQueue.flagOffJob(job);
+        }
+        else
+        {
+          try {
+            Thread.sleep(1500);
+          }
+          catch (InterruptedException e) {
+          }
+        }
       }
     }
   }
index 1e566d1..ac9854f 100755 (executable)
@@ -6,15 +6,15 @@ import mircoders.global.*;
 public class MirBasicOpenPostingLocalizer implements MirOpenPostingLocalizer {
 
   public void afterContentPosting() {
-    MirGlobal.producerEngine().addTask("content", "new");
-    MirGlobal.producerEngine().addTask("openposting", "new");
-    MirGlobal.producerEngine().addTask("startpage", "all");
+    MirGlobal.producerEngine().addJob("content", "new");
+    MirGlobal.producerEngine().addJob("openposting", "new");
+    MirGlobal.producerEngine().addJob("startpage", "all");
 
-    MirGlobal.producerEngine().addTask("synchronization", "run");
+    MirGlobal.producerEngine().addJob("synchronization", "run");
   }
 
   public void afterCommentPosting() {
-    MirGlobal.producerEngine().addTask("content", "new");
-    MirGlobal.producerEngine().addTask("synchronization", "run");
+    MirGlobal.producerEngine().addJob("content", "new");
+    MirGlobal.producerEngine().addJob("synchronization", "run");
   }
 }
index b30ae48..7c38a9d 100755 (executable)
@@ -47,7 +47,7 @@ public class ServletModuleProducer extends ServletModule
                        String producerParam = req.getParameter("producer");
                        String verbParam = req.getParameter("verb");
 
-                       MirGlobal.producerEngine().produceNow( producerParam, verbParam, out );
+                       MirGlobal.producerEngine().addJob(producerParam, verbParam);
 
 //        ProducerFactory factory = (ProducerFactory) MirGlobal.localizer().producers().factories().get(producerParam);
 //        mir.producer.Producer producer = factory.makeProducer(verbParam);