View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.rng;
18  
19  import java.util.Arrays;
20  import java.util.HashSet;
21  import java.util.Spliterator;
22  import java.util.concurrent.Callable;
23  import java.util.concurrent.ExecutionException;
24  import java.util.concurrent.ForkJoinPool;
25  import java.util.concurrent.ThreadLocalRandom;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.concurrent.atomic.AtomicLong;
28  import java.util.function.Consumer;
29  import java.util.function.DoubleConsumer;
30  import java.util.function.IntConsumer;
31  import java.util.function.LongConsumer;
32  import java.util.stream.LongStream;
33  import java.util.stream.Stream;
34  import org.junit.jupiter.api.Assertions;
35  import org.junit.jupiter.api.Test;
36  import org.junit.jupiter.params.ParameterizedTest;
37  import org.junit.jupiter.params.provider.Arguments;
38  import org.junit.jupiter.params.provider.MethodSource;
39  import org.junit.jupiter.params.provider.ValueSource;
40  
41  /**
42   * Tests for split method implementations in
43   * {@link SplittableUniformRandomProvider}.
44   *
45   * <p>This class verifies all exception conditions for the split methods and the
46   * arguments to the methods to stream RNGs. Exception conditions and sequential
47   * (default) output from the primitive stream methods are tested in
48   * {@link SplittableUniformRandomProviderStreamTest}.
49   *
50   * <p>Parallel streams (RNGs and primitives) are tested using a splittable
51   * generator that outputs a unique sequence using an atomic counter that is
52   * thread-safe.
53   */
54  class SplittableUniformRandomProviderTest {
55      private static final long STREAM_SIZE_ONE = 1;
56      /** The expected characteristics for the spliterator from the splittable stream. */
57      private static final int SPLITERATOR_CHARACTERISTICS =
58          Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.NONNULL | Spliterator.IMMUTABLE;
59  
60      /**
61       * Dummy class for checking the behavior of the SplittableUniformRandomProvider.
62       * All generation and split methods throw an exception. This can be used to test
63       * exception conditions for arguments to default stream functions.
64       */
65      private static class DummyGenerator implements SplittableUniformRandomProvider {
66          /** An instance. */
67          static final DummyGenerator INSTANCE = new DummyGenerator();
68  
69          @Override
70          public long nextLong() {
71              throw new UnsupportedOperationException("The nextLong method should not be invoked");
72          }
73  
74          @Override
75          public SplittableUniformRandomProvider split(UniformRandomProvider source) {
76              throw new UnsupportedOperationException("The split(source) method should not be invoked");
77          }
78      }
79  
80      /**
81       * Class for outputting a unique sequence from the nextLong() method even under
82       * recursive splitting. Splitting creates a new instance.
83       */
84      private static class SequenceGenerator implements SplittableUniformRandomProvider {
85          /** The value for nextLong. */
86          private final AtomicLong value;
87  
88          /**
89           * @param seed Sequence seed value.
90           */
91          SequenceGenerator(long seed) {
92              value = new AtomicLong(seed);
93          }
94  
95          /**
96           * @param value The value for nextLong.
97           */
98          SequenceGenerator(AtomicLong value) {
99              this.value = value;
100         }
101 
102         @Override
103         public long nextLong() {
104             return value.getAndIncrement();
105         }
106 
107         @Override
108         public SplittableUniformRandomProvider split(UniformRandomProvider source) {
109             // Ignore the source (use of the source is optional)
110             return new SequenceGenerator(value);
111         }
112     }
113 
114     /**
115      * Class for outputting a fixed value from the nextLong() method even under
116      * recursive splitting. Splitting creates a new instance seeded with the nextLong value
117      * from the source of randomness. This can be used to distinguish self-seeding from
118      * seeding with an alternative source.
119      */
120     private class FixedGenerator implements SplittableUniformRandomProvider {
121         /** The value for nextLong. */
122         private final long value;
123 
124         /**
125          * @param value Fixed value.
126          */
127         FixedGenerator(long value) {
128             this.value = value;
129         }
130 
131         @Override
132         public long nextLong() {
133             return value;
134         }
135 
136         @Override
137         public SplittableUniformRandomProvider split(UniformRandomProvider source) {
138             return new FixedGenerator(source.nextLong());
139         }
140     }
141 
142     /**
143      * Class to track recursive splitting and iterating over a fixed set of values.
144      * Splitting without a source of randomness returns the same instance; with a
145      * source of randomness will throw an exception. All generation methods throw an
146      * exception.
147      *
148      * <p>An atomic counter is maintained to allow concurrent return of unique
149      * values from a fixed array. The values are expected to be maintained in child
150      * classes. Any generation methods that are overridden for tests should
151      * be thread-safe, e.g. returning {@code values[count.getAndIncrement()]}.
152      *
153      * <p>A count of the number of splits is maintained. This is not used for assertions
154      * to avoid test failures that may occur when streams are split differently, or not
155      * at all, by the current JVM. The count can be used to debug splitting behavior
156      * on JVM implementations.
157      */
158     private static class CountingGenerator extends DummyGenerator {
159         /** The split count. Incrementded when the generator is split. */
160         protected final AtomicInteger splitCount = new AtomicInteger();
161         /** The count of returned values. */
162         protected final AtomicInteger count = new AtomicInteger();
163 
164         @Override
165         public SplittableUniformRandomProvider split() {
166             splitCount.getAndIncrement();
167             return this;
168         }
169     }
170 
171     /**
172      * Class to return the same instance when splitting without a source of randomness;
173      * with a source of randomness will throw an exception. All generation methods
174      * throw an exception. Any generation methods that are overridden for tests should
175      * be thread-safe.
176      */
177     private abstract static class SingleInstanceGenerator extends DummyGenerator {
178         @Override
179         public SplittableUniformRandomProvider split() {
180             return this;
181         }
182     }
183 
184     /**
185      * Thread and stream sizes used to test parallel streams.
186      *
187      * @return the arguments
188      */
189     static Stream<Arguments> threadAndStreamSizes() {
190         return Stream.of(
191             Arguments.of(1, 16),
192             Arguments.of(2, 16),
193             Arguments.of(4, 16),
194             Arguments.of(8, 16),
195             Arguments.of(4, 2),
196             Arguments.of(8, 4)
197         );
198     }
199 
200     /**
201      * Execute the task in a ForkJoinPool with the specified level of parallelism. Any
202      * parallel stream executing in the task should be limited to the specified level of
203      * parallelism.
204      *
205      * <p><b>Note</b>
206      *
207      * <p>This is a JDK undocumented feature of streams to use the enclosing ForkJoinPool
208      * in-place of {@link ForkJoinPool#commonPool()}; this behaviour may be subject to
209      * change.
210      *
211      * <p>Here the intention is to force the parallel stream to execute with a varying
212      * number of threads. Note that debugging using the {@link CountingGenerator}
213      * indicates that the number of splits is not influenced by the enclosing pool
214      * parallelism but rather the number of stream elements and possibly the
215      * <em>standard</em> number of available processors. Further testing on Linux using
216      * {@code numactl -C 1} to limit the number of processors returns 1 for
217      * {@link ForkJoinPool#getCommonPoolParallelism()} and
218      * {@link Runtime#availableProcessors()} with no change in the number of splits
219      * performed by parallel streams. This indicates the splitting of parallel streams may
220      * not respect the limits imposed on the executing JVM. However this does mean that
221      * tests using this method do test the splitting of the stream, irrespective of
222      * configured parallelism when executed on a machine that has multiple CPU cores, i.e.
223      * the <em>potential</em> for parallelism.
224      *
225      * <p>It is unknown if the parallel streams will split when executed on a true single-core
226      * JVM such as that provided by a continuous integration build environment running for
227      * example in a virtual machine.
228      *
229      * @param <T> Return type of the task.
230      * @param parallelism Level of parallelism.
231      * @param task Task.
232      * @return the task result
233      * @throws InterruptedException the interrupted exception
234      * @throws ExecutionException the execution exception
235      */
236     private static <T> T execute(int parallelism, Callable<T> task) throws InterruptedException, ExecutionException {
237         final ForkJoinPool threadPool = new ForkJoinPool(parallelism);
238         try {
239             return threadPool.submit(task).get();
240         } finally {
241             threadPool.shutdown();
242         }
243     }
244 
245     /**
246      * Helper method to raise an assertion error inside an action passed to a Spliterator
247      * when the action should not be invoked.
248      *
249      * @see Spliterator#tryAdvance(Consumer)
250      * @see Spliterator#forEachRemaining(Consumer)
251      */
252     private static void failSpliteratorShouldBeEmpty() {
253         Assertions.fail("Spliterator should not have any remaining elements");
254     }
255 
256     @Test
257     void testDefaultSplit() {
258         // Create the split result so we can check the return value
259         final SplittableUniformRandomProvider expected = new DummyGenerator();
260         // Implement split(UniformRandomProvider)
261         final SplittableUniformRandomProvider rng = new DummyGenerator() {
262             @Override
263             public SplittableUniformRandomProvider split(UniformRandomProvider source) {
264                 Assertions.assertSame(this, source, "default split should use itself as the source");
265                 return expected;
266             }
267         };
268         // Test the default split()
269         Assertions.assertSame(expected, rng.split());
270     }
271 
272     // Tests for splitting the stream of splittable RNGs
273 
274     @ParameterizedTest
275     @ValueSource(longs = {-1, -2, Long.MIN_VALUE})
276     void testSplitsInvalidStreamSizeThrows(long size) {
277         final SplittableUniformRandomProvider rng = DummyGenerator.INSTANCE;
278         Assertions.assertThrows(IllegalArgumentException.class, () -> rng.splits(size), "splits(size)");
279         final SplittableUniformRandomProvider source = new SequenceGenerator(42);
280         Assertions.assertThrows(IllegalArgumentException.class, () -> rng.splits(size, source), "splits(size, source)");
281     }
282 
283     @Test
284     void testSplitsUnlimitedStreamSize() {
285         final SplittableUniformRandomProvider rng = DummyGenerator.INSTANCE;
286         assertUnlimitedSpliterator(rng.splits().spliterator(), "splits()");
287         final SplittableUniformRandomProvider source = new SequenceGenerator(42);
288         assertUnlimitedSpliterator(rng.splits(source).spliterator(), "splits(source)");
289     }
290 
291     /**
292      * Assert the spliterator has an unlimited expected size and the characteristics for a sized
293      * non-null immutable stream.
294      *
295      * @param spliterator Spliterator.
296      * @param msg Error message.
297      */
298     private static void assertUnlimitedSpliterator(Spliterator<?> spliterator, String msg) {
299         BaseRandomProviderStreamTest.assertSpliterator(spliterator, Long.MAX_VALUE, SPLITERATOR_CHARACTERISTICS, msg);
300     }
301 
302     @Test
303     void testSplitsNullSourceThrows() {
304         final SplittableUniformRandomProvider rng = DummyGenerator.INSTANCE;
305         final SplittableUniformRandomProvider source = null;
306         Assertions.assertThrows(NullPointerException.class, () -> rng.splits(source));
307         Assertions.assertThrows(NullPointerException.class, () -> rng.splits(STREAM_SIZE_ONE, source));
308     }
309 
310     /**
311      * Test the splits method. The test asserts that a parallel stream of RNGs output a
312      * sequence using a specialised sequence generator that maintains the sequence output
313      * under recursive splitting.
314      */
315     @ParameterizedTest
316     @MethodSource(value = {"threadAndStreamSizes"})
317     void testSplitsParallel(int threads, long streamSize) throws InterruptedException, ExecutionException {
318         final long start = Integer.toUnsignedLong(ThreadLocalRandom.current().nextInt());
319         final long[] actual = execute(threads, (Callable<long[]>) () -> {
320             // The splits method will use itself as the source and the output should be the sequence
321             final SplittableUniformRandomProvider rng = new SequenceGenerator(start);
322             final SplittableUniformRandomProvider[] rngs =
323                     rng.splits(streamSize).parallel().toArray(SplittableUniformRandomProvider[]::new);
324             // Check the instance is a new object of the same type.
325             // These will be hashed using the system identity hash code.
326             final HashSet<SplittableUniformRandomProvider> observed = new HashSet<>();
327             observed.add(rng);
328             Arrays.stream(rngs).forEach(r -> {
329                 Assertions.assertTrue(observed.add(r), "Instance should be unique");
330                 Assertions.assertEquals(SequenceGenerator.class, r.getClass());
331             });
332             // Get output from the unique RNGs: these return from the same atomic sequence
333             return Arrays.stream(rngs).mapToLong(UniformRandomProvider::nextLong).toArray();
334         });
335         // Required to reorder the sequence to ascending
336         Arrays.sort(actual);
337         final long[] expected = LongStream.range(start, start + streamSize).toArray();
338         Assertions.assertArrayEquals(expected, actual);
339     }
340 
341     /**
342      * Test the splits method. The test asserts that a parallel stream of RNGs output a
343      * sequence using a specialised sequence generator that maintains the sequence output
344      * under recursive splitting. The sequence is used to seed a fixed generator. The stream
345      * instances are verified to be the correct class type.
346      */
347     @ParameterizedTest
348     @MethodSource(value = {"threadAndStreamSizes"})
349     void testSplitsParallelWithSource(int threads, long streamSize) throws InterruptedException, ExecutionException {
350         final long start = Integer.toUnsignedLong(ThreadLocalRandom.current().nextInt());
351         final long[] actual = execute(threads, (Callable<long[]>) () -> {
352             // This generator defines the instances created.
353             // It should not be split without a source.
354             // Seed with something not the start value.
355             final SplittableUniformRandomProvider rng = new FixedGenerator(~start) {
356                 @Override
357                 public SplittableUniformRandomProvider split() {
358                     throw new UnsupportedOperationException("The split method should not be invoked");
359                 }
360             };
361             // The splits method will use this to seed each instance.
362             // This generator is split within the spliterator.
363             final SplittableUniformRandomProvider source = new SequenceGenerator(start);
364             final SplittableUniformRandomProvider[] rngs =
365                 rng.splits(streamSize, source).parallel().toArray(SplittableUniformRandomProvider[]::new);
366             // Check the instance is a new object of the same type.
367             // These will be hashed using the system identity hash code.
368             final HashSet<SplittableUniformRandomProvider> observed = new HashSet<>();
369             observed.add(rng);
370             Arrays.stream(rngs).forEach(r -> {
371                 Assertions.assertTrue(observed.add(r), "Instance should be unique");
372                 Assertions.assertEquals(FixedGenerator.class, r.getClass());
373             });
374             // Get output from the unique RNGs: these return from the same atomic sequence
375             return Arrays.stream(rngs).mapToLong(UniformRandomProvider::nextLong).toArray();
376         });
377         // Required to reorder the sequence to ascending
378         Arrays.sort(actual);
379         final long[] expected = LongStream.range(start, start + streamSize).toArray();
380         Assertions.assertArrayEquals(expected, actual);
381     }
382 
383     @Test
384     void testSplitsSpliterator() {
385         final int start = 42;
386         final SplittableUniformRandomProvider rng = new SequenceGenerator(start);
387 
388         // Split a large spliterator into four smaller ones;
389         // each is used to test different functionality
390         final long size = 41;
391         Spliterator<SplittableUniformRandomProvider> s1 = rng.splits(size).spliterator();
392         Assertions.assertEquals(size, s1.estimateSize());
393         final Spliterator<SplittableUniformRandomProvider> s2 = s1.trySplit();
394         final Spliterator<SplittableUniformRandomProvider> s3 = s1.trySplit();
395         final Spliterator<SplittableUniformRandomProvider> s4 = s2.trySplit();
396         Assertions.assertEquals(size, s1.estimateSize() + s2.estimateSize() + s3.estimateSize() + s4.estimateSize());
397 
398         // s1. Test cannot split indefinitely
399         while (s1.estimateSize() > 1) {
400             final long currentSize = s1.estimateSize();
401             final Spliterator<SplittableUniformRandomProvider> other = s1.trySplit();
402             Assertions.assertEquals(currentSize, s1.estimateSize() + other.estimateSize());
403             s1 = other;
404         }
405         Assertions.assertNull(s1.trySplit(), "Cannot split when size <= 1");
406 
407         // The expected value is incremented for each generation call
408         final long[] expected = {start};
409 
410         // s2. Test advance
411         for (long newSize = s2.estimateSize(); newSize-- > 0;) {
412             Assertions.assertTrue(s2.tryAdvance(r -> Assertions.assertEquals(expected[0]++, r.nextLong())));
413             Assertions.assertEquals(newSize, s2.estimateSize(), "s2 size estimate");
414         }
415         Assertions.assertFalse(s2.tryAdvance(r -> failSpliteratorShouldBeEmpty()));
416         s2.forEachRemaining(r -> failSpliteratorShouldBeEmpty());
417 
418         // s3. Test forEachRemaining
419         s3.forEachRemaining(r -> Assertions.assertEquals(expected[0]++, r.nextLong()));
420         Assertions.assertEquals(0, s3.estimateSize());
421         s3.forEachRemaining(r -> failSpliteratorShouldBeEmpty());
422 
423         // s4. Test tryAdvance and forEachRemaining when the action throws an exception
424         final IllegalStateException ex = new IllegalStateException();
425         final Consumer<SplittableUniformRandomProvider> badAction = r -> {
426             throw ex;
427         };
428         final long currentSize = s4.estimateSize();
429         Assertions.assertTrue(currentSize > 1, "Spliterator requires more elements to test advance");
430         Assertions.assertSame(ex, Assertions.assertThrows(IllegalStateException.class, () -> s4.tryAdvance(badAction)));
431         Assertions.assertEquals(currentSize - 1, s4.estimateSize(), "Spliterator should be advanced even when action throws");
432 
433         Assertions.assertSame(ex, Assertions.assertThrows(IllegalStateException.class, () -> s4.forEachRemaining(badAction)));
434         Assertions.assertEquals(0, s4.estimateSize(), "Spliterator should be finished even when action throws");
435         s4.forEachRemaining(r -> failSpliteratorShouldBeEmpty());
436     }
437 
438     // Tests for splitting the primitive streams to test support for parallel execution
439 
440     @ParameterizedTest
441     @MethodSource(value = {"threadAndStreamSizes"})
442     void testIntsParallelWithSize(int threads, long streamSize) throws InterruptedException, ExecutionException {
443         final int[] values = ThreadLocalRandom.current().ints(streamSize).toArray();
444         final CountingGenerator rng = new CountingGenerator() {
445             @Override
446             public int nextInt() {
447                 return values[count.getAndIncrement()];
448             }
449         };
450         final int[] actual = execute(threads, (Callable<int[]>) () ->
451             rng.ints(streamSize).parallel().toArray()
452         );
453         Arrays.sort(values);
454         Arrays.sort(actual);
455         Assertions.assertArrayEquals(values, actual);
456     }
457 
458     @ParameterizedTest
459     @MethodSource(value = {"threadAndStreamSizes"})
460     void testIntsParallelOriginBoundWithSize(int threads, long streamSize) throws InterruptedException, ExecutionException {
461         final int origin = 13;
462         final int bound = 42;
463         final int[] values = ThreadLocalRandom.current().ints(streamSize, origin, bound).toArray();
464         final CountingGenerator rng = new CountingGenerator() {
465             @Override
466             public int nextInt(int o, int b) {
467                 Assertions.assertEquals(origin, o, "origin");
468                 Assertions.assertEquals(bound, b, "bound");
469                 return values[count.getAndIncrement()];
470             }
471         };
472         final int[] actual = execute(threads, (Callable<int[]>) () ->
473             rng.ints(streamSize, origin, bound).parallel().toArray()
474         );
475         Arrays.sort(values);
476         Arrays.sort(actual);
477         Assertions.assertArrayEquals(values, actual);
478     }
479 
480     @Test
481     void testIntsSpliterator() {
482         final int start = 42;
483         final SplittableUniformRandomProvider rng = new SingleInstanceGenerator() {
484             private final AtomicInteger value = new AtomicInteger(start);
485 
486             @Override
487             public int nextInt() {
488                 return value.getAndIncrement();
489             }
490         };
491 
492         // Split a large spliterator into four smaller ones;
493         // each is used to test different functionality
494         final long size = 41;
495         Spliterator.OfInt s1 = rng.ints(size).spliterator();
496         Assertions.assertEquals(size, s1.estimateSize());
497         final Spliterator.OfInt s2 = s1.trySplit();
498         final Spliterator.OfInt s3 = s1.trySplit();
499         final Spliterator.OfInt s4 = s2.trySplit();
500         Assertions.assertEquals(size, s1.estimateSize() + s2.estimateSize() + s3.estimateSize() + s4.estimateSize());
501 
502         // s1. Test cannot split indefinitely
503         while (s1.estimateSize() > 1) {
504             final long currentSize = s1.estimateSize();
505             final Spliterator.OfInt other = s1.trySplit();
506             Assertions.assertEquals(currentSize, s1.estimateSize() + other.estimateSize());
507             s1 = other;
508         }
509         Assertions.assertNull(s1.trySplit(), "Cannot split when size <= 1");
510 
511         // The expected value is incremented for each generation call
512         final int[] expected = {start};
513 
514         // s2. Test advance
515         for (long newSize = s2.estimateSize(); newSize-- > 0;) {
516             Assertions.assertTrue(s2.tryAdvance((IntConsumer) i -> Assertions.assertEquals(expected[0]++, i)));
517             Assertions.assertEquals(newSize, s2.estimateSize(), "s2 size estimate");
518         }
519         Assertions.assertFalse(s2.tryAdvance((IntConsumer) i -> failSpliteratorShouldBeEmpty()));
520         s2.forEachRemaining((IntConsumer) i -> failSpliteratorShouldBeEmpty());
521 
522         // s3. Test forEachRemaining
523         s3.forEachRemaining((IntConsumer) i -> Assertions.assertEquals(expected[0]++, i));
524         Assertions.assertEquals(0, s3.estimateSize());
525         s3.forEachRemaining((IntConsumer) i -> failSpliteratorShouldBeEmpty());
526 
527         // s4. Test tryAdvance and forEachRemaining when the action throws an exception
528         final IllegalStateException ex = new IllegalStateException();
529         final IntConsumer badAction = i -> {
530             throw ex;
531         };
532         final long currentSize = s4.estimateSize();
533         Assertions.assertTrue(currentSize > 1, "Spliterator requires more elements to test advance");
534         Assertions.assertSame(ex, Assertions.assertThrows(IllegalStateException.class, () -> s4.tryAdvance(badAction)));
535         Assertions.assertEquals(currentSize - 1, s4.estimateSize(), "Spliterator should be advanced even when action throws");
536 
537         Assertions.assertSame(ex, Assertions.assertThrows(IllegalStateException.class, () -> s4.forEachRemaining(badAction)));
538         Assertions.assertEquals(0, s4.estimateSize(), "Spliterator should be finished even when action throws");
539         s4.forEachRemaining((IntConsumer) i -> failSpliteratorShouldBeEmpty());
540     }
541 
542     @ParameterizedTest
543     @MethodSource(value = {"threadAndStreamSizes"})
544     void testLongsParallelWithSize(int threads, long streamSize) throws InterruptedException, ExecutionException {
545         final long[] values = ThreadLocalRandom.current().longs(streamSize).toArray();
546         final CountingGenerator rng = new CountingGenerator() {
547             @Override
548             public long nextLong() {
549                 return values[count.getAndIncrement()];
550             }
551         };
552         final long[] actual = execute(threads, (Callable<long[]>) () ->
553             rng.longs(streamSize).parallel().toArray()
554         );
555         Arrays.sort(values);
556         Arrays.sort(actual);
557         Assertions.assertArrayEquals(values, actual);
558     }
559 
560     @ParameterizedTest
561     @MethodSource(value = {"threadAndStreamSizes"})
562     void testLongsParallelOriginBoundWithSize(int threads, long streamSize) throws InterruptedException, ExecutionException {
563         final long origin = 195267376168313L;
564         final long bound = 421268681268318L;
565         final long[] values = ThreadLocalRandom.current().longs(streamSize, origin, bound).toArray();
566         final CountingGenerator rng = new CountingGenerator() {
567             @Override
568             public long nextLong(long o, long b) {
569                 Assertions.assertEquals(origin, o, "origin");
570                 Assertions.assertEquals(bound, b, "bound");
571                 return values[count.getAndIncrement()];
572             }
573         };
574         final long[] actual = execute(threads, (Callable<long[]>) () ->
575             rng.longs(streamSize, origin, bound).parallel().toArray()
576         );
577         Arrays.sort(values);
578         Arrays.sort(actual);
579         Assertions.assertArrayEquals(values, actual);
580     }
581 
582     @Test
583     void testLongsSpliterator() {
584         final long start = 42;
585         final SplittableUniformRandomProvider rng = new SingleInstanceGenerator() {
586             private final AtomicLong value = new AtomicLong(start);
587 
588             @Override
589             public long nextLong() {
590                 return value.getAndIncrement();
591             }
592         };
593 
594         // Split a large spliterator into four smaller ones;
595         // each is used to test different functionality
596         final long size = 41;
597         Spliterator.OfLong s1 = rng.longs(size).spliterator();
598         Assertions.assertEquals(size, s1.estimateSize());
599         final Spliterator.OfLong s2 = s1.trySplit();
600         final Spliterator.OfLong s3 = s1.trySplit();
601         final Spliterator.OfLong s4 = s2.trySplit();
602         Assertions.assertEquals(size, s1.estimateSize() + s2.estimateSize() + s3.estimateSize() + s4.estimateSize());
603 
604         // s1. Test cannot split indefinitely
605         while (s1.estimateSize() > 1) {
606             final long currentSize = s1.estimateSize();
607             final Spliterator.OfLong other = s1.trySplit();
608             Assertions.assertEquals(currentSize, s1.estimateSize() + other.estimateSize());
609             s1 = other;
610         }
611         Assertions.assertNull(s1.trySplit(), "Cannot split when size <= 1");
612 
613         // The expected value is incremented for each generation call
614         final long[] expected = {start};
615 
616         // s2. Test advance
617         for (long newSize = s2.estimateSize(); newSize-- > 0;) {
618             Assertions.assertTrue(s2.tryAdvance((LongConsumer) i -> Assertions.assertEquals(expected[0]++, i)));
619             Assertions.assertEquals(newSize, s2.estimateSize(), "s2 size estimate");
620         }
621         Assertions.assertFalse(s2.tryAdvance((LongConsumer) i -> failSpliteratorShouldBeEmpty()));
622         s2.forEachRemaining((LongConsumer) i -> failSpliteratorShouldBeEmpty());
623 
624         // s3. Test forEachRemaining
625         s3.forEachRemaining((LongConsumer) i -> Assertions.assertEquals(expected[0]++, i));
626         Assertions.assertEquals(0, s3.estimateSize());
627         s3.forEachRemaining((LongConsumer) i -> failSpliteratorShouldBeEmpty());
628 
629         // s4. Test tryAdvance and forEachRemaining when the action throws an exception
630         final IllegalStateException ex = new IllegalStateException();
631         final LongConsumer badAction = i -> {
632             throw ex;
633         };
634         final long currentSize = s4.estimateSize();
635         Assertions.assertTrue(currentSize > 1, "Spliterator requires more elements to test advance");
636         Assertions.assertSame(ex, Assertions.assertThrows(IllegalStateException.class, () -> s4.tryAdvance(badAction)));
637         Assertions.assertEquals(currentSize - 1, s4.estimateSize(), "Spliterator should be advanced even when action throws");
638 
639         Assertions.assertSame(ex, Assertions.assertThrows(IllegalStateException.class, () -> s4.forEachRemaining(badAction)));
640         Assertions.assertEquals(0, s4.estimateSize(), "Spliterator should be finished even when action throws");
641         s4.forEachRemaining((LongConsumer) i -> failSpliteratorShouldBeEmpty());
642     }
643 
644     @ParameterizedTest
645     @MethodSource(value = {"threadAndStreamSizes"})
646     void testDoublesParallelWithSize(int threads, long streamSize) throws InterruptedException, ExecutionException {
647         final double[] values = ThreadLocalRandom.current().doubles(streamSize).toArray();
648         final CountingGenerator rng = new CountingGenerator() {
649             @Override
650             public double nextDouble() {
651                 return values[count.getAndIncrement()];
652             }
653         };
654         final double[] actual = execute(threads, (Callable<double[]>) () ->
655             rng.doubles(streamSize).parallel().toArray()
656         );
657         Arrays.sort(values);
658         Arrays.sort(actual);
659         Assertions.assertArrayEquals(values, actual);
660     }
661 
662     @ParameterizedTest
663     @MethodSource(value = {"threadAndStreamSizes"})
664     void testDoublesParallelOriginBoundWithSize(int threads, long streamSize) throws InterruptedException, ExecutionException {
665         final double origin = 0.123;
666         final double bound = 0.789;
667         final double[] values = ThreadLocalRandom.current().doubles(streamSize, origin, bound).toArray();
668         final CountingGenerator rng = new CountingGenerator() {
669             @Override
670             public double nextDouble(double o, double b) {
671                 Assertions.assertEquals(origin, o, "origin");
672                 Assertions.assertEquals(bound, b, "bound");
673                 return values[count.getAndIncrement()];
674             }
675         };
676         final double[] actual = execute(threads, (Callable<double[]>) () ->
677             rng.doubles(streamSize, origin, bound).parallel().toArray()
678         );
679         Arrays.sort(values);
680         Arrays.sort(actual);
681         Assertions.assertArrayEquals(values, actual);
682     }
683 
684     @Test
685     void testDoublesSpliterator() {
686         // Due to lack of an AtomicDouble this uses an AtomicInteger. Any int value can be
687         // represented as a double and the increment operator functions without loss of
688         // precision (the same is not true if using an AtomicLong with >53 bits of precision).
689         final int start = 42;
690         final SplittableUniformRandomProvider rng = new SingleInstanceGenerator() {
691             private final AtomicInteger value = new AtomicInteger(start);
692 
693             @Override
694             public double nextDouble() {
695                 return value.getAndIncrement();
696             }
697         };
698 
699         // Split a large spliterator into four smaller ones;
700         // each is used to test different functionality
701         final long size = 41;
702         Spliterator.OfDouble s1 = rng.doubles(size).spliterator();
703         Assertions.assertEquals(size, s1.estimateSize());
704         final Spliterator.OfDouble s2 = s1.trySplit();
705         final Spliterator.OfDouble s3 = s1.trySplit();
706         final Spliterator.OfDouble s4 = s2.trySplit();
707         Assertions.assertEquals(size, s1.estimateSize() + s2.estimateSize() + s3.estimateSize() + s4.estimateSize());
708 
709         // s1. Test cannot split indefinitely
710         while (s1.estimateSize() > 1) {
711             final double currentSize = s1.estimateSize();
712             final Spliterator.OfDouble other = s1.trySplit();
713             Assertions.assertEquals(currentSize, s1.estimateSize() + other.estimateSize());
714             s1 = other;
715         }
716         Assertions.assertNull(s1.trySplit(), "Cannot split when size <= 1");
717 
718         // The expected value is incremented for each generation call
719         final double[] expected = {start};
720 
721         // s2. Test advance
722         for (double newSize = s2.estimateSize(); newSize-- > 0;) {
723             Assertions.assertTrue(s2.tryAdvance((DoubleConsumer) i -> Assertions.assertEquals(expected[0]++, i)));
724             Assertions.assertEquals(newSize, s2.estimateSize(), "s2 size estimate");
725         }
726         Assertions.assertFalse(s2.tryAdvance((DoubleConsumer) i -> failSpliteratorShouldBeEmpty()));
727         s2.forEachRemaining((DoubleConsumer) i -> failSpliteratorShouldBeEmpty());
728 
729         // s3. Test forEachRemaining
730         s3.forEachRemaining((DoubleConsumer) i -> Assertions.assertEquals(expected[0]++, i));
731         Assertions.assertEquals(0, s3.estimateSize());
732         s3.forEachRemaining((DoubleConsumer) i -> failSpliteratorShouldBeEmpty());
733 
734         // s4. Test tryAdvance and forEachRemaining when the action throws an exception
735         final IllegalStateException ex = new IllegalStateException();
736         final DoubleConsumer badAction = i -> {
737             throw ex;
738         };
739         final double currentSize = s4.estimateSize();
740         Assertions.assertTrue(currentSize > 1, "Spliterator requires more elements to test advance");
741         Assertions.assertSame(ex, Assertions.assertThrows(IllegalStateException.class, () -> s4.tryAdvance(badAction)));
742         Assertions.assertEquals(currentSize - 1, s4.estimateSize(), "Spliterator should be advanced even when action throws");
743 
744         Assertions.assertSame(ex, Assertions.assertThrows(IllegalStateException.class, () -> s4.forEachRemaining(badAction)));
745         Assertions.assertEquals(0, s4.estimateSize(), "Spliterator should be finished even when action throws");
746         s4.forEachRemaining((DoubleConsumer) i -> failSpliteratorShouldBeEmpty());
747     }
748 }