1 | package org.apache.continuum.taskqueueexecutor; |
2 | |
3 | /* |
4 | * Licensed to the Apache Software Foundation (ASF) under one |
5 | * or more contributor license agreements. See the NOTICE file |
6 | * distributed with this work for additional information |
7 | * regarding copyright ownership. The ASF licenses this file |
8 | * to you under the Apache License, Version 2.0 (the |
9 | * "License"); you may not use this file except in compliance |
10 | * with the License. You may obtain a copy of the License at |
11 | * |
12 | * http://www.apache.org/licenses/LICENSE-2.0 |
13 | * |
14 | * Unless required by applicable law or agreed to in writing, |
15 | * software distributed under the License is distributed on an |
16 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
17 | * KIND, either express or implied. See the License for the |
18 | * specific language governing permissions and limitations |
19 | * under the License. |
20 | */ |
21 | |
22 | import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; |
23 | import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; |
24 | import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable; |
25 | import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException; |
26 | import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException; |
27 | import org.codehaus.plexus.taskqueue.Task; |
28 | import org.codehaus.plexus.taskqueue.TaskQueue; |
29 | import org.codehaus.plexus.taskqueue.execution.TaskExecutionException; |
30 | import org.codehaus.plexus.taskqueue.execution.TaskExecutor; |
31 | import org.codehaus.plexus.taskqueue.execution.TaskQueueExecutor; |
32 | import org.codehaus.plexus.util.StringUtils; |
33 | import org.slf4j.Logger; |
34 | import org.slf4j.LoggerFactory; |
35 | |
36 | import edu.emory.mathcs.backport.java.util.concurrent.CancellationException; |
37 | import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException; |
38 | import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; |
39 | import edu.emory.mathcs.backport.java.util.concurrent.Executors; |
40 | import edu.emory.mathcs.backport.java.util.concurrent.Future; |
41 | import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; |
42 | import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException; |
43 | |
44 | /** |
45 | * Modified plexus ThreadedTaskQueueExecutor |
46 | */ |
47 | public class ParallelBuildsThreadedTaskQueueExecutor |
48 | implements TaskQueueExecutor, Initializable, Startable |
49 | { |
50 | private static final Logger log = LoggerFactory.getLogger( ParallelBuildsThreadedTaskQueueExecutor.class ); |
51 | |
52 | private static final int SHUTDOWN = 1; |
53 | |
54 | private static final int CANCEL_TASK = 2; |
55 | |
56 | /** |
57 | * @requirement |
58 | */ |
59 | private TaskQueue queue; |
60 | |
61 | /** |
62 | * @requirement |
63 | */ |
64 | private TaskExecutor executor; |
65 | |
66 | /** |
67 | * @configuration |
68 | */ |
69 | private String name; |
70 | |
71 | // ---------------------------------------------------------------------- |
72 | // |
73 | // ---------------------------------------------------------------------- |
74 | |
75 | private ExecutorRunnable executorRunnable; |
76 | |
77 | private ExecutorService executorService; |
78 | |
79 | private Task currentTask; |
80 | |
81 | private class ExecutorRunnable |
82 | extends Thread |
83 | { |
84 | private volatile int command; |
85 | |
86 | private boolean done; |
87 | |
88 | public void run() |
89 | { |
90 | while ( command != SHUTDOWN ) |
91 | { |
92 | final Task task; |
93 | |
94 | currentTask = null; |
95 | |
96 | try |
97 | { |
98 | task = queue.poll( 100, TimeUnit.MILLISECONDS ); |
99 | } |
100 | catch ( InterruptedException e ) |
101 | { |
102 | log.info( "Executor thread interrupted, command: " + |
103 | ( command == SHUTDOWN ? "Shutdown" : command == CANCEL_TASK ? "Cancel task" : "Unknown" ) ); |
104 | continue; |
105 | } |
106 | |
107 | if ( task == null ) |
108 | { |
109 | continue; |
110 | } |
111 | |
112 | currentTask = task; |
113 | |
114 | Future future = executorService.submit( new Runnable() |
115 | { |
116 | public void run() |
117 | { |
118 | try |
119 | { |
120 | executor.executeTask( task ); |
121 | } |
122 | catch ( TaskExecutionException e ) |
123 | { |
124 | log.error( "Error executing task", e ); |
125 | } |
126 | } |
127 | } ); |
128 | |
129 | try |
130 | { |
131 | waitForTask( task, future ); |
132 | } |
133 | catch ( ExecutionException e ) |
134 | { |
135 | log.error( "Error executing task", e ); |
136 | } |
137 | } |
138 | |
139 | currentTask = null; |
140 | |
141 | log.info( "Executor thread '" + name + "' exited." ); |
142 | |
143 | done = true; |
144 | |
145 | synchronized ( this ) |
146 | { |
147 | notifyAll(); |
148 | } |
149 | } |
150 | |
151 | private void waitForTask( Task task, Future future ) |
152 | throws ExecutionException |
153 | { |
154 | boolean stop = false; |
155 | |
156 | while ( !stop ) |
157 | { |
158 | try |
159 | { |
160 | if ( task.getMaxExecutionTime() == 0 ) |
161 | { |
162 | log.debug( "Waiting indefinitely for task to complete" ); |
163 | future.get(); |
164 | return; |
165 | } |
166 | else |
167 | { |
168 | log.debug( "Waiting at most " + task.getMaxExecutionTime() + "ms for task completion" ); |
169 | future.get( task.getMaxExecutionTime(), TimeUnit.MILLISECONDS ); |
170 | log.debug( "Task completed within " + task.getMaxExecutionTime() + "ms" ); |
171 | return; |
172 | } |
173 | } |
174 | catch ( InterruptedException e ) |
175 | { |
176 | switch ( command ) |
177 | { |
178 | case SHUTDOWN: |
179 | { |
180 | log.info( "Shutdown command received. Cancelling task." ); |
181 | cancel( future ); |
182 | return; |
183 | } |
184 | |
185 | case CANCEL_TASK: |
186 | { |
187 | command = 0; |
188 | log.info( "Cancelling task" ); |
189 | cancel( future ); |
190 | return; |
191 | } |
192 | |
193 | default: |
194 | // when can this thread be interrupted, and should we ignore it if shutdown = false? |
195 | log.warn( "Interrupted while waiting for task to complete; ignoring", e ); |
196 | break; |
197 | } |
198 | } |
199 | catch ( TimeoutException e ) |
200 | { |
201 | log.warn( "Task " + task + " didn't complete within time, cancelling it." ); |
202 | cancel( future ); |
203 | return; |
204 | } |
205 | catch ( CancellationException e ) |
206 | { |
207 | log.warn( "The task was cancelled", e ); |
208 | return; |
209 | } |
210 | } |
211 | } |
212 | |
213 | private void cancel( Future future ) |
214 | { |
215 | if ( !future.cancel( true ) ) |
216 | { |
217 | if ( !future.isDone() && !future.isCancelled() ) |
218 | { |
219 | log.warn( "Unable to cancel task" ); |
220 | } |
221 | else |
222 | { |
223 | log.warn( |
224 | "Task not cancelled (Flags: done: " + future.isDone() + " cancelled: " + future.isCancelled() + |
225 | ")" ); |
226 | } |
227 | } |
228 | else |
229 | { |
230 | log.debug( "Task successfully cancelled" ); |
231 | } |
232 | } |
233 | |
234 | public synchronized void shutdown() |
235 | { |
236 | log.debug( "Signalling executor thread to shutdown" ); |
237 | |
238 | command = SHUTDOWN; |
239 | |
240 | interrupt(); |
241 | } |
242 | |
243 | public synchronized boolean cancelTask( Task task ) |
244 | { |
245 | if ( !task.equals( currentTask ) ) |
246 | { |
247 | log.debug( "Not cancelling task - it is not running" ); |
248 | return false; |
249 | } |
250 | |
251 | if ( command != SHUTDOWN ) |
252 | { |
253 | log.debug( "Signalling executor thread to cancel task" ); |
254 | |
255 | command = CANCEL_TASK; |
256 | |
257 | interrupt(); |
258 | } |
259 | else |
260 | { |
261 | log.debug( "Executor thread already stopping; task will be cancelled automatically" ); |
262 | } |
263 | |
264 | return true; |
265 | } |
266 | |
267 | public boolean isDone() |
268 | { |
269 | return done; |
270 | } |
271 | } |
272 | |
273 | // ---------------------------------------------------------------------- |
274 | // Component lifecycle |
275 | // ---------------------------------------------------------------------- |
276 | |
277 | public void initialize() |
278 | throws InitializationException |
279 | { |
280 | if ( StringUtils.isEmpty( name ) ) |
281 | { |
282 | throw new IllegalArgumentException( "'name' must be set." ); |
283 | } |
284 | } |
285 | |
286 | public void start() |
287 | throws StartingException |
288 | { |
289 | log.info( "Starting task executor, thread name '" + name + "'." ); |
290 | |
291 | this.executorService = Executors.newCachedThreadPool(); |
292 | |
293 | executorRunnable = new ExecutorRunnable(); |
294 | |
295 | executorRunnable.setDaemon( true ); |
296 | |
297 | executorRunnable.start(); |
298 | } |
299 | |
300 | public void stop() |
301 | throws StoppingException |
302 | { |
303 | executorRunnable.shutdown(); |
304 | |
305 | int maxSleep = 10 * 1000; // 10 seconds |
306 | |
307 | int interval = 1000; |
308 | |
309 | long endTime = System.currentTimeMillis() + maxSleep; |
310 | |
311 | while ( !executorRunnable.isDone() && executorRunnable.isAlive() ) |
312 | { |
313 | if ( System.currentTimeMillis() > endTime ) |
314 | { |
315 | log.warn( "Timeout waiting for executor thread '" + name + "' to stop, aborting" ); |
316 | break; |
317 | } |
318 | |
319 | log.info( "Waiting until task executor '" + name + "' is idling..." ); |
320 | |
321 | try |
322 | { |
323 | synchronized ( executorRunnable ) |
324 | { |
325 | executorRunnable.wait( interval ); |
326 | } |
327 | } |
328 | catch ( InterruptedException ex ) |
329 | { |
330 | // ignore |
331 | } |
332 | |
333 | // notify again, just in case. |
334 | executorRunnable.shutdown(); |
335 | } |
336 | } |
337 | |
338 | public Task getCurrentTask() |
339 | { |
340 | return currentTask; |
341 | } |
342 | |
343 | public synchronized boolean cancelTask( Task task ) |
344 | { |
345 | return executorRunnable.cancelTask( task ); |
346 | } |
347 | |
348 | public String getName() |
349 | { |
350 | return name; |
351 | } |
352 | |
353 | public void setName( String name ) |
354 | { |
355 | this.name = name; |
356 | } |
357 | |
358 | public TaskQueue getQueue() |
359 | { |
360 | return queue; |
361 | } |
362 | } |