View Javadoc
1   package org.apache.maven.surefire.booter;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23  import org.apache.maven.surefire.api.booter.BiProperty;
24  import org.apache.maven.surefire.api.booter.Command;
25  import org.apache.maven.surefire.api.booter.DumpErrorSingleton;
26  import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
27  import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
28  import org.apache.maven.surefire.api.booter.MasterProcessCommand;
29  import org.apache.maven.surefire.api.booter.Shutdown;
30  import org.apache.maven.surefire.api.provider.CommandChainReader;
31  import org.apache.maven.surefire.api.provider.CommandListener;
32  import org.apache.maven.surefire.api.testset.TestSetFailedException;
33  
34  import java.io.EOFException;
35  import java.io.IOException;
36  import java.util.Iterator;
37  import java.util.NoSuchElementException;
38  import java.util.Queue;
39  import java.util.concurrent.ConcurrentLinkedQueue;
40  import java.util.concurrent.CopyOnWriteArrayList;
41  import java.util.concurrent.CountDownLatch;
42  import java.util.concurrent.Semaphore;
43  import java.util.concurrent.atomic.AtomicReference;
44  
45  import static java.lang.StrictMath.max;
46  import static java.lang.Thread.State.NEW;
47  import static java.lang.Thread.State.RUNNABLE;
48  import static java.lang.Thread.State.TERMINATED;
49  import static java.util.Objects.requireNonNull;
50  import static org.apache.maven.surefire.api.booter.Command.toShutdown;
51  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.BYE_ACK;
52  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.NOOP;
53  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SHUTDOWN;
54  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.SKIP_SINCE_NEXT_TEST;
55  import static org.apache.maven.surefire.shared.utils.StringUtils.isBlank;
56  import static org.apache.maven.surefire.shared.utils.StringUtils.isNotBlank;
57  import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
58  
59  /**
60   * Reader of commands coming from plugin(master) process.
61   *
62   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
63   * @since 2.19
64   */
65  public final class CommandReader implements CommandChainReader
66  {
67      private static final String LAST_TEST_SYMBOL = "";
68  
69      private final Queue<BiProperty<MasterProcessCommand, CommandListener>> listeners = new ConcurrentLinkedQueue<>();
70  
71      private final Thread commandThread = newDaemonThread( new CommandRunnable(), "surefire-forkedjvm-command-thread" );
72  
73      private final AtomicReference<Thread.State> state = new AtomicReference<>( NEW );
74  
75      private final CountDownLatch startMonitor = new CountDownLatch( 1 );
76  
77      private final Semaphore nextCommandNotifier = new Semaphore( 0 );
78  
79      private final CopyOnWriteArrayList<String> testClasses = new CopyOnWriteArrayList<>();
80  
81      private final MasterProcessChannelDecoder decoder;
82  
83      private final Shutdown shutdown;
84  
85      private final ConsoleLogger logger;
86  
87      private int iteratedCount;
88  
89      public CommandReader( MasterProcessChannelDecoder decoder, Shutdown shutdown, ConsoleLogger logger )
90      {
91          this.decoder = requireNonNull( decoder, "null decoder" );
92          this.shutdown = requireNonNull( shutdown, "null Shutdown config" );
93          this.logger = requireNonNull( logger, "null logger" );
94          state.set( RUNNABLE );
95          commandThread.start();
96      }
97  
98      @Override
99      public boolean awaitStarted()
100         throws TestSetFailedException
101     {
102         if ( state.get() == RUNNABLE )
103         {
104             try
105             {
106                 startMonitor.await();
107                 return true;
108             }
109             catch ( InterruptedException e )
110             {
111                 DumpErrorSingleton.getSingleton().dumpException( e );
112                 throw new TestSetFailedException( e.getLocalizedMessage() );
113             }
114         }
115         else
116         {
117             return false;
118         }
119     }
120 
121     @Override
122     public void addSkipNextTestsListener( CommandListener listener )
123     {
124         addListener( SKIP_SINCE_NEXT_TEST, listener );
125     }
126 
127     @Override
128     public void addShutdownListener( CommandListener listener )
129     {
130         addListener( SHUTDOWN, listener );
131     }
132 
133     public void addNoopListener( CommandListener listener )
134     {
135         addListener( NOOP, listener );
136     }
137 
138     public void addByeAckListener( CommandListener listener )
139     {
140         addListener( BYE_ACK, listener );
141     }
142 
143     private void addListener( MasterProcessCommand cmd, CommandListener listener )
144     {
145         listeners.add( new BiProperty<>( cmd, listener ) );
146     }
147 
148     /**
149      * @return test classes which have been retrieved by
150      * {@link CommandReader#getIterableClasses(MasterProcessChannelEncoder)}.
151      */
152     Iterator<String> iterated()
153     {
154         return testClasses.subList( 0, iteratedCount ).iterator();
155     }
156 
157     /**
158      * The iterator can be used only in one Thread.
159      * Two simultaneous instances are not allowed for sake of only one {@link #nextCommandNotifier}.
160      *
161      * @param eventChannel original stream in current JVM process
162      * @return Iterator with test classes lazily loaded as commands from the main process
163      */
164     Iterable<String> getIterableClasses( MasterProcessChannelEncoder eventChannel )
165     {
166         return new ClassesIterable( eventChannel );
167     }
168 
169     public void stop()
170     {
171         if ( !isStopped() )
172         {
173             state.set( TERMINATED );
174             makeQueueFull();
175             listeners.clear();
176             commandThread.interrupt();
177         }
178     }
179 
180     private boolean isStopped()
181     {
182         return state.get() == TERMINATED;
183     }
184 
185     /**
186      * @return {@code true} if {@link #LAST_TEST_SYMBOL} found at the last index in {@link #testClasses}.
187      */
188     private boolean isQueueFull()
189     {
190         // The problem with COWAL is that such collection doe not have operation getLast, however it has get(int)
191         // and we need both atomic.
192         //
193         // Both lines can be Java Concurrent, but the last operation is atomic with optimized search.
194         // Searching index of LAST_TEST_SYMBOL in the only last few (concurrently) inserted strings.
195         // The insert operation is concurrent with this method.
196         // Prerequisite: The strings are added but never removed and the method insertToQueue() does not
197         // allow adding a string after LAST_TEST_SYMBOL.
198         int searchFrom = max( 0, testClasses.size() - 1 );
199         return testClasses.indexOf( LAST_TEST_SYMBOL, searchFrom ) != -1;
200     }
201 
202     private void makeQueueFull()
203     {
204         testClasses.addIfAbsent( LAST_TEST_SYMBOL );
205     }
206 
207     private boolean insertToQueue( String test )
208     {
209         return isNotBlank( test ) && !isQueueFull() && testClasses.add( test );
210     }
211 
212     private final class ClassesIterable
213         implements Iterable<String>
214     {
215         private final MasterProcessChannelEncoder eventChannel;
216 
217         ClassesIterable( MasterProcessChannelEncoder eventChannel )
218         {
219             this.eventChannel = eventChannel;
220         }
221 
222         @Override
223         public Iterator<String> iterator()
224         {
225             return new ClassesIterator( eventChannel );
226         }
227     }
228 
229     private final class ClassesIterator
230         implements Iterator<String>
231     {
232         private final MasterProcessChannelEncoder eventChannel;
233 
234         private String clazz;
235 
236         private int nextQueueIndex;
237 
238         private ClassesIterator( MasterProcessChannelEncoder eventChannel )
239         {
240             this.eventChannel = eventChannel;
241         }
242 
243         @Override
244         public boolean hasNext()
245         {
246             popUnread();
247             return isNotBlank( clazz );
248         }
249 
250         @Override
251         public String next()
252         {
253             popUnread();
254             try
255             {
256                 if ( isBlank( clazz ) )
257                 {
258                     throw new NoSuchElementException( CommandReader.this.isStopped() ? "stream was stopped" : "" );
259                 }
260                 else
261                 {
262                     return clazz;
263                 }
264             }
265             finally
266             {
267                 clazz = null;
268             }
269         }
270 
271         @Override
272         public void remove()
273         {
274             throw new UnsupportedOperationException();
275         }
276 
277         private void popUnread()
278         {
279             if ( shouldFinish() )
280             {
281                 clazz = null;
282                 return;
283             }
284 
285             if ( isBlank( clazz ) )
286             {
287                 requestNextTest();
288                 CommandReader.this.awaitNextTest();
289                 if ( shouldFinish() )
290                 {
291                     clazz = null;
292                     return;
293                 }
294                 clazz = CommandReader.this.testClasses.get( nextQueueIndex++ );
295                 CommandReader.this.iteratedCount = nextQueueIndex;
296             }
297 
298             if ( CommandReader.this.isStopped() )
299             {
300                 clazz = null;
301             }
302         }
303 
304         private void requestNextTest()
305         {
306             eventChannel.acquireNextTest();
307         }
308 
309         private boolean shouldFinish()
310         {
311             boolean wasLastTestRead = isEndSymbolAt( nextQueueIndex );
312             return CommandReader.this.isStopped() || wasLastTestRead;
313         }
314 
315         private boolean isEndSymbolAt( int index )
316         {
317             return CommandReader.this.isQueueFull() && 1 + index == CommandReader.this.testClasses.size();
318         }
319     }
320 
321     private void awaitNextTest()
322     {
323         nextCommandNotifier.acquireUninterruptibly();
324     }
325 
326     private void wakeupIterator()
327     {
328         nextCommandNotifier.release();
329     }
330 
331     private final class CommandRunnable
332         implements Runnable
333     {
334         @Override
335         public void run()
336         {
337             CommandReader.this.startMonitor.countDown();
338             boolean isTestSetFinished = false;
339             try
340             {
341                 while ( CommandReader.this.state.get() == RUNNABLE )
342                 {
343                     Command command = CommandReader.this.decoder.decode();
344                     switch ( command.getCommandType() )
345                     {
346                         case RUN_CLASS:
347                             String test = command.getData();
348                             boolean inserted = CommandReader.this.insertToQueue( test );
349                             if ( inserted )
350                             {
351                                 CommandReader.this.wakeupIterator();
352                                 callListeners( command );
353                             }
354                             break;
355                         case TEST_SET_FINISHED:
356                             CommandReader.this.makeQueueFull();
357                             isTestSetFinished = true;
358                             CommandReader.this.wakeupIterator();
359                             callListeners( command );
360                             break;
361                         case SHUTDOWN:
362                             CommandReader.this.makeQueueFull();
363                             CommandReader.this.wakeupIterator();
364                             callListeners( command );
365                                 break;
366                         case BYE_ACK:
367                             callListeners( command );
368                             // After SHUTDOWN no more commands can come.
369                             // Hence, do NOT go back to blocking in I/O.
370                             CommandReader.this.state.set( TERMINATED );
371                             break;
372                         default:
373                             callListeners( command );
374                             break;
375                     }
376                 }
377             }
378             catch ( EOFException e )
379             {
380                 CommandReader.this.state.set( TERMINATED );
381                 if ( !isTestSetFinished )
382                 {
383                     String msg = "TestSet has not finished before stream error has appeared >> "
384                                          + "initializing exit by non-null configuration: "
385                                          + CommandReader.this.shutdown;
386                     DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
387 
388                     exitByConfiguration();
389                     // does not go to finally for non-default config: Shutdown.EXIT or Shutdown.KILL
390                 }
391             }
392             catch ( IOException e )
393             {
394                 CommandReader.this.state.set( TERMINATED );
395                 // If #stop() method is called, reader thread is interrupted and cause is InterruptedException.
396                 if ( !( e.getCause() instanceof InterruptedException ) )
397                 {
398                     String msg = "[SUREFIRE] std/in stream corrupted";
399                     DumpErrorSingleton.getSingleton().dumpStreamException( e, msg );
400                     CommandReader.this.logger.error( msg, e );
401                 }
402             }
403             finally
404             {
405                 // ensure fail-safe iterator as well as safe to finish in for-each loop using ClassesIterator
406                 if ( !isTestSetFinished )
407                 {
408                     CommandReader.this.makeQueueFull();
409                 }
410                 CommandReader.this.wakeupIterator();
411             }
412         }
413 
414         private void callListeners( Command cmd )
415         {
416             MasterProcessCommand expectedCommandType = cmd.getCommandType();
417             for ( BiProperty<MasterProcessCommand, CommandListener> listenerWrapper : CommandReader.this.listeners )
418             {
419                 MasterProcessCommand commandType = listenerWrapper.getP1();
420                 CommandListener listener = listenerWrapper.getP2();
421                 if ( commandType == null || commandType == expectedCommandType )
422                 {
423                     listener.update( cmd );
424                 }
425             }
426         }
427 
428         private void exitByConfiguration()
429         {
430             Shutdown shutdown = CommandReader.this.shutdown; // won't read inconsistent changes through the stack
431             if ( shutdown != null )
432             {
433                 CommandReader.this.makeQueueFull();
434                 CommandReader.this.wakeupIterator();
435                 callListeners( toShutdown( shutdown ) );
436             }
437         }
438     }
439 }