Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
Scheduler |
|
| 1.8571428571428572;1,857 | ||||
Scheduler$1 |
|
| 1.8571428571428572;1,857 | ||||
Scheduler$Controller |
|
| 1.8571428571428572;1,857 | ||||
Scheduler$ShutdownHandler |
|
| 1.8571428571428572;1,857 |
1 | package org.apache.maven.surefire.junitcore.pc; | |
2 | ||
3 | /* | |
4 | * Licensed to the Apache Software Foundation (ASF) under one | |
5 | * or more contributor license agreements. See the NOTICE file | |
6 | * distributed with this work for additional information | |
7 | * regarding copyright ownership. The ASF licenses this file | |
8 | * to you under the Apache License, Version 2.0 (the | |
9 | * "License"); you may not use this file except in compliance | |
10 | * with the License. You may obtain a copy of the License at | |
11 | * | |
12 | * http://www.apache.org/licenses/LICENSE-2.0 | |
13 | * | |
14 | * Unless required by applicable law or agreed to in writing, | |
15 | * software distributed under the License is distributed on an | |
16 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
17 | * KIND, either express or implied. See the License for the | |
18 | * specific language governing permissions and limitations | |
19 | * under the License. | |
20 | */ | |
21 | ||
22 | import org.junit.runner.Description; | |
23 | import org.junit.runners.model.RunnerScheduler; | |
24 | ||
25 | import java.util.ArrayList; | |
26 | import java.util.Collection; | |
27 | import java.util.Set; | |
28 | import java.util.concurrent.CopyOnWriteArraySet; | |
29 | import java.util.concurrent.RejectedExecutionException; | |
30 | import java.util.concurrent.RejectedExecutionHandler; | |
31 | import java.util.concurrent.ThreadPoolExecutor; | |
32 | ||
33 | /** | |
34 | * | |
35 | * Schedules tests, controls thread resources, awaiting tests and other schedulers finished, and | |
36 | * a master scheduler can shutdown slaves. | |
37 | * <p> | |
38 | * The scheduler objects should be first created (and wired) and set in runners | |
39 | * {@link org.junit.runners.ParentRunner#setScheduler(org.junit.runners.model.RunnerScheduler)}. | |
40 | * <p> | |
41 | * A new instance of scheduling strategy should be passed to the constructor of this scheduler. | |
42 | * | |
43 | * @author Tibor Digana (tibor17) | |
44 | * @since 2.16 | |
45 | */ | |
46 | 526 | public class Scheduler implements RunnerScheduler { |
47 | private final Balancer balancer; | |
48 | private final SchedulingStrategy strategy; | |
49 | 166 | private final Set<Controller> slaves = new CopyOnWriteArraySet<Controller>(); |
50 | private final Description description; | |
51 | 166 | private volatile boolean shutdown = false; |
52 | 166 | private volatile boolean started = false; |
53 | private volatile Controller masterController; | |
54 | ||
55 | /** | |
56 | * Use e.g. parallel classes have own non-shared thread pool, and methods another pool. | |
57 | * <p> | |
58 | * You can use it with one infinite thread pool shared in strategies across all | |
59 | * suites, class runners, etc. | |
60 | */ | |
61 | public Scheduler(Description description, SchedulingStrategy strategy) { | |
62 | 29 | this(description, strategy, -1); |
63 | 29 | } |
64 | ||
65 | /** | |
66 | * Should be used if schedulers in parallel children and parent use one instance of bounded thread pool. | |
67 | * <p> | |
68 | * Set this scheduler in a e.g. one suite of classes, then every individual class runner should reference | |
69 | * {@link #Scheduler(org.junit.runner.Description, Scheduler, SchedulingStrategy)} | |
70 | * or {@link #Scheduler(org.junit.runner.Description, Scheduler, SchedulingStrategy, int)}. | |
71 | * | |
72 | * @param description description of current runner | |
73 | * @param strategy scheduling strategy with a shared thread pool | |
74 | * @param concurrency determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)} | |
75 | * @throws NullPointerException if null <tt>strategy</tt> | |
76 | */ | |
77 | public Scheduler(Description description, SchedulingStrategy strategy, int concurrency) { | |
78 | 35 | this(description, strategy, BalancerFactory.createBalancer(concurrency)); |
79 | 35 | } |
80 | ||
81 | /** | |
82 | * New instances should be used by schedulers with limited concurrency by <tt>balancer</tt> | |
83 | * against other groups of schedulers. The schedulers share one pool. | |
84 | * <p> | |
85 | * Unlike in {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)} which was limiting | |
86 | * the <tt>concurrency</tt> of children of a runner where this scheduler was set, <em>this</em> <tt>balancer</tt> | |
87 | * is limiting the concurrency of all children in runners having schedulers created by this constructor. | |
88 | * | |
89 | * @param description description of current runner | |
90 | * @param strategy scheduling strategy which may share threads with other strategy | |
91 | * @param balancer determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)} | |
92 | * @throws NullPointerException if null <tt>strategy</tt> or <tt>balancer</tt> | |
93 | */ | |
94 | 166 | public Scheduler(Description description, SchedulingStrategy strategy, Balancer balancer) { |
95 | 166 | strategy.setDefaultShutdownHandler(newShutdownHandler()); |
96 | 166 | this.description = description; |
97 | 166 | this.strategy = strategy; |
98 | 166 | this.balancer = balancer; |
99 | 166 | masterController = null; |
100 | 166 | } |
101 | /** | |
102 | * Can be used by e.g. a runner having parallel classes in use case with parallel | |
103 | * suites, classes and methods sharing the same thread pool. | |
104 | * | |
105 | * @param description description of current runner | |
106 | * @param masterScheduler scheduler sharing own threads with this slave | |
107 | * @param strategy scheduling strategy for this scheduler | |
108 | * @param balancer determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)} | |
109 | * @throws NullPointerException if null <tt>masterScheduler</tt>, <tt>strategy</tt> or <tt>balancer</tt> | |
110 | */ | |
111 | public Scheduler(Description description, Scheduler masterScheduler, SchedulingStrategy strategy, Balancer balancer) { | |
112 | 131 | this(description, strategy, balancer); |
113 | 131 | strategy.setDefaultShutdownHandler(newShutdownHandler()); |
114 | 131 | masterScheduler.register(this); |
115 | 131 | } |
116 | ||
117 | /** | |
118 | * @param masterScheduler a reference to {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)} | |
119 | * or {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy)} | |
120 | * @see #Scheduler(org.junit.runner.Description, SchedulingStrategy) | |
121 | * @see #Scheduler(org.junit.runner.Description, SchedulingStrategy, int) | |
122 | */ | |
123 | public Scheduler(Description description, Scheduler masterScheduler, SchedulingStrategy strategy, int concurrency) { | |
124 | 6 | this(description, strategy, concurrency); |
125 | 6 | strategy.setDefaultShutdownHandler(newShutdownHandler()); |
126 | 6 | masterScheduler.register(this); |
127 | 6 | } |
128 | ||
129 | /** | |
130 | * Should be used with individual pools on suites, classes and methods, see | |
131 | * {@link org.apache.maven.surefire.junitcore.pc.ParallelComputerBuilder#useSeparatePools()}. | |
132 | * <p> | |
133 | * Cached thread pool is infinite and can be always shared. | |
134 | */ | |
135 | public Scheduler(Description description, Scheduler masterScheduler, SchedulingStrategy strategy) { | |
136 | 6 | this(description, masterScheduler, strategy, 0); |
137 | 6 | } |
138 | ||
139 | private void setController(Controller masterController) { | |
140 | 137 | if (masterController == null) { |
141 | 0 | throw new NullPointerException("null ExecutionController"); |
142 | } | |
143 | 137 | this.masterController = masterController; |
144 | 137 | } |
145 | ||
146 | /** | |
147 | * @param slave a slave scheduler to register | |
148 | * @return <tt>true</tt> if successfully registered the <tt>slave</tt>. | |
149 | */ | |
150 | private boolean register(Scheduler slave) { | |
151 | 137 | boolean canRegister = slave != null && slave != this; |
152 | 137 | if (canRegister) { |
153 | 137 | Controller controller = new Controller(slave); |
154 | 137 | canRegister = !slaves.contains(controller); |
155 | 137 | if (canRegister) { |
156 | 137 | slaves.add(controller); |
157 | 137 | slave.setController(controller); |
158 | } | |
159 | } | |
160 | 137 | return canRegister; |
161 | } | |
162 | ||
163 | /** | |
164 | * @return <tt>true</tt> if new tasks can be scheduled. | |
165 | */ | |
166 | private boolean canSchedule() { | |
167 | 534 | return !shutdown && (masterController == null || masterController.canSchedule()); |
168 | } | |
169 | ||
170 | protected void logQuietly(Throwable t) { | |
171 | 0 | t.printStackTrace(System.err); |
172 | 0 | } |
173 | ||
174 | protected void logQuietly(String msg) { | |
175 | 0 | System.err.println(msg); |
176 | 0 | } |
177 | ||
178 | /** | |
179 | * Attempts to stop all actively executing tasks and immediately returns a collection | |
180 | * of descriptions of those tasks which have started prior to this call. | |
181 | * <p> | |
182 | * This scheduler and other registered schedulers will shutdown, see {@link #register(Scheduler)}. | |
183 | * If <tt>shutdownNow</tt> is set, waiting methods will be interrupted via {@link Thread#interrupt}. | |
184 | * | |
185 | * @param shutdownNow if <tt>true</tt> interrupts waiting methods | |
186 | * @return collection of descriptions started before shutting down | |
187 | */ | |
188 | public Collection<Description> shutdown(boolean shutdownNow) { | |
189 | 16 | shutdown = true; |
190 | 16 | ArrayList<Description> activeChildren = new ArrayList<Description>(); |
191 | ||
192 | 16 | if (started && description != null) { |
193 | 8 | activeChildren.add(description); |
194 | } | |
195 | ||
196 | 16 | for (Controller slave : slaves) { |
197 | try { | |
198 | 14 | activeChildren.addAll(slave.shutdown(shutdownNow)); |
199 | 0 | } catch (Throwable t) { |
200 | 0 | logQuietly(t); |
201 | 14 | } |
202 | 14 | } |
203 | ||
204 | try { | |
205 | 16 | balancer.releaseAllPermits(); |
206 | } finally { | |
207 | 16 | if (shutdownNow) { |
208 | 8 | strategy.stopNow(); |
209 | } else { | |
210 | 8 | strategy.stop(); |
211 | } | |
212 | 8 | } |
213 | ||
214 | 16 | return activeChildren; |
215 | } | |
216 | ||
217 | protected void beforeExecute() { | |
218 | 281 | } |
219 | ||
220 | protected void afterExecute() { | |
221 | 281 | } |
222 | ||
223 | public void schedule(Runnable childStatement) { | |
224 | 289 | if (childStatement == null) { |
225 | 0 | logQuietly("cannot schedule null"); |
226 | 289 | } else if (canSchedule() && strategy.canSchedule()) { |
227 | try { | |
228 | 285 | balancer.acquirePermit(); |
229 | 285 | Runnable task = wrapTask(childStatement); |
230 | 285 | strategy.schedule(task); |
231 | 285 | started = true; |
232 | 0 | } catch (RejectedExecutionException e) { |
233 | 0 | shutdown(false); |
234 | 0 | } catch (Throwable t) { |
235 | 0 | balancer.releasePermit(); |
236 | 0 | logQuietly(t); |
237 | 285 | } |
238 | } | |
239 | 289 | } |
240 | ||
241 | public void finished() { | |
242 | try { | |
243 | 302 | strategy.finished(); |
244 | 0 | } catch (InterruptedException e) { |
245 | 0 | logQuietly(e); |
246 | } finally { | |
247 | 302 | for (Controller slave : slaves) { |
248 | 137 | slave.awaitFinishedQuietly(); |
249 | 137 | } |
250 | 302 | } |
251 | 302 | } |
252 | ||
253 | private Runnable wrapTask(final Runnable task) { | |
254 | 285 | return new Runnable() { |
255 | public void run() { | |
256 | try { | |
257 | 281 | beforeExecute(); |
258 | 281 | task.run(); |
259 | } finally { | |
260 | 0 | try { |
261 | 281 | afterExecute(); |
262 | } finally { | |
263 | 281 | balancer.releasePermit(); |
264 | 281 | } |
265 | 281 | } |
266 | 281 | } |
267 | }; | |
268 | } | |
269 | ||
270 | protected ShutdownHandler newShutdownHandler() { | |
271 | 303 | return new ShutdownHandler(); |
272 | } | |
273 | ||
274 | /** | |
275 | * If this is a master scheduler, the slaves can stop scheduling by the master through the controller. | |
276 | */ | |
277 | 137 | private final class Controller { |
278 | private final Scheduler slave; | |
279 | ||
280 | 137 | private Controller(Scheduler slave) { |
281 | 137 | this.slave = slave; |
282 | 137 | } |
283 | ||
284 | /** | |
285 | * @return <tt>true</tt> if new children can be scheduled. | |
286 | */ | |
287 | boolean canSchedule() { | |
288 | 245 | return Scheduler.this.canSchedule(); |
289 | } | |
290 | ||
291 | void awaitFinishedQuietly() { | |
292 | try { | |
293 | 137 | slave.finished(); |
294 | 0 | } catch(Throwable t) { |
295 | 0 | slave.logQuietly(t); |
296 | 137 | } |
297 | 137 | } |
298 | ||
299 | Collection<Description> shutdown(boolean shutdownNow) { | |
300 | 14 | return slave.shutdown(shutdownNow); |
301 | } | |
302 | ||
303 | @Override | |
304 | public int hashCode() { | |
305 | 0 | return slave.hashCode(); |
306 | } | |
307 | ||
308 | @Override | |
309 | public boolean equals(Object o) { | |
310 | 674 | return o == this || (o instanceof Controller) && slave.equals(((Controller) o).slave); |
311 | } | |
312 | } | |
313 | ||
314 | public class ShutdownHandler implements RejectedExecutionHandler { | |
315 | private volatile RejectedExecutionHandler poolHandler; | |
316 | ||
317 | 303 | protected ShutdownHandler() { |
318 | 303 | poolHandler = null; |
319 | 303 | } |
320 | ||
321 | public void setRejectedExecutionHandler(RejectedExecutionHandler poolHandler) { | |
322 | 186 | this.poolHandler = poolHandler; |
323 | 186 | } |
324 | ||
325 | public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { | |
326 | 0 | if (executor.isShutdown()) { |
327 | 0 | shutdown(false); |
328 | } | |
329 | 0 | final RejectedExecutionHandler poolHandler = this.poolHandler; |
330 | 0 | if (poolHandler != null) { |
331 | 0 | poolHandler.rejectedExecution(r, executor); |
332 | } | |
333 | 0 | } |
334 | } | |
335 | } |