Coverage Report - org.apache.commons.pipeline.driver.DedicatedThreadStageDriver
 
Classes in this File Line Coverage Branch Coverage Complexity
DedicatedThreadStageDriver
77%
27/35
56%
9/16
0
DedicatedThreadStageDriver$1
80%
8/10
50%
1/2
0
DedicatedThreadStageDriver$2
20%
1/5
N/A
0
DedicatedThreadStageDriver$WorkerThread
76%
28/37
43%
13/30
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  * 
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  * 
 12  
  * Unless required by applicable law or agreed to in writing,
 13  
  * software distributed under the License is distributed on an
 14  
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 15  
  * KIND, either express or implied.  See the License for the
 16  
  * specific language governing permissions and limitations
 17  
  * under the License.    
 18  
  */ 
 19  
 
 20  
 
 21  
 package org.apache.commons.pipeline.driver;
 22  
 
 23  
 import java.lang.Thread.UncaughtExceptionHandler;
 24  
 import java.util.concurrent.BlockingQueue;
 25  
 import java.util.concurrent.TimeUnit;
 26  
 import org.apache.commons.logging.Log;
 27  
 import org.apache.commons.logging.LogFactory;
 28  
 import org.apache.commons.pipeline.driver.AbstractStageDriver;
 29  
 import org.apache.commons.pipeline.Feeder;
 30  
 import org.apache.commons.pipeline.StageDriver;
 31  
 import org.apache.commons.pipeline.Stage;
 32  
 import org.apache.commons.pipeline.StageContext;
 33  
 import org.apache.commons.pipeline.StageException;
 34  
 import static org.apache.commons.pipeline.StageDriver.State.*;
 35  
 import org.apache.commons.pipeline.StageDriver.State;
 36  
 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
 37  
 
 38  
 /**
 39  
  * This is a very simple implementation of a AbstractStageDriver which spawns
 40  
  * a single thread to process a stage.
 41  
  */
 42  2784
 public class DedicatedThreadStageDriver extends AbstractStageDriver {
 43  15
     private final Log log = LogFactory.getLog(DedicatedThreadStageDriver.class);
 44  
     
 45  
     //poll timeout to ensure deadlock cannot occur on thread termination
 46  
     private long timeout;
 47  
     
 48  
     //thread responsible for stage processing
 49  
     private Thread workerThread;
 50  
     
 51  
     //queue to hold data to be processed
 52  
     private BlockingQueue queue;    
 53  
     
 54  
     //feeder used to feed data to this stage's queue
 55  15
     private final Feeder feeder = new Feeder() {
 56  
         public void feed(Object obj) {
 57  418
             if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
 58  
                     + " (" + DedicatedThreadStageDriver.this.queue.remainingCapacity() + " available slots in queue)");
 59  
             try {
 60  418
                 DedicatedThreadStageDriver.this.queue.put(obj);
 61  0
             } catch (InterruptedException e) {
 62  0
                 throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object "
 63  
                         + obj + " in queue for stage " + stage, e);
 64  418
             }
 65  
             
 66  418
             synchronized(DedicatedThreadStageDriver.this) {
 67  418
                 DedicatedThreadStageDriver.this.notifyAll();
 68  418
             }
 69  418
         }
 70  
     };
 71  
     
 72  
     /**
 73  
      * Creates a new DedicatedThreadStageDriver with the specified thread wait
 74  
      * timeout and fault tolerance values.
 75  
      * @param stage The stage that the driver will run
 76  
      * @param context the context in which to run the stage
 77  
      * @param queue The object queue to use for storing objects prior to processing. The
 78  
      * default is {@link LinkedBlockingQueue}
 79  
      * @param timeout The amount of time, in milliseconds, that the worker thread
 80  
      * will wait before checking the processing state if no objects are available
 81  
      * in the thread's queue.
 82  
      * @param faultTolerance Flag determining the behavior of the driver when
 83  
      * an error is encountered in execution of {@link Stage#process(Object)}.
 84  
      * If this is set to false, any exception thrown during {@link Stage#process(Object)}
 85  
      * will cause the worker thread to halt without executing {@link Stage#postprocess()}
 86  
      * ({@link Stage#release()} will be called.)
 87  
      */
 88  
     public DedicatedThreadStageDriver(Stage stage, StageContext context, BlockingQueue queue, long timeout, FaultTolerance faultTolerance) {
 89  15
         super(stage, context, faultTolerance);
 90  15
         this.queue = queue;
 91  15
         this.timeout = timeout;
 92  15
     }
 93  
     
 94  
     /**
 95  
      * Return the Feeder used to feed data to the queue of objects to be processed.
 96  
      * @return The feeder for objects processed by this driver's stage.
 97  
      */
 98  
     public Feeder getFeeder() {
 99  20
         return this.feeder;
 100  
     }
 101  
     
 102  
     /**
 103  
      * Start the processing of the stage.
 104  
      * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup
 105  
      */
 106  
     public synchronized void start() throws StageException {
 107  14
         if (this.currentState == STOPPED) {
 108  14
             log.debug("Starting worker thread for stage " + stage + ".");
 109  14
             this.workerThread = new WorkerThread(stage);
 110  14
             this.workerThread.start();
 111  14
             log.debug("Worker thread for stage " + stage + " started.");
 112  
             
 113  
             //wait to ensure that the stage starts up correctly
 114  
             try {
 115  40
                 while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
 116  0
             } catch (InterruptedException e) {
 117  0
                 throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for thread startup.", e);
 118  14
             }
 119  
         } else {
 120  0
             throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
 121  
         }
 122  14
     }
 123  
     
 124  
     /**
 125  
      * Causes processing to shut down gracefully.
 126  
      * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown.
 127  
      */
 128  
     public synchronized void finish() throws StageException {
 129  14
         if (currentState == STOPPED) {
 130  0
             throw new IllegalStateException("The driver is not currently running.");
 131  
         }
 132  
         
 133  
         try {
 134  14
             while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
 135  
             
 136  
             //ask the worker thread to shut down
 137  14
             testAndSetState(RUNNING, STOP_REQUESTED);
 138  
             
 139  28
             while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait();
 140  
             
 141  14
             log.debug("Waiting for worker thread stop for stage " + stage + ".");
 142  14
             this.workerThread.join();
 143  14
             log.debug("Worker thread for stage " + stage + " halted");
 144  
             
 145  0
         } catch (InterruptedException e) {
 146  0
             throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for graceful shutdown.", e);
 147  14
         }
 148  
         
 149  14
         setState(STOPPED);
 150  14
     }
 151  
     
 152  
     /*********************************
 153  
      * WORKER THREAD IMPLEMENTATIONS *
 154  
      *********************************/
 155  15
     private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() {
 156  
         public void uncaughtException(Thread t, Throwable e) {
 157  0
             setState(ERROR);
 158  0
             recordFatalError(e);
 159  0
             log.error("Uncaught exception in stage " + stage, e);
 160  0
         }
 161  
     };
 162  
     
 163  
     /**
 164  
      * This worker thread removes and processes data objects from the incoming                synchronize
 165  
      *
 166  
      * queue. It first calls preprocess(), then begins a loop that calls the process()
 167  
      * method to process data from the queue. This loop runs as long as the
 168  
      * {@link getRunning() running} property is true or the queue is not empty. To break the loop the
 169  
      * calling code must run the writer's finish() method to set the running property to false.
 170  
      * At this point the loop will continue to run until the queue is empty, then the loop will
 171  
      * exit and the postprocess() method is called.<P>
 172  
      *
 173  
      * @throws StageException if an error is encountered during data processing
 174  
      * and faultTolerant is set to false.
 175  
      */
 176  
     private class WorkerThread extends Thread {
 177  
         /** The Stage this thread will work on */
 178  
         private Stage stage;
 179  
         
 180  14
         public WorkerThread(Stage stage) {
 181  14
             this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
 182  14
             this.stage = stage;
 183  14
         }
 184  
         
 185  
         public final void run() {
 186  14
             setState(STARTED);
 187  
             
 188  
             try {
 189  14
                 if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
 190  14
                 stage.preprocess();
 191  14
                 if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
 192  
                 
 193  
                 //do not transition into running state if an error has occurred or a stop requested
 194  14
                 testAndSetState(STARTED, RUNNING);
 195  451
                 running: while (currentState != ERROR) {
 196  
                     try {
 197  451
                         Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
 198  451
                         if (obj == null) {
 199  32
                             if (currentState == STOP_REQUESTED) break running;
 200  
                             //else continue running;
 201  
                         } else {
 202  
                             try {
 203  418
                                 stage.process(obj);
 204  1
                             } catch (StageException e) {
 205  1
                                 recordProcessingException(obj, e);
 206  1
                                 if (faultTolerance == NONE) throw e;
 207  0
                             } catch (RuntimeException e) {
 208  0
                                 recordProcessingException(obj, e);
 209  0
                                 if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
 210  418
                             }
 211  
                         }
 212  0
                     } catch (InterruptedException e) {
 213  0
                         throw new RuntimeException("Worker thread unexpectedly interrupted while waiting on data for stage " + stage, e);
 214  437
                     }
 215  
                 }
 216  14
                 if (log.isDebugEnabled()) log.debug("Stage " + stage + " exited running state.");
 217  
                 
 218  14
                 if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
 219  14
                 stage.postprocess();
 220  14
                 if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
 221  
                 
 222  0
             } catch (StageException e) {
 223  0
                 log.error("An error occurred in the stage " + stage, e);
 224  0
                 recordFatalError(e);
 225  0
                 setState(ERROR);
 226  
             } finally {
 227  14
                 if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
 228  14
                 stage.release();
 229  14
                 if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
 230  
             }
 231  
             
 232  
             //do not transition into finished state if an error has occurred
 233  14
             testAndSetState(STOP_REQUESTED, FINISHED);
 234  14
         }
 235  
     }
 236  
     
 237  
     /**
 238  
      * Get the size of the queue used by this StageDriver.
 239  
      * @return the queue capacity
 240  
      */
 241  
     public int getQueueSize() {
 242  0
         return this.queue.size() + this.queue.remainingCapacity();
 243  
     }
 244  
     
 245  
     /**
 246  
      * Get the timeout value (in milliseconds) used by this StageDriver on
 247  
      * thread termination.
 248  
      * @return the timeout setting in milliseconds
 249  
      */
 250  
     public long getTimeout() {
 251  0
         return this.timeout;
 252  
     }
 253  
 }