View Javadoc
1   package org.apache.maven.plugin.surefire.booterclient.output;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.NotifiableTestStream;
23  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
24  import org.apache.maven.plugin.surefire.report.DefaultReporterFactory;
25  import org.apache.maven.shared.utils.cli.StreamConsumer;
26  import org.apache.maven.surefire.report.ConsoleOutputReceiver;
27  import org.apache.maven.surefire.report.ReportEntry;
28  import org.apache.maven.surefire.report.RunListener;
29  import org.apache.maven.surefire.report.StackTraceWriter;
30  import org.apache.maven.surefire.report.TestSetReportEntry;
31  
32  import java.io.BufferedReader;
33  import java.io.File;
34  import java.io.IOException;
35  import java.io.StringReader;
36  import java.nio.ByteBuffer;
37  import java.util.Collections;
38  import java.util.Map;
39  import java.util.Queue;
40  import java.util.Set;
41  import java.util.StringTokenizer;
42  import java.util.TreeSet;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ConcurrentLinkedQueue;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicLong;
47  
48  import static java.lang.Integer.decode;
49  import static java.lang.System.currentTimeMillis;
50  import static java.util.Collections.unmodifiableMap;
51  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_BYE;
52  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_CONSOLE;
53  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_DEBUG;
54  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_ERROR;
55  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_NEXT_TEST;
56  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_STDERR;
57  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_STDOUT;
58  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_STOP_ON_NEXT_TEST;
59  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_SYSPROPS;
60  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_TESTSET_COMPLETED;
61  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_TESTSET_STARTING;
62  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_TEST_ASSUMPTIONFAILURE;
63  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_TEST_ERROR;
64  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_TEST_FAILED;
65  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_TEST_SKIPPED;
66  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_TEST_STARTING;
67  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_TEST_SUCCEEDED;
68  import static org.apache.maven.surefire.booter.ForkingRunListener.BOOTERCODE_WARNING;
69  import static org.apache.maven.surefire.booter.Shutdown.KILL;
70  import static org.apache.maven.surefire.report.CategorizedReportEntry.reportEntry;
71  import static org.apache.maven.surefire.util.internal.StringUtils.isNotBlank;
72  import static org.apache.maven.surefire.util.internal.StringUtils.unescapeBytes;
73  import static org.apache.maven.surefire.util.internal.StringUtils.unescapeString;
74  
75  // todo move to the same package with ForkStarter
76  
77  /**
78   * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
79   *
80   * @author Kristian Rosenvold
81   */
82  public class ForkClient
83       implements StreamConsumer
84  {
85      private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
86      private static final long START_TIME_ZERO = 0L;
87      private static final long START_TIME_NEGATIVE_TIMEOUT = -1L;
88  
89      private final DefaultReporterFactory defaultReporterFactory;
90  
91      private final Map<String, String> testVmSystemProperties = new ConcurrentHashMap<String, String>();
92  
93      private final NotifiableTestStream notifiableTestStream;
94  
95      private final Queue<String> testsInProgress = new ConcurrentLinkedQueue<String>();
96  
97      /**
98       * {@code testSetStartedAt} is set to non-zero after received
99       * {@link org.apache.maven.surefire.booter.ForkingRunListener#BOOTERCODE_TESTSET_STARTING test-set}.
100      */
101     private final AtomicLong testSetStartedAt = new AtomicLong( START_TIME_ZERO );
102 
103     private final ConsoleLogger log;
104 
105     /**
106      * prevents from printing same warning
107      */
108     private final AtomicBoolean printedErrorStream;
109 
110     /**
111      * Used by single Thread started by {@link ThreadedStreamConsumer} and therefore does not need to be volatile.
112      */
113     private RunListener testSetReporter;
114 
115     /**
116      * Written by one Thread and read by another: Main Thread and ForkStarter's Thread.
117      */
118     private volatile boolean saidGoodBye;
119 
120     private volatile StackTraceWriter errorInFork;
121 
122     private volatile int forkNumber;
123 
124     public ForkClient( DefaultReporterFactory defaultReporterFactory, NotifiableTestStream notifiableTestStream,
125                        ConsoleLogger log, AtomicBoolean printedErrorStream )
126     {
127         this.defaultReporterFactory = defaultReporterFactory;
128         this.notifiableTestStream = notifiableTestStream;
129         this.log = log;
130         this.printedErrorStream = printedErrorStream;
131     }
132 
133     protected void stopOnNextTest()
134     {
135     }
136 
137     public void kill()
138     {
139         if ( !saidGoodBye )
140         {
141             notifiableTestStream.shutdown( KILL );
142         }
143     }
144 
145     /**
146      * Called in concurrent Thread.
147      * Will shutdown if timeout was reached.
148      *
149      * @param currentTimeMillis    current time in millis seconds
150      * @param forkedProcessTimeoutInSeconds timeout in seconds given by MOJO
151      */
152     public final void tryToTimeout( long currentTimeMillis, int forkedProcessTimeoutInSeconds )
153     {
154         if ( forkedProcessTimeoutInSeconds > 0 )
155         {
156             final long forkedProcessTimeoutInMillis = 1000 * forkedProcessTimeoutInSeconds;
157             final long startedAt = testSetStartedAt.get();
158             if ( startedAt > START_TIME_ZERO && currentTimeMillis - startedAt >= forkedProcessTimeoutInMillis )
159             {
160                 testSetStartedAt.set( START_TIME_NEGATIVE_TIMEOUT );
161                 notifiableTestStream.shutdown( KILL );
162             }
163         }
164     }
165 
166     public final DefaultReporterFactory getDefaultReporterFactory()
167     {
168         return defaultReporterFactory;
169     }
170 
171     @Override
172     public final void consumeLine( String s )
173     {
174         if ( isNotBlank( s ) )
175         {
176             processLine( s );
177         }
178     }
179 
180     private void setCurrentStartTime()
181     {
182         if ( testSetStartedAt.get() == START_TIME_ZERO ) // JIT can optimize <= no JNI call
183         {
184             // Not necessary to call JNI library library #currentTimeMillis
185             // which may waste 10 - 30 machine cycles in callback. Callbacks should be fast.
186             testSetStartedAt.compareAndSet( START_TIME_ZERO, currentTimeMillis() );
187         }
188     }
189 
190     public final boolean hadTimeout()
191     {
192         return testSetStartedAt.get() == START_TIME_NEGATIVE_TIMEOUT;
193     }
194 
195     private RunListener getTestSetReporter()
196     {
197         if ( testSetReporter == null )
198         {
199             testSetReporter = defaultReporterFactory.createReporter();
200         }
201         return testSetReporter;
202     }
203 
204     private void processLine( String event )
205     {
206         final OperationalData op;
207         try
208         {
209             op = new OperationalData( event );
210         }
211         catch ( RuntimeException e )
212         {
213             logStreamWarning( e, event );
214             return;
215         }
216         final String remaining = op.getData();
217         switch ( op.getOperationId() )
218         {
219             case BOOTERCODE_TESTSET_STARTING:
220                 getTestSetReporter().testSetStarting( createReportEntry( remaining ) );
221                 setCurrentStartTime();
222                 break;
223             case BOOTERCODE_TESTSET_COMPLETED:
224                 testsInProgress.clear();
225 
226                 getTestSetReporter().testSetCompleted( createReportEntry( remaining, testVmSystemProperties ) );
227                 break;
228             case BOOTERCODE_TEST_STARTING:
229                 ReportEntry reportEntry = createReportEntry( remaining );
230                 testsInProgress.offer( reportEntry.getSourceName() );
231 
232                 getTestSetReporter().testStarting( createReportEntry( remaining ) );
233                 break;
234             case BOOTERCODE_TEST_SUCCEEDED:
235                 reportEntry = createReportEntry( remaining );
236                 testsInProgress.remove( reportEntry.getSourceName() );
237 
238                 getTestSetReporter().testSucceeded( createReportEntry( remaining ) );
239                 break;
240             case BOOTERCODE_TEST_FAILED:
241                 reportEntry = createReportEntry( remaining );
242                 testsInProgress.remove( reportEntry.getSourceName() );
243 
244                 getTestSetReporter().testFailed( createReportEntry( remaining ) );
245                 break;
246             case BOOTERCODE_TEST_SKIPPED:
247                 reportEntry = createReportEntry( remaining );
248                 testsInProgress.remove( reportEntry.getSourceName() );
249 
250                 getTestSetReporter().testSkipped( createReportEntry( remaining ) );
251                 break;
252             case BOOTERCODE_TEST_ERROR:
253                 reportEntry = createReportEntry( remaining );
254                 testsInProgress.remove( reportEntry.getSourceName() );
255 
256                 getTestSetReporter().testError( createReportEntry( remaining ) );
257                 break;
258             case BOOTERCODE_TEST_ASSUMPTIONFAILURE:
259                 reportEntry = createReportEntry( remaining );
260                 testsInProgress.remove( reportEntry.getSourceName() );
261 
262                 getTestSetReporter().testAssumptionFailure( createReportEntry( remaining ) );
263                 break;
264             case BOOTERCODE_SYSPROPS:
265                 int keyEnd = remaining.indexOf( "," );
266                 StringBuilder key = new StringBuilder();
267                 StringBuilder value = new StringBuilder();
268                 unescapeString( key, remaining.substring( 0, keyEnd ) );
269                 unescapeString( value, remaining.substring( keyEnd + 1 ) );
270                 testVmSystemProperties.put( key.toString(), value.toString() );
271                 break;
272             case BOOTERCODE_STDOUT:
273                 writeTestOutput( remaining, true );
274                 break;
275             case BOOTERCODE_STDERR:
276                 writeTestOutput( remaining, false );
277                 break;
278             case BOOTERCODE_CONSOLE:
279                 getOrCreateConsoleLogger()
280                         .info( createConsoleMessage( remaining ) );
281                 break;
282             case BOOTERCODE_NEXT_TEST:
283                 notifiableTestStream.provideNewTest();
284                 break;
285             case BOOTERCODE_ERROR:
286                 errorInFork = deserializeStackTraceWriter( new StringTokenizer( remaining, "," ) );
287                 break;
288             case BOOTERCODE_BYE:
289                 saidGoodBye = true;
290                 notifiableTestStream.acknowledgeByeEventReceived();
291                 break;
292             case BOOTERCODE_STOP_ON_NEXT_TEST:
293                 stopOnNextTest();
294                 break;
295             case BOOTERCODE_DEBUG:
296                 getOrCreateConsoleLogger()
297                         .debug( createConsoleMessage( remaining ) );
298                 break;
299             case BOOTERCODE_WARNING:
300                 getOrCreateConsoleLogger()
301                         .warning( createConsoleMessage( remaining ) );
302                 break;
303             default:
304                 logStreamWarning( event );
305         }
306     }
307 
308     private void logStreamWarning( String event )
309     {
310         logStreamWarning( null, event );
311     }
312 
313     private void logStreamWarning( Throwable e, String event )
314     {
315         if ( event == null || !event.contains( PRINTABLE_JVM_NATIVE_STREAM ) )
316         {
317             String msg = "Corrupted STDOUT by directly writing to native stream in forked JVM " + forkNumber + ".";
318 
319             InPluginProcessDumpSingleton util = InPluginProcessDumpSingleton.getSingleton();
320             File dump =
321                     e == null
322                     ? util.dumpText( msg + " Stream '" + event + "'.", defaultReporterFactory, forkNumber )
323                     : util.dumpException( e, msg + " Stream '" + event + "'.", defaultReporterFactory, forkNumber );
324 
325             if ( printedErrorStream.compareAndSet( false, true ) )
326             {
327                 log.warning( msg + " See FAQ web page and the dump file " + dump.getAbsolutePath() );
328             }
329 
330             if ( log.isDebugEnabled() && event != null )
331             {
332                 log.debug( event );
333             }
334         }
335         else
336         {
337             if ( log.isDebugEnabled() )
338             {
339                 log.debug( event );
340             }
341             else if ( log.isInfoEnabled() )
342             {
343                 log.info( event );
344             }
345             else
346             {
347                 // In case of debugging forked JVM, see PRINTABLE_JVM_NATIVE_STREAM.
348                 System.out.println( event );
349             }
350         }
351     }
352 
353     private void writeTestOutput( String remaining, boolean isStdout )
354     {
355         int csNameEnd = remaining.indexOf( ',' );
356         String charsetName = remaining.substring( 0, csNameEnd );
357         String byteEncoded = remaining.substring( csNameEnd + 1 );
358         ByteBuffer unescaped = unescapeBytes( byteEncoded, charsetName );
359 
360         if ( unescaped.hasArray() )
361         {
362             byte[] convertedBytes = unescaped.array();
363             getOrCreateConsoleOutputReceiver()
364                 .writeTestOutput( convertedBytes, unescaped.position(), unescaped.remaining(), isStdout );
365         }
366         else
367         {
368             byte[] convertedBytes = new byte[unescaped.remaining()];
369             unescaped.get( convertedBytes, 0, unescaped.remaining() );
370             getOrCreateConsoleOutputReceiver()
371                 .writeTestOutput( convertedBytes, 0, convertedBytes.length, isStdout );
372         }
373     }
374 
375     public final void consumeMultiLineContent( String s )
376             throws IOException
377     {
378         BufferedReader stringReader = new BufferedReader( new StringReader( s ) );
379         for ( String s1 = stringReader.readLine(); s1 != null; s1 = stringReader.readLine() )
380         {
381             consumeLine( s1 );
382         }
383     }
384 
385     private String createConsoleMessage( String remaining )
386     {
387         return unescape( remaining );
388     }
389 
390     private TestSetReportEntry createReportEntry( String untokenized )
391     {
392         return createReportEntry( untokenized, Collections.<String, String>emptyMap() );
393     }
394 
395     private TestSetReportEntry createReportEntry( String untokenized, Map<String, String> systemProperties )
396     {
397         StringTokenizer tokens = new StringTokenizer( untokenized, "," );
398         try
399         {
400             String source = nullableCsv( tokens.nextToken() );
401             String name = nullableCsv( tokens.nextToken() );
402             String group = nullableCsv( tokens.nextToken() );
403             String message = nullableCsv( tokens.nextToken() );
404             String elapsedStr = tokens.nextToken();
405             Integer elapsed = "null".equals( elapsedStr ) ? null : decode( elapsedStr );
406             final StackTraceWriter stackTraceWriter =
407                     tokens.hasMoreTokens() ? deserializeStackTraceWriter( tokens ) : null;
408 
409             return reportEntry( source, name, group, stackTraceWriter, elapsed, message, systemProperties );
410         }
411         catch ( RuntimeException e )
412         {
413             throw new RuntimeException( untokenized, e );
414         }
415     }
416 
417     private StackTraceWriter deserializeStackTraceWriter( StringTokenizer tokens )
418     {
419         String stackTraceMessage = nullableCsv( tokens.nextToken() );
420         String smartStackTrace = nullableCsv( tokens.nextToken() );
421         String stackTrace = tokens.hasMoreTokens() ? nullableCsv( tokens.nextToken() ) : null;
422         boolean hasTrace = stackTrace != null;
423         return hasTrace ? new DeserializedStacktraceWriter( stackTraceMessage, smartStackTrace, stackTrace ) : null;
424     }
425 
426     private String nullableCsv( String source )
427     {
428         return "null".equals( source ) ? null : unescape( source );
429     }
430 
431     private String unescape( String source )
432     {
433         StringBuilder stringBuffer = new StringBuilder( source.length() );
434         unescapeString( stringBuffer, source );
435         return stringBuffer.toString();
436     }
437 
438     public final Map<String, String> getTestVmSystemProperties()
439     {
440         return unmodifiableMap( testVmSystemProperties );
441     }
442 
443     /**
444      * Used when getting reporters on the plugin side of a fork.
445      * Used by testing purposes only. May not be volatile variable.
446      *
447      * @return A mock provider reporter
448      */
449     public final RunListener getReporter()
450     {
451         return getTestSetReporter();
452     }
453 
454     private ConsoleOutputReceiver getOrCreateConsoleOutputReceiver()
455     {
456         return (ConsoleOutputReceiver) getTestSetReporter();
457     }
458 
459     private ConsoleLogger getOrCreateConsoleLogger()
460     {
461         return (ConsoleLogger) getTestSetReporter();
462     }
463 
464     public void close( boolean hadTimeout )
465     {
466         // no op
467     }
468 
469     public final boolean isSaidGoodBye()
470     {
471         return saidGoodBye;
472     }
473 
474     public final StackTraceWriter getErrorInFork()
475     {
476         return errorInFork;
477     }
478 
479     public final boolean isErrorInFork()
480     {
481         return errorInFork != null;
482     }
483 
484     public Set<String> testsInProgress()
485     {
486         return new TreeSet<String>( testsInProgress );
487     }
488 
489     public boolean hasTestsInProgress()
490     {
491         return !testsInProgress.isEmpty();
492     }
493 
494     public void setForkNumber( int forkNumber )
495     {
496         assert this.forkNumber == 0;
497         this.forkNumber = forkNumber;
498     }
499 
500     private static final class OperationalData
501     {
502         private final byte operationId;
503         private final String data;
504 
505         OperationalData( String event )
506         {
507             operationId = (byte) event.charAt( 0 );
508             int comma = event.indexOf( ",", 3 );
509             if ( comma < 0 )
510             {
511                 throw new IllegalArgumentException( "Stream stdin corrupted. Expected comma after third character "
512                                                             + "in command '" + event + "'." );
513             }
514             int rest = event.indexOf( ",", comma );
515             data = event.substring( rest + 1 );
516         }
517 
518         byte getOperationId()
519         {
520             return operationId;
521         }
522 
523         String getData()
524         {
525             return data;
526         }
527     }
528 }