1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.io.input;
18
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.junit.jupiter.api.Assertions.assertThrows;
21 import static org.junit.jupiter.api.Assertions.assertTimeout;
22 import static org.junit.jupiter.api.Assertions.assertTrue;
23
24 import java.io.BufferedInputStream;
25 import java.io.BufferedOutputStream;
26 import java.io.ByteArrayOutputStream;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.nio.charset.StandardCharsets;
30 import java.time.Duration;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.LinkedBlockingQueue;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.stream.Stream;
37
38 import org.apache.commons.io.IOUtils;
39 import org.apache.commons.io.output.QueueOutputStream;
40 import org.apache.commons.io.output.QueueOutputStreamTest;
41 import org.apache.commons.lang3.StringUtils;
42 import org.junit.jupiter.api.DisplayName;
43 import org.junit.jupiter.api.Test;
44 import org.junit.jupiter.params.ParameterizedTest;
45 import org.junit.jupiter.params.provider.Arguments;
46 import org.junit.jupiter.params.provider.MethodSource;
47
48 import com.google.common.base.Stopwatch;
49
50
51
52
53
54
55 public class QueueInputStreamTest {
56
57 public static Stream<Arguments> inputData() {
58
59 return Stream.of(Arguments.of(""),
60 Arguments.of("1"),
61 Arguments.of("12"),
62 Arguments.of("1234"),
63 Arguments.of("12345678"),
64 Arguments.of(StringUtils.repeat("A", 4095)),
65 Arguments.of(StringUtils.repeat("A", 4096)),
66 Arguments.of(StringUtils.repeat("A", 4097)),
67 Arguments.of(StringUtils.repeat("A", 8191)),
68 Arguments.of(StringUtils.repeat("A", 8192)),
69 Arguments.of(StringUtils.repeat("A", 8193)),
70 Arguments.of(StringUtils.repeat("A", 8192 * 4)));
71
72 }
73
74 private int defaultBufferSize() {
75 return 8192;
76 }
77
78 private String readUnbuffered(final InputStream inputStream) throws IOException {
79 return readUnbuffered(inputStream, Integer.MAX_VALUE);
80 }
81
82 private String readUnbuffered(final InputStream inputStream, final int maxBytes) throws IOException {
83 if (maxBytes == 0) {
84 return "";
85 }
86
87 final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
88 int n = -1;
89 while ((n = inputStream.read()) != -1) {
90 byteArrayOutputStream.write(n);
91 if (byteArrayOutputStream.size() >= maxBytes) {
92 break;
93 }
94 }
95 return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
96 }
97
98 @ParameterizedTest(name = "inputData={0}")
99 @MethodSource("inputData")
100 public void testBufferedReads(final String inputData) throws IOException {
101 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
102 try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
103 final QueueOutputStream outputStream = new QueueOutputStream(queue)) {
104 outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
105 final String actualData = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
106 assertEquals(inputData, actualData);
107 }
108 }
109
110 @ParameterizedTest(name = "inputData={0}")
111 @MethodSource("inputData")
112 public void testBufferedReadWrite(final String inputData) throws IOException {
113 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
114 try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
115 final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
116 outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
117 outputStream.flush();
118 final String dataCopy = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
119 assertEquals(inputData, dataCopy);
120 }
121 }
122
123 @ParameterizedTest(name = "inputData={0}")
124 @MethodSource("inputData")
125 public void testBufferedWrites(final String inputData) throws IOException {
126 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
127 try (QueueInputStream inputStream = new QueueInputStream(queue);
128 final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
129 outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
130 outputStream.flush();
131 final String actualData = readUnbuffered(inputStream);
132 assertEquals(inputData, actualData);
133 }
134 }
135
136 @Test
137 public void testInvalidArguments() {
138 assertThrows(NullPointerException.class, () -> new QueueInputStream(null), "queue is required");
139 assertThrows(IllegalArgumentException.class, () -> QueueInputStream.builder().setTimeout(Duration.ofMillis(-1)).get(), "waitTime must not be negative");
140 }
141
142 @Test
143 public void testResetArguments() throws IOException {
144 try (QueueInputStream queueInputStream = QueueInputStream.builder().setTimeout(null).get()) {
145 assertEquals(Duration.ZERO, queueInputStream.getTimeout());
146 assertEquals(0, queueInputStream.getBlockingQueue().size());
147 }
148 try (QueueInputStream queueInputStream = QueueInputStream.builder().setBlockingQueue(null).get()) {
149 assertEquals(Duration.ZERO, queueInputStream.getTimeout());
150 assertEquals(0, queueInputStream.getBlockingQueue().size());
151 }
152 }
153
154 @Test
155 @DisplayName("If read is interrupted while waiting, then exception is thrown")
156 public void testTimeoutInterrupted() throws Exception {
157 try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMinutes(2)).get();
158 final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
159
160
161 final AtomicBoolean result = new AtomicBoolean();
162 final CountDownLatch latch = new CountDownLatch(1);
163 final Thread thread = new Thread(() -> {
164
165 assertThrows(IllegalStateException.class, () -> readUnbuffered(inputStream, 3));
166 assertTrue(Thread.currentThread().isInterrupted());
167 result.set(true);
168 latch.countDown();
169 });
170 thread.setDaemon(true);
171 thread.start();
172
173
174 thread.interrupt();
175 latch.await(500, TimeUnit.MILLISECONDS);
176 assertTrue(result.get());
177 }
178 }
179
180 @Test
181 @DisplayName("If data is not available in queue, then read will wait until wait time elapses")
182 public void testTimeoutUnavailableData() throws IOException {
183 try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(Duration.ofMillis(500)).get();
184 final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
185 final Stopwatch stopwatch = Stopwatch.createStarted();
186 final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, 3));
187 stopwatch.stop();
188 assertEquals("", actualData);
189
190 assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) >= 500, () -> stopwatch.toString());
191 }
192 }
193
194 @ParameterizedTest(name = "inputData={0}")
195 @MethodSource("inputData")
196 public void testUnbufferedReadWrite(final String inputData) throws IOException {
197 try (QueueInputStream inputStream = new QueueInputStream();
198 final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
199 writeUnbuffered(outputStream, inputData);
200 final String actualData = readUnbuffered(inputStream);
201 assertEquals(inputData, actualData);
202 }
203 }
204
205 @ParameterizedTest(name = "inputData={0}")
206 @MethodSource("inputData")
207 public void testUnbufferedReadWriteWithTimeout(final String inputData) throws IOException {
208 final Duration timeout = Duration.ofMinutes(2);
209 try (QueueInputStream inputStream = QueueInputStream.builder().setTimeout(timeout).get();
210 final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
211 assertEquals(timeout, inputStream.getTimeout());
212 writeUnbuffered(outputStream, inputData);
213 final String actualData = assertTimeout(Duration.ofSeconds(1), () -> readUnbuffered(inputStream, inputData.length()));
214 assertEquals(inputData, actualData);
215 }
216 }
217
218 private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws IOException {
219 final byte[] bytes = inputData.getBytes(StandardCharsets.UTF_8);
220 outputStream.write(bytes, 0, bytes.length);
221 }
222 }