1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.surefire.junitcore.pc;
20
21 import java.util.Collection;
22 import java.util.Set;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.CopyOnWriteArraySet;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.ThreadPoolExecutor;
28
29 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
30 import org.junit.runner.Description;
31 import org.junit.runners.model.RunnerScheduler;
32
33
34
35
36
37
38
39
40
41
42
43
44
45 public class Scheduler implements RunnerScheduler {
46 private final Balancer balancer;
47
48 private final SchedulingStrategy strategy;
49
50 private final Set<Controller> slaves = new CopyOnWriteArraySet<>();
51
52 private final Description description;
53
54 private final ConsoleLogger logger;
55
56 private volatile boolean shutdown = false;
57
58 private volatile boolean started = false;
59
60 private volatile boolean finished = false;
61
62 private volatile Controller masterController;
63
64
65
66
67
68
69
70
71
72
73
74 public Scheduler(ConsoleLogger logger, Description description, SchedulingStrategy strategy) {
75 this(logger, description, strategy, -1);
76 }
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 public Scheduler(ConsoleLogger logger, Description description, SchedulingStrategy strategy, int concurrency) {
92 this(logger, description, strategy, BalancerFactory.createBalancer(concurrency));
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 public Scheduler(ConsoleLogger logger, Description description, SchedulingStrategy strategy, Balancer balancer) {
111 strategy.setDefaultShutdownHandler(newShutdownHandler());
112 this.logger = logger;
113 this.description = description;
114 this.strategy = strategy;
115 this.balancer = balancer;
116 masterController = null;
117 }
118
119
120
121
122
123
124
125
126
127
128
129
130 public Scheduler(
131 ConsoleLogger logger,
132 Description description,
133 Scheduler masterScheduler,
134 SchedulingStrategy strategy,
135 Balancer balancer) {
136 this(logger, description, strategy, balancer);
137 strategy.setDefaultShutdownHandler(newShutdownHandler());
138 masterScheduler.register(this);
139 }
140
141
142
143
144
145
146
147
148
149
150
151
152
153 public Scheduler(
154 ConsoleLogger logger,
155 Description description,
156 Scheduler masterScheduler,
157 SchedulingStrategy strategy,
158 int concurrency) {
159 this(logger, description, strategy, concurrency);
160 strategy.setDefaultShutdownHandler(newShutdownHandler());
161 masterScheduler.register(this);
162 }
163
164
165
166
167
168
169
170
171
172
173
174
175 public Scheduler(
176 ConsoleLogger logger, Description description, Scheduler masterScheduler, SchedulingStrategy strategy) {
177 this(logger, description, masterScheduler, strategy, 0);
178 }
179
180 private void setController(Controller masterController) {
181 if (masterController == null) {
182 throw new NullPointerException("null ExecutionController");
183 }
184 this.masterController = masterController;
185 }
186
187
188
189
190
191 private boolean register(Scheduler slave) {
192 boolean canRegister = slave != null && slave != this;
193 if (canRegister) {
194 Controller controller = new Controller(slave);
195 canRegister = !slaves.contains(controller);
196 if (canRegister) {
197 slaves.add(controller);
198 slave.setController(controller);
199 }
200 }
201 return canRegister;
202 }
203
204
205
206
207 private boolean canSchedule() {
208 return !shutdown && (masterController == null || masterController.canSchedule());
209 }
210
211 protected void logQuietly(Throwable t) {
212 logger.error(t);
213 }
214
215 protected void logQuietly(String msg) {
216 logger.warning(msg);
217 }
218
219
220
221
222
223
224
225
226
227
228
229 protected ShutdownResult describeStopped(boolean stopNow) {
230 Collection<Description> executedTests = new ConcurrentLinkedQueue<>();
231 Collection<Description> incompleteTests = new ConcurrentLinkedQueue<>();
232 stop(executedTests, incompleteTests, false, stopNow);
233 return new ShutdownResult(executedTests, incompleteTests);
234 }
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252 private void stop(
253 Collection<Description> executedTests,
254 Collection<Description> incompleteTests,
255 boolean tryCancelFutures,
256 boolean stopNow) {
257 shutdown = true;
258 try {
259 if (started && !ParallelComputerUtil.isUnusedDescription(description)) {
260 if (executedTests != null) {
261 executedTests.add(description);
262 }
263
264 if (incompleteTests != null && !finished) {
265 incompleteTests.add(description);
266 }
267 }
268
269 for (Controller slave : slaves) {
270 slave.stop(executedTests, incompleteTests, tryCancelFutures, stopNow);
271 }
272 } finally {
273 try {
274 balancer.releaseAllPermits();
275 } finally {
276 if (stopNow) {
277 strategy.stopNow();
278 } else if (tryCancelFutures) {
279 strategy.stop();
280 } else {
281 strategy.disable();
282 }
283 }
284 }
285 }
286
287 protected boolean shutdownThreadPoolsAwaitingKilled() {
288 if (masterController == null) {
289 stop(null, null, true, false);
290 boolean isNotInterrupted = true;
291 if (strategy != null) {
292 isNotInterrupted = strategy.destroy();
293 }
294 for (Controller slave : slaves) {
295 isNotInterrupted &= slave.destroy();
296 }
297 return isNotInterrupted;
298 } else {
299 throw new UnsupportedOperationException("cannot call this method if this is not a master scheduler");
300 }
301 }
302
303 protected void beforeExecute() {}
304
305 protected void afterExecute() {}
306
307 @Override
308 public void schedule(Runnable childStatement) {
309 if (childStatement == null) {
310 logQuietly("cannot schedule null");
311 } else if (canSchedule() && strategy.canSchedule()) {
312 try {
313 boolean isNotInterrupted = balancer.acquirePermit();
314 if (isNotInterrupted && !shutdown) {
315 Runnable task = wrapTask(childStatement);
316 strategy.schedule(task);
317 started = true;
318 }
319 } catch (RejectedExecutionException e) {
320 stop(null, null, true, false);
321 } catch (Throwable t) {
322 balancer.releasePermit();
323 logQuietly(t);
324 }
325 }
326 }
327
328 @Override
329 public void finished() {
330 try {
331 strategy.finished();
332 } catch (InterruptedException e) {
333 logQuietly(e);
334 } finally {
335 finished = true;
336 }
337 }
338
339 private Runnable wrapTask(final Runnable task) {
340 return new Runnable() {
341 @Override
342 public void run() {
343 try {
344 beforeExecute();
345 task.run();
346 } finally {
347 try {
348 afterExecute();
349 } finally {
350 balancer.releasePermit();
351 }
352 }
353 }
354 };
355 }
356
357 protected ShutdownHandler newShutdownHandler() {
358 return new ShutdownHandler();
359 }
360
361
362
363
364 private final class Controller {
365 private final Scheduler slave;
366
367 private Controller(Scheduler slave) {
368 this.slave = slave;
369 }
370
371
372
373
374 boolean canSchedule() {
375 return Scheduler.this.canSchedule();
376 }
377
378 void stop(
379 Collection<Description> executedTests,
380 Collection<Description> incompleteTests,
381 boolean tryCancelFutures,
382 boolean shutdownNow) {
383 slave.stop(executedTests, incompleteTests, tryCancelFutures, shutdownNow);
384 }
385
386
387
388
389 boolean destroy() {
390 return slave.strategy.destroy();
391 }
392
393 @Override
394 public int hashCode() {
395 return slave.hashCode();
396 }
397
398 @Override
399 public boolean equals(Object o) {
400 return o == this || (o instanceof Controller) && slave.equals(((Controller) o).slave);
401 }
402 }
403
404
405
406
407
408
409
410
411
412 public class ShutdownHandler implements RejectedExecutionHandler {
413 private volatile RejectedExecutionHandler poolHandler;
414
415 protected ShutdownHandler() {
416 poolHandler = null;
417 }
418
419 public void setRejectedExecutionHandler(RejectedExecutionHandler poolHandler) {
420 this.poolHandler = poolHandler;
421 }
422
423 @Override
424 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
425 if (executor.isShutdown()) {
426 Scheduler.this.stop(null, null, true, false);
427 }
428 final RejectedExecutionHandler poolHandler = this.poolHandler;
429 if (poolHandler != null) {
430 poolHandler.rejectedExecution(r, executor);
431 }
432 }
433 }
434 }