View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.jetspeed.scheduler;
18  
19  import java.util.List;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  
24  /***
25   * Service for a cron like scheduler.
26   *
27   * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
28   * @version $Id: AbstractScheduler.java 516448 2007-03-09 16:25:47Z ate $
29   */
30  public abstract class AbstractScheduler implements Scheduler
31  {
32      private final static Log log = LogFactory.getLog(MemoryBasedScheduler.class);
33      
34      /***
35       * The queue.
36       */
37      protected JobQueue scheduleQueue = null;
38  
39      /***
40       * The main loop for starting jobs.
41       */
42      protected MainLoop mainLoop;
43  
44      /***
45       * The thread used to process commands.
46       */
47      protected Thread thread;
48  
49      /***
50       * Creates a new instance.
51       */
52      public AbstractScheduler()
53      {
54          mainLoop = null;
55          thread = null;
56      }
57  
58      public void start()
59      {
60      }
61      
62      public void stop()
63      {
64          if(getThread() != null)
65          {
66              getThread().interrupt();
67          }
68      }
69  
70      /***
71       * Get a specific Job from Storage.
72       *
73       * @param oid The int id for the job.
74       * @return A JobEntry.
75       * @exception Exception, a generic exception.
76       */
77      public abstract JobEntry getJob(int oid)
78          throws Exception;
79  
80      /***
81       * Add a new job to the queue.  Before adding a job, calculate the runtime 
82       * to make sure the entry will be placed at the right order in the queue.
83       *
84       * @param je A JobEntry with the job to add.
85       * @exception Exception, a generic exception.
86       */
87      public abstract void addJob(JobEntry je)
88          throws Exception; 
89          
90      /***
91       * Remove a job from the queue.
92       *
93       * @param je A JobEntry with the job to remove.
94       * @exception Exception, a generic exception.
95       */
96      public abstract void removeJob(JobEntry je)
97          throws Exception;
98  
99      /***
100      * Modify a Job.
101      *
102      * @param je A JobEntry with the job to modify
103      * @exception Exception, a generic exception.
104      */
105     public abstract void updateJob(JobEntry je)
106         throws Exception;
107         
108     /***
109      * List jobs in the queue.  This is used by the scheduler UI.
110      *
111      * @return A List of jobs.
112      */
113     public List listJobs()
114     {
115         return scheduleQueue.list();
116     }
117 
118     /***
119      * Return the thread being used to process commands, or null if
120      * there is no such thread.  You can use this to invoke any
121      * special methods on the thread, for example, to interrupt it.
122      *
123      * @return A Thread.
124      */
125     public synchronized Thread getThread()
126     {
127         return thread;
128     }
129 
130     /***
131      * Set thread to null to indicate termination.
132      */
133     private synchronized void clearThread()
134     {
135         thread = null;
136     }
137 
138     /***
139      * Start (or restart) a thread to process commands, or wake up an
140      * existing thread if one is already running.  This method can be
141      * invoked if the background thread crashed due to an
142      * unrecoverable exception in an executed command.
143      */
144     public synchronized void restart()
145     {
146         if (thread == null)
147         {
148             // Create the the housekeeping thread of the scheduler. It will wait
149             // for the time when the next task needs to be started, and then
150             // launch a worker thread to execute the task.
151             thread = new Thread(mainLoop, Scheduler.SERVICE_NAME);
152             // Indicate that this is a system thread. JVM will quit only when there
153             // are no more active user threads. Settings threads spawned internally
154             // by CPS as daemons allows commandline applications 
155             // to terminate in an orderly manner.
156             thread.setDaemon(true);
157             thread.start();
158         }
159         else
160         {
161             notify();
162         }
163     }
164 
165     /***
166      *  Return the next Job to execute, or null if thread is
167      *  interrupted.
168      *
169      * @return A JobEntry.
170      * @exception Exception, a generic exception.
171      */
172     private synchronized JobEntry nextJob()
173         throws Exception
174     {
175         try
176         {
177             while ( !Thread.interrupted() )
178             {
179                 // Grab the next job off the queue.
180                 JobEntry je = scheduleQueue.getNext();
181 
182                 if (je == null)
183                 {
184                     // Queue must be empty. Wait on it.
185                     wait();
186                 }
187                 else
188                 {
189                     long now = System.currentTimeMillis();
190                     long when = je.getNextRuntime();
191 
192                     if ( when > now )
193                     {
194                         // Wait till next runtime.
195                         wait(when - now);
196                     }
197                     else
198                     {
199                         // Update the next runtime for the job.
200                         scheduleQueue.updateQueue(je);
201                         // Return the job to run it.
202                         return je;
203                     }
204                 }
205             }
206         }
207         catch (InterruptedException ex)
208         {
209         }
210 
211         // On interrupt.
212         return null;
213     }
214 
215     /***
216      * Inner class.  This is isolated in its own Runnable class just
217      * so that the main class need not implement Runnable, which would
218      * allow others to directly invoke run, which is not supported.
219      */
220     protected class MainLoop
221         implements Runnable
222     {
223         /***
224          * Method to run the class.
225          */
226         public void run()
227         {
228             try
229             {
230                 for(;;)
231                 {
232                     JobEntry je = nextJob();
233                     if ( je != null )
234                     {
235                         // Start the thread to run the job.
236                         Runnable wt = new WorkerThread(je);
237                         Thread helper = new Thread(wt);
238                         helper.start();
239                     }
240                     else
241                     {
242                         break;
243                     }
244                 }
245             }
246             catch(Exception e)
247             {
248                 // Log error.
249                 log.error("Error running a Scheduled Job: " + e);
250             }
251             finally
252             {
253                 clearThread();
254             }
255         }
256     }
257 }