1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.lang.Thread.State;
24 import java.util.ArrayDeque;
25 import java.util.Queue;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.FutureTask;
29 import java.util.concurrent.TimeUnit;
30
31 import org.apache.maven.surefire.api.booter.Command;
32 import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
33 import org.apache.maven.surefire.booter.ForkedNodeArg;
34 import org.apache.maven.surefire.booter.spi.CommandChannelDecoder;
35 import org.junit.Test;
36
37 import static java.nio.channels.Channels.newChannel;
38 import static java.nio.charset.StandardCharsets.UTF_8;
39 import static org.apache.maven.surefire.api.booter.Command.TEST_SET_FINISHED;
40 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.BYE_ACK;
41 import static org.apache.maven.surefire.api.booter.MasterProcessCommand.NOOP;
42 import static org.hamcrest.MatcherAssert.assertThat;
43 import static org.hamcrest.Matchers.is;
44 import static org.hamcrest.Matchers.notNullValue;
45 import static org.hamcrest.Matchers.nullValue;
46 import static org.junit.Assert.assertTrue;
47 import static org.junit.Assert.fail;
48
49
50
51
52
53
54
55 public class TestProvidingInputStreamTest {
56 private static final int WAIT_LOOPS = 100;
57
58 @Test
59 public void closedStreamShouldReturnNullAsEndOfStream() throws IOException {
60 Queue<String> commands = new ArrayDeque<>();
61 TestProvidingInputStream is = new TestProvidingInputStream(commands);
62 is.close();
63 assertThat(is.readNextCommand(), is(nullValue()));
64 }
65
66 @Test
67 public void emptyStreamShouldWaitUntilClosed() throws Exception {
68 Queue<String> commands = new ArrayDeque<>();
69 final TestProvidingInputStream is = new TestProvidingInputStream(commands);
70 final Thread streamThread = Thread.currentThread();
71 FutureTask<State> futureTask = new FutureTask<>(new Callable<State>() {
72 @Override
73 public State call() {
74 sleep(1000L);
75 State state = streamThread.getState();
76 is.close();
77 return state;
78 }
79 });
80 Thread assertionThread = new Thread(futureTask);
81 assertionThread.start();
82 assertThat(is.readNextCommand(), is(nullValue()));
83 State state = futureTask.get();
84 assertThat(state, is(State.WAITING));
85 }
86
87 @Test
88 public void finishedTestsetShouldNotBlock() throws IOException {
89 final TestProvidingInputStream is = new TestProvidingInputStream(new ArrayDeque<String>());
90 is.testSetFinished();
91 new Thread(new Runnable() {
92 @Override
93 public void run() {
94 is.provideNewTest();
95 }
96 })
97 .start();
98
99 for (int i = 0; i < 2; i++) {
100 Command cmd = is.readNextCommand();
101 assertThat(cmd.getData(), is(nullValue()));
102 assertThat(cmd, is(TEST_SET_FINISHED));
103 }
104
105 boolean emptyStream = isInputStreamEmpty(is);
106
107 is.close();
108 assertTrue(emptyStream);
109 assertThat(is.readNextCommand(), is(nullValue()));
110 }
111
112 @Test
113 public void shouldReadTest() throws IOException {
114 Queue<String> commands = new ArrayDeque<>();
115 commands.add("Test");
116 final TestProvidingInputStream is = new TestProvidingInputStream(commands);
117 new Thread(new Runnable() {
118 @Override
119 public void run() {
120 is.provideNewTest();
121 }
122 })
123 .start();
124
125 Command cmd = is.readNextCommand();
126 assertThat(cmd.getData(), is("Test"));
127
128 is.close();
129 }
130
131 @Test
132 public void shouldDecodeTwoCommands() throws IOException {
133 final TestProvidingInputStream pluginIs = new TestProvidingInputStream(new ConcurrentLinkedQueue<String>());
134 InputStream is = new InputStream() {
135 private byte[] buffer;
136 private int idx;
137
138 @Override
139 public int read() throws IOException {
140 if (buffer == null) {
141 idx = 0;
142 Command cmd = pluginIs.readNextCommand();
143 if (cmd != null) {
144 if (cmd.getCommandType() == BYE_ACK) {
145 buffer = ":maven-surefire-command:\u0007:bye-ack:".getBytes(UTF_8);
146 } else if (cmd.getCommandType() == NOOP) {
147 buffer = ":maven-surefire-command:\u0004:noop:".getBytes(UTF_8);
148 } else {
149 fail();
150 }
151 }
152 }
153
154 if (buffer != null) {
155 byte b = buffer[idx++];
156 if (idx == buffer.length) {
157 buffer = null;
158 idx = 0;
159 }
160 return b;
161 }
162 throw new IOException();
163 }
164 };
165 MasterProcessChannelDecoder decoder = new CommandChannelDecoder(newChannel(is), new ForkedNodeArg(1, false));
166 pluginIs.acknowledgeByeEventReceived();
167 pluginIs.noop();
168 Command bye = decoder.decode();
169 assertThat(bye, is(notNullValue()));
170 assertThat(bye.getCommandType(), is(BYE_ACK));
171 Command noop = decoder.decode();
172 assertThat(noop, is(notNullValue()));
173 assertThat(noop.getCommandType(), is(NOOP));
174 }
175
176 private static void sleep(long millis) {
177 try {
178 TimeUnit.MILLISECONDS.sleep(millis);
179 } catch (InterruptedException e) {
180
181 }
182 }
183
184
185
186
187
188
189 private static boolean isInputStreamEmpty(final TestProvidingInputStream is) {
190 Thread t = new Thread(new Runnable() {
191 @Override
192 public void run() {
193 try {
194 is.readNextCommand();
195 } catch (IOException e) {
196 Throwable cause = e.getCause();
197 Throwable err = cause == null ? e : cause;
198 if (!(err instanceof InterruptedException)) {
199 System.err.println(err.toString());
200 }
201 }
202 }
203 });
204 t.start();
205 State state;
206 int loops = 0;
207 do {
208 sleep(100L);
209 state = t.getState();
210 } while (state == State.NEW && loops++ < WAIT_LOOPS);
211 t.interrupt();
212 return state == State.WAITING || state == State.TIMED_WAITING;
213 }
214 }