Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
Pipeline |
|
| 0.0;0 | ||||
Pipeline$1 |
|
| 0.0;0 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.commons.pipeline; | |
19 | ||
20 | import java.util.ArrayList; | |
21 | import java.util.Collection; | |
22 | import java.util.Collections; | |
23 | import java.util.EventObject; | |
24 | import java.util.HashMap; | |
25 | import java.util.LinkedList; | |
26 | import java.util.List; | |
27 | import java.util.Map; | |
28 | import org.apache.commons.pipeline.driver.SynchronousStageDriver; | |
29 | import org.apache.commons.pipeline.validation.PipelineValidator; | |
30 | import org.apache.commons.pipeline.validation.ValidationException; | |
31 | import org.apache.commons.pipeline.validation.ValidationFailure; | |
32 | ||
33 | /** | |
34 | * This class represents a processing system consisting of a number of stages | |
35 | * and branches. Each stage contains a queue and manages one or more threads | |
36 | * that process data in that stage's queue and allow processed data to be | |
37 | * passed along to subsequent stages and onto branches of the pipeline.<P> | |
38 | * | |
39 | * This class allows all stages in the pipeline to be managed collectively | |
40 | * with methods to start and stop processing for all stages, as well as | |
41 | * a simple framework for asynchronous event-based communication between stages. | |
42 | */ | |
43 | 326 | public class Pipeline implements Runnable, StageContext { |
44 | /** | |
45 | * The branch key for the main line of production. This value is reserved | |
46 | * and may not be used as a key for other branch pipelines. | |
47 | */ | |
48 | public static final String MAIN_BRANCH = "main"; | |
49 | ||
50 | //The logger used for reporting by this pipeline | |
51 | //private final Log log = LogFactory.getLog(Pipeline.class); | |
52 | ||
53 | // List of stages in the pipeline, encapsulated in the drivers | |
54 | // that will be used to onStart them. | |
55 | private final LinkedList<StageDriver> drivers; | |
56 | private final Map<Stage, StageDriver> driverMap; | |
57 | ||
58 | // The list of stages in the pipeline. | |
59 | private final LinkedList<Stage> stages; | |
60 | ||
61 | // Map of pipeline branches where the keys are branch names. | |
62 | private final Map<String,Pipeline> branches; | |
63 | ||
64 | // Used to store a reference to the parent pipeline, if this is a branch | |
65 | private Pipeline parent; | |
66 | ||
67 | // The list of listeners registered with the pipeline. | |
68 | private final List<StageEventListener> listeners; | |
69 | ||
70 | // Holds value of property validator. | |
71 | private PipelineValidator validator; | |
72 | ||
73 | // Feeder used to handle output of final stage | |
74 | 10 | private Feeder terminalFeeder = Feeder.VOID; |
75 | ||
76 | // Global environment variables | |
77 | 10 | private Map<String,Object> env = Collections.synchronizedMap(new HashMap<String,Object>()); |
78 | ||
79 | // List of jobs to be run at defined points in pipeline lifecycle | |
80 | 10 | private Collection<PipelineLifecycleJob> lifecycleJobs = new ArrayList<PipelineLifecycleJob>(); |
81 | ||
82 | /** | |
83 | * Creates and initializes a new Pipeline. | |
84 | */ | |
85 | 10 | public Pipeline() { |
86 | 10 | this.drivers = new LinkedList<StageDriver>(); |
87 | 10 | this.driverMap = new HashMap<Stage, StageDriver>(); |
88 | 10 | this.stages = new LinkedList<Stage>(); |
89 | 10 | this.branches = new HashMap<String,Pipeline>(); |
90 | 10 | this.listeners = Collections.synchronizedList(new ArrayList<StageEventListener>()); |
91 | 10 | } |
92 | ||
93 | /** | |
94 | * {@inheritDoc} | |
95 | */ | |
96 | public void registerListener(StageEventListener listener) { | |
97 | 5 | listeners.add(listener); |
98 | 5 | } |
99 | ||
100 | /** | |
101 | * {@inheritDoc} | |
102 | */ | |
103 | public Collection<StageEventListener> getRegisteredListeners() { | |
104 | 4 | return this.listeners; |
105 | } | |
106 | ||
107 | /** | |
108 | * Asynchronously notifies each registered listener of an event and propagates | |
109 | * the event to any attached branches and the parent pipeline. | |
110 | * | |
111 | * @param ev The event to be sent to registered listeners | |
112 | */ | |
113 | public void raise(final EventObject ev) { | |
114 | 162 | new Thread() { |
115 | public void run() { | |
116 | //first, recursively find the root pipeline | |
117 | 162 | Pipeline root = Pipeline.this; |
118 | 163 | while (root.parent != null) root = root.parent; |
119 | ||
120 | //notify the listeners from the root pipeline | |
121 | 162 | root.notifyListeners(ev); |
122 | 162 | } |
123 | }.start(); | |
124 | 162 | } |
125 | ||
126 | /** | |
127 | * Notify all listeners and recursively notify child branches of the | |
128 | * specified event. This method does not propagate events to the | |
129 | * parent pipeline. | |
130 | */ | |
131 | private void notifyListeners(EventObject ev) { | |
132 | 164 | for (StageEventListener listener : listeners) listener.notify(ev); |
133 | 164 | for (Pipeline branch : branches.values()) branch.notifyListeners(ev); |
134 | 164 | } |
135 | ||
136 | /** | |
137 | * {@inheritDoc} | |
138 | */ | |
139 | public Feeder getDownstreamFeeder(Stage stage) { | |
140 | 8 | if (stage == null) throw new IllegalArgumentException("Unable to look up downstream feeder for null stage."); |
141 | 8 | if (stage == drivers.getLast().getStage()) { |
142 | 1 | return this.terminalFeeder; |
143 | } else { | |
144 | //Iterate backwards over the list until the stage is found, then return | |
145 | //the feeder for the subsequent stage. Comparisons are done using reference | |
146 | //equality. | |
147 | 17 | for (int i = drivers.size() - 2; i >= 0; i--) { |
148 | 17 | if (stage == drivers.get(i).getStage()) return drivers.get(i+1).getFeeder(); |
149 | } | |
150 | ||
151 | 0 | throw new IllegalStateException("Unable to find stage " + stage + " in pipeline."); |
152 | } | |
153 | } | |
154 | ||
155 | /** | |
156 | * {@inheritDoc} | |
157 | */ | |
158 | public Feeder getBranchFeeder(String branch) { | |
159 | 2 | if (!getBranches().containsKey(branch)) { |
160 | 0 | throw new IllegalStateException("Unable to find branch in pipeline: '" + branch + "'"); |
161 | } | |
162 | ||
163 | 2 | return branches.get(branch).getSourceFeeder(); |
164 | } | |
165 | ||
166 | /** | |
167 | * {@inheritDoc} | |
168 | */ | |
169 | public Object getEnv(String key) { | |
170 | 2 | return this.env.get(key); |
171 | } | |
172 | ||
173 | /** | |
174 | * Sets the value corresponding to the specified environment variable key. | |
175 | */ | |
176 | public void setEnv(String key, Object value) { | |
177 | 2 | this.env.put(key, value); |
178 | 2 | } |
179 | ||
180 | /** | |
181 | * Adds a {@link Stage} object to the end of this Pipeline. If a | |
182 | * {@link PipelineValidator} has been set using {@link #setValidator}, it will | |
183 | * be used to validate that the appended Stage can consume the output of the | |
184 | * previous stage of the pipeline. It does NOT validate the ability or availability | |
185 | * of branches to consume data produced by the appended stage. | |
186 | * | |
187 | * @param stage the stage to be added to the pipeline | |
188 | * @param driverFactory the factory that will be used to create a {@link StageDriver} that will run the stage | |
189 | * @throws ValidationException if there is a non-null validator set for this pipeline and an error is | |
190 | * encountered validating the addition of the stage to the pipeline. | |
191 | */ | |
192 | public final void addStage(Stage stage, StageDriverFactory driverFactory) throws ValidationException { | |
193 | 9 | if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null."); |
194 | ||
195 | 9 | if (validator != null) { |
196 | 0 | List<ValidationFailure> errors = validator.validateAddStage(this, stage, driverFactory); |
197 | 0 | if (errors != null && !errors.isEmpty()) { |
198 | 0 | throw new ValidationException("An error occurred adding stage " + stage.toString(), errors); |
199 | } | |
200 | } | |
201 | ||
202 | 9 | stage.init(this); |
203 | 9 | this.stages.add(stage); |
204 | ||
205 | 9 | StageDriver driver = driverFactory.createStageDriver(stage, this); |
206 | 9 | this.driverMap.put(stage, driver); |
207 | 9 | this.drivers.add(driver); |
208 | 9 | } |
209 | ||
210 | /** | |
211 | * Returns an unmodifiable list of stages that have been added to this | |
212 | * pipeline. | |
213 | * @return A list of the stages that have been added to the pipeline | |
214 | */ | |
215 | public final List<Stage> getStages() { | |
216 | 1 | return Collections.unmodifiableList(this.stages); |
217 | } | |
218 | ||
219 | /** | |
220 | * Return the StageDriver for the specified Stage. | |
221 | * | |
222 | * @return the StageDriver for the specified Stage. | |
223 | */ | |
224 | public final StageDriver getStageDriver(Stage stage) { | |
225 | 2 | return this.driverMap.get(stage); |
226 | } | |
227 | ||
228 | /** | |
229 | * Returns an unmodifiable list of stage drivers that have been added | |
230 | * to the pipeline. | |
231 | * @return the list of drivers for stages in the pipeline | |
232 | */ | |
233 | public final List<StageDriver> getStageDrivers() { | |
234 | 0 | return Collections.unmodifiableList(this.drivers); |
235 | } | |
236 | ||
237 | /** | |
238 | * Adds a branch to the pipeline. | |
239 | * @param key the string identifier that will be used to refer to the added branch | |
240 | * @param pipeline the branch pipeline | |
241 | * @throws org.apache.commons.pipeline.validation.ValidationException if the pipeline has a non-null {@link PipelineValidator} and the branch | |
242 | * cannot consume the data produced for the branch by stages in the pipeline. | |
243 | */ | |
244 | public void addBranch(String key, Pipeline branch) throws ValidationException { | |
245 | 3 | if (key == null) |
246 | 0 | throw new IllegalArgumentException("Branch key may not be null."); |
247 | 3 | if (MAIN_BRANCH.equalsIgnoreCase(key)) |
248 | 0 | throw new IllegalArgumentException("Branch key name \"" + MAIN_BRANCH + "\" is reserved."); |
249 | 3 | if (branch == null) |
250 | 0 | throw new IllegalArgumentException("Illegal attempt to set reference to null branch."); |
251 | 3 | if (branch == this || branch.hasBranch(this)) |
252 | 0 | throw new IllegalArgumentException("Illegal attempt to set reference to self as a branch (infinite recursion potential)"); |
253 | ||
254 | 3 | if (validator != null) { |
255 | 0 | List<ValidationFailure> errors = validator.validateAddBranch(this, key, branch); |
256 | 0 | if (errors != null && !errors.isEmpty()) { |
257 | 0 | throw new ValidationException("An error occurred adding branch pipeline " + branch, errors); |
258 | } | |
259 | } | |
260 | ||
261 | 3 | branch.parent = this; |
262 | 3 | this.branches.put(key, branch); |
263 | 3 | } |
264 | ||
265 | /** | |
266 | * Returns an unmodifiable map of branch pipelines, keyed by branch identifier. | |
267 | * @return the map of registered branch pipelines, keyed by branch identifier | |
268 | */ | |
269 | public Map<String,Pipeline> getBranches() { | |
270 | 2 | return Collections.unmodifiableMap(branches); |
271 | } | |
272 | ||
273 | /** | |
274 | * Simple method that recursively checks whether the specified | |
275 | * pipeline is a branch of this pipeline. | |
276 | * @param pipeline the candidate branch | |
277 | * @return true if the specified pipeline is a branch of this pipeline, or recursively | |
278 | * a branch of a branch. Tests are performed using reference equality. | |
279 | */ | |
280 | private boolean hasBranch(Pipeline pipeline) { | |
281 | 3 | if (branches.containsValue(pipeline)) return true; |
282 | 3 | for (Pipeline branch : branches.values()) { |
283 | 0 | if (branch.hasBranch(pipeline)) return true; |
284 | } | |
285 | ||
286 | 3 | return false; |
287 | } | |
288 | ||
289 | /** | |
290 | * Returns a feeder for the first stage if the pipeline is not empty | |
291 | * @return the feeder to feed the first stage of the pipeline | |
292 | */ | |
293 | public Feeder getSourceFeeder() { | |
294 | 4 | if (drivers.isEmpty()) return this.terminalFeeder; |
295 | 3 | return drivers.peek().getFeeder(); |
296 | } | |
297 | ||
298 | /** | |
299 | * Gets the feeder that receives output from the final stage. | |
300 | * @return the terminal feeder being used to handle any output from the final stage. The default is {@link Feeder#VOID} | |
301 | */ | |
302 | public Feeder getTerminalFeeder() { | |
303 | 1 | return this.terminalFeeder; |
304 | } | |
305 | ||
306 | /** | |
307 | * Sets the terminal feeder used to handle any output from the final stage. | |
308 | * @param terminalFeeder the {@link Feeder} that will receive any output from the final stage | |
309 | */ | |
310 | public void setTerminalFeeder(Feeder terminalFeeder) { | |
311 | 1 | this.terminalFeeder = terminalFeeder; |
312 | 1 | } |
313 | ||
314 | /** | |
315 | * Adds a job to be onStart on startup to the pipeline. | |
316 | */ | |
317 | public void addLifecycleJob(PipelineLifecycleJob job) { | |
318 | 0 | this.lifecycleJobs.add(job); |
319 | 0 | } |
320 | ||
321 | /** | |
322 | * This method iterates over the stages in the pipeline, looking up a | |
323 | * {@link StageDriver} for each stage and using that driver to start the stage. | |
324 | * Startups may occur sequentially or in parallel, depending upon the stage driver | |
325 | * used. If a the stage has not been configured with a {@link StageDriver}, | |
326 | * we will use the default, non-threaded {@link SynchronousStageDriver}. | |
327 | * | |
328 | * @throws org.apache.commons.pipeline.StageException Thrown if there is an error during pipeline startup | |
329 | */ | |
330 | public void start() throws StageException { | |
331 | 1 | for (PipelineLifecycleJob job : lifecycleJobs) job.onStart(this); |
332 | 1 | for (StageDriver driver: this.drivers) driver.start(); |
333 | 1 | for (Pipeline branch : branches.values()) branch.start(); |
334 | 1 | } |
335 | ||
336 | /** | |
337 | * This method iterates over the stages in the pipeline, looking up a {@link StageDriver} | |
338 | * for each stage and using that driver to request that the stage finish | |
339 | * execution. The {@link StageDriver#finish(Stage)} | |
340 | * method will block until the stage's queue is exhausted, so this method | |
341 | * may be used to safely finalize all stages without the risk of | |
342 | * losing data in the queues. | |
343 | * | |
344 | * @throws org.apache.commons.pipeline.StageException Thrown if there is an unhandled error during stage shutdown | |
345 | */ | |
346 | public void finish() throws StageException { | |
347 | 1 | for (StageDriver driver: this.drivers) driver.finish(); |
348 | 1 | for (Pipeline pipeline : branches.values()) pipeline.finish(); |
349 | 1 | for (PipelineLifecycleJob job : lifecycleJobs) job.onFinish(this); |
350 | 1 | } |
351 | ||
352 | /** | |
353 | * Runs the pipeline from start to finish. | |
354 | */ | |
355 | public void run() { | |
356 | try { | |
357 | 1 | start(); |
358 | 1 | finish(); |
359 | 0 | } catch (StageException e) { |
360 | 0 | throw new RuntimeException(e); |
361 | 1 | } |
362 | 1 | } |
363 | ||
364 | /** | |
365 | * Returns the validator being used to validate the pipeline structure, | |
366 | * or null if no validation is being performed.. | |
367 | * @return Validator used to validate pipeline structure. | |
368 | */ | |
369 | public PipelineValidator getValidator() { | |
370 | 0 | return this.validator; |
371 | } | |
372 | ||
373 | /** | |
374 | * Sets the validator used to validate the pipeline as it is contstructed. | |
375 | * Setting the validator to null disables validation | |
376 | * @param validator Validator used to validate pipeline structure. | |
377 | */ | |
378 | public void setValidator(PipelineValidator validator) { | |
379 | 0 | this.validator = validator; |
380 | 0 | } |
381 | ||
382 | /** | |
383 | * Returns the parent of this pipeline, if it is a branch | |
384 | * @return parent Pipeline, or null if this is the main pipeline | |
385 | */ | |
386 | public Pipeline getParent() { | |
387 | 0 | return parent; |
388 | } | |
389 | } |