Coverage Report - org.apache.commons.pipeline.driver.SynchronousStageDriver
 
Classes in this File Line Coverage Branch Coverage Complexity
SynchronousStageDriver
56%
19/34
50%
4/8
0
SynchronousStageDriver$1
62%
8/13
33%
2/6
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *     http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 
 18  
 package org.apache.commons.pipeline.driver;
 19  
 
 20  
 import java.util.LinkedList;
 21  
 import java.util.Queue;
 22  
 import org.apache.commons.logging.Log;
 23  
 import org.apache.commons.logging.LogFactory;
 24  
 import org.apache.commons.pipeline.Feeder;
 25  
 import org.apache.commons.pipeline.Stage;
 26  
 import org.apache.commons.pipeline.StageException;
 27  
 import org.apache.commons.pipeline.StageContext;
 28  
 import static org.apache.commons.pipeline.StageDriver.State.*;
 29  
 import org.apache.commons.pipeline.StageDriver.State;
 30  
 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
 31  
 
 32  
 /**
 33  
  * This is a non-threaded version of the AbstractStageDriver.
 34  
  */
 35  0
 public class SynchronousStageDriver extends AbstractStageDriver {
 36  9
     private final Log log = LogFactory.getLog(SynchronousStageDriver.class);
 37  
     
 38  
     //queue of objects to be processed that are fed to the driver
 39  
     //when it is not in a running state
 40  9
     private Queue<Object> queue = new LinkedList<Object>();
 41  
     
 42  
     //Feeder used to feed objects to this stage
 43  9
     private final Feeder feeder = new Feeder() {
 44  
         public void feed(Object obj) {
 45  8
             synchronized (SynchronousStageDriver.this) {
 46  8
                 if (currentState == ERROR) throw new IllegalStateException("Unable to process data: driver in fatal error state.");
 47  8
                 if (currentState != RUNNING) { //enqueue objects if stage has not been started
 48  0
                     queue.add(obj);
 49  0
                     return;
 50  
                 }
 51  8
             }
 52  
             
 53  
             try {
 54  8
                 stage.process(obj);
 55  0
             } catch (StageException e) {
 56  0
                 recordProcessingException(obj, e);
 57  0
                 if (faultTolerance == NONE) throw fatalError(e);
 58  8
             }
 59  8
         }
 60  
     };
 61  
     
 62  
     /**
 63  
      * Creates a new instance of SimpleStageDriver
 64  
      * @param stage The stage to be run
 65  
      * @param context The context in which the stage will be run
 66  
      */
 67  
     public SynchronousStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) {
 68  9
         super(stage, context, faultTolerance);
 69  9
     }
 70  
     
 71  
     /**
 72  
      * Get the feeder for the encapsulated stage. Since the SynchronousStageDriver
 73  
      * is designed to run the stage in the main thread of execution, calls
 74  
      * to {@link Feeder#feed(Object)} on the returned feeder will trigger processing
 75  
      * of the object fed to the stage.
 76  
      * @return The Feeder instance for the stage.
 77  
      */
 78  
     public Feeder getFeeder() {
 79  11
         return this.feeder;
 80  
     }
 81  
     
 82  
     /**
 83  
      * Performs preprocessing and updates the driver state.
 84  
      * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state to be started or an error occurs
 85  
      * during preprocessing.
 86  
      */
 87  
     public synchronized void start() throws StageException {
 88  5
         if (this.currentState == STOPPED) {
 89  
             try {
 90  5
                 stage.preprocess();
 91  5
                 setState(RUNNING);
 92  0
             } catch (StageException e) {
 93  0
                 throw fatalError(e);
 94  5
             }
 95  
             
 96  
             // feed any queued values before returning control
 97  5
             while (!queue.isEmpty()) this.getFeeder().feed(queue.remove());
 98  
         } else {
 99  0
             throw new IllegalStateException("Illegal attempt to start driver from state: " + this.currentState);
 100  
         }
 101  5
     }
 102  
     
 103  
     /**
 104  
      * Performs postprocessing and releases stage resources, and updates the driver
 105  
      * state accordingly.
 106  
      * @throws org.apache.commons.pipeline.StageException Thrown if an error occurs during postprocessing
 107  
      */
 108  
     public synchronized void finish() throws StageException {
 109  5
         if (this.currentState == RUNNING) {            
 110  
             try {
 111  5
                 testAndSetState(RUNNING, STOP_REQUESTED);
 112  5
                 if (this.currentState == STOP_REQUESTED) stage.postprocess();
 113  0
             } catch (StageException e) {
 114  0
                 throw fatalError(e);
 115  
             } finally {
 116  5
                 stage.release();
 117  5
                 testAndSetState(STOP_REQUESTED, STOPPED);
 118  5
             }            
 119  
         } else {
 120  0
             throw new IllegalStateException("Driver is not running (current state: " + this.currentState + ")");
 121  
         }
 122  5
     }
 123  
     
 124  
     /**
 125  
      * This method obtains a lock to set the current state of processing
 126  
      * to error, records the error and returns a RuntimeException encapsulating
 127  
      * the specified throwable.
 128  
      */
 129  
     private RuntimeException fatalError(Throwable t) {
 130  
         try {
 131  0
             setState(ERROR);
 132  0
             this.recordFatalError(t);
 133  0
             stage.release();
 134  0
             this.notifyAll();
 135  0
         } catch (Exception e) {
 136  0
             this.recordFatalError(e);
 137  0
         }
 138  
         
 139  0
         return new RuntimeException("Fatal error halted processing of stage: " + stage);
 140  
     }
 141  
 }