1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
|
20 | |
|
21 | |
package org.apache.commons.pipeline.driver; |
22 | |
|
23 | |
import java.lang.Thread.UncaughtExceptionHandler; |
24 | |
import java.util.concurrent.BlockingQueue; |
25 | |
import java.util.concurrent.TimeUnit; |
26 | |
import org.apache.commons.logging.Log; |
27 | |
import org.apache.commons.logging.LogFactory; |
28 | |
import org.apache.commons.pipeline.driver.AbstractStageDriver; |
29 | |
import org.apache.commons.pipeline.Feeder; |
30 | |
import org.apache.commons.pipeline.StageDriver; |
31 | |
import org.apache.commons.pipeline.Stage; |
32 | |
import org.apache.commons.pipeline.StageContext; |
33 | |
import org.apache.commons.pipeline.StageException; |
34 | |
import static org.apache.commons.pipeline.StageDriver.State.*; |
35 | |
import org.apache.commons.pipeline.StageDriver.State; |
36 | |
import static org.apache.commons.pipeline.driver.FaultTolerance.*; |
37 | |
|
38 | |
|
39 | |
|
40 | |
|
41 | |
|
42 | 2784 | public class DedicatedThreadStageDriver extends AbstractStageDriver { |
43 | 15 | private final Log log = LogFactory.getLog(DedicatedThreadStageDriver.class); |
44 | |
|
45 | |
|
46 | |
private long timeout; |
47 | |
|
48 | |
|
49 | |
private Thread workerThread; |
50 | |
|
51 | |
|
52 | |
private BlockingQueue queue; |
53 | |
|
54 | |
|
55 | 15 | private final Feeder feeder = new Feeder() { |
56 | |
public void feed(Object obj) { |
57 | 418 | if (log.isDebugEnabled()) log.debug(obj + " is being fed to stage " + stage |
58 | |
+ " (" + DedicatedThreadStageDriver.this.queue.remainingCapacity() + " available slots in queue)"); |
59 | |
try { |
60 | 418 | DedicatedThreadStageDriver.this.queue.put(obj); |
61 | 0 | } catch (InterruptedException e) { |
62 | 0 | throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object " |
63 | |
+ obj + " in queue for stage " + stage, e); |
64 | 418 | } |
65 | |
|
66 | 418 | synchronized(DedicatedThreadStageDriver.this) { |
67 | 418 | DedicatedThreadStageDriver.this.notifyAll(); |
68 | 418 | } |
69 | 418 | } |
70 | |
}; |
71 | |
|
72 | |
|
73 | |
|
74 | |
|
75 | |
|
76 | |
|
77 | |
|
78 | |
|
79 | |
|
80 | |
|
81 | |
|
82 | |
|
83 | |
|
84 | |
|
85 | |
|
86 | |
|
87 | |
|
88 | |
public DedicatedThreadStageDriver(Stage stage, StageContext context, BlockingQueue queue, long timeout, FaultTolerance faultTolerance) { |
89 | 15 | super(stage, context, faultTolerance); |
90 | 15 | this.queue = queue; |
91 | 15 | this.timeout = timeout; |
92 | 15 | } |
93 | |
|
94 | |
|
95 | |
|
96 | |
|
97 | |
|
98 | |
public Feeder getFeeder() { |
99 | 20 | return this.feeder; |
100 | |
} |
101 | |
|
102 | |
|
103 | |
|
104 | |
|
105 | |
|
106 | |
public synchronized void start() throws StageException { |
107 | 14 | if (this.currentState == STOPPED) { |
108 | 14 | log.debug("Starting worker thread for stage " + stage + "."); |
109 | 14 | this.workerThread = new WorkerThread(stage); |
110 | 14 | this.workerThread.start(); |
111 | 14 | log.debug("Worker thread for stage " + stage + " started."); |
112 | |
|
113 | |
|
114 | |
try { |
115 | 40 | while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait(); |
116 | 0 | } catch (InterruptedException e) { |
117 | 0 | throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for thread startup.", e); |
118 | 14 | } |
119 | |
} else { |
120 | 0 | throw new IllegalStateException("Attempt to start driver in state " + this.currentState); |
121 | |
} |
122 | 14 | } |
123 | |
|
124 | |
|
125 | |
|
126 | |
|
127 | |
|
128 | |
public synchronized void finish() throws StageException { |
129 | 14 | if (currentState == STOPPED) { |
130 | 0 | throw new IllegalStateException("The driver is not currently running."); |
131 | |
} |
132 | |
|
133 | |
try { |
134 | 14 | while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait(); |
135 | |
|
136 | |
|
137 | 14 | testAndSetState(RUNNING, STOP_REQUESTED); |
138 | |
|
139 | 28 | while ( !(this.currentState == FINISHED || this.currentState == ERROR) ) this.wait(); |
140 | |
|
141 | 14 | log.debug("Waiting for worker thread stop for stage " + stage + "."); |
142 | 14 | this.workerThread.join(); |
143 | 14 | log.debug("Worker thread for stage " + stage + " halted"); |
144 | |
|
145 | 0 | } catch (InterruptedException e) { |
146 | 0 | throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for graceful shutdown.", e); |
147 | 14 | } |
148 | |
|
149 | 14 | setState(STOPPED); |
150 | 14 | } |
151 | |
|
152 | |
|
153 | |
|
154 | |
|
155 | 15 | private UncaughtExceptionHandler workerThreadExceptionHandler = new UncaughtExceptionHandler() { |
156 | |
public void uncaughtException(Thread t, Throwable e) { |
157 | 0 | setState(ERROR); |
158 | 0 | recordFatalError(e); |
159 | 0 | log.error("Uncaught exception in stage " + stage, e); |
160 | 0 | } |
161 | |
}; |
162 | |
|
163 | |
|
164 | |
|
165 | |
|
166 | |
|
167 | |
|
168 | |
|
169 | |
|
170 | |
|
171 | |
|
172 | |
|
173 | |
|
174 | |
|
175 | |
|
176 | |
private class WorkerThread extends Thread { |
177 | |
|
178 | |
private Stage stage; |
179 | |
|
180 | 14 | public WorkerThread(Stage stage) { |
181 | 14 | this.setUncaughtExceptionHandler(workerThreadExceptionHandler); |
182 | 14 | this.stage = stage; |
183 | 14 | } |
184 | |
|
185 | |
public final void run() { |
186 | 14 | setState(STARTED); |
187 | |
|
188 | |
try { |
189 | 14 | if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "..."); |
190 | 14 | stage.preprocess(); |
191 | 14 | if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete."); |
192 | |
|
193 | |
|
194 | 14 | testAndSetState(STARTED, RUNNING); |
195 | 451 | running: while (currentState != ERROR) { |
196 | |
try { |
197 | 451 | Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS); |
198 | 451 | if (obj == null) { |
199 | 32 | if (currentState == STOP_REQUESTED) break running; |
200 | |
|
201 | |
} else { |
202 | |
try { |
203 | 418 | stage.process(obj); |
204 | 1 | } catch (StageException e) { |
205 | 1 | recordProcessingException(obj, e); |
206 | 1 | if (faultTolerance == NONE) throw e; |
207 | 0 | } catch (RuntimeException e) { |
208 | 0 | recordProcessingException(obj, e); |
209 | 0 | if (faultTolerance == CHECKED || faultTolerance == NONE) throw e; |
210 | 418 | } |
211 | |
} |
212 | 0 | } catch (InterruptedException e) { |
213 | 0 | throw new RuntimeException("Worker thread unexpectedly interrupted while waiting on data for stage " + stage, e); |
214 | 437 | } |
215 | |
} |
216 | 14 | if (log.isDebugEnabled()) log.debug("Stage " + stage + " exited running state."); |
217 | |
|
218 | 14 | if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "..."); |
219 | 14 | stage.postprocess(); |
220 | 14 | if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete."); |
221 | |
|
222 | 0 | } catch (StageException e) { |
223 | 0 | log.error("An error occurred in the stage " + stage, e); |
224 | 0 | recordFatalError(e); |
225 | 0 | setState(ERROR); |
226 | |
} finally { |
227 | 14 | if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "..."); |
228 | 14 | stage.release(); |
229 | 14 | if (log.isDebugEnabled()) log.debug("Stage " + stage + " released."); |
230 | |
} |
231 | |
|
232 | |
|
233 | 14 | testAndSetState(STOP_REQUESTED, FINISHED); |
234 | 14 | } |
235 | |
} |
236 | |
|
237 | |
|
238 | |
|
239 | |
|
240 | |
|
241 | |
public int getQueueSize() { |
242 | 0 | return this.queue.size() + this.queue.remainingCapacity(); |
243 | |
} |
244 | |
|
245 | |
|
246 | |
|
247 | |
|
248 | |
|
249 | |
|
250 | |
public long getTimeout() { |
251 | 0 | return this.timeout; |
252 | |
} |
253 | |
} |