Coverage Report - org.apache.turbine.services.schedule.AbstractSchedulerService
 
Classes in this File Line Coverage Branch Coverage Complexity
AbstractSchedulerService
95%
58/61
78%
11/14
1,737
AbstractSchedulerService$MainLoop
81%
13/16
75%
3/4
1,737
 
 1  
 package org.apache.turbine.services.schedule;
 2  
 
 3  
 /*
 4  
  * Licensed to the Apache Software Foundation (ASF) under one
 5  
  * or more contributor license agreements.  See the NOTICE file
 6  
  * distributed with this work for additional information
 7  
  * regarding copyright ownership.  The ASF licenses this file
 8  
  * to you under the Apache License, Version 2.0 (the
 9  
  * "License"); you may not use this file except in compliance
 10  
  * with the License.  You may obtain a copy of the License at
 11  
  *
 12  
  *   http://www.apache.org/licenses/LICENSE-2.0
 13  
  *
 14  
  * Unless required by applicable law or agreed to in writing,
 15  
  * software distributed under the License is distributed on an
 16  
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 17  
  * KIND, either express or implied.  See the License for the
 18  
  * specific language governing permissions and limitations
 19  
  * under the License.
 20  
  */
 21  
 
 22  
 import java.util.List;
 23  
 
 24  
 import org.apache.logging.log4j.LogManager;
 25  
 import org.apache.logging.log4j.Logger;
 26  
 import org.apache.turbine.services.InitializationException;
 27  
 import org.apache.turbine.services.TurbineBaseService;
 28  
 import org.apache.turbine.util.TurbineException;
 29  
 
 30  
 /**
 31  
  * Service for a cron like scheduler.
 32  
  *
 33  
  * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
 34  
  * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
 35  
  * @version $Id: TorqueSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
 36  
  */
 37  
 public abstract class AbstractSchedulerService extends TurbineBaseService implements ScheduleService
 38  
 {
 39  
     /** Logging */
 40  3
     protected static final Logger log = LogManager.getLogger(ScheduleService.LOGGER_NAME);
 41  
 
 42  
     /** The queue */
 43  3
     protected JobQueue<JobEntry> scheduleQueue = null;
 44  
 
 45  
     /** Current status of the scheduler */
 46  3
     protected boolean enabled = false;
 47  
 
 48  
     /** The main loop for starting jobs. */
 49  
     protected MainLoop mainLoop;
 50  
 
 51  
     /** The thread used to process commands. */
 52  
     protected Thread thread;
 53  
 
 54  
     /**
 55  
      * Creates a new instance.
 56  
      */
 57  
     public AbstractSchedulerService()
 58  3
     {
 59  3
         mainLoop = null;
 60  3
         thread = null;
 61  3
     }
 62  
 
 63  
     /**
 64  
      * Initializes the SchedulerService.
 65  
      *
 66  
      * @throws InitializationException
 67  
      *             Something went wrong in the init stage
 68  
      */
 69  
     @Override
 70  
     public void init() throws InitializationException
 71  
     {
 72  
         try
 73  
         {
 74  12
             setEnabled(getConfiguration().getBoolean("enabled", true));
 75  12
             scheduleQueue = new JobQueue<JobEntry>();
 76  12
             mainLoop = new MainLoop();
 77  
 
 78  
             @SuppressWarnings("unchecked") // Why is this cast necessary?
 79  12
             List<JobEntry> jobs = (List<JobEntry>)loadJobs();
 80  12
             scheduleQueue.batchLoad(jobs);
 81  12
             restart();
 82  
 
 83  12
             setInit(true);
 84  
         }
 85  0
         catch (Exception e)
 86  
         {
 87  0
             throw new InitializationException("Could not initialize the scheduler service", e);
 88  12
         }
 89  12
     }
 90  
 
 91  
     /**
 92  
      * Load all jobs from configuration storage
 93  
      *
 94  
      * @return the list of pre-configured jobs
 95  
      * @throws TurbineException if jobs could not be loaded
 96  
      */
 97  
     protected abstract List<? extends JobEntry> loadJobs() throws TurbineException;
 98  
 
 99  
     /**
 100  
      * Shutdowns the service.
 101  
      *
 102  
      * This methods interrupts the housekeeping thread.
 103  
      */
 104  
     @Override
 105  
     public void shutdown()
 106  
     {
 107  12
         if (getThread() != null)
 108  
         {
 109  11
             getThread().interrupt();
 110  
         }
 111  12
     }
 112  
 
 113  
     /**
 114  
      * @see org.apache.turbine.services.schedule.ScheduleService#newJob(int, int, int, int, int, java.lang.String)
 115  
      */
 116  
     @Override
 117  
     public abstract JobEntry newJob(int sec, int min, int hour, int wd, int day_mo, String task) throws TurbineException;
 118  
 
 119  
     /**
 120  
      * Get a specific Job from Storage.
 121  
      *
 122  
      * @param oid
 123  
      *            The int id for the job.
 124  
      * @return A JobEntry.
 125  
      * @throws TurbineException
 126  
      *                job could not be retrieved.
 127  
      */
 128  
     @Override
 129  
     public abstract JobEntry getJob(int oid) throws TurbineException;
 130  
 
 131  
     /**
 132  
      * Add a new job to the queue.
 133  
      *
 134  
      * @param je
 135  
      *            A JobEntry with the job to add.
 136  
      * @throws TurbineException
 137  
      *             job could not be added
 138  
      */
 139  
     @Override
 140  
     public void addJob(JobEntry je) throws TurbineException
 141  
     {
 142  3
         updateJob(je);
 143  3
     }
 144  
 
 145  
     /**
 146  
      * Remove a job from the queue.
 147  
      *
 148  
      * @param je
 149  
      *            A JobEntry with the job to remove.
 150  
      * @throws TurbineException
 151  
      *                job could not be removed
 152  
      */
 153  
     @Override
 154  
     public abstract void removeJob(JobEntry je) throws TurbineException;
 155  
 
 156  
     /**
 157  
      * Add or update a job.
 158  
      *
 159  
      * @param je
 160  
      *            A JobEntry with the job to modify
 161  
      * @throws TurbineException
 162  
      *             job could not be updated
 163  
      */
 164  
     @Override
 165  
     public abstract void updateJob(JobEntry je) throws TurbineException;
 166  
 
 167  
     /**
 168  
      * List jobs in the queue. This is used by the scheduler UI.
 169  
      *
 170  
      * @return A List of jobs.
 171  
      */
 172  
     @Override
 173  
     public List<JobEntry> listJobs()
 174  
     {
 175  15
         return scheduleQueue.list();
 176  
     }
 177  
 
 178  
     /**
 179  
      * Sets the enabled status of the scheduler
 180  
      *
 181  
      * @param enabled true to enable the scheduler
 182  
      *
 183  
      */
 184  
     protected void setEnabled(boolean enabled)
 185  
     {
 186  15
         this.enabled = enabled;
 187  15
     }
 188  
 
 189  
     /**
 190  
      * Determines if the scheduler service is currently enabled.
 191  
      *
 192  
      * @return Status of the scheduler service.
 193  
      */
 194  
     @Override
 195  
     public boolean isEnabled()
 196  
     {
 197  6
         return enabled;
 198  
     }
 199  
 
 200  
     /**
 201  
      * Starts or restarts the scheduler if not already running.
 202  
      */
 203  
     @Override
 204  
     public synchronized void startScheduler()
 205  
     {
 206  3
         setEnabled(true);
 207  3
         restart();
 208  3
     }
 209  
 
 210  
     /**
 211  
      * Stops the scheduler if it is currently running.
 212  
      */
 213  
     @Override
 214  
     public synchronized void stopScheduler()
 215  
     {
 216  3
         log.info("Stopping job scheduler");
 217  3
         Thread thread = getThread();
 218  3
         if (thread != null)
 219  
         {
 220  3
             thread.interrupt();
 221  
         }
 222  3
         enabled = false;
 223  3
     }
 224  
 
 225  
     /**
 226  
      * Return the thread being used to process commands, or null if there is no
 227  
      * such thread. You can use this to invoke any special methods on the
 228  
      * thread, for example, to interrupt it.
 229  
      *
 230  
      * @return A Thread.
 231  
      */
 232  
     public synchronized Thread getThread()
 233  
     {
 234  26
         return thread;
 235  
     }
 236  
 
 237  
     /**
 238  
      * Set thread to null to indicate termination.
 239  
      */
 240  
     protected synchronized void clearThread()
 241  
     {
 242  12
         thread = null;
 243  12
     }
 244  
 
 245  
     /**
 246  
      * Start (or restart) a thread to process commands, or wake up an existing
 247  
      * thread if one is already running. This method can be invoked if the
 248  
      * background thread crashed due to an unrecoverable exception in an
 249  
      * executed command.
 250  
      */
 251  
     public synchronized void restart()
 252  
     {
 253  21
         if (enabled)
 254  
         {
 255  21
             log.info("Starting job scheduler");
 256  21
             if (thread == null)
 257  
             {
 258  
                 // Create the the housekeeping thread of the scheduler. It will
 259  
                 // wait for the time when the next task needs to be started,
 260  
                 // and then launch a worker thread to execute the task.
 261  12
                 thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME);
 262  
                 // Indicate that this is a system thread. JVM will quit only
 263  
                 // when there are no more enabled user threads. Settings threads
 264  
                 // spawned internally by Turbine as daemons allows commandline
 265  
                 // applications using Turbine to terminate in an orderly manner.
 266  12
                 thread.setDaemon(true);
 267  12
                 thread.start();
 268  
             }
 269  
             else
 270  
             {
 271  9
                 notify();
 272  
             }
 273  
         }
 274  21
     }
 275  
 
 276  
     /**
 277  
      * Return the next Job to execute, or null if thread is interrupted.
 278  
      *
 279  
      * @return A JobEntry.
 280  
      * @throws TurbineException
 281  
      *                a generic exception.
 282  
      */
 283  
     protected synchronized JobEntry nextJob() throws TurbineException
 284  
     {
 285  
         try
 286  
         {
 287  27
             while (!Thread.interrupted())
 288  
             {
 289  
                 // Grab the next job off the queue.
 290  25
                 JobEntry je = scheduleQueue.getNext();
 291  
 
 292  25
                 if (je == null)
 293  
                 {
 294  
                     // Queue must be empty. Wait on it.
 295  0
                     wait();
 296  
                 }
 297  
                 else
 298  
                 {
 299  25
                     long now = System.currentTimeMillis();
 300  25
                     long when = je.getNextRuntime();
 301  
 
 302  25
                     if (when > now)
 303  
                     {
 304  
                         // Wait till next runtime.
 305  22
                         wait(when - now);
 306  
                     }
 307  
                     else
 308  
                     {
 309  
                         // Update the next runtime for the job.
 310  3
                         scheduleQueue.updateQueue(je);
 311  
                         // Return the job to run it.
 312  3
                         return je;
 313  
                     }
 314  
                 }
 315  12
             }
 316  
         }
 317  10
         catch (InterruptedException ex)
 318  
         {
 319  
             // ignore
 320  2
         }
 321  
 
 322  
         // On interrupt.
 323  12
         return null;
 324  
     }
 325  
 
 326  
     /**
 327  
      * Inner class. This is isolated in its own Runnable class just so that the
 328  
      * main class need not implement Runnable, which would allow others to
 329  
      * directly invoke run, which is not supported.
 330  
      */
 331  12
     protected class MainLoop implements Runnable
 332  
     {
 333  
         /**
 334  
          * Method to run the class.
 335  
          */
 336  
         @Override
 337  
         public void run()
 338  
         {
 339  12
             String taskName = null;
 340  
             try
 341  
             {
 342  15
                 while (enabled)
 343  
                 {
 344  15
                     JobEntry je = nextJob();
 345  15
                     if (je != null)
 346  
                     {
 347  3
                         taskName = je.getTask();
 348  
 
 349  
                         // Start the thread to run the job.
 350  3
                         Runnable wt = new WorkerThread(je);
 351  3
                         Thread helper = new Thread(wt);
 352  3
                         helper.start();
 353  
                     }
 354  
                     else
 355  
                     {
 356  
                         break;
 357  
                     }
 358  3
                 }
 359  
             }
 360  0
             catch (Exception e)
 361  
             {
 362  0
                 log.error("Error running a Scheduled Job: {}", taskName, e);
 363  0
                 enabled = false;
 364  
             }
 365  
             finally
 366  
             {
 367  12
                 clearThread();
 368  12
             }
 369  12
         }
 370  
     }
 371  
 }