View Javadoc
1   package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.api.booter.Command;
23  import org.apache.maven.surefire.booter.spi.LegacyMasterProcessChannelDecoder;
24  import org.apache.maven.plugin.surefire.extensions.StreamFeeder;
25  import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
26  import org.junit.Test;
27  
28  import java.io.IOException;
29  import java.io.InputStream;
30  import java.lang.Thread.State;
31  import java.util.ArrayDeque;
32  import java.util.Queue;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.FutureTask;
36  import java.util.concurrent.TimeUnit;
37  
38  import static java.nio.channels.Channels.newChannel;
39  import static java.nio.charset.StandardCharsets.US_ASCII;
40  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.BYE_ACK;
41  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.NOOP;
42  import static org.hamcrest.MatcherAssert.assertThat;
43  import static org.hamcrest.Matchers.is;
44  import static org.hamcrest.Matchers.notNullValue;
45  import static org.hamcrest.Matchers.nullValue;
46  import static org.junit.Assert.assertTrue;
47  
48  /**
49   * Asserts that this stream properly reads bytes from queue.
50   *
51   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
52   * @since 2.19
53   */
54  public class TestProvidingInputStreamTest
55  {
56      private static final int WAIT_LOOPS = 100;
57      @Test
58      public void closedStreamShouldReturnNullAsEndOfStream()
59          throws IOException
60      {
61          Queue<String> commands = new ArrayDeque<>();
62          TestProvidingInputStream is = new TestProvidingInputStream( commands );
63          is.close();
64          assertThat( is.readNextCommand(), is( nullValue() ) );
65      }
66  
67      @Test
68      public void emptyStreamShouldWaitUntilClosed()
69          throws Exception
70      {
71          Queue<String> commands = new ArrayDeque<>();
72          final TestProvidingInputStream is = new TestProvidingInputStream( commands );
73          final Thread streamThread = Thread.currentThread();
74          FutureTask<State> futureTask = new FutureTask<>( new Callable<State>()
75          {
76              @Override
77              public State call()
78              {
79                  sleep( 1000L );
80                  State state = streamThread.getState();
81                  is.close();
82                  return state;
83              }
84          } );
85          Thread assertionThread = new Thread( futureTask );
86          assertionThread.start();
87          assertThat( is.readNextCommand(), is( nullValue() ) );
88          State state = futureTask.get();
89          assertThat( state, is( State.WAITING ) );
90      }
91  
92      @Test
93      public void finishedTestsetShouldNotBlock()
94          throws IOException
95      {
96          Queue<String> commands = new ArrayDeque<>();
97          final TestProvidingInputStream is = new TestProvidingInputStream( commands );
98          is.testSetFinished();
99          new Thread( new Runnable()
100         {
101             @Override
102             public void run()
103             {
104                 is.provideNewTest();
105             }
106         } ).start();
107 
108         Command cmd = is.readNextCommand();
109         assertThat( cmd.getData(), is( nullValue() ) );
110         String stream = new String( StreamFeeder.encode( cmd.getCommandType() ), US_ASCII );
111 
112         cmd = is.readNextCommand();
113         assertThat( cmd.getData(), is( nullValue() ) );
114         stream += new String( StreamFeeder.encode( cmd.getCommandType() ), US_ASCII );
115 
116         assertThat( stream,
117             is( ":maven-surefire-command:testset-finished::maven-surefire-command:testset-finished:" ) );
118 
119         boolean emptyStream = isInputStreamEmpty( is );
120 
121         is.close();
122         assertTrue( emptyStream );
123         assertThat( is.readNextCommand(), is( nullValue() ) );
124     }
125 
126     @Test
127     public void shouldReadTest()
128         throws IOException
129     {
130         Queue<String> commands = new ArrayDeque<>();
131         commands.add( "Test" );
132         final TestProvidingInputStream is = new TestProvidingInputStream( commands );
133         new Thread( new Runnable()
134         {
135             @Override
136             public void run()
137             {
138                 is.provideNewTest();
139             }
140         } ).start();
141 
142         Command cmd = is.readNextCommand();
143         assertThat( cmd.getData(), is( "Test" ) );
144 
145         is.close();
146     }
147 
148     @Test
149     public void shouldDecodeTwoCommands()
150             throws IOException
151     {
152         final TestProvidingInputStream pluginIs = new TestProvidingInputStream( new ConcurrentLinkedQueue<String>() );
153         InputStream is = new InputStream()
154         {
155             private byte[] buffer;
156             private int idx;
157 
158             @Override
159             public int read() throws IOException
160             {
161                 if ( buffer == null )
162                 {
163                     idx = 0;
164                     Command cmd = pluginIs.readNextCommand();
165                     buffer = cmd == null ? null : StreamFeeder.encode( cmd.getCommandType() );
166                 }
167 
168                 if ( buffer != null )
169                 {
170                     byte b = buffer[idx++];
171                     if ( idx == buffer.length )
172                     {
173                         buffer = null;
174                         idx = 0;
175                     }
176                     return b;
177                 }
178                 throw new IOException();
179             }
180         };
181         MasterProcessChannelDecoder decoder = new LegacyMasterProcessChannelDecoder( newChannel( is ) );
182         pluginIs.acknowledgeByeEventReceived();
183         pluginIs.noop();
184         Command bye = decoder.decode();
185         assertThat( bye, is( notNullValue() ) );
186         assertThat( bye.getCommandType(), is( BYE_ACK ) );
187         Command noop = decoder.decode();
188         assertThat( noop, is( notNullValue() ) );
189         assertThat( noop.getCommandType(), is( NOOP ) );
190     }
191 
192     private static void sleep( long millis )
193     {
194         try
195         {
196             TimeUnit.MILLISECONDS.sleep( millis );
197         }
198         catch ( InterruptedException e )
199         {
200             // do nothing
201         }
202     }
203 
204     /**
205      * Waiting (max of 20 seconds)
206      * @param is examined stream
207      * @return {@code true} if the {@link InputStream#read()} is waiting for a new byte.
208      */
209     private static boolean isInputStreamEmpty( final TestProvidingInputStream is )
210     {
211         Thread t = new Thread( new Runnable()
212         {
213             @Override
214             public void run()
215             {
216                 try
217                 {
218                     is.readNextCommand();
219                 }
220                 catch ( IOException e )
221                 {
222                     Throwable cause = e.getCause();
223                     Throwable err = cause == null ? e : cause;
224                     if ( !( err instanceof InterruptedException ) )
225                     {
226                         System.err.println( err.toString() );
227                     }
228                 }
229             }
230         } );
231         t.start();
232         State state;
233         int loops = 0;
234         do
235         {
236             sleep( 100L );
237             state = t.getState();
238         }
239         while ( state == State.NEW && loops++ < WAIT_LOOPS );
240         t.interrupt();
241         return state == State.WAITING || state == State.TIMED_WAITING;
242     }
243 }