--- /dev/null
+package mircoders.global;
+
+import java.util.*;
+
+// important: objects passed as data must not be altered once put into a job
+
+public class JobQueue {
+ private List jobs;
+ private Map dataToJob;
+
+ public static final int STATUS_PENDING = 0;
+ public static final int STATUS_PROCESSING = 1;
+ public static final int STATUS_PROCESSED = 2;
+
+ public JobQueue() {
+ jobs = new Vector();
+ dataToJob = new HashMap();
+ }
+
+ public void appendJob(Object aData) {
+ synchronized (jobs) {
+ Job job = new Job(aData);
+ jobs.add(job);
+ dataToJob.put(aData, job);
+ }
+ }
+
+ public Object acquirePendingJob() {
+ synchronized (jobs) {
+ Iterator i = jobs.iterator();
+
+ while (i.hasNext()) {
+ Job job = (Job) i.next();
+
+ if (job.setProcessing()) {
+ return job.getData();
+ }
+ }
+ }
+
+ return null;
+ }
+
+ public void flagOffJob(Object aData) {
+ synchronized (jobs) {
+ Job job = (Job) dataToJob.get(aData);
+
+ if (job!=null) {
+ job.setProcessed();
+ }
+ }
+ }
+
+ public void cleanupJobs() {
+ synchronized (jobs) {
+ Iterator i = jobs.iterator();
+
+ while (i.hasNext()) {
+ Job job = (Job) i.next();
+
+ if (job.hasBeenProcessed()) {
+ i.remove();
+ }
+ }
+ }
+ }
+
+ public class Job {
+ private Object data;
+ private int status;
+ private int identifier;
+
+ public Job(Object aData) {
+ data = aData;
+ status = STATUS_PENDING;
+ }
+
+ public Object getData() {
+ return data;
+ }
+
+ public int getStatus() {
+ synchronized(this) {
+ return status;
+ }
+ }
+
+ public boolean setProcessing() {
+ return setStatus(STATUS_PENDING, STATUS_PROCESSING);
+ }
+
+ public void setProcessed() {
+ setStatus(STATUS_PROCESSING, STATUS_PROCESSED);
+ }
+
+ public boolean hasBeenProcessed() {
+ return getStatus() == STATUS_PROCESSED;
+ }
+
+ private boolean setStatus(int anOldStatus, int aNewStatus) {
+ synchronized(this) {
+ if (status == anOldStatus) {
+ status = aNewStatus;
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+ }
+
+ }
+}
+
public class ProducerEngine {
private Map producers;
- private List Queue;
+ private JobQueue producerJobQueue;
private Thread queueThread;
+ private PrintWriter log;
protected ProducerEngine() {
producers = MirGlobal.localizer().producers().factories();
- Queue = new Vector();
+ producerJobQueue = new JobQueue();
+ try {
+ log = new PrintWriter(new FileWriter(new File("/tmp/producer.log")));
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e.getMessage());
+ }
+
+ queueThread = new Thread(new ProducerJobQueueThread());
+ queueThread.start();
+ }
-// queueThread = new Thread(
+ public void addJob(String aProducerFactory, String aVerb) {
+ producerJobQueue.appendJob(new ProducerJob(aProducerFactory, aVerb));
+ log.println(aProducerFactory+"."+aVerb+" added to queue");
+ log.flush();
}
- public void addTask(String aProducerFactory, String aVerb) {
- produceNow(aProducerFactory, aVerb, new PrintWriter(new NullWriter()));
+ public void printQueueStatus(PrintWriter aWriter) {
+
}
- public void produceNow(String aProducerFactory, String aVerb, PrintWriter aLogger) {
+ private void produceNow(String aProducerFactory, String aVerb, PrintWriter aLogger) {
try {
long startTime;
long endTime;
}
}
- private class ProducerQueueItem {
- String factory;
- String verb;
+ private class ProducerJob {
+ private String factoryName;
+ private String verb;
- public ProducerQueueItem(String aFactory, String aVerb) {
- factory = aFactory;
+ public ProducerJob(String aFactory, String aVerb) {
+ factoryName = aFactory;
verb = aVerb;
}
- public String getVerb() {
- return verb;
- }
+ public void execute() {
+ ProducerFactory factory;
+ Producer producer;
+ long startTime;
+ long endTime;
+
+ startTime = System.currentTimeMillis();
+ log.println("Producing job: "+factoryName+"."+verb);
+
+ try {
+ factory = (ProducerFactory) producers.get(factoryName);
+
+ if (factory!=null) {
+ synchronized(factory) {
+ producer = factory.makeProducer(verb);
+ }
+ if (producer!=null) {
+ producer.produce(log);
+ }
+ }
+ }
+ catch (Throwable t) {
+ log.println(" exception "+t.getMessage());
+ t.printStackTrace(log);
+ }
+ log.println("Done producing job: "+factoryName+"."+verb);
+ endTime = System.currentTimeMillis();
- public String getFactory() {
- return factory;
+ log.println("Time: " + (endTime-startTime) + " ms");
+ log.flush();
}
}
- private class ProducerQueue implements Runnable {
+ private class ProducerJobQueueThread implements Runnable {
public void run() {
- while (false) {
+ log.println("starting ProducerJobQueueThread");
+ log.flush();
+
+ while (true) {
+ ProducerJob job = (ProducerJob) producerJobQueue.acquirePendingJob();
+ if (job!=null) {
+ job.execute();
+ producerJobQueue.flagOffJob(job);
+ }
+ else
+ {
+ try {
+ Thread.sleep(1500);
+ }
+ catch (InterruptedException e) {
+ }
+ }
}
}
}
public class MirBasicOpenPostingLocalizer implements MirOpenPostingLocalizer {
public void afterContentPosting() {
- MirGlobal.producerEngine().addTask("content", "new");
- MirGlobal.producerEngine().addTask("openposting", "new");
- MirGlobal.producerEngine().addTask("startpage", "all");
+ MirGlobal.producerEngine().addJob("content", "new");
+ MirGlobal.producerEngine().addJob("openposting", "new");
+ MirGlobal.producerEngine().addJob("startpage", "all");
- MirGlobal.producerEngine().addTask("synchronization", "run");
+ MirGlobal.producerEngine().addJob("synchronization", "run");
}
public void afterCommentPosting() {
- MirGlobal.producerEngine().addTask("content", "new");
- MirGlobal.producerEngine().addTask("synchronization", "run");
+ MirGlobal.producerEngine().addJob("content", "new");
+ MirGlobal.producerEngine().addJob("synchronization", "run");
}
}
String producerParam = req.getParameter("producer");
String verbParam = req.getParameter("verb");
- MirGlobal.producerEngine().produceNow( producerParam, verbParam, out );
+ MirGlobal.producerEngine().addJob(producerParam, verbParam);
// ProducerFactory factory = (ProducerFactory) MirGlobal.localizer().producers().factories().get(producerParam);
// mir.producer.Producer producer = factory.makeProducer(verbParam);