import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Vector;
+import java.util.ArrayList;
import mir.log.LoggerWrapper;
// important: objects passed as data must not be altered once put into a job
public class JobQueue {
- private Vector jobHandlers;
+ private List jobHandlers;
private Map identifierToJobHandler;
private int nrJobs;
private int jobCleanupTreshold;
public JobQueue(LoggerWrapper aLogger) {
logger = aLogger;
- jobHandlers = new Vector();
+ jobHandlers = new ArrayList();
identifierToJobHandler = new HashMap();
nrJobs = 0;
lastCleanup = 0;
jobCleanupTreshold = 900; // seconds
queueRunner = new JobQueueRunner(logger);
thread = new Thread(queueRunner);
+ thread.setDaemon(true);
thread.start();
}
identifierToJobHandler.put(jobHandler.getIdentifier(), jobHandler);
jobHandler.setPending();
+ jobHandlers.notify();
+
return jobHandler.getIdentifier();
}
}
public List getJobsInfo() {
- List result = new Vector();
+ List result = new ArrayList();
synchronized (jobHandlers) {
Iterator i = jobHandlers.iterator();
}
private void cleanupJobList() {
- List toRemove = new Vector();
+ List toRemove = new ArrayList();
+
synchronized (jobHandlers) {
Iterator i = jobHandlers.iterator();
lastCleanup = System.currentTimeMillis();
}
- private JobHandler acquirePendingJob() {
- synchronized (jobHandlers) {
- int priorityFound= 0;
- JobHandler jobFound;
+ /**
+ * Returns when a new producer job has become available
+ */
+ private JobHandler acquirePendingJob() throws InterruptedException {
+ int priorityFound= 0;
+ JobHandler jobFound;
- jobFound = null;
- Iterator i = jobHandlers.iterator();
- while (i.hasNext()) {
- JobHandler job = (JobHandler) i.next();
+ synchronized (jobHandlers) {
+ do {
+ jobFound = null;
+ Iterator i = jobHandlers.iterator();
+ while (i.hasNext()) {
+ JobHandler job = (JobHandler) i.next();
+
+ if (job.isPending() && (jobFound==null || priorityFound<job.getPriority())) {
+ jobFound = job;
+ priorityFound = job.getPriority();
+ }
+ }
- if (job.isPending() && (jobFound==null || priorityFound<job.getPriority())) {
- jobFound = job;
- priorityFound = job.getPriority();
+ if (jobFound==null) {
+ jobHandlers.wait();
}
- }
+
+ } while (jobFound==null);
return jobFound;
}
}
}
+ public void cancelAllJobs() {
+ synchronized (jobHandlers) {
+ Iterator i = jobHandlers.iterator();
+
+ while (i.hasNext()) {
+ ((JobHandler) i.next()).cancelOrAbortJob();
+ }
+ }
+ }
+
public interface Job {
+ /**
+ * This method should cause the run() method to terminate as soon as
+ * possible.
+ */
void abort();
/**
- *
- *
+ * This method should perform the actions associated with the job.
* @return <code>true</code> if terminated normally, <code>false</code> if aborted
*/
boolean run();
try {
while (true) {
- JobHandler job = acquirePendingJob();
- if (job != null) {
+ try {
+ JobHandler job = acquirePendingJob();
+
logger.debug(" starting job ("+job.getIdentifier()+"): " +job.getDescription());
job.runJob();
logger.debug(" finished job ("+job.getIdentifier()+"): " +job.getDescription());
+
+ job=null;
}
- else {
- try {
- Thread.sleep(1500);
- }
- catch (InterruptedException e) {
- }
+ catch (InterruptedException e) {
}
}
}