1 package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.api.booter.Command;
23 import org.apache.maven.surefire.booter.spi.LegacyMasterProcessChannelDecoder;
24 import org.apache.maven.plugin.surefire.extensions.StreamFeeder;
25 import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
26 import org.junit.Test;
27
28 import java.io.IOException;
29 import java.io.InputStream;
30 import java.lang.Thread.State;
31 import java.util.ArrayDeque;
32 import java.util.Queue;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.FutureTask;
36 import java.util.concurrent.TimeUnit;
37
38 import static java.nio.channels.Channels.newChannel;
39 import static java.nio.charset.StandardCharsets.US_ASCII;
40 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.BYE_ACK;
41 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.NOOP;
42 import static org.hamcrest.MatcherAssert.assertThat;
43 import static org.hamcrest.Matchers.is;
44 import static org.hamcrest.Matchers.notNullValue;
45 import static org.hamcrest.Matchers.nullValue;
46 import static org.junit.Assert.assertTrue;
47
48
49
50
51
52
53
54 public class TestProvidingInputStreamTest
55 {
56 private static final int WAIT_LOOPS = 100;
57 @Test
58 public void closedStreamShouldReturnNullAsEndOfStream()
59 throws IOException
60 {
61 Queue<String> commands = new ArrayDeque<>();
62 TestProvidingInputStream is = new TestProvidingInputStream( commands );
63 is.close();
64 assertThat( is.readNextCommand(), is( nullValue() ) );
65 }
66
67 @Test
68 public void emptyStreamShouldWaitUntilClosed()
69 throws Exception
70 {
71 Queue<String> commands = new ArrayDeque<>();
72 final TestProvidingInputStream is = new TestProvidingInputStream( commands );
73 final Thread streamThread = Thread.currentThread();
74 FutureTask<State> futureTask = new FutureTask<>( new Callable<State>()
75 {
76 @Override
77 public State call()
78 {
79 sleep( 1000L );
80 State state = streamThread.getState();
81 is.close();
82 return state;
83 }
84 } );
85 Thread assertionThread = new Thread( futureTask );
86 assertionThread.start();
87 assertThat( is.readNextCommand(), is( nullValue() ) );
88 State state = futureTask.get();
89 assertThat( state, is( State.WAITING ) );
90 }
91
92 @Test
93 public void finishedTestsetShouldNotBlock()
94 throws IOException
95 {
96 Queue<String> commands = new ArrayDeque<>();
97 final TestProvidingInputStream is = new TestProvidingInputStream( commands );
98 is.testSetFinished();
99 new Thread( new Runnable()
100 {
101 @Override
102 public void run()
103 {
104 is.provideNewTest();
105 }
106 } ).start();
107
108 Command cmd = is.readNextCommand();
109 assertThat( cmd.getData(), is( nullValue() ) );
110 String stream = new String( StreamFeeder.encode( cmd.getCommandType() ), US_ASCII );
111
112 cmd = is.readNextCommand();
113 assertThat( cmd.getData(), is( nullValue() ) );
114 stream += new String( StreamFeeder.encode( cmd.getCommandType() ), US_ASCII );
115
116 assertThat( stream,
117 is( ":maven-surefire-command:testset-finished::maven-surefire-command:testset-finished:" ) );
118
119 boolean emptyStream = isInputStreamEmpty( is );
120
121 is.close();
122 assertTrue( emptyStream );
123 assertThat( is.readNextCommand(), is( nullValue() ) );
124 }
125
126 @Test
127 public void shouldReadTest()
128 throws IOException
129 {
130 Queue<String> commands = new ArrayDeque<>();
131 commands.add( "Test" );
132 final TestProvidingInputStream is = new TestProvidingInputStream( commands );
133 new Thread( new Runnable()
134 {
135 @Override
136 public void run()
137 {
138 is.provideNewTest();
139 }
140 } ).start();
141
142 Command cmd = is.readNextCommand();
143 assertThat( cmd.getData(), is( "Test" ) );
144
145 is.close();
146 }
147
148 @Test
149 public void shouldDecodeTwoCommands()
150 throws IOException
151 {
152 final TestProvidingInputStream pluginIs = new TestProvidingInputStream( new ConcurrentLinkedQueue<String>() );
153 InputStream is = new InputStream()
154 {
155 private byte[] buffer;
156 private int idx;
157
158 @Override
159 public int read() throws IOException
160 {
161 if ( buffer == null )
162 {
163 idx = 0;
164 Command cmd = pluginIs.readNextCommand();
165 buffer = cmd == null ? null : StreamFeeder.encode( cmd.getCommandType() );
166 }
167
168 if ( buffer != null )
169 {
170 byte b = buffer[idx++];
171 if ( idx == buffer.length )
172 {
173 buffer = null;
174 idx = 0;
175 }
176 return b;
177 }
178 throw new IOException();
179 }
180 };
181 MasterProcessChannelDecoder decoder = new LegacyMasterProcessChannelDecoder( newChannel( is ) );
182 pluginIs.acknowledgeByeEventReceived();
183 pluginIs.noop();
184 Command bye = decoder.decode();
185 assertThat( bye, is( notNullValue() ) );
186 assertThat( bye.getCommandType(), is( BYE_ACK ) );
187 Command noop = decoder.decode();
188 assertThat( noop, is( notNullValue() ) );
189 assertThat( noop.getCommandType(), is( NOOP ) );
190 }
191
192 private static void sleep( long millis )
193 {
194 try
195 {
196 TimeUnit.MILLISECONDS.sleep( millis );
197 }
198 catch ( InterruptedException e )
199 {
200
201 }
202 }
203
204
205
206
207
208
209 private static boolean isInputStreamEmpty( final TestProvidingInputStream is )
210 {
211 Thread t = new Thread( new Runnable()
212 {
213 @Override
214 public void run()
215 {
216 try
217 {
218 is.readNextCommand();
219 }
220 catch ( IOException e )
221 {
222 Throwable cause = e.getCause();
223 Throwable err = cause == null ? e : cause;
224 if ( !( err instanceof InterruptedException ) )
225 {
226 System.err.println( err.toString() );
227 }
228 }
229 }
230 } );
231 t.start();
232 State state;
233 int loops = 0;
234 do
235 {
236 sleep( 100L );
237 state = t.getState();
238 }
239 while ( state == State.NEW && loops++ < WAIT_LOOPS );
240 t.interrupt();
241 return state == State.WAITING || state == State.TIMED_WAITING;
242 }
243 }