1 package org.apache.maven.surefire.booter;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23 import org.apache.maven.surefire.api.booter.BiProperty;
24 import org.apache.maven.surefire.api.booter.Command;
25 import org.apache.maven.surefire.api.booter.DumpErrorSingleton;
26 import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
27 import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
28 import org.apache.maven.surefire.api.booter.MasterProcessCommand;
29 import org.apache.maven.surefire.api.booter.Shutdown;
30 import org.apache.maven.surefire.api.provider.CommandChainReader;
31 import org.apache.maven.surefire.api.provider.CommandListener;
32 import org.apache.maven.surefire.api.testset.TestSetFailedException;
33
34 import java.io.EOFException;
35 import java.io.IOException;
36 import java.util.Iterator;
37 import java.util.NoSuchElementException;
38 import java.util.Queue;
39 import java.util.concurrent.ConcurrentLinkedQueue;
40 import java.util.concurrent.CopyOnWriteArrayList;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.Semaphore;
43 import java.util.concurrent.atomic.AtomicReference;
44
45 import static java.lang.StrictMath.max;
46 import static java.lang.Thread.State.NEW;
47 import static java.lang.Thread.State.RUNNABLE;
48 import static java.lang.Thread.State.TERMINATED;
49 import static java.util.Objects.requireNonNull;
50 import static org.apache.maven.surefire.api.booter.Command.toShutdown;
51 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.BYE_ACK;
52 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.NOOP;
53 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SHUTDOWN;
54 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
55 import static org.apache.maven.surefire.shared.utils.StringUtils.isBlank;
56 import static org.apache.maven.surefire.shared.utils.StringUtils.isNotBlank;
57 import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
58
59
60
61
62
63
64
65 public final class CommandReader implements CommandChainReader
66 {
67 private static final String LAST_TEST_SYMBOL = "";
68
69 private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners = new ConcurrentLinkedQueue<>();
70
71 private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
72
73 private final AtomicReference<Thread.State> state = new AtomicReference<>( NEW );
74
75 private final CountDownLatch startMonitor = new CountDownLatch( 1 );
76
77 private final Semaphore nextCommandNotifier = new Semaphore( 0 );
78
79 private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<>();
80
81 private final MasterProcessChannelDecoder decoder;
82
83 private final Shutdown shutdown;
84
85 private final ConsoleLogger logger;
86
87 private int iteratedCount;
88
89 public CommandReader( MasterProcessChannelDecoder decoder, Shutdown shutdown, ConsoleLogger logger )
90 {
91 this.decoder = requireNonNull( decoder, "null decoder" );
92 this.shutdown = requireNonNull( shutdown, "null Shutdown config" );
93 this.logger = requireNonNull( logger, "null logger" );
94 state.set( RUNNABLE );
95 commandThread.start();
96 }
97
98 @Override
99 public boolean awaitStarted()
100 throws TestSetFailedException
101 {
102 if ( state.get() == RUNNABLE )
103 {
104 try
105 {
106 startMonitor.await();
107 return true;
108 }
109 catch ( InterruptedException e )
110 {
111 DumpErrorSingleton.getSingleton().dumpException( e );
112 throw new TestSetFailedException( e.getLocalizedMessage() );
113 }
114 }
115 else
116 {
117 return false;
118 }
119 }
120
121 @Override
122 public void addSkipNextTestsListener( CommandListener listener )
123 {
124 addListener( SKIP_SINCE_NEXT_TEST, listener );
125 }
126
127 @Override
128 public void addShutdownListener( CommandListener listener )
129 {
130 addListener( SHUTDOWN, listener );
131 }
132
133 public void addNoopListener( CommandListener listener )
134 {
135 addListener( NOOP, listener );
136 }
137
138 public void addByeAckListener( CommandListener listener )
139 {
140 addListener( BYE_ACK, listener );
141 }
142
143 private void addListener( MasterProcessCommand cmd, CommandListener listener )
144 {
145 listeners.add( new BiProperty<>( cmd, listener ) );
146 }
147
148
149
150
151
152 Iterator<String> iterated()
153 {
154 return testClasses.subList( 0, iteratedCount ).iterator();
155 }
156
157
158
159
160
161
162
163
164 Iterable<String> getIterableClasses( MasterProcessChannelEncoder eventChannel )
165 {
166 return new ClassesIterable( eventChannel );
167 }
168
169 public void stop()
170 {
171 if ( !isStopped() )
172 {
173 state.set( TERMINATED );
174 makeQueueFull();
175 listeners.clear();
176 commandThread.interrupt();
177 }
178 }
179
180 private boolean isStopped()
181 {
182 return state.get() == TERMINATED;
183 }
184
185
186
187
188 private boolean isQueueFull()
189 {
190
191
192
193
194
195
196
197
198 int searchFrom = max( 0, testClasses.size() - 1 );
199 return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
200 }
201
202 private void makeQueueFull()
203 {
204 testClasses.addIfAbsent( LAST_TEST_SYMBOL );
205 }
206
207 private boolean insertToQueue( String test )
208 {
209 return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
210 }
211
212 private final class ClassesIterable
213 implements Iterable<String>
214 {
215 private final MasterProcessChannelEncoder eventChannel;
216
217 ClassesIterable( MasterProcessChannelEncoder eventChannel )
218 {
219 this.eventChannel = eventChannel;
220 }
221
222 @Override
223 public Iterator<String> iterator()
224 {
225 return new ClassesIterator( eventChannel );
226 }
227 }
228
229 private final class ClassesIterator
230 implements Iterator<String>
231 {
232 private final MasterProcessChannelEncoder eventChannel;
233
234 private String clazz;
235
236 private int nextQueueIndex;
237
238 private ClassesIterator( MasterProcessChannelEncoder eventChannel )
239 {
240 this.eventChannel = eventChannel;
241 }
242
243 @Override
244 public boolean hasNext()
245 {
246 popUnread();
247 return isNotBlank( clazz );
248 }
249
250 @Override
251 public String next()
252 {
253 popUnread();
254 try
255 {
256 if ( isBlank( clazz ) )
257 {
258 throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
259 }
260 else
261 {
262 return clazz;
263 }
264 }
265 finally
266 {
267 clazz = null;
268 }
269 }
270
271 @Override
272 public void remove()
273 {
274 throw new UnsupportedOperationException();
275 }
276
277 private void popUnread()
278 {
279 if ( shouldFinish() )
280 {
281 clazz = null;
282 return;
283 }
284
285 if ( isBlank( clazz ) )
286 {
287 requestNextTest();
288 CommandReader.this.awaitNextTest();
289 if ( shouldFinish() )
290 {
291 clazz = null;
292 return;
293 }
294 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
295 CommandReader.this.iteratedCount = nextQueueIndex;
296 }
297
298 if ( CommandReader.this.isStopped() )
299 {
300 clazz = null;
301 }
302 }
303
304 private void requestNextTest()
305 {
306 eventChannel.acquireNextTest();
307 }
308
309 private boolean shouldFinish()
310 {
311 boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
312 return CommandReader.this.isStopped() || wasLastTestRead;
313 }
314
315 private boolean isEndSymbolAt( int index )
316 {
317 return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
318 }
319 }
320
321 private void awaitNextTest()
322 {
323 nextCommandNotifier.acquireUninterruptibly();
324 }
325
326 private void wakeupIterator()
327 {
328 nextCommandNotifier.release();
329 }
330
331 private final class CommandRunnable
332 implements Runnable
333 {
334 @Override
335 public void run()
336 {
337 CommandReader.this.startMonitor.countDown();
338 boolean isTestSetFinished = false;
339 try
340 {
341 while ( CommandReader.this.state.get() == RUNNABLE )
342 {
343 Command command = CommandReader.this.decoder.decode();
344 switch ( command.getCommandType() )
345 {
346 case RUN_CLASS:
347 String test = command.getData();
348 boolean inserted = CommandReader.this.insertToQueue( test );
349 if ( inserted )
350 {
351 CommandReader.this.wakeupIterator();
352 callListeners( command );
353 }
354 break;
355 case TEST_SET_FINISHED:
356 CommandReader.this.makeQueueFull();
357 isTestSetFinished = true;
358 CommandReader.this.wakeupIterator();
359 callListeners( command );
360 break;
361 case SHUTDOWN:
362 CommandReader.this.makeQueueFull();
363 CommandReader.this.wakeupIterator();
364 callListeners( command );
365 break;
366 case BYE_ACK:
367 callListeners( command );
368
369
370 CommandReader.this.state.set( TERMINATED );
371 break;
372 default:
373 callListeners( command );
374 break;
375 }
376 }
377 }
378 catch ( EOFException e )
379 {
380 CommandReader.this.state.set( TERMINATED );
381 if ( !isTestSetFinished )
382 {
383 String msg = "TestSet has not finished before stream error has appeared >> "
384 + "initializing exit by non-null configuration: "
385 + CommandReader.this.shutdown;
386 DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
387
388 exitByConfiguration();
389
390 }
391 }
392 catch ( IOException e )
393 {
394 CommandReader.this.state.set( TERMINATED );
395
396 if ( !( e.getCause() instanceof InterruptedException ) )
397 {
398 String msg = "[SUREFIRE] std/in stream corrupted";
399 DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
400 CommandReader.this.logger.error( msg, e );
401 }
402 }
403 finally
404 {
405
406 if ( !isTestSetFinished )
407 {
408 CommandReader.this.makeQueueFull();
409 }
410 CommandReader.this.wakeupIterator();
411 }
412 }
413
414 private void callListeners( Command cmd )
415 {
416 MasterProcessCommand expectedCommandType = cmd.getCommandType();
417 for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
418 {
419 MasterProcessCommand commandType = listenerWrapper.getP1();
420 CommandListener listener = listenerWrapper.getP2();
421 if ( commandType == null || commandType == expectedCommandType )
422 {
423 listener.update( cmd );
424 }
425 }
426 }
427
428 private void exitByConfiguration()
429 {
430 Shutdown shutdown = CommandReader.this.shutdown;
431 if ( shutdown != null )
432 {
433 CommandReader.this.makeQueueFull();
434 CommandReader.this.wakeupIterator();
435 callListeners( toShutdown( shutdown ) );
436 }
437 }
438 }
439 }