Coverage Report - org.apache.commons.pipeline.driver.ThreadPoolStageDriver
 
Classes in this File Line Coverage Branch Coverage Complexity
ThreadPoolStageDriver
78%
40/51
50%
14/28
0
ThreadPoolStageDriver$1
80%
8/10
50%
1/2
0
ThreadPoolStageDriver$2
20%
1/5
N/A
0
ThreadPoolStageDriver$LatchWorkerThread
63%
22/35
50%
7/14
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing,
 13  
  * software distributed under the License is distributed on an
 14  
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 15  
  * KIND, either express or implied.  See the License for the
 16  
  * specific language governing permissions and limitations
 17  
  * under the License.
 18  
  */
 19  
 
 20  
 package org.apache.commons.pipeline.driver;
 21  
 
 22  
 import java.lang.Thread.UncaughtExceptionHandler;
 23  
 import java.util.concurrent.BlockingQueue;
 24  
 import java.util.concurrent.CountDownLatch;
 25  
 import java.util.concurrent.TimeUnit;
 26  
 import org.apache.commons.logging.Log;
 27  
 import org.apache.commons.logging.LogFactory;
 28  
 import org.apache.commons.pipeline.Feeder;
 29  
 import org.apache.commons.pipeline.StageDriver;
 30  
 import org.apache.commons.pipeline.Stage;
 31  
 import org.apache.commons.pipeline.StageContext;
 32  
 import org.apache.commons.pipeline.StageException;
 33  
 import org.apache.commons.pipeline.driver.AbstractStageDriver;
 34  
 import org.apache.commons.pipeline.driver.FaultTolerance;
 35  
 
 36  
 import static org.apache.commons.pipeline.StageDriver.State.*;
 37  
 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
 38  
 
 39  
 /**
 40  
  * This {@link StageDriver} implementation uses a pool of threads
 41  
  * to process objects from an input queue.
 42  
  */
 43  188
 public class ThreadPoolStageDriver extends AbstractStageDriver {
 44  
     // logger for the class
 45  9
     private final Log log = LogFactory.getLog(ThreadPoolStageDriver.class);
 46  
     
 47  
     // wait timeout to ensure deadlock cannot occur on thread termination
 48  
     private long timeout;
 49  
     
 50  
     // signal telling threads to start polling queue
 51  
     private final CountDownLatch startSignal;
 52  
     
 53  
     // signal threads use to tell driver they have finished
 54  
     private final CountDownLatch doneSignal;
 55  
     
 56  
     // number of threads polling queue
 57  
     private final int numThreads;
 58  
     
 59  
     // queue to hold data to be processed
 60  
     private final BlockingQueue queue;
 61  
     
 62  
     //feeder used to feed data to this stage's queue
 63  9
     private final Feeder feeder = new Feeder() {
 64  
         public void feed(Object obj) {
 65  16
             if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage
 66  
                     + " (" + ThreadPoolStageDriver.this.queue.remainingCapacity() + " available slots in queue)");
 67  
             
 68  
             try {
 69  16
                 ThreadPoolStageDriver.this.queue.put(obj);
 70  0
             } catch (InterruptedException e) {
 71  0
                 throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object "
 72  
                         + obj + " in queue for stage " + stage, e);
 73  16
             }
 74  
             
 75  16
             synchronized(ThreadPoolStageDriver.this) {
 76  16
                 ThreadPoolStageDriver.this.notifyAll();
 77  16
             }
 78  16
         }
 79  
     };
 80  
     
 81  
     /**
 82  
      * Creates a new ThreadPoolStageDriver.
 83  
      *
 84  
      * @param stage The stage that the driver will run
 85  
      * @param context the context in which to run the stage
 86  
      * @param queue The object queue to use for storing objects prior to processing. The
 87  
      * default is {@link LinkedBlockingQueue}
 88  
      * @param timeout The amount of time, in milliseconds, that the worker thread
 89  
      * will wait before checking the processing state if no objects are available
 90  
      * in the thread's queue.
 91  
      * @param faultTolerance Flag determining the behavior of the driver when
 92  
      * an error is encountered in execution of {@link Stage#process(Object)}.
 93  
      * If this is set to false, any exception thrown during {@link Stage#process(Object)}
 94  
      * will cause the worker thread to halt without executing {@link Stage#postprocess()}
 95  
      * ({@link Stage#release()} will be called.)
 96  
      * @param numThreads Number of threads that will be simultaneously reading from queue
 97  
      */
 98  
     public ThreadPoolStageDriver(Stage stage, StageContext context,
 99  
             BlockingQueue queue,
 100  
             long timeout,
 101  
             FaultTolerance faultTolerance,
 102  
             int numThreads) {
 103  9
         super(stage, context, faultTolerance);
 104  9
         this.numThreads = numThreads;
 105  
         
 106  9
         this.startSignal = new CountDownLatch(1);
 107  9
         this.doneSignal = new CountDownLatch(this.numThreads);
 108  
         
 109  9
         this.queue = queue;
 110  9
         this.timeout = timeout;
 111  9
     }
 112  
     
 113  
     /**
 114  
      * Return the Feeder used to feed data to the queue of objects to be processed.
 115  
      * @return The feeder for objects processed by this driver's stage.
 116  
      */
 117  
     public Feeder getFeeder() {
 118  12
         return this.feeder;
 119  
     }
 120  
     
 121  
     /**
 122  
      * Start the processing of the stage. Creates threads to poll items
 123  
      * from queue.
 124  
      * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup
 125  
      */
 126  
     public synchronized void start() throws StageException {
 127  8
         if (this.currentState == STOPPED) {
 128  8
             setState(STARTED);
 129  
             
 130  8
             if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
 131  8
             stage.preprocess();
 132  8
             if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
 133  
             
 134  8
             log.debug("Starting worker threads for stage " + stage + ".");
 135  
             
 136  20
             for (int i=0;i<this.numThreads;i++) {
 137  12
                 new LatchWorkerThread(i).start();
 138  
             }
 139  
             
 140  
             // let threads know they can start
 141  8
             testAndSetState(STARTED, RUNNING);
 142  8
             startSignal.countDown();
 143  
             
 144  8
             log.debug("Worker threads for stage " + stage + " started.");
 145  
         } else {
 146  0
             throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
 147  
         }
 148  8
     }
 149  
     
 150  
     /**
 151  
      * Causes processing to shut down gracefully. Waits until all worker threads
 152  
      * have completed. It is important that this method be called only after
 153  
      * the completion of execution of finish() in the driver for the prior
 154  
      * stage; parallel finish calls can cause the stage to shut down before
 155  
      * all prior stages have finished processing.
 156  
      *
 157  
      * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown.
 158  
      */
 159  
     public synchronized void finish() throws StageException {
 160  8
         if (currentState == STOPPED) {
 161  0
             throw new IllegalStateException("The driver is not currently running.");
 162  
         }
 163  
         
 164  
         try {
 165  
             //it may be the case that finish() is called when the driver is still in the process
 166  
             //of starting up, so it is necessary to wait to enter the running state before
 167  
             //a stop can be requested
 168  8
             while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
 169  
             
 170  
             //ask the worker threads to shut down
 171  8
             testAndSetState(RUNNING, STOP_REQUESTED);
 172  
             
 173  8
             if (log.isDebugEnabled()) log.debug("Waiting for worker threads to stop for stage " + stage + ".");
 174  8
             doneSignal.await();
 175  8
             if (log.isDebugEnabled()) log.debug("Worker threads for stage " + stage + " halted");
 176  
             
 177  
             //transition into finished state (not used internally?)
 178  8
             testAndSetState(STOP_REQUESTED, FINISHED);
 179  
             
 180  
             //do not run postprocessing if the driver is in an error state
 181  8
             if (this.currentState != ERROR) {
 182  8
                 if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
 183  8
                 this.stage.postprocess();
 184  8
                 if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
 185  
             }
 186  
             
 187  
             //the following lines appear to be artifacts of copy-and-paste from
 188  
             //DedicatedThreadStageDriver.
 189  
 //            //do not transition into finished state if an error has occurred
 190  
 //            testAndSetState(STOP_REQUESTED, FINISHED);
 191  
 //
 192  
 //            while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait();
 193  
             
 194  0
         } catch (StageException e) {
 195  0
             log.error("An error occurred during postprocessing of stage " + stage , e);
 196  0
             recordFatalError(e);
 197  0
             setState(ERROR);
 198  0
         } catch (InterruptedException e) {
 199  0
             throw new StageException(this.getStage(), "StageDriver unexpectedly interrupted while waiting for shutdown of worker threads.", e);
 200  
         } finally {
 201  8
             if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
 202  8
             stage.release();
 203  8
             if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
 204  
         }
 205  
         
 206  8
         testAndSetState(FINISHED, STOPPED);
 207  8
     }
 208  
     
 209  
     /**
 210  
      * Get the size of the queue used by this StageDriver.
 211  
      * @return the queue capacity
 212  
      */
 213  
     public int getQueueSize() {
 214  0
         return this.queue.size() + this.queue.remainingCapacity();
 215  
     }
 216  
     
 217  
     /**
 218  
      * Get the timeout value (in milliseconds) used by this StageDriver on
 219  
      * thread termination.
 220  
      * @return the timeout setting in milliseconds
 221  
      */
 222  
     public long getTimeout() {
 223  0
         return this.timeout;
 224  
     }
 225  
     
 226  
     /**
 227  
      * Returns the number of threads allocated to the thread pool.
 228  
      */
 229  
     public int getNumThreads() {
 230  0
         return numThreads;
 231  
     }
 232  
     
 233  
     /*********************************
 234  
      * WORKER THREAD IMPLEMENTATIONS *
 235  
      *********************************/
 236  9
     private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() {
 237  
         public void uncaughtException(Thread t, Throwable e) {
 238  0
             setState(ERROR);
 239  0
             recordFatalError(e);
 240  0
             log.error("Uncaught exception in stage " + stage, e);
 241  0
         }
 242  
     };
 243  
     
 244  
     /**
 245  
      * This worker thread removes and processes data objects from the incoming
 246  
      * synchronize queue. It calls the Stage's process() method to process data
 247  
      * from the queue. This loop runs until State has changed to
 248  
      * STOP_REQUESTED. To break the loop the calling code must run the writer's
 249  
      * finish() method to set the running property to false.
 250  
      *
 251  
      * @throws StageException if an error is encountered during data processing
 252  
      * and faultTolerant is set to false.
 253  
      */
 254  
     private class LatchWorkerThread extends Thread {
 255  
         final int threadID;
 256  
         
 257  12
         LatchWorkerThread(int threadID) {
 258  12
             this.setUncaughtExceptionHandler(workerThreadExceptionHandler);
 259  12
             this.threadID = threadID;
 260  12
         }
 261  
         
 262  
         public final void run() {
 263  
             try {
 264  12
                 ThreadPoolStageDriver.this.startSignal.await();
 265  
                 //do not transition into running state if an error has occurred or a stop requested
 266  32
                 running: while (currentState != ERROR) {
 267  
                     try {
 268  32
                         Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
 269  32
                         if (obj == null) {
 270  15
                             if (currentState == STOP_REQUESTED) break running;
 271  
                         } else {
 272  
                             try {
 273  16
                                 stage.process(obj);
 274  1
                             } catch (StageException e) {
 275  1
                                 recordProcessingException(obj, e);
 276  1
                                 if (faultTolerance == NONE) throw e;
 277  0
                             } catch (RuntimeException e) {
 278  0
                                 recordProcessingException(obj, e);
 279  0
                                 if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
 280  16
                             }
 281  
                         }
 282  0
                     } catch (InterruptedException e) {
 283  0
                         throw new RuntimeException("Worker thread " + this.threadID + " unexpectedly interrupted while waiting on data for stage " + stage, e);
 284  20
                     }
 285  
                 }
 286  11
                 if (log.isDebugEnabled()) log.debug("Stage " + stage + " (threadID: " + this.threadID + ") exited running state.");
 287  
                 
 288  0
             } catch (StageException e) {
 289  0
                 log.error("An error occurred in the stage " + stage + " (threadID: " + this.threadID + ")", e);
 290  0
                 recordFatalError(e);
 291  0
                 setState(ERROR);
 292  0
             } catch (InterruptedException e) {
 293  0
                 log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while waiting for barrier", e);
 294  0
                 recordFatalError(e);
 295  0
                 setState(ERROR);
 296  
             } finally {
 297  12
                 doneSignal.countDown();
 298  12
                 synchronized (ThreadPoolStageDriver.this) {
 299  12
                     ThreadPoolStageDriver.this.notifyAll();
 300  12
                 }
 301  12
             }
 302  12
         }
 303  
     }    
 304  
 }