Coverage Report - org.apache.commons.pipeline.driver.AbstractStageDriver
 
Classes in this File Line Coverage Branch Coverage Complexity
AbstractStageDriver
68%
23/34
30%
3/10
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *     http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 
 18  
 package org.apache.commons.pipeline.driver;
 19  
 
 20  
 import java.util.ArrayList;
 21  
 import java.util.List;
 22  
 import org.apache.commons.pipeline.*;
 23  
 
 24  
 /**
 25  
  * This interface is used to define how processing for a stage is started,
 26  
  * stopped, and run. AbstractStageDriver implementations may run stages in one or
 27  
  * more threads, and use the {@link StageMonitor} interface to provide communication
 28  
  * between the stage, the driver, and the enclosing pipeline.
 29  
  */
 30  
 public abstract class AbstractStageDriver implements StageDriver {
 31  
     
 32  
     /**
 33  
      * The stage to run.
 34  
      */
 35  
     protected Stage stage;
 36  
     
 37  
     /**
 38  
      * The context for the stage being run
 39  
      */
 40  
     protected StageContext context;    
 41  
     
 42  
     /**
 43  
      * The current state of processing. In most drivers, this is used for
 44  
      * thread control.
 45  
      */
 46  33
     protected volatile State currentState = State.STOPPED;    
 47  
             
 48  
     /**
 49  
      * Enumerated value indicating the fault tolerance level of the StageDriver.
 50  
      */
 51  33
     protected FaultTolerance faultTolerance = FaultTolerance.NONE;
 52  
     
 53  
     /**
 54  
      * List of processing failures that have occurred.
 55  
      */
 56  33
     protected List<ProcessingException> processingExceptions = new ArrayList<ProcessingException>();
 57  
 
 58  
     /**
 59  
      * List of errors that have occurred.
 60  
      */
 61  33
     protected List<Throwable> errors = new ArrayList<Throwable>();
 62  
     
 63  
     /**
 64  
      * Creates a StageDriver for the specified stage.
 65  
      * 
 66  
      * @param stage The stage for which the driver will be created
 67  
      * @param context The context in which to run the stage
 68  
      */
 69  
     public AbstractStageDriver(Stage stage, StageContext context) {
 70  0
         this(stage, context, FaultTolerance.NONE);
 71  0
     }
 72  
     
 73  
     /**
 74  
      * Creates a StageDriver for the specified stage.
 75  
      * 
 76  
      * @param stage The stage for which the driver will be created
 77  
      * @param context The context in which to run the stage
 78  
      */
 79  33
     public AbstractStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) {
 80  33
         if (stage == null) throw new IllegalArgumentException("Stage may not be null.");
 81  33
         if (context == null) throw new IllegalArgumentException("Context may not be null.");
 82  33
         this.stage = stage;
 83  33
         this.context = context;
 84  33
         this.faultTolerance = faultTolerance;
 85  33
     }
 86  
     
 87  
     /**
 88  
      * Returns the Stage being run by this StageDriver.
 89  
      * 
 90  
      * @return The stage being run by this StageDriver instance
 91  
      */
 92  
     public Stage getStage() {
 93  25
         return this.stage;
 94  
     }
 95  
     
 96  
     /**
 97  
      * This method is used to provide a communication channel between the context 
 98  
      * in which the driver is being run and the managed stage.
 99  
      * @return the Feeder used to feed objects to the managed stage for processing.
 100  
      */
 101  
     public abstract Feeder getFeeder();    
 102  
 
 103  
     /**
 104  
      * Return the current state of stage processing.
 105  
      * @return the current state of processing
 106  
      */
 107  
     public State getState() {
 108  13
         return this.currentState;
 109  
     }
 110  
     
 111  
     /**
 112  
      * Atomically tests to determine whether or not the driver is in the one of
 113  
      * the specified states.
 114  
      */
 115  
     protected synchronized boolean isInState(State... states) {
 116  0
         for (State state : states) if (state == currentState) return true;
 117  0
         return false;
 118  
     }
 119  
     
 120  
     /**
 121  
      * Set the current state of stage processing and notify any listeners
 122  
      * that may be waiting on a state change.
 123  
      */
 124  
     protected synchronized void setState(State nextState) {
 125  125
         this.currentState = nextState;
 126  125
         this.notifyAll();
 127  125
     }
 128  
     
 129  
     /**
 130  
      * This method performs an atomic conditional state transition change
 131  
      * to the value specified by the nextState parameter if and only if the
 132  
      * current state is equal to the test state.
 133  
      */
 134  
     protected synchronized boolean testAndSetState(State testState, State nextState) {
 135  84
         if (currentState == testState) {
 136  84
             setState(nextState);
 137  84
             return true;
 138  
         } else {
 139  0
             return false;
 140  
         }
 141  
     }
 142  
     
 143  
     /**
 144  
      * This method is used to start the driver, run the 
 145  
      * {@link Stage#preprocess() preprocess()} method of the attached stage
 146  
      * and to then begin processing any objects fed to this driver's Feeder.
 147  
      *
 148  
      * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during stage startup. In most cases, such errors
 149  
      * will be handled internally by the driver.
 150  
      */
 151  
     public abstract void start() throws StageException;
 152  
     
 153  
     /**
 154  
      * This method waits for the stage(s) queue(s) to empty and any processor thread(s) to exit
 155  
      * cleanly and then calls release() to release any resources acquired during processing, if possible.
 156  
      * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during driver shutdown. Ordinarily such 
 157  
      * exceptions will be handled internally.
 158  
      */
 159  
     public abstract void finish() throws StageException;
 160  
 
 161  
     /**
 162  
      * Sets the failure tolerance flag for the worker thread. If faultTolerance
 163  
      * is set to CHECKED, {@link StageException StageException}s thrown by
 164  
      * the {@link Stage#process(Object)} method will not interrupt queue
 165  
      * processing, but will simply be logged with a severity of ERROR.
 166  
      * If faultTolerance is set to ALL, runtime exceptions will also be
 167  
      * logged and otherwise ignored.
 168  
      * @param faultTolerance the flag value
 169  
      */
 170  
     public final void setFaultTolerance(FaultTolerance faultTolerance) {
 171  0
         this.faultTolerance = faultTolerance;
 172  0
     }
 173  
     
 174  
     /**
 175  
      * Getter for property faultTolerant.
 176  
      * @return Value of property faultTolerant.
 177  
      */
 178  
     public final FaultTolerance getFaultTolerance() {
 179  0
         return this.faultTolerance;
 180  
     }    
 181  
     
 182  
     /**
 183  
      * Store a fatal error.
 184  
      * @param error The error to be stored for later analysis
 185  
      */
 186  
     protected void recordFatalError(Throwable error) {
 187  0
         this.errors.add(error);
 188  0
     }
 189  
 
 190  
     /**
 191  
      * Returns a list of unrecoverable errors that occurred during stage
 192  
      * processing.
 193  
      * @return A list of unrecoverable errors that occurred during stage processing.
 194  
      */
 195  
     public List<Throwable> getFatalErrors() {
 196  0
         return this.errors;
 197  
     }
 198  
     
 199  
     /**
 200  
      * Store processing failure information for the specified data object.
 201  
      * @param data The data being processed at the time of the error
 202  
      * @param error The error encountered
 203  
      */
 204  
     protected void recordProcessingException(Object data, Throwable error) {
 205  2
         ProcessingException ex = new ProcessingException(this.stage, error, data, this.getState());  
 206  2
         this.processingExceptions.add(ex);
 207  2
     }    
 208  
     
 209  
     /**
 210  
      * Returns a list of errors that occurred while processing data objects,
 211  
      * along with the objects that were being processed when the errors
 212  
      * were generated.
 213  
      * @return The list of non-fatal processing errors.
 214  
      */
 215  
     public List<ProcessingException> getProcessingExceptions() {
 216  2
         return this.processingExceptions;
 217  
     }
 218  
 }