View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.io.function;
19  
20  import static org.junit.jupiter.api.Assertions.assertEquals;
21  import static org.junit.jupiter.api.Assertions.assertFalse;
22  import static org.junit.jupiter.api.Assertions.assertNotNull;
23  import static org.junit.jupiter.api.Assertions.assertSame;
24  import static org.junit.jupiter.api.Assertions.assertThrows;
25  import static org.junit.jupiter.api.Assertions.assertTrue;
26  
27  import java.io.IOException;
28  import java.nio.file.Path;
29  import java.util.concurrent.atomic.AtomicInteger;
30  import java.util.concurrent.atomic.AtomicReference;
31  import java.util.stream.BaseStream;
32  import java.util.stream.Stream;
33  
34  import org.junit.jupiter.api.AfterEach;
35  import org.junit.jupiter.api.BeforeEach;
36  import org.junit.jupiter.api.Test;
37  
38  /**
39   * Tests {@link IOBaseStream}.
40   */
41  public class IOBaseStreamTest {
42  
43      /**
44       * Implements IOBaseStream with generics.
45       */
46      private static class IOBaseStreamFixture<T, S extends IOBaseStreamFixture<T, S, B>, B extends BaseStream<T, B>> implements IOBaseStream<T, S, B> {
47  
48          private final B baseStream;
49  
50          private IOBaseStreamFixture(final B baseStream) {
51              this.baseStream = baseStream;
52          }
53  
54          @Override
55          public B unwrap() {
56              return baseStream;
57          }
58  
59          @SuppressWarnings("unchecked") // We are this here
60          @Override
61          public S wrap(final B delegate) {
62              return delegate == baseStream ? (S) this : (S) new IOBaseStreamFixture<T, S, B>(delegate);
63          }
64  
65      }
66  
67      /**
68       * Implements IOBaseStream with a concrete type.
69       */
70      private static final class IOBaseStreamPathFixture<B extends BaseStream<Path, B>> extends IOBaseStreamFixture<Path, IOBaseStreamPathFixture<B>, B> {
71  
72          private IOBaseStreamPathFixture(final B baseStream) {
73              super(baseStream);
74          }
75  
76          @Override
77          public IOBaseStreamPathFixture<B> wrap(final B delegate) {
78              return delegate == unwrap() ? this : new IOBaseStreamPathFixture<>(delegate);
79          }
80  
81      }
82  
83      private static final class MyRuntimeException extends RuntimeException {
84  
85          private static final long serialVersionUID = 1L;
86  
87          public MyRuntimeException(final String message) {
88              super(message);
89          }
90  
91      }
92  
93      /** Sanity check */
94      private BaseStream<Path, ? extends BaseStream<Path, ?>> baseStream;
95  
96      /** Generic version */
97      private IOBaseStreamFixture<Path, ? extends IOBaseStreamFixture<Path, ?, ?>, ?> ioBaseStream;
98  
99      /** Concrete version */
100     private IOBaseStreamPathFixture<? extends BaseStream<Path, ?>> ioBaseStreamPath;
101 
102     /** Adapter version */
103     private IOStream<Path> ioBaseStreamAdapter;
104 
105     @BeforeEach
106     public void beforeEach() {
107         baseStream = createStreamOfPaths();
108         ioBaseStream = createIOBaseStream();
109         ioBaseStreamPath = createIOBaseStreamPath();
110         ioBaseStreamAdapter = createIOBaseStreamAdapter();
111     }
112 
113     private IOBaseStreamFixture<Path, ?, Stream<Path>> createIOBaseStream() {
114         return new IOBaseStreamFixture<>(createStreamOfPaths());
115     }
116 
117     private IOStream<Path> createIOBaseStreamAdapter() {
118         return IOStreamAdapter.adapt(createStreamOfPaths());
119     }
120 
121     private IOBaseStreamPathFixture<Stream<Path>> createIOBaseStreamPath() {
122         return new IOBaseStreamPathFixture<>(createStreamOfPaths());
123     }
124 
125     private Stream<Path> createStreamOfPaths() {
126         return Stream.of(TestConstants.ABS_PATH_A, TestConstants.ABS_PATH_B);
127     }
128 
129     @Test
130     @AfterEach
131     public void testClose() {
132         baseStream.close();
133         ioBaseStream.close();
134         ioBaseStreamPath.close();
135         ioBaseStream.asBaseStream().close();
136         ioBaseStreamPath.asBaseStream().close();
137     }
138 
139     @SuppressWarnings("resource") // @AfterEach
140     @Test
141     public void testIsParallel() {
142         assertFalse(baseStream.isParallel());
143         assertFalse(ioBaseStream.isParallel());
144         assertFalse(ioBaseStream.asBaseStream().isParallel());
145         assertFalse(ioBaseStreamPath.asBaseStream().isParallel());
146         assertFalse(ioBaseStreamPath.isParallel());
147     }
148 
149     @SuppressWarnings("resource") // @AfterEach
150     @Test
151     public void testIteratorPathIO() throws IOException {
152         final AtomicReference<Path> ref = new AtomicReference<>();
153         ioBaseStream.iterator().forEachRemaining(e -> ref.set(e.toRealPath()));
154         assertEquals(TestConstants.ABS_PATH_B.toRealPath(), ref.get());
155         //
156         ioBaseStreamPath.asBaseStream().iterator().forEachRemaining(e -> ref.set(e.getFileName()));
157         assertEquals(TestConstants.ABS_PATH_B.getFileName(), ref.get());
158     }
159 
160     @SuppressWarnings("resource") // @AfterEach
161     @Test
162     public void testIteratorSimple() throws IOException {
163         final AtomicInteger ref = new AtomicInteger();
164         baseStream.iterator().forEachRemaining(e -> ref.incrementAndGet());
165         assertEquals(2, ref.get());
166         ioBaseStream.iterator().forEachRemaining(e -> ref.incrementAndGet());
167         assertEquals(4, ref.get());
168         ioBaseStreamPath.asBaseStream().iterator().forEachRemaining(e -> ref.incrementAndGet());
169         assertEquals(6, ref.get());
170     }
171 
172     @SuppressWarnings("resource")
173     @Test
174     public void testOnClose() {
175         // Stream
176         testOnClose(baseStream);
177         testOnClose(ioBaseStream.asBaseStream());
178         testOnClose(ioBaseStreamPath.asBaseStream());
179     }
180 
181     @SuppressWarnings("resource")
182     private <T, S extends BaseStream<T, S>> void testOnClose(final BaseStream<T, S> stream) {
183         final AtomicReference<String> refA = new AtomicReference<>();
184         final AtomicReference<String> refB = new AtomicReference<>();
185         stream.onClose(() -> refA.set("A"));
186         stream.onClose(() -> {
187             throw new MyRuntimeException("B");
188         });
189         stream.onClose(() -> {
190             throw new MyRuntimeException("C");
191         });
192         stream.onClose(() -> refB.set("D"));
193         final MyRuntimeException e = assertThrows(MyRuntimeException.class, stream::close);
194         assertEquals("A", refA.get());
195         assertEquals("D", refB.get());
196         assertEquals("B", e.getMessage());
197         final Throwable[] suppressed = e.getSuppressed();
198         assertNotNull(suppressed);
199         assertEquals(1, suppressed.length);
200         assertEquals("C", suppressed[0].getMessage());
201     }
202 
203     @SuppressWarnings("resource")
204     @Test
205     public void testParallel() throws IOException {
206         final AtomicInteger ref = new AtomicInteger();
207         baseStream.parallel().iterator().forEachRemaining(e -> ref.incrementAndGet());
208         assertEquals(2, ref.get());
209         ioBaseStream.parallel().iterator().forEachRemaining(e -> ref.incrementAndGet());
210         assertEquals(4, ref.get());
211         final BaseStream<Path, ?> parallel = ioBaseStreamPath.asBaseStream().parallel();
212         parallel.iterator().forEachRemaining(e -> ref.incrementAndGet());
213         assertEquals(6, ref.get());
214         assertTrue(parallel.isParallel());
215     }
216 
217     @SuppressWarnings("resource") // @AfterEach
218     @Test
219     public void testParallelParallel() {
220         try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) {
221             testParallelParallel(stream);
222         }
223         try (final IOBaseStream<?, ?, ?> stream = createIOBaseStreamPath()) {
224             testParallelParallel(stream);
225         }
226         try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) {
227             testParallelParallel(stream);
228         }
229         try (final IOBaseStreamFixture<Path, ?, Stream<Path>> stream = createIOBaseStream()) {
230             testParallelParallel(stream.asBaseStream());
231         }
232     }
233 
234     @SuppressWarnings("resource")
235     private void testParallelParallel(final BaseStream<?, ?> stream) {
236         final BaseStream<?, ?> seq = stream.sequential();
237         assertFalse(seq.isParallel());
238         final BaseStream<?, ?> p1 = seq.parallel();
239         assertTrue(p1.isParallel());
240         final BaseStream<?, ?> p2 = p1.parallel();
241         assertTrue(p1.isParallel());
242         assertSame(p1, p2);
243     }
244 
245     @SuppressWarnings("resource")
246     private void testParallelParallel(final IOBaseStream<?, ?, ?> stream) {
247         final IOBaseStream<?, ?, ?> seq = stream.sequential();
248         assertFalse(seq.isParallel());
249         final IOBaseStream<?, ?, ?> p1 = seq.parallel();
250         assertTrue(p1.isParallel());
251         final IOBaseStream<?, ?, ?> p2 = p1.parallel();
252         assertTrue(p1.isParallel());
253         assertSame(p1, p2);
254     }
255 
256     @SuppressWarnings("resource")
257     @Test
258     public void testSequential() throws IOException {
259         final AtomicInteger ref = new AtomicInteger();
260         baseStream.sequential().iterator().forEachRemaining(e -> ref.incrementAndGet());
261         assertEquals(2, ref.get());
262         ioBaseStream.sequential().iterator().forEachRemaining(e -> ref.incrementAndGet());
263         assertEquals(4, ref.get());
264         ioBaseStreamPath.asBaseStream().sequential().iterator().forEachRemaining(e -> ref.incrementAndGet());
265         assertEquals(6, ref.get());
266     }
267 
268     @SuppressWarnings("resource") // @AfterEach
269     @Test
270     public void testSequentialSequential() {
271         try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) {
272             testSequentialSequential(stream);
273         }
274         try (final IOBaseStream<?, ?, ?> stream = createIOBaseStreamPath()) {
275             testSequentialSequential(stream);
276         }
277         try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) {
278             testSequentialSequential(stream.asBaseStream());
279         }
280     }
281 
282     @SuppressWarnings("resource")
283     private void testSequentialSequential(final BaseStream<?, ?> stream) {
284         final BaseStream<?, ?> p = stream.parallel();
285         assertTrue(p.isParallel());
286         final BaseStream<?, ?> seq1 = p.sequential();
287         assertFalse(seq1.isParallel());
288         final BaseStream<?, ?> seq2 = seq1.sequential();
289         assertFalse(seq1.isParallel());
290         assertSame(seq1, seq2);
291     }
292 
293     @SuppressWarnings("resource")
294     private void testSequentialSequential(final IOBaseStream<?, ?, ?> stream) {
295         final IOBaseStream<?, ?, ?> p = stream.parallel();
296         assertTrue(p.isParallel());
297         final IOBaseStream<?, ?, ?> seq1 = p.sequential();
298         assertFalse(seq1.isParallel());
299         final IOBaseStream<?, ?, ?> seq2 = seq1.sequential();
300         assertFalse(seq1.isParallel());
301         assertSame(seq1, seq2);
302     }
303 
304     @SuppressWarnings("resource") // @AfterEach
305     @Test
306     public void testSpliterator() {
307         final AtomicInteger ref = new AtomicInteger();
308         baseStream.spliterator().forEachRemaining(e -> ref.incrementAndGet());
309         assertEquals(2, ref.get());
310         ioBaseStream.spliterator().forEachRemaining(e -> ref.incrementAndGet());
311         assertEquals(4, ref.get());
312         ioBaseStreamPath.asBaseStream().spliterator().forEachRemaining(e -> ref.incrementAndGet());
313         assertEquals(6, ref.get());
314     }
315 
316     @SuppressWarnings("resource")
317     @Test
318     public void testUnordered() throws IOException {
319         final AtomicInteger ref = new AtomicInteger();
320         baseStream.unordered().iterator().forEachRemaining(e -> ref.incrementAndGet());
321         assertEquals(2, ref.get());
322         ioBaseStream.unordered().iterator().forEachRemaining(e -> ref.incrementAndGet());
323         assertEquals(4, ref.get());
324         ioBaseStreamPath.asBaseStream().unordered().iterator().forEachRemaining(e -> ref.incrementAndGet());
325         assertEquals(6, ref.get());
326     }
327 
328     @SuppressWarnings("resource")
329     @Test
330     public void testUnwrap() {
331         final AtomicInteger ref = new AtomicInteger();
332         baseStream.iterator().forEachRemaining(e -> ref.incrementAndGet());
333         assertEquals(2, ref.get());
334         ioBaseStream.unwrap().iterator().forEachRemaining(e -> ref.incrementAndGet());
335         assertEquals(4, ref.get());
336         ioBaseStreamPath.asBaseStream().iterator().forEachRemaining(e -> ref.incrementAndGet());
337         assertEquals(6, ref.get());
338     }
339 
340     @Test
341     public void testWrap() {
342         final Stream<Path> stream = createStreamOfPaths();
343         @SuppressWarnings("resource")
344         final IOStream<Path> wrap = ioBaseStreamAdapter.wrap(stream);
345         assertNotNull(wrap);
346         assertEquals(stream, wrap.unwrap());
347     }
348 
349 }