From: zapata Date: Tue, 30 Apr 2002 22:09:10 +0000 (+0000) Subject: implemented a simple producer queue X-Git-Tag: prexmlproducerconfig~90 X-Git-Url: http://erislabs.net/gitweb/?p=mir.git;a=commitdiff_plain;h=47a1f9d32e70436a3a92214296430184fabf6576 implemented a simple producer queue --- diff --git a/source/mircoders/global/JobQueue.java b/source/mircoders/global/JobQueue.java new file mode 100755 index 00000000..2d578888 --- /dev/null +++ b/source/mircoders/global/JobQueue.java @@ -0,0 +1,114 @@ +package mircoders.global; + +import java.util.*; + +// important: objects passed as data must not be altered once put into a job + +public class JobQueue { + private List jobs; + private Map dataToJob; + + public static final int STATUS_PENDING = 0; + public static final int STATUS_PROCESSING = 1; + public static final int STATUS_PROCESSED = 2; + + public JobQueue() { + jobs = new Vector(); + dataToJob = new HashMap(); + } + + public void appendJob(Object aData) { + synchronized (jobs) { + Job job = new Job(aData); + jobs.add(job); + dataToJob.put(aData, job); + } + } + + public Object acquirePendingJob() { + synchronized (jobs) { + Iterator i = jobs.iterator(); + + while (i.hasNext()) { + Job job = (Job) i.next(); + + if (job.setProcessing()) { + return job.getData(); + } + } + } + + return null; + } + + public void flagOffJob(Object aData) { + synchronized (jobs) { + Job job = (Job) dataToJob.get(aData); + + if (job!=null) { + job.setProcessed(); + } + } + } + + public void cleanupJobs() { + synchronized (jobs) { + Iterator i = jobs.iterator(); + + while (i.hasNext()) { + Job job = (Job) i.next(); + + if (job.hasBeenProcessed()) { + i.remove(); + } + } + } + } + + public class Job { + private Object data; + private int status; + private int identifier; + + public Job(Object aData) { + data = aData; + status = STATUS_PENDING; + } + + public Object getData() { + return data; + } + + public int getStatus() { + synchronized(this) { + return status; + } + } + + public boolean setProcessing() { + return setStatus(STATUS_PENDING, STATUS_PROCESSING); + } + + public void setProcessed() { + setStatus(STATUS_PROCESSING, STATUS_PROCESSED); + } + + public boolean hasBeenProcessed() { + return getStatus() == STATUS_PROCESSED; + } + + private boolean setStatus(int anOldStatus, int aNewStatus) { + synchronized(this) { + if (status == anOldStatus) { + status = aNewStatus; + return true; + } + else { + return false; + } + } + } + + } +} + diff --git a/source/mircoders/global/ProducerEngine.java b/source/mircoders/global/ProducerEngine.java index 3083837c..33ebb65a 100755 --- a/source/mircoders/global/ProducerEngine.java +++ b/source/mircoders/global/ProducerEngine.java @@ -7,21 +7,35 @@ import mir.util.*; public class ProducerEngine { private Map producers; - private List Queue; + private JobQueue producerJobQueue; private Thread queueThread; + private PrintWriter log; protected ProducerEngine() { producers = MirGlobal.localizer().producers().factories(); - Queue = new Vector(); + producerJobQueue = new JobQueue(); + try { + log = new PrintWriter(new FileWriter(new File("/tmp/producer.log"))); + } + catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + + queueThread = new Thread(new ProducerJobQueueThread()); + queueThread.start(); + } -// queueThread = new Thread( + public void addJob(String aProducerFactory, String aVerb) { + producerJobQueue.appendJob(new ProducerJob(aProducerFactory, aVerb)); + log.println(aProducerFactory+"."+aVerb+" added to queue"); + log.flush(); } - public void addTask(String aProducerFactory, String aVerb) { - produceNow(aProducerFactory, aVerb, new PrintWriter(new NullWriter())); + public void printQueueStatus(PrintWriter aWriter) { + } - public void produceNow(String aProducerFactory, String aVerb, PrintWriter aLogger) { + private void produceNow(String aProducerFactory, String aVerb, PrintWriter aLogger) { try { long startTime; long endTime; @@ -57,27 +71,67 @@ public class ProducerEngine { } } - private class ProducerQueueItem { - String factory; - String verb; + private class ProducerJob { + private String factoryName; + private String verb; - public ProducerQueueItem(String aFactory, String aVerb) { - factory = aFactory; + public ProducerJob(String aFactory, String aVerb) { + factoryName = aFactory; verb = aVerb; } - public String getVerb() { - return verb; - } + public void execute() { + ProducerFactory factory; + Producer producer; + long startTime; + long endTime; + + startTime = System.currentTimeMillis(); + log.println("Producing job: "+factoryName+"."+verb); + + try { + factory = (ProducerFactory) producers.get(factoryName); + + if (factory!=null) { + synchronized(factory) { + producer = factory.makeProducer(verb); + } + if (producer!=null) { + producer.produce(log); + } + } + } + catch (Throwable t) { + log.println(" exception "+t.getMessage()); + t.printStackTrace(log); + } + log.println("Done producing job: "+factoryName+"."+verb); + endTime = System.currentTimeMillis(); - public String getFactory() { - return factory; + log.println("Time: " + (endTime-startTime) + " ms"); + log.flush(); } } - private class ProducerQueue implements Runnable { + private class ProducerJobQueueThread implements Runnable { public void run() { - while (false) { + log.println("starting ProducerJobQueueThread"); + log.flush(); + + while (true) { + ProducerJob job = (ProducerJob) producerJobQueue.acquirePendingJob(); + if (job!=null) { + job.execute(); + producerJobQueue.flagOffJob(job); + } + else + { + try { + Thread.sleep(1500); + } + catch (InterruptedException e) { + } + } } } } diff --git a/source/mircoders/localizer/basic/MirBasicOpenPostingLocalizer.java b/source/mircoders/localizer/basic/MirBasicOpenPostingLocalizer.java index 1e566d18..ac9854f7 100755 --- a/source/mircoders/localizer/basic/MirBasicOpenPostingLocalizer.java +++ b/source/mircoders/localizer/basic/MirBasicOpenPostingLocalizer.java @@ -6,15 +6,15 @@ import mircoders.global.*; public class MirBasicOpenPostingLocalizer implements MirOpenPostingLocalizer { public void afterContentPosting() { - MirGlobal.producerEngine().addTask("content", "new"); - MirGlobal.producerEngine().addTask("openposting", "new"); - MirGlobal.producerEngine().addTask("startpage", "all"); + MirGlobal.producerEngine().addJob("content", "new"); + MirGlobal.producerEngine().addJob("openposting", "new"); + MirGlobal.producerEngine().addJob("startpage", "all"); - MirGlobal.producerEngine().addTask("synchronization", "run"); + MirGlobal.producerEngine().addJob("synchronization", "run"); } public void afterCommentPosting() { - MirGlobal.producerEngine().addTask("content", "new"); - MirGlobal.producerEngine().addTask("synchronization", "run"); + MirGlobal.producerEngine().addJob("content", "new"); + MirGlobal.producerEngine().addJob("synchronization", "run"); } } diff --git a/source/mircoders/servlet/ServletModuleProducer.java b/source/mircoders/servlet/ServletModuleProducer.java index b30ae48f..7c38a9d0 100755 --- a/source/mircoders/servlet/ServletModuleProducer.java +++ b/source/mircoders/servlet/ServletModuleProducer.java @@ -47,7 +47,7 @@ public class ServletModuleProducer extends ServletModule String producerParam = req.getParameter("producer"); String verbParam = req.getParameter("verb"); - MirGlobal.producerEngine().produceNow( producerParam, verbParam, out ); + MirGlobal.producerEngine().addJob(producerParam, verbParam); // ProducerFactory factory = (ProducerFactory) MirGlobal.localizer().producers().factories().get(producerParam); // mir.producer.Producer producer = factory.makeProducer(verbParam);