Coverage Report - org.apache.commons.pipeline.Pipeline
 
Classes in this File Line Coverage Branch Coverage Complexity
Pipeline
76%
68/90
53%
31/58
0
Pipeline$1
100%
5/5
100%
2/2
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *     http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 
 18  
 package org.apache.commons.pipeline;
 19  
 
 20  
 import java.util.ArrayList;
 21  
 import java.util.Collection;
 22  
 import java.util.Collections;
 23  
 import java.util.EventObject;
 24  
 import java.util.HashMap;
 25  
 import java.util.LinkedList;
 26  
 import java.util.List;
 27  
 import java.util.Map;
 28  
 import org.apache.commons.pipeline.driver.SynchronousStageDriver;
 29  
 import org.apache.commons.pipeline.validation.PipelineValidator;
 30  
 import org.apache.commons.pipeline.validation.ValidationException;
 31  
 import org.apache.commons.pipeline.validation.ValidationFailure;
 32  
 
 33  
 /**
 34  
  * This class represents a processing system consisting of a number of stages
 35  
  * and branches. Each stage contains a queue and manages one or more threads
 36  
  * that process data in that stage's queue and allow processed data to be
 37  
  * passed along to subsequent stages and onto branches of the pipeline.<P>
 38  
  *
 39  
  * This class allows all stages in the pipeline to be managed collectively
 40  
  * with methods to start and stop processing for all stages, as well as
 41  
  * a simple framework for asynchronous event-based communication between stages.
 42  
  */
 43  326
 public class Pipeline implements Runnable, StageContext {
 44  
     /**
 45  
      * The branch key for the main line of production. This value is reserved
 46  
      * and may not be used as a key for other branch pipelines.
 47  
      */
 48  
     public static final String MAIN_BRANCH = "main";
 49  
     
 50  
     //The logger used for reporting by this pipeline
 51  
     //private final Log log = LogFactory.getLog(Pipeline.class);
 52  
     
 53  
     // List of stages in the pipeline, encapsulated in the drivers
 54  
     // that will be used to onStart them.
 55  
     private final LinkedList<StageDriver> drivers;
 56  
     private final Map<Stage, StageDriver> driverMap;
 57  
     
 58  
     // The list of stages in the pipeline.
 59  
     private final LinkedList<Stage> stages;
 60  
     
 61  
     // Map of pipeline branches where the keys are branch names.
 62  
     private final Map<String,Pipeline> branches;
 63  
     
 64  
     // Used to store a reference to the parent pipeline, if this is a branch
 65  
     private Pipeline parent;
 66  
     
 67  
     // The list of listeners registered with the pipeline.
 68  
     private final List<StageEventListener> listeners;
 69  
     
 70  
     // Holds value of property validator.
 71  
     private PipelineValidator validator;
 72  
     
 73  
     // Feeder used to handle output of final stage
 74  10
     private Feeder terminalFeeder = Feeder.VOID;
 75  
     
 76  
     // Global environment variables
 77  10
     private Map<String,Object> env = Collections.synchronizedMap(new HashMap<String,Object>());
 78  
     
 79  
     // List of jobs to be run at defined points in pipeline lifecycle
 80  10
     private Collection<PipelineLifecycleJob> lifecycleJobs = new ArrayList<PipelineLifecycleJob>();
 81  
     
 82  
     /**
 83  
      * Creates and initializes a new Pipeline.
 84  
      */
 85  10
     public Pipeline() {
 86  10
         this.drivers = new LinkedList<StageDriver>();
 87  10
         this.driverMap = new HashMap<Stage, StageDriver>();
 88  10
         this.stages = new LinkedList<Stage>();
 89  10
         this.branches = new HashMap<String,Pipeline>();
 90  10
         this.listeners = Collections.synchronizedList(new ArrayList<StageEventListener>());
 91  10
     }
 92  
     
 93  
     /**
 94  
      * {@inheritDoc}
 95  
      */
 96  
     public void registerListener(StageEventListener listener) {
 97  5
         listeners.add(listener);
 98  5
     }
 99  
     
 100  
     /**
 101  
      * {@inheritDoc}
 102  
      */
 103  
     public Collection<StageEventListener> getRegisteredListeners() {
 104  4
         return this.listeners;
 105  
     }
 106  
     
 107  
     /**
 108  
      * Asynchronously notifies each registered listener of an event and propagates
 109  
      * the event to any attached branches and the parent pipeline.
 110  
      *
 111  
      * @param ev The event to be sent to registered listeners
 112  
      */
 113  
     public void raise(final EventObject ev) {
 114  162
         new Thread() {
 115  
             public void run() {
 116  
                 //first, recursively find the root pipeline
 117  162
                 Pipeline root = Pipeline.this;
 118  163
                 while (root.parent != null) root = root.parent;
 119  
                 
 120  
                 //notify the listeners from the root pipeline
 121  162
                 root.notifyListeners(ev);
 122  162
             }
 123  
         }.start();
 124  162
     }
 125  
     
 126  
     /**
 127  
      * Notify all listeners and recursively notify child branches of the
 128  
      * specified event. This method does not propagate events to the
 129  
      * parent pipeline.
 130  
      */
 131  
     private void notifyListeners(EventObject ev) {
 132  164
         for (StageEventListener listener : listeners) listener.notify(ev);
 133  164
         for (Pipeline branch : branches.values()) branch.notifyListeners(ev);
 134  164
     }
 135  
     
 136  
     /**
 137  
      * {@inheritDoc}
 138  
      */
 139  
     public Feeder getDownstreamFeeder(Stage stage) {
 140  8
         if (stage == null) throw new IllegalArgumentException("Unable to look up downstream feeder for null stage.");
 141  8
         if (stage == drivers.getLast().getStage()) {
 142  1
             return this.terminalFeeder;
 143  
         } else {
 144  
             //Iterate backwards over the list until the stage is found, then return
 145  
             //the feeder for the subsequent stage. Comparisons are done using reference
 146  
             //equality.
 147  17
             for (int i = drivers.size() - 2; i >= 0; i--) {
 148  17
                 if (stage == drivers.get(i).getStage()) return drivers.get(i+1).getFeeder();
 149  
             }
 150  
             
 151  0
             throw new IllegalStateException("Unable to find stage " + stage + " in pipeline.");
 152  
         }
 153  
     }
 154  
     
 155  
     /**
 156  
      * {@inheritDoc}
 157  
      */
 158  
     public Feeder getBranchFeeder(String branch) {
 159  2
         if (!getBranches().containsKey(branch)) {
 160  0
             throw new IllegalStateException("Unable to find branch in pipeline: '" + branch + "'");
 161  
         }
 162  
         
 163  2
         return branches.get(branch).getSourceFeeder();
 164  
     }
 165  
     
 166  
     /**
 167  
      * {@inheritDoc}
 168  
      */
 169  
     public Object getEnv(String key) {
 170  2
         return this.env.get(key);
 171  
     }
 172  
     
 173  
     /**
 174  
      * Sets the value corresponding to the specified environment variable key.
 175  
      */
 176  
     public void setEnv(String key, Object value) {
 177  2
         this.env.put(key, value);
 178  2
     }
 179  
     
 180  
     /**
 181  
      * Adds a {@link Stage} object to the end of this Pipeline. If a
 182  
      * {@link PipelineValidator} has been set using {@link #setValidator}, it will
 183  
      * be used to validate that the appended Stage can consume the output of the
 184  
      * previous stage of the pipeline. It does NOT validate the ability or availability
 185  
      * of branches to consume data produced by the appended stage.
 186  
      *
 187  
      * @param stage the stage to be added to the pipeline
 188  
      * @param driverFactory the factory that will be used to create a {@link StageDriver} that will run the stage
 189  
      * @throws ValidationException if there is a non-null validator set for this pipeline and an error is
 190  
      * encountered validating the addition of the stage to the pipeline.
 191  
      */
 192  
     public final void addStage(Stage stage, StageDriverFactory driverFactory) throws ValidationException {
 193  9
         if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null.");
 194  
         
 195  9
         if (validator != null) {
 196  0
             List<ValidationFailure> errors = validator.validateAddStage(this, stage, driverFactory);
 197  0
             if (errors != null && !errors.isEmpty()) {
 198  0
                 throw new ValidationException("An error occurred adding stage " + stage.toString(), errors);
 199  
             }
 200  
         }
 201  
         
 202  9
         stage.init(this);
 203  9
         this.stages.add(stage);
 204  
         
 205  9
         StageDriver driver = driverFactory.createStageDriver(stage, this);
 206  9
         this.driverMap.put(stage, driver);
 207  9
         this.drivers.add(driver);
 208  9
     }
 209  
     
 210  
     /**
 211  
      * Returns an unmodifiable list of stages that have been added to this
 212  
      * pipeline.
 213  
      * @return A list of the stages that have been added to the pipeline
 214  
      */
 215  
     public final List<Stage> getStages() {
 216  1
         return Collections.unmodifiableList(this.stages);
 217  
     }
 218  
     
 219  
     /**
 220  
      * Return the StageDriver for the specified Stage.
 221  
      *
 222  
      * @return the StageDriver for the specified Stage.
 223  
      */
 224  
     public final StageDriver getStageDriver(Stage stage) {
 225  2
         return this.driverMap.get(stage);
 226  
     }
 227  
     
 228  
     /**
 229  
      * Returns an unmodifiable list of stage drivers that have been added
 230  
      * to the pipeline.
 231  
      * @return the list of drivers for stages in the pipeline
 232  
      */
 233  
     public final List<StageDriver> getStageDrivers() {
 234  0
         return Collections.unmodifiableList(this.drivers);
 235  
     }
 236  
     
 237  
     /**
 238  
      * Adds a branch to the pipeline.
 239  
      * @param key the string identifier that will be used to refer to the added branch
 240  
      * @param pipeline the branch pipeline
 241  
      * @throws org.apache.commons.pipeline.validation.ValidationException if the pipeline has a non-null {@link PipelineValidator} and the branch
 242  
      * cannot consume the data produced for the branch by stages in the pipeline.
 243  
      */
 244  
     public void addBranch(String key, Pipeline branch) throws ValidationException {
 245  3
         if (key == null)
 246  0
             throw new IllegalArgumentException("Branch key may not be null.");
 247  3
         if (MAIN_BRANCH.equalsIgnoreCase(key))
 248  0
             throw new IllegalArgumentException("Branch key name \"" + MAIN_BRANCH + "\" is reserved.");
 249  3
         if (branch == null)
 250  0
             throw new IllegalArgumentException("Illegal attempt to set reference to null branch.");
 251  3
         if (branch == this || branch.hasBranch(this))
 252  0
             throw new IllegalArgumentException("Illegal attempt to set reference to self as a branch (infinite recursion potential)");
 253  
         
 254  3
         if (validator != null) {
 255  0
             List<ValidationFailure> errors = validator.validateAddBranch(this, key, branch);
 256  0
             if (errors != null && !errors.isEmpty()) {
 257  0
                 throw new ValidationException("An error occurred adding branch pipeline " + branch, errors);
 258  
             }
 259  
         }
 260  
         
 261  3
         branch.parent = this;
 262  3
         this.branches.put(key, branch);
 263  3
     }
 264  
     
 265  
     /**
 266  
      * Returns an unmodifiable map of branch pipelines, keyed by branch identifier.
 267  
      * @return the map of registered branch pipelines, keyed by branch identifier
 268  
      */
 269  
     public Map<String,Pipeline> getBranches() {
 270  2
         return Collections.unmodifiableMap(branches);
 271  
     }
 272  
     
 273  
     /**
 274  
      * Simple method that recursively checks whether the specified
 275  
      * pipeline is a branch of this pipeline.
 276  
      * @param pipeline the candidate branch
 277  
      * @return true if the specified pipeline is a branch of this pipeline, or recursively
 278  
      * a branch of a branch. Tests are performed using reference equality.
 279  
      */
 280  
     private boolean hasBranch(Pipeline pipeline) {
 281  3
         if (branches.containsValue(pipeline)) return true;
 282  3
         for (Pipeline branch : branches.values()) {
 283  0
             if (branch.hasBranch(pipeline)) return true;
 284  
         }
 285  
         
 286  3
         return false;
 287  
     }
 288  
     
 289  
     /**
 290  
      * Returns a feeder for the first stage if the pipeline is not empty
 291  
      * @return the feeder to feed the first stage of the pipeline
 292  
      */
 293  
     public Feeder getSourceFeeder() {
 294  4
         if (drivers.isEmpty()) return this.terminalFeeder;
 295  3
         return drivers.peek().getFeeder();
 296  
     }
 297  
     
 298  
     /**
 299  
      * Gets the feeder that receives output from the final stage.
 300  
      * @return the terminal feeder being used to handle any output from the final stage. The default is {@link Feeder#VOID}
 301  
      */
 302  
     public Feeder getTerminalFeeder() {
 303  1
         return this.terminalFeeder;
 304  
     }
 305  
     
 306  
     /**
 307  
      * Sets the terminal feeder used to handle any output from the final stage.
 308  
      * @param terminalFeeder the {@link Feeder} that will receive any output from the final stage
 309  
      */
 310  
     public void setTerminalFeeder(Feeder terminalFeeder) {
 311  1
         this.terminalFeeder = terminalFeeder;
 312  1
     }
 313  
     
 314  
     /**
 315  
      * Adds a job to be onStart on startup to the pipeline.
 316  
      */
 317  
     public void addLifecycleJob(PipelineLifecycleJob job) {
 318  0
         this.lifecycleJobs.add(job);
 319  0
     }
 320  
     
 321  
     /**
 322  
      * This method iterates over the stages in the pipeline, looking up a
 323  
      * {@link StageDriver} for each stage and using that driver to start the stage.
 324  
      * Startups may occur sequentially or in parallel, depending upon the stage driver
 325  
      * used.  If a the stage has not been configured with a {@link StageDriver},
 326  
      * we will use the default, non-threaded {@link SynchronousStageDriver}.
 327  
      *
 328  
      * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during pipeline startup
 329  
      */
 330  
     public void start() throws StageException {
 331  1
         for (PipelineLifecycleJob job : lifecycleJobs) job.onStart(this);
 332  1
         for (StageDriver driver: this.drivers) driver.start();
 333  1
         for (Pipeline branch : branches.values()) branch.start();
 334  1
     }
 335  
     
 336  
     /**
 337  
      * This method iterates over the stages in the pipeline, looking up a {@link StageDriver}
 338  
      * for each stage and using that driver to request that the stage finish
 339  
      * execution. The {@link StageDriver#finish(Stage)}
 340  
      * method will block until the stage's queue is exhausted, so this method
 341  
      * may be used to safely finalize all stages without the risk of
 342  
      * losing data in the queues.
 343  
      *
 344  
      * @throws org.apache.commons.pipeline.StageException Thrown if there is an unhandled error during stage shutdown
 345  
      */
 346  
     public void finish() throws StageException {
 347  1
         for (StageDriver driver: this.drivers) driver.finish();
 348  1
         for (Pipeline pipeline : branches.values()) pipeline.finish();
 349  1
         for (PipelineLifecycleJob job : lifecycleJobs) job.onFinish(this);
 350  1
     }
 351  
     
 352  
     /**
 353  
      * Runs the pipeline from start to finish.
 354  
      */
 355  
     public void run() {
 356  
         try {
 357  1
             start();
 358  1
             finish();
 359  0
         } catch (StageException e) {
 360  0
             throw new RuntimeException(e);
 361  1
         }
 362  1
     }
 363  
     
 364  
     /**
 365  
      * Returns the validator being used to validate the pipeline structure,
 366  
      * or null if no validation is being performed..
 367  
      * @return Validator used to validate pipeline structure.
 368  
      */
 369  
     public PipelineValidator getValidator() {
 370  0
         return this.validator;
 371  
     }
 372  
     
 373  
     /**
 374  
      * Sets the validator used to validate the pipeline as it is contstructed.
 375  
      * Setting the validator to null disables validation
 376  
      * @param validator Validator used to validate pipeline structure.
 377  
      */
 378  
     public void setValidator(PipelineValidator validator) {
 379  0
         this.validator = validator;
 380  0
     }
 381  
     
 382  
     /**
 383  
      * Returns the parent of this pipeline, if it is a branch
 384  
      * @return parent Pipeline, or null if this is the main pipeline
 385  
      */
 386  
     public Pipeline getParent() {
 387  0
         return parent;
 388  
     }
 389  
 }