private PrintWriter log;
protected ProducerEngine() {
-// producers = MirGlobal.localizer().producers().factories();
producerJobQueue = new JobQueue();
try {
RandomAccessFile raFile = (new RandomAccessFile(MirGlobal.getConfigProperty("Home") + "/" + MirGlobal.getConfigProperty("Producer.Logfile"), "rw"));
log = new PrintWriter(new FileWriter( raFile.getFD()));
}
catch (Exception e) {
-// throw new ProducerEngineRuntimeExc("Creating PrintWriter log failed",e);
log = new PrintWriter(new NullWriter());
}
queueThread = new Thread(new ProducerJobQueueThread());
}
public void addJob(String aProducerFactory, String aVerb) {
-// ML: TODO: should check if a similar job is already pending
producerJobQueue.appendJob(new ProducerJob(aProducerFactory, aVerb));
log.println(aProducerFactory+"."+aVerb+" added to queue");
log.flush();
}
- public void printQueueStatus(PrintWriter aWriter) {
- Iterator iterator = producerJobQueue.makeJobListSnapshot().iterator();
- producerJobQueue.cleanupJobs();
- JobQueue.Job job;
-
- while (iterator.hasNext()) {
- job = (JobQueue.Job) iterator.next();
- ProducerJob producerJob = (ProducerJob) job.getData();
+ public void addTask(ProducerTask aTask) {
+ addJob(aTask.getProducer(), aTask.getVerb());
+ }
- aWriter.print(producerJob.factoryName + "." + producerJob.verb);
- if (job.hasBeenProcessed())
- aWriter.print(" processed");
- else if (job.isProcessing())
- aWriter.print(" processing");
- else if (job.isPending())
- aWriter.print(" pending");
- else
- aWriter.print(" unknown status");
+ public void addTasks(List aTasks) {
+ Iterator i = aTasks.iterator();
- aWriter.println("<br/>");
+ while (i.hasNext()) {
+ addTask((ProducerTask) i.next());
}
}
- private void produceNow(String aProducerFactory, String aVerb, PrintWriter aLogger) {
- try {
- long startTime;
- long endTime;
- Map startingMap = new HashMap();
+ private String convertStatus(JobQueue.Job aJob) {
+ if (aJob.hasBeenProcessed())
+ return "processed";
+ if (aJob.isProcessing())
+ return "processing";
+ if (aJob.isPending())
+ return "pending";
+ if (aJob.isCancelled())
+ return "cancelled";
+ if (aJob.hasBeenAborted())
+ return "aborted";
- startTime = System.currentTimeMillis();
+ return "unknown";
+ }
- aLogger.println("Producing (" + aProducerFactory + "," + aVerb + ")");
+ private Map convertJob(JobQueue.Job aJob) {
+ Map result = new HashMap();
+ ProducerJob producerJob = (ProducerJob) aJob.getData();
- ProducerFactory factory = (ProducerFactory) MirGlobal.localizer().producers().factories().get(aProducerFactory);
+ result.put("identifier", aJob.getIdentifier());
+ result.put("factory", producerJob.getFactoryName());
+ result.put("verb", producerJob.getVerb());
+ result.put("priority", new Integer(aJob.getPriority()));
+ result.put("status", convertStatus(aJob));
+ result.put("lastchange", new DateToMapAdapter(aJob.getLastChange()));
- if (factory == null )
- throw new Exception("No producer factory '"+aProducerFactory+"' present.");
+ return result;
+ }
- MirGlobal.localizer().producerAssistant().initializeGenerationValueSet(startingMap);
- Producer producer = factory.makeProducer(aVerb, startingMap);
+ private void convertJobList(List aSourceJobList, List aDestination) {
+ Iterator i = aSourceJobList.iterator();
- producer.produce(aLogger);
+ while (i.hasNext())
+ aDestination.add(convertJob((JobQueue.Job) i.next()));
+ }
- endTime = System.currentTimeMillis();
+ public List getQueueStatus() {
+ List result = new Vector();
+ List pendingJobs = new Vector();
+ List finishedJobs = new Vector();
- aLogger.println("Time: " + (endTime-startTime) + " ms<br>");
- }
- catch (Throwable e) {
- try {
- aLogger.println("exception occurred:<br>");
- aLogger.println(e.getMessage());
- e.printStackTrace(aLogger);
- }
- catch (Throwable f) {
- }
- }
+ producerJobQueue.makeJobListSnapshots(pendingJobs, finishedJobs);
+
+ convertJobList(pendingJobs, result);
+ convertJobList(finishedJobs, result);
+
+ return result;
}
- private class ProducerJob {
+private class ProducerJob {
private String factoryName;
private String verb;
+ private Producer producer;
public ProducerJob(String aFactory, String aVerb) {
factoryName = aFactory;
verb = aVerb;
+ producer=null;
+ }
+
+ public String getFactoryName() {
+ return factoryName;
+ }
+
+ public String getVerb() {
+ return verb;
+ }
+
+ public void abort() {
+ if (producer!=null) {
+ producer.abort();
+ }
}
public void execute() {
ProducerFactory factory;
- Producer producer;
long startTime;
long endTime;
Map startingMap = new HashMap();
log.println("Producing job: "+factoryName+"."+verb);
try {
- factory = (ProducerFactory) MirGlobal.localizer().producers().factories().get( factoryName );
+ factory = MirGlobal.localizer().producers().getFactoryForName( factoryName );
if (factory!=null) {
MirGlobal.localizer().producerAssistant().initializeGenerationValueSet(startingMap);
t.printStackTrace(log);
}
log.println("Done producing job: "+factoryName+"."+verb);
- endTime = System.currentTimeMillis();
+ endTime = System.currentTimeMillis();
+ log.println("Time: " + (endTime-startTime) + " ms");
+ log.flush();
+ }
- log.println("Time: " + (endTime-startTime) + " ms");
- log.flush();
+ boolean isAborted() {
+ return false;
}
}
ProducerJob job = (ProducerJob) producerJobQueue.acquirePendingJob();
if (job!=null) {
job.execute();
- producerJobQueue.flagOffJob(job);
+ if (job.isAborted())
+ producerJobQueue.jobAborted(job);
+ else
+ producerJobQueue.jobProcessed(job);
}
else
{
}
}
+ public static class ProducerEngineExc extends Exc {
+ public ProducerEngineExc(String aMessage) {
+ super(aMessage);
+ }
+ }
public static class ProducerEngineRuntimeExc extends Failure {
public ProducerEngineRuntimeExc(String msg, Exception cause){
}
}
+ public static class ProducerTask {
+ private String producer;
+ private String verb;
+
+ public ProducerTask(String aProducer, String aVerb) {
+ producer = aProducer;
+ verb = aVerb;
+ }
+
+ public String getVerb() {
+ return verb;
+ }
+
+ public String getProducer() {
+ return producer;
+ }
+
+ public static List parseProducerTaskList(String aList) throws ProducerEngineExc {
+ Iterator i;
+ List result = new Vector();
+
+ i = StringRoutines.splitString(aList, ";").iterator();
+ while (i.hasNext()) {
+ String taskExpression = (String) i.next();
+ List parts = StringRoutines.splitString(taskExpression, ".");
+
+ if (parts.size()!=2)
+ throw new ProducerEngineExc("Invalid producer expression: '" + taskExpression + "'");
+ else
+ result.add(new ProducerEngine.ProducerTask((String) parts.get(0), (String) parts.get(1)));
+ }
+
+ return result;
+ }
+ }
}
\ No newline at end of file