1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.io.function;
19
20 import static org.junit.jupiter.api.Assertions.assertEquals;
21 import static org.junit.jupiter.api.Assertions.assertFalse;
22 import static org.junit.jupiter.api.Assertions.assertNotNull;
23 import static org.junit.jupiter.api.Assertions.assertSame;
24 import static org.junit.jupiter.api.Assertions.assertThrows;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26
27 import java.io.IOException;
28 import java.nio.file.Path;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.atomic.AtomicReference;
31 import java.util.stream.BaseStream;
32 import java.util.stream.Stream;
33
34 import org.junit.jupiter.api.AfterEach;
35 import org.junit.jupiter.api.BeforeEach;
36 import org.junit.jupiter.api.Test;
37
38
39
40
41 public class IOBaseStreamTest {
42
43
44
45
46 private static class IOBaseStreamFixture<T, S extends IOBaseStreamFixture<T, S, B>, B extends BaseStream<T, B>> implements IOBaseStream<T, S, B> {
47
48 private final B baseStream;
49
50 private IOBaseStreamFixture(final B baseStream) {
51 this.baseStream = baseStream;
52 }
53
54 @Override
55 public B unwrap() {
56 return baseStream;
57 }
58
59 @SuppressWarnings("unchecked")
60 @Override
61 public S wrap(final B delegate) {
62 return delegate == baseStream ? (S) this : (S) new IOBaseStreamFixture<T, S, B>(delegate);
63 }
64
65 }
66
67
68
69
70 private static final class IOBaseStreamPathFixture<B extends BaseStream<Path, B>> extends IOBaseStreamFixture<Path, IOBaseStreamPathFixture<B>, B> {
71
72 private IOBaseStreamPathFixture(final B baseStream) {
73 super(baseStream);
74 }
75
76 @Override
77 public IOBaseStreamPathFixture<B> wrap(final B delegate) {
78 return delegate == unwrap() ? this : new IOBaseStreamPathFixture<>(delegate);
79 }
80
81 }
82
83 private static final class MyRuntimeException extends RuntimeException {
84
85 private static final long serialVersionUID = 1L;
86
87 public MyRuntimeException(final String message) {
88 super(message);
89 }
90
91 }
92
93
94 private BaseStream<Path, ? extends BaseStream<Path, ?>> baseStream;
95
96
97 private IOBaseStreamFixture<Path, ? extends IOBaseStreamFixture<Path, ?, ?>, ?> ioBaseStream;
98
99
100 private IOBaseStreamPathFixture<? extends BaseStream<Path, ?>> ioBaseStreamPath;
101
102
103 private IOStream<Path> ioBaseStreamAdapter;
104
105 @BeforeEach
106 public void beforeEach() {
107 baseStream = createStreamOfPaths();
108 ioBaseStream = createIOBaseStream();
109 ioBaseStreamPath = createIOBaseStreamPath();
110 ioBaseStreamAdapter = createIOBaseStreamAdapter();
111 }
112
113 private IOBaseStreamFixture<Path, ?, Stream<Path>> createIOBaseStream() {
114 return new IOBaseStreamFixture<>(createStreamOfPaths());
115 }
116
117 private IOStream<Path> createIOBaseStreamAdapter() {
118 return IOStreamAdapter.adapt(createStreamOfPaths());
119 }
120
121 private IOBaseStreamPathFixture<Stream<Path>> createIOBaseStreamPath() {
122 return new IOBaseStreamPathFixture<>(createStreamOfPaths());
123 }
124
125 private Stream<Path> createStreamOfPaths() {
126 return Stream.of(TestConstants.ABS_PATH_A, TestConstants.ABS_PATH_B);
127 }
128
129 @Test
130 @AfterEach
131 public void testClose() {
132 baseStream.close();
133 ioBaseStream.close();
134 ioBaseStreamPath.close();
135 ioBaseStream.asBaseStream().close();
136 ioBaseStreamPath.asBaseStream().close();
137 }
138
139 @SuppressWarnings("resource")
140 @Test
141 public void testIsParallel() {
142 assertFalse(baseStream.isParallel());
143 assertFalse(ioBaseStream.isParallel());
144 assertFalse(ioBaseStream.asBaseStream().isParallel());
145 assertFalse(ioBaseStreamPath.asBaseStream().isParallel());
146 assertFalse(ioBaseStreamPath.isParallel());
147 }
148
149 @SuppressWarnings("resource")
150 @Test
151 public void testIteratorPathIO() throws IOException {
152 final AtomicReference<Path> ref = new AtomicReference<>();
153 ioBaseStream.iterator().forEachRemaining(e -> ref.set(e.toRealPath()));
154 assertEquals(TestConstants.ABS_PATH_B.toRealPath(), ref.get());
155
156 ioBaseStreamPath.asBaseStream().iterator().forEachRemaining(e -> ref.set(e.getFileName()));
157 assertEquals(TestConstants.ABS_PATH_B.getFileName(), ref.get());
158 }
159
160 @SuppressWarnings("resource")
161 @Test
162 public void testIteratorSimple() throws IOException {
163 final AtomicInteger ref = new AtomicInteger();
164 baseStream.iterator().forEachRemaining(e -> ref.incrementAndGet());
165 assertEquals(2, ref.get());
166 ioBaseStream.iterator().forEachRemaining(e -> ref.incrementAndGet());
167 assertEquals(4, ref.get());
168 ioBaseStreamPath.asBaseStream().iterator().forEachRemaining(e -> ref.incrementAndGet());
169 assertEquals(6, ref.get());
170 }
171
172 @SuppressWarnings("resource")
173 @Test
174 public void testOnClose() {
175
176 testOnClose(baseStream);
177 testOnClose(ioBaseStream.asBaseStream());
178 testOnClose(ioBaseStreamPath.asBaseStream());
179 }
180
181 @SuppressWarnings("resource")
182 private <T, S extends BaseStream<T, S>> void testOnClose(final BaseStream<T, S> stream) {
183 final AtomicReference<String> refA = new AtomicReference<>();
184 final AtomicReference<String> refB = new AtomicReference<>();
185 stream.onClose(() -> refA.set("A"));
186 stream.onClose(() -> {
187 throw new MyRuntimeException("B");
188 });
189 stream.onClose(() -> {
190 throw new MyRuntimeException("C");
191 });
192 stream.onClose(() -> refB.set("D"));
193 final MyRuntimeException e = assertThrows(MyRuntimeException.class, stream::close);
194 assertEquals("A", refA.get());
195 assertEquals("D", refB.get());
196 assertEquals("B", e.getMessage());
197 final Throwable[] suppressed = e.getSuppressed();
198 assertNotNull(suppressed);
199 assertEquals(1, suppressed.length);
200 assertEquals("C", suppressed[0].getMessage());
201 }
202
203 @SuppressWarnings("resource")
204 @Test
205 public void testParallel() throws IOException {
206 final AtomicInteger ref = new AtomicInteger();
207 baseStream.parallel().iterator().forEachRemaining(e -> ref.incrementAndGet());
208 assertEquals(2, ref.get());
209 ioBaseStream.parallel().iterator().forEachRemaining(e -> ref.incrementAndGet());
210 assertEquals(4, ref.get());
211 final BaseStream<Path, ?> parallel = ioBaseStreamPath.asBaseStream().parallel();
212 parallel.iterator().forEachRemaining(e -> ref.incrementAndGet());
213 assertEquals(6, ref.get());
214 assertTrue(parallel.isParallel());
215 }
216
217 @SuppressWarnings("resource")
218 @Test
219 public void testParallelParallel() {
220 try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) {
221 testParallelParallel(stream);
222 }
223 try (final IOBaseStream<?, ?, ?> stream = createIOBaseStreamPath()) {
224 testParallelParallel(stream);
225 }
226 try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) {
227 testParallelParallel(stream);
228 }
229 try (final IOBaseStreamFixture<Path, ?, Stream<Path>> stream = createIOBaseStream()) {
230 testParallelParallel(stream.asBaseStream());
231 }
232 }
233
234 @SuppressWarnings("resource")
235 private void testParallelParallel(final BaseStream<?, ?> stream) {
236 final BaseStream<?, ?> seq = stream.sequential();
237 assertFalse(seq.isParallel());
238 final BaseStream<?, ?> p1 = seq.parallel();
239 assertTrue(p1.isParallel());
240 final BaseStream<?, ?> p2 = p1.parallel();
241 assertTrue(p1.isParallel());
242 assertSame(p1, p2);
243 }
244
245 @SuppressWarnings("resource")
246 private void testParallelParallel(final IOBaseStream<?, ?, ?> stream) {
247 final IOBaseStream<?, ?, ?> seq = stream.sequential();
248 assertFalse(seq.isParallel());
249 final IOBaseStream<?, ?, ?> p1 = seq.parallel();
250 assertTrue(p1.isParallel());
251 final IOBaseStream<?, ?, ?> p2 = p1.parallel();
252 assertTrue(p1.isParallel());
253 assertSame(p1, p2);
254 }
255
256 @SuppressWarnings("resource")
257 @Test
258 public void testSequential() throws IOException {
259 final AtomicInteger ref = new AtomicInteger();
260 baseStream.sequential().iterator().forEachRemaining(e -> ref.incrementAndGet());
261 assertEquals(2, ref.get());
262 ioBaseStream.sequential().iterator().forEachRemaining(e -> ref.incrementAndGet());
263 assertEquals(4, ref.get());
264 ioBaseStreamPath.asBaseStream().sequential().iterator().forEachRemaining(e -> ref.incrementAndGet());
265 assertEquals(6, ref.get());
266 }
267
268 @SuppressWarnings("resource")
269 @Test
270 public void testSequentialSequential() {
271 try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) {
272 testSequentialSequential(stream);
273 }
274 try (final IOBaseStream<?, ?, ?> stream = createIOBaseStreamPath()) {
275 testSequentialSequential(stream);
276 }
277 try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) {
278 testSequentialSequential(stream.asBaseStream());
279 }
280 }
281
282 @SuppressWarnings("resource")
283 private void testSequentialSequential(final BaseStream<?, ?> stream) {
284 final BaseStream<?, ?> p = stream.parallel();
285 assertTrue(p.isParallel());
286 final BaseStream<?, ?> seq1 = p.sequential();
287 assertFalse(seq1.isParallel());
288 final BaseStream<?, ?> seq2 = seq1.sequential();
289 assertFalse(seq1.isParallel());
290 assertSame(seq1, seq2);
291 }
292
293 @SuppressWarnings("resource")
294 private void testSequentialSequential(final IOBaseStream<?, ?, ?> stream) {
295 final IOBaseStream<?, ?, ?> p = stream.parallel();
296 assertTrue(p.isParallel());
297 final IOBaseStream<?, ?, ?> seq1 = p.sequential();
298 assertFalse(seq1.isParallel());
299 final IOBaseStream<?, ?, ?> seq2 = seq1.sequential();
300 assertFalse(seq1.isParallel());
301 assertSame(seq1, seq2);
302 }
303
304 @SuppressWarnings("resource")
305 @Test
306 public void testSpliterator() {
307 final AtomicInteger ref = new AtomicInteger();
308 baseStream.spliterator().forEachRemaining(e -> ref.incrementAndGet());
309 assertEquals(2, ref.get());
310 ioBaseStream.spliterator().forEachRemaining(e -> ref.incrementAndGet());
311 assertEquals(4, ref.get());
312 ioBaseStreamPath.asBaseStream().spliterator().forEachRemaining(e -> ref.incrementAndGet());
313 assertEquals(6, ref.get());
314 }
315
316 @SuppressWarnings("resource")
317 @Test
318 public void testUnordered() throws IOException {
319 final AtomicInteger ref = new AtomicInteger();
320 baseStream.unordered().iterator().forEachRemaining(e -> ref.incrementAndGet());
321 assertEquals(2, ref.get());
322 ioBaseStream.unordered().iterator().forEachRemaining(e -> ref.incrementAndGet());
323 assertEquals(4, ref.get());
324 ioBaseStreamPath.asBaseStream().unordered().iterator().forEachRemaining(e -> ref.incrementAndGet());
325 assertEquals(6, ref.get());
326 }
327
328 @SuppressWarnings("resource")
329 @Test
330 public void testUnwrap() {
331 final AtomicInteger ref = new AtomicInteger();
332 baseStream.iterator().forEachRemaining(e -> ref.incrementAndGet());
333 assertEquals(2, ref.get());
334 ioBaseStream.unwrap().iterator().forEachRemaining(e -> ref.incrementAndGet());
335 assertEquals(4, ref.get());
336 ioBaseStreamPath.asBaseStream().iterator().forEachRemaining(e -> ref.incrementAndGet());
337 assertEquals(6, ref.get());
338 }
339
340 @Test
341 public void testWrap() {
342 final Stream<Path> stream = createStreamOfPaths();
343 @SuppressWarnings("resource")
344 final IOStream<Path> wrap = ioBaseStreamAdapter.wrap(stream);
345 assertNotNull(wrap);
346 assertEquals(stream, wrap.unwrap());
347 }
348
349 }