X-Git-Url: http://erislabs.net/gitweb/?a=blobdiff_plain;f=source%2Fmircoders%2Fglobal%2FJobQueue.java;h=4fafe7fc71ec8827b228156c065916d4e907997e;hb=3beba5fd39a5d60ba5b87064ddf11dea30b78413;hp=22a6be789e3dbd98d9a35630413e823ce83521af;hpb=5a9e646591ea143c885f34e86ef82a2883013a20;p=mir.git diff --git a/source/mircoders/global/JobQueue.java b/source/mircoders/global/JobQueue.java index 22a6be78..4fafe7fc 100755 --- a/source/mircoders/global/JobQueue.java +++ b/source/mircoders/global/JobQueue.java @@ -37,17 +37,22 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Vector; +import java.util.*; + +import mir.log.*; // important: objects passed as data must not be altered once put into a job public class JobQueue { - private Vector jobs; - private Vector finishedJobs; - private Map dataToJob; - private Map identifierToJob; + private Vector jobHandlers; + private Map identifierToJobHandler; private int nrJobs; + private int jobCleanupTreshold; + private JobQueueRunner queueRunner; + private Thread thread; + private LoggerWrapper logger; + public static final int STATUS_CREATED = -1; public static final int STATUS_PENDING = 0; public static final int STATUS_PROCESSING = 1; public static final int STATUS_PROCESSED = 2; @@ -60,143 +65,191 @@ public class JobQueue { public static final int FINISHEDJOBS_LOGSIZE = 10; - public JobQueue() { - finishedJobs = new Vector(); - jobs = new Vector(); - dataToJob = new HashMap(); - identifierToJob = new HashMap(); + public JobQueue(LoggerWrapper aLogger) { + logger = aLogger; + jobHandlers = new Vector(); + identifierToJobHandler = new HashMap(); nrJobs = 0; + jobCleanupTreshold = 1000; + queueRunner = new JobQueueRunner(logger); + thread = new Thread(queueRunner); + thread.start(); } - public String appendJob(Object aData) { - synchronized (jobs) { - Job job = new Job(aData, Integer.toString(nrJobs)); + public String appendJob(Job aJob, String aDescription) { + synchronized (jobHandlers) { + JobHandler jobHandler = new JobHandler(aJob, Integer.toString(nrJobs), aDescription); nrJobs++; - jobs.add(job); - dataToJob.put(aData, job); - identifierToJob.put(job.getIdentifier(), job); - return job.getIdentifier(); + jobHandlers.add(jobHandler); + identifierToJobHandler.put(jobHandler.getIdentifier(), jobHandler); + jobHandler.setPending(); + + return jobHandler.getIdentifier(); } } - public Object acquirePendingJob() { - synchronized (jobs) { - int priorityFound= 0; - Job jobFound; + public List getJobsInfo() { + List result = new Vector(); - do { - jobFound = null; - Iterator i = jobs.iterator(); - while (i.hasNext()) { - Job job = (Job) i.next(); + synchronized (jobHandlers) { + Iterator i = jobHandlers.iterator(); - if (job.isPending() && (jobFound==null || priorityFoundFINISHEDJOBS_LOGSIZE) - finishedJobs.remove(finishedJobs.size()-1); - } + return result; } - public void jobProcessed(Object aData) { - synchronized (jobs) { - Job job = (Job) dataToJob.get(aData); + private void cleanupJobList() { + synchronized (jobHandlers) { + Iterator i = jobHandlers.iterator(); + + Calendar tresholdCalendar = new GregorianCalendar(); + tresholdCalendar.add(Calendar.SECOND, -jobCleanupTreshold); + Date treshold = tresholdCalendar.getTime(); - if (job!=null) { - job.setProcessed(); - finishJob(job); + while (i.hasNext()) { + JobHandler jobHandler = (JobHandler) i.next(); + + synchronized (jobHandler) { + if (jobHandler.isFinished() && jobHandler.getLastChange().before(treshold)) { + jobHandlers.remove(jobHandler); + } + } } } } - public void jobAborted(Object aData) { - synchronized (jobs) { - Job job = (Job) dataToJob.get(aData); + private JobHandler acquirePendingJob() { + synchronized (jobHandlers) { + int priorityFound= 0; + JobHandler jobFound; + + jobFound = null; + Iterator i = jobHandlers.iterator(); + while (i.hasNext()) { + JobHandler job = (JobHandler) i.next(); - if (job!=null) { - job.setAborted(); - finishJob(job); + if (job.isPending() && (jobFound==null || priorityFoundtrue if terminated normally, false if aborted + */ + boolean run(); } - public List makeJobListSnapshot() { - synchronized (jobs) { - return (List) jobs.clone(); + public static class JobInfo { + private String identifier; + private Date lastChange; + private int status; + private long runningTime; + private int priority; + private String description; + + private JobInfo(String aDescription, int aStatus, Date aLastChange, String anIdentifier, long aRunningTime, int aPriority) { + description = aDescription; + lastChange = aLastChange; + status = aStatus; + identifier = anIdentifier; + priority = aPriority; + runningTime = aRunningTime; } - } - public List makeFinishedJobListSnapshot() { - synchronized (jobs) { - return (List) finishedJobs.clone(); + public String getDescription() { + return description; + } + + public int getStatus() { + return status; + } + + public int getPriority() { + return priority; + } + + public Date getLastChange() { + return lastChange; + } + + public String getIdentifier() { + return identifier; + } + + public long getRunningTime() { + return runningTime; } } - public class Job implements Cloneable { - private Object data; + public class JobHandler { + private Job job; + private String identifier; + private String description; + private Date lastChange; private long starttime; private long endtime; - private String identifier; private int status; private int priority; private boolean hasRun; - public Job(Object aData, String anIdentifier, int aStatus, int aPriority, Date aLastChange) { - data = aData; - status = aStatus; + public JobHandler(Job aJob, String anIdentifier, String aDescription, int aPriority) { + job = aJob; + description = aDescription; identifier = anIdentifier; priority = aPriority; - lastChange = aLastChange; - - hasRun = false; + status = STATUS_CREATED; } - public Job(Object aData, String anIdentifier, int aStatus, int aPriority) { - this(aData, anIdentifier, aStatus, aPriority, (new GregorianCalendar()).getTime()); + public JobHandler(Job aJob, String anIdentifier, String aDescription) { + this(aJob, anIdentifier, aDescription, PRIORITY_NORMAL); } - public Job(Object aData, String anIdentifier) { - this(aData, anIdentifier, STATUS_PENDING, PRIORITY_NORMAL); + public JobInfo getJobInfo() { + return new JobInfo(getDescription(), getStatus(), getLastChange(), getIdentifier(), getRunningTime(), priority); } - public Object getData() { - return data; - } + private void runJob() { + if (setProcessing()) { + if (job.run()) + setProcessed(); + else + setAborted(); + } + }; + + private void cancelOrAbortJob() { + synchronized (this) { + if (isPending()) + setCancelled(); + if (isProcessing()) + job.abort(); + } + }; public int getStatus() { synchronized(this) { @@ -204,46 +257,52 @@ public class JobQueue { } } - public Date getLastChange() { - return lastChange; - } - public String getIdentifier() { return identifier; } + public String getDescription() { + return description; + } + public long getRunningTime() { - long result = 0; + synchronized(this) { + long result = 0; - if (hasRun) { - if (isFinished()) - result = endtime; - else - result = System.currentTimeMillis(); + if (hasRun) { + if (isFinished()) + result = endtime; + else + result = System.currentTimeMillis(); - result = result-starttime; - } + result = result - starttime; + } - return result; + return result; + } } public int getPriority() { return priority; } - protected boolean setProcessing() { + private boolean setProcessing() { return setStatus(STATUS_PENDING, STATUS_PROCESSING); } - protected void setProcessed() { + private void setProcessed() { setStatus(STATUS_PROCESSING, STATUS_PROCESSED); } - protected void setAborted() { + private void setAborted() { setStatus(STATUS_PROCESSING, STATUS_ABORTED); } - protected boolean setCancelled() { + private void setPending() { + setStatus(STATUS_CREATED, STATUS_PENDING); + } + + private boolean setCancelled() { return setStatus(STATUS_PENDING, STATUS_CANCELLED); } @@ -271,6 +330,12 @@ public class JobQueue { return getStatus() == STATUS_PENDING; } + public Date getLastChange() { + synchronized (this) { + return lastChange; + } + } + private boolean setStatus(int anOldStatus, int aNewStatus) { synchronized(this) { if (status == anOldStatus) { @@ -291,10 +356,37 @@ public class JobQueue { } } } + } - protected Object clone() { - synchronized(this) { - return new Job(data, identifier, status, priority, lastChange); + private class JobQueueRunner implements Runnable { + private LoggerWrapper logger; + + public JobQueueRunner(LoggerWrapper aLogger) { + logger = aLogger; + } + + public void run() { + logger.debug("starting JobQueueRunner"); + + try { + while (true) { + JobHandler job = acquirePendingJob(); + if (job != null) { + logger.debug(" starting job ("+job.getIdentifier()+"): " +job.getDescription()); + job.runJob(); + logger.debug(" finished job ("+job.getIdentifier()+"): " +job.getDescription()); + } + else { + try { + Thread.sleep(1500); + } + catch (InterruptedException e) { + } + } + } + } + finally { + logger.warn("JobQueueRunner terminated"); } } }