1 package org.apache.maven.surefire.junitcore;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Future;
30
31 import org.apache.maven.surefire.util.NestedRuntimeException;
32 import org.junit.runner.Computer;
33 import org.junit.runner.Runner;
34 import org.junit.runners.ParentRunner;
35 import org.junit.runners.model.InitializationError;
36 import org.junit.runners.model.RunnerBuilder;
37 import org.junit.runners.model.RunnerScheduler;
38
39
40
41
42 public class ConfigurableParallelComputer
43 extends Computer
44 {
45 private final boolean fClasses;
46
47 private final boolean fMethods;
48
49 private final boolean fixedPool;
50
51 private final ExecutorService fService;
52
53 private final List<AsynchronousRunner> nonBlockers =
54 Collections.synchronizedList( new ArrayList<AsynchronousRunner>() );
55
56
57 public ConfigurableParallelComputer()
58 {
59 this( true, true, Executors.newCachedThreadPool(), false );
60 }
61
62 public ConfigurableParallelComputer( boolean fClasses, boolean fMethods )
63 {
64 this( fClasses, fMethods, Executors.newCachedThreadPool(), false );
65 }
66
67 public ConfigurableParallelComputer( boolean fClasses, boolean fMethods, Integer numberOfThreads, boolean perCore )
68 {
69 this( fClasses, fMethods, Executors.newFixedThreadPool(
70 numberOfThreads * ( perCore ? Runtime.getRuntime().availableProcessors() : 1 ) ), true );
71 }
72
73 private ConfigurableParallelComputer( boolean fClasses, boolean fMethods, ExecutorService executorService,
74 boolean fixedPool )
75 {
76 this.fClasses = fClasses;
77 this.fMethods = fMethods;
78 fService = executorService;
79 this.fixedPool = fixedPool;
80 }
81
82 @SuppressWarnings( { "UnusedDeclaration" } )
83 public void close()
84 throws ExecutionException
85 {
86 for ( AsynchronousRunner nonBlocker : nonBlockers )
87 {
88 nonBlocker.waitForCompletion();
89 }
90
91 fService.shutdown();
92 try
93 {
94 fService.awaitTermination( 10, java.util.concurrent.TimeUnit.SECONDS );
95 }
96 catch ( InterruptedException e )
97 {
98 throw new NestedRuntimeException( e );
99 }
100 }
101
102 private Runner parallelize( Runner runner, RunnerScheduler runnerInterceptor )
103 {
104 if ( runner instanceof ParentRunner<?> )
105 {
106 ( (ParentRunner<?>) runner ).setScheduler( runnerInterceptor );
107 }
108 return runner;
109 }
110
111 private RunnerScheduler getMethodInterceptor()
112 {
113 if ( fClasses && fMethods )
114 {
115 final AsynchronousRunner blockingAsynchronousRunner = new AsynchronousRunner( fService );
116 nonBlockers.add( blockingAsynchronousRunner );
117 return blockingAsynchronousRunner;
118 }
119 return fMethods ? new AsynchronousRunner( fService ) : new SynchronousRunner();
120 }
121
122 private RunnerScheduler getClassInterceptor()
123 {
124 if ( fClasses )
125 {
126 return fMethods ? new SynchronousRunner() : new AsynchronousRunner( fService );
127 }
128 return new SynchronousRunner();
129 }
130
131 @Override
132 public Runner getSuite( RunnerBuilder builder, java.lang.Class<?>[] classes )
133 throws InitializationError
134 {
135 Runner suite = super.getSuite( builder, classes );
136 return fClasses ? parallelize( suite, getClassInterceptor() ) : suite;
137 }
138
139 @Override
140 protected Runner getRunner( RunnerBuilder builder, Class<?> testClass )
141 throws Throwable
142 {
143 Runner runner = super.getRunner( builder, testClass );
144 return fMethods ? parallelize( runner, getMethodInterceptor() ) : runner;
145 }
146
147 @Override
148 public String toString()
149 {
150 return "ConfigurableParallelComputer{" + "classes=" + fClasses + ", methods=" + fMethods + ", fixedPool="
151 + fixedPool + '}';
152 }
153
154 private class SynchronousRunner
155 implements RunnerScheduler
156 {
157 public void schedule( final Runnable childStatement )
158 {
159 childStatement.run();
160 }
161
162 public void finished()
163 {
164 }
165 }
166
167 public class AsynchronousRunner
168 implements RunnerScheduler
169 {
170 private final List<Future<Object>> futures = Collections.synchronizedList( new ArrayList<Future<Object>>() );
171
172 private final ExecutorService fService;
173
174 public AsynchronousRunner( ExecutorService fService )
175 {
176 this.fService = fService;
177 }
178
179 public void schedule( final Runnable childStatement )
180 {
181 final Callable<Object> objectCallable = new Callable<Object>()
182 {
183 public Object call()
184 throws Exception
185 {
186 childStatement.run();
187 return null;
188 }
189 };
190 futures.add( fService.submit( objectCallable ) );
191 }
192
193
194 public void finished()
195 {
196 try
197 {
198 waitForCompletion();
199 }
200 catch ( ExecutionException e )
201 {
202 throw new NestedRuntimeException( e );
203 }
204 }
205
206 public void waitForCompletion()
207 throws ExecutionException
208 {
209 for ( Future<Object> each : futures )
210 {
211 try
212 {
213 each.get();
214 }
215 catch ( InterruptedException e )
216 {
217 throw new NestedRuntimeException( e );
218 }
219 }
220 }
221 }
222 }