Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
SynchronousStageDriver |
|
| 0.0;0 | ||||
SynchronousStageDriver$1 |
|
| 0.0;0 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.commons.pipeline.driver; | |
19 | ||
20 | import java.util.LinkedList; | |
21 | import java.util.Queue; | |
22 | import org.apache.commons.logging.Log; | |
23 | import org.apache.commons.logging.LogFactory; | |
24 | import org.apache.commons.pipeline.Feeder; | |
25 | import org.apache.commons.pipeline.Stage; | |
26 | import org.apache.commons.pipeline.StageException; | |
27 | import org.apache.commons.pipeline.StageContext; | |
28 | import static org.apache.commons.pipeline.StageDriver.State.*; | |
29 | import org.apache.commons.pipeline.StageDriver.State; | |
30 | import static org.apache.commons.pipeline.driver.FaultTolerance.*; | |
31 | ||
32 | /** | |
33 | * This is a non-threaded version of the AbstractStageDriver. | |
34 | */ | |
35 | 0 | public class SynchronousStageDriver extends AbstractStageDriver { |
36 | 9 | private final Log log = LogFactory.getLog(SynchronousStageDriver.class); |
37 | ||
38 | //queue of objects to be processed that are fed to the driver | |
39 | //when it is not in a running state | |
40 | 9 | private Queue<Object> queue = new LinkedList<Object>(); |
41 | ||
42 | //Feeder used to feed objects to this stage | |
43 | 9 | private final Feeder feeder = new Feeder() { |
44 | public void feed(Object obj) { | |
45 | 8 | synchronized (SynchronousStageDriver.this) { |
46 | 8 | if (currentState == ERROR) throw new IllegalStateException("Unable to process data: driver in fatal error state."); |
47 | 8 | if (currentState != RUNNING) { //enqueue objects if stage has not been started |
48 | 0 | queue.add(obj); |
49 | 0 | return; |
50 | } | |
51 | 8 | } |
52 | ||
53 | try { | |
54 | 8 | stage.process(obj); |
55 | 0 | } catch (StageException e) { |
56 | 0 | recordProcessingException(obj, e); |
57 | 0 | if (faultTolerance == NONE) throw fatalError(e); |
58 | 8 | } |
59 | 8 | } |
60 | }; | |
61 | ||
62 | /** | |
63 | * Creates a new instance of SimpleStageDriver | |
64 | * @param stage The stage to be run | |
65 | * @param context The context in which the stage will be run | |
66 | */ | |
67 | public SynchronousStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) { | |
68 | 9 | super(stage, context, faultTolerance); |
69 | 9 | } |
70 | ||
71 | /** | |
72 | * Get the feeder for the encapsulated stage. Since the SynchronousStageDriver | |
73 | * is designed to run the stage in the main thread of execution, calls | |
74 | * to {@link Feeder#feed(Object)} on the returned feeder will trigger processing | |
75 | * of the object fed to the stage. | |
76 | * @return The Feeder instance for the stage. | |
77 | */ | |
78 | public Feeder getFeeder() { | |
79 | 11 | return this.feeder; |
80 | } | |
81 | ||
82 | /** | |
83 | * Performs preprocessing and updates the driver state. | |
84 | * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state to be started or an error occurs | |
85 | * during preprocessing. | |
86 | */ | |
87 | public synchronized void start() throws StageException { | |
88 | 5 | if (this.currentState == STOPPED) { |
89 | try { | |
90 | 5 | stage.preprocess(); |
91 | 5 | setState(RUNNING); |
92 | 0 | } catch (StageException e) { |
93 | 0 | throw fatalError(e); |
94 | 5 | } |
95 | ||
96 | // feed any queued values before returning control | |
97 | 5 | while (!queue.isEmpty()) this.getFeeder().feed(queue.remove()); |
98 | } else { | |
99 | 0 | throw new IllegalStateException("Illegal attempt to start driver from state: " + this.currentState); |
100 | } | |
101 | 5 | } |
102 | ||
103 | /** | |
104 | * Performs postprocessing and releases stage resources, and updates the driver | |
105 | * state accordingly. | |
106 | * @throws org.apache.commons.pipeline.StageException Thrown if an error occurs during postprocessing | |
107 | */ | |
108 | public synchronized void finish() throws StageException { | |
109 | 5 | if (this.currentState == RUNNING) { |
110 | try { | |
111 | 5 | testAndSetState(RUNNING, STOP_REQUESTED); |
112 | 5 | if (this.currentState == STOP_REQUESTED) stage.postprocess(); |
113 | 0 | } catch (StageException e) { |
114 | 0 | throw fatalError(e); |
115 | } finally { | |
116 | 5 | stage.release(); |
117 | 5 | testAndSetState(STOP_REQUESTED, STOPPED); |
118 | 5 | } |
119 | } else { | |
120 | 0 | throw new IllegalStateException("Driver is not running (current state: " + this.currentState + ")"); |
121 | } | |
122 | 5 | } |
123 | ||
124 | /** | |
125 | * This method obtains a lock to set the current state of processing | |
126 | * to error, records the error and returns a RuntimeException encapsulating | |
127 | * the specified throwable. | |
128 | */ | |
129 | private RuntimeException fatalError(Throwable t) { | |
130 | try { | |
131 | 0 | setState(ERROR); |
132 | 0 | this.recordFatalError(t); |
133 | 0 | stage.release(); |
134 | 0 | this.notifyAll(); |
135 | 0 | } catch (Exception e) { |
136 | 0 | this.recordFatalError(e); |
137 | 0 | } |
138 | ||
139 | 0 | return new RuntimeException("Fatal error halted processing of stage: " + stage); |
140 | } | |
141 | } |