99ffed7730b960fdaf755aa9831db7556566c324
[mir.git] / source / mircoders / global / JobQueue.java
1 /*
2  * Copyright (C) 2001, 2002 The Mir-coders group
3  *
4  * This file is part of Mir.
5  *
6  * Mir is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * Mir is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with Mir; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  * In addition, as a special exception, The Mir-coders gives permission to link
21  * the code of this program with  any library licensed under the Apache Software License,
22  * The Sun (tm) Java Advanced Imaging library (JAI), The Sun JIMI library
23  * (or with modified versions of the above that use the same license as the above),
24  * and distribute linked combinations including the two.  You must obey the
25  * GNU General Public License in all respects for all of the code used other than
26  * the above mentioned libraries.  If you modify this file, you may extend this
27  * exception to your version of the file, but you are not obligated to do so.
28  * If you do not wish to do so, delete this exception statement from your version.
29  */
30
31 package mircoders.global;
32
33
34 import java.util.ArrayList;
35 import java.util.Calendar;
36 import java.util.Date;
37 import java.util.GregorianCalendar;
38 import java.util.HashMap;
39 import java.util.Iterator;
40 import java.util.List;
41 import java.util.Map;
42
43 import mir.log.LoggerWrapper;
44
45 // important: objects passed as data must not be altered once put into a job
46
47 public class JobQueue {
48   private List jobHandlers;
49   private Map identifierToJobHandler;
50   private int nrJobs;
51   private int jobCleanupTreshold;
52   private JobQueueRunner queueRunner;
53   private Thread thread;
54   private LoggerWrapper logger;
55   private long lastCleanup;
56
57   public static final int STATUS_CREATED = -1;
58   public static final int STATUS_PENDING = 0;
59   public static final int STATUS_PROCESSING = 1;
60   public static final int STATUS_PROCESSED = 2;
61   public static final int STATUS_CANCELLED = 3;
62   public static final int STATUS_ABORTED = 4;
63
64   public static final int PRIORITY_NORMAL = 100;
65   public static final int PRIORITY_LOW = 10;
66   public static final int PRIORITY_HIGH = 1000;
67
68   public static final int FINISHEDJOBS_LOGSIZE = 10;
69
70   public JobQueue(LoggerWrapper aLogger) {
71     logger = aLogger;
72     jobHandlers = new ArrayList();
73     identifierToJobHandler = new HashMap();
74     nrJobs = 0;
75     lastCleanup = 0;
76     jobCleanupTreshold = 900; // seconds
77     queueRunner = new JobQueueRunner(logger);
78     thread = new Thread(queueRunner, "JobQueue");
79     thread.setDaemon(true);
80     thread.start();
81   }
82
83   public String appendJob(Job aJob, String aDescription) {
84     try {
85       if (System.currentTimeMillis() - lastCleanup > 60000) {
86         cleanupJobList();
87       }
88     }
89     catch (Throwable t) {
90       logger.error("error while cleaning up joblist: " + t.toString());
91     }
92
93     synchronized (jobHandlers) {
94       JobHandler jobHandler = new JobHandler(aJob, Integer.toString(nrJobs), aDescription);
95       nrJobs++;
96
97       jobHandlers.add(jobHandler);
98       identifierToJobHandler.put(jobHandler.getIdentifier(), jobHandler);
99
100       jobHandler.setPending();
101
102       jobHandlers.notify();
103
104       return jobHandler.getIdentifier();
105     }
106   }
107
108   public List getJobsInfo() {
109     if (System.currentTimeMillis() - lastCleanup > 60000) {
110       cleanupJobList();
111     }
112
113     List result = new ArrayList();
114
115     synchronized (jobHandlers) {
116       Iterator i = jobHandlers.iterator();
117
118       while (i.hasNext()) {
119         result.add(0, ((JobHandler) i.next()).getJobInfo());
120       }
121     }
122
123     return result;
124   }
125
126   private void cleanupJobList() {
127     List toRemove = new ArrayList();
128
129     synchronized (jobHandlers) {
130       Iterator i = jobHandlers.iterator();
131
132       Calendar tresholdCalendar = new GregorianCalendar();
133       tresholdCalendar.add(Calendar.SECOND, -jobCleanupTreshold);
134       Date treshold = tresholdCalendar.getTime();
135
136       while (i.hasNext()) {
137         JobHandler jobHandler = (JobHandler) i.next();
138
139         synchronized (jobHandler) {
140           if (jobHandler.isFinished() && jobHandler.getLastChange().before(treshold)) {
141             toRemove.add(jobHandler);
142             identifierToJobHandler.remove(jobHandler.getIdentifier());
143           }
144         }
145       }
146
147       jobHandlers.removeAll(toRemove);
148     }
149
150     lastCleanup = System.currentTimeMillis();
151   }
152
153   /**
154    * Returns when a new producer job has become available
155    */
156   private JobHandler acquirePendingJob() throws InterruptedException {
157     int priorityFound= 0;
158     JobHandler jobFound;
159
160     synchronized (jobHandlers) {
161       do {
162         jobFound = null;
163         Iterator i = jobHandlers.iterator();
164         while (i.hasNext()) {
165           JobHandler job = (JobHandler) i.next();
166
167           if (job.isPending() && (jobFound==null || priorityFound<job.getPriority())) {
168             jobFound = job;
169             priorityFound = job.getPriority();
170           }
171         }
172
173         if (jobFound==null) {
174           jobHandlers.wait();
175         }
176
177       } while (jobFound==null);
178
179       return jobFound;
180     }
181   }
182
183   public void cancelJobs(List aJobs) {
184     synchronized (jobHandlers) {
185       Iterator i = aJobs.iterator();
186
187       while (i.hasNext()) {
188         ((JobHandler) identifierToJobHandler.get(i.next())).cancelOrAbortJob();
189       }
190     }
191   }
192
193   public void cancelAllJobs() {
194     synchronized (jobHandlers) {
195       Iterator i = jobHandlers.iterator();
196
197       while (i.hasNext()) {
198         ((JobHandler) i.next()).cancelOrAbortJob();
199       }
200     }
201   }
202
203   public interface Job {
204     /**
205      * This method should cause the run() method to terminate as soon as
206      * possible.
207      */
208     void abort();
209
210     /**
211      * This method should perform the actions associated with the job.
212      * @return <code>true</code> if terminated normally, <code>false</code> if aborted
213      */
214     boolean run();
215   }
216
217   public static class JobInfo {
218     private String identifier;
219     private Date lastChange;
220     private int status;
221     private long runningTime;
222     private int priority;
223     private String description;
224
225     private JobInfo(String aDescription, int aStatus, Date aLastChange, String anIdentifier, long aRunningTime, int aPriority) {
226       description = aDescription;
227       lastChange = aLastChange;
228       status = aStatus;
229       identifier = anIdentifier;
230       priority = aPriority;
231       runningTime = aRunningTime;
232     }
233
234     public String getDescription() {
235       return description;
236     }
237
238     public int getStatus() {
239       return status;
240     }
241
242     public int getPriority() {
243       return priority;
244     }
245
246     public Date getLastChange() {
247       return lastChange;
248     }
249
250     public String getIdentifier() {
251       return identifier;
252     }
253
254     public long getRunningTime() {
255       return runningTime;
256     }
257   }
258
259   public class JobHandler {
260     private Job job;
261     private String identifier;
262     private String description;
263
264     private Date lastChange;
265     private long starttime;
266     private long endtime;
267     private int status;
268     private int priority;
269     private boolean hasRun;
270
271     public JobHandler(Job aJob, String anIdentifier, String aDescription, int aPriority) {
272       job = aJob;
273       description = aDescription;
274       identifier = anIdentifier;
275       priority = aPriority;
276       status = STATUS_CREATED;
277     }
278
279     public JobHandler(Job aJob, String anIdentifier, String aDescription) {
280       this(aJob, anIdentifier, aDescription, PRIORITY_NORMAL);
281     }
282
283     public JobInfo getJobInfo() {
284       return new JobInfo(getDescription(), getStatus(), getLastChange(), getIdentifier(), getRunningTime(), priority);
285     }
286
287     private void runJob() {
288       if (setProcessing()) {
289         if (job.run())
290           setProcessed();
291         else
292           setAborted();
293       }
294     }
295
296     private void cancelOrAbortJob() {
297       synchronized (this) {
298         if (isPending())
299           setCancelled();
300         if (isProcessing())
301           job.abort();
302       }
303     }
304
305     public int getStatus() {
306       synchronized(this) {
307         return status;
308       }
309     }
310
311     public String getIdentifier() {
312       return identifier;
313     }
314
315     public String getDescription() {
316       return description;
317     }
318
319     public long getRunningTime() {
320       synchronized(this) {
321         long result = 0;
322
323         if (hasRun) {
324           if (isFinished())
325             result = endtime;
326           else
327             result = System.currentTimeMillis();
328
329           result = result - starttime;
330         }
331
332         return result;
333       }
334     }
335
336     public int getPriority() {
337       return priority;
338     }
339
340     private boolean setProcessing() {
341       return setStatus(STATUS_PENDING, STATUS_PROCESSING);
342     }
343
344     private void setProcessed() {
345       setStatus(STATUS_PROCESSING, STATUS_PROCESSED);
346     }
347
348     private void setAborted() {
349       setStatus(STATUS_PROCESSING, STATUS_ABORTED);
350     }
351
352     private void setPending() {
353       setStatus(STATUS_CREATED, STATUS_PENDING);
354     }
355
356     private boolean setCancelled() {
357       return setStatus(STATUS_PENDING, STATUS_CANCELLED);
358     }
359
360     public boolean hasBeenProcessed() {
361       return getStatus() == STATUS_PROCESSED;
362     }
363
364     public boolean hasBeenAborted() {
365       return getStatus() == STATUS_ABORTED;
366     }
367
368     public boolean isCancelled() {
369       return getStatus() == STATUS_CANCELLED;
370     }
371
372     public boolean isFinished() {
373       return hasBeenProcessed() || hasBeenAborted() || isCancelled();
374     }
375
376     public boolean isProcessing() {
377       return getStatus() == STATUS_PROCESSING;
378     }
379
380     public boolean isPending() {
381       return getStatus() == STATUS_PENDING;
382     }
383
384     public Date getLastChange() {
385       synchronized (this) {
386         return lastChange;
387       }
388     }
389
390     private boolean setStatus(int anOldStatus, int aNewStatus) {
391       synchronized(this) {
392         if (status == anOldStatus) {
393           status = aNewStatus;
394           lastChange = (new GregorianCalendar()).getTime();
395           if (isProcessing()) {
396             starttime = System.currentTimeMillis();
397             hasRun = true;
398           }
399
400           if (isFinished()) {
401             endtime = System.currentTimeMillis();
402           }
403           return true;
404         }
405                                 return false;
406       }
407     }
408   }
409
410   private class JobQueueRunner implements Runnable {
411     
412     public JobQueueRunner(LoggerWrapper aLogger) {
413       logger = aLogger;
414     }
415
416     public void run() {
417       logger.debug("starting JobQueueRunner");
418
419       try {
420         while (true) {
421           try {
422             JobHandler job = acquirePendingJob();
423
424             logger.debug("  starting job ("+job.getIdentifier()+"): " +job.getDescription());
425             job.runJob();
426             logger.debug("  finished job ("+job.getIdentifier()+"): " +job.getDescription());
427
428             job=null;
429           }
430           catch (InterruptedException e) {
431           }
432         }
433       }
434       finally {
435         logger.warn("JobQueueRunner terminated");
436       }
437     }
438   }
439 }
440