View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.reactive;
29  
30  import java.nio.ByteBuffer;
31  import java.nio.charset.StandardCharsets;
32  import java.security.MessageDigest;
33  import java.security.NoSuchAlgorithmException;
34  import java.util.Random;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import org.apache.hc.core5.util.TextUtils;
38  import org.reactivestreams.Publisher;
39  
40  import io.reactivex.Emitter;
41  import io.reactivex.Flowable;
42  import io.reactivex.Single;
43  import io.reactivex.functions.BiFunction;
44  import io.reactivex.functions.Consumer;
45  
46  public class ReactiveTestUtils {
47      /** The range from which to generate random data. */
48      private final static byte[] RANGE = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
49              .getBytes(StandardCharsets.US_ASCII);
50  
51      /**
52       * Produces a deterministic stream of bytes, in randomly sized chunks of up to 128kB.
53       *
54       * @param length the number of bytes in the stream
55       * @return a reactive stream of bytes
56       */
57      public static Flowable<ByteBuffer> produceStream(final long length) {
58          return produceStream(length, null);
59      }
60  
61      /**
62       * Produces a deterministic stream of bytes, in randomly sized chunks of up to 128kB, while computing the hash of
63       * the random data.
64       *
65       * @param length the number of bytes in the stream
66       * @param hash an output argument for the hash, set when the end of the stream is reached; if {@code null}, the
67       *             hash will not be computed
68       * @return a reactive stream of bytes
69       */
70      public static Flowable<ByteBuffer> produceStream(final long length, final AtomicReference<String> hash) {
71          return produceStream(length, 128 * 1024, hash);
72      }
73  
74      /**
75       * Produces a deterministic stream of bytes, in randomly sized chunks, while computing the hash of the random data.
76       *
77       * @param length the number of bytes in the stream
78       * @param maximumBlockSize the maximum size of any {@code ByteBuffer in the stream}
79       * @param hash an output argument for the hash, set when the end of the stream is reached; if {@code null}, the
80       *             hash will not be computed
81       * @return a reactive stream of bytes
82       */
83      public static Flowable<ByteBuffer> produceStream(
84              final long length,
85              final int maximumBlockSize,
86              final AtomicReference<String> hash
87      ) {
88          return Flowable.generate(new Consumer<Emitter<ByteBuffer>>() {
89              final Random random = new Random(length); // Use the length as the random seed for easy reproducibility
90              long bytesEmitted;
91              final MessageDigest md = newMessageDigest();
92  
93              @Override
94              public void accept(final Emitter<ByteBuffer> emitter) {
95                  final long remainingLength = length - bytesEmitted;
96                  if (remainingLength == 0) {
97                      emitter.onComplete();
98                      if (hash != null) {
99                          hash.set(TextUtils.toHexString(md.digest()));
100                     }
101                 } else {
102                     final int bufferLength = (int) Math.min(remainingLength, 1 + random.nextInt(maximumBlockSize));
103                     final byte[] bs = new byte[bufferLength];
104                     for (int i = 0; i < bufferLength; i++) {
105                         final byte b = RANGE[(int) (random.nextDouble() * RANGE.length)];
106                         bs[i] = b;
107                     }
108                     if (hash != null) {
109                         md.update(bs);
110                     }
111                     emitter.onNext(ByteBuffer.wrap(bs));
112                     bytesEmitted += bufferLength;
113                 }
114             }
115         });
116     }
117 
118     /**
119      * Computes the hash of the deterministic stream (as produced by {@link #produceStream(long)}).
120      */
121     public static String getStreamHash(final long length) {
122         return TextUtils.toHexString(consumeStream(produceStream(length)).blockingGet().md.digest());
123     }
124 
125     /**
126      * Consumes the given stream and returns a data structure containing its length and digest.
127      */
128     public static Single<StreamDescription> consumeStream(final Publisher<ByteBuffer> publisher) {
129         final StreamDescription seed = new StreamDescription(0, newMessageDigest());
130         return Flowable.fromPublisher(publisher)
131                 .reduce(seed, new BiFunction<StreamDescription, ByteBuffer, StreamDescription>() {
132                     @Override
133                     public StreamDescription apply(final StreamDescription desc, final ByteBuffer byteBuffer) {
134                         final long length = desc.length + byteBuffer.remaining();
135                         desc.md.update(byteBuffer);
136                         return new StreamDescription(length, desc.md);
137                     }
138                 });
139     }
140 
141     private static MessageDigest newMessageDigest() {
142         try {
143             return MessageDigest.getInstance("MD5");
144         } catch (final NoSuchAlgorithmException ex) {
145             throw new AssertionError(ex);
146         }
147     }
148 
149     public static class StreamDescription {
150         public final long length;
151         public final MessageDigest md;
152 
153         public StreamDescription(final long length, final MessageDigest md) {
154             this.length = length;
155             this.md = md;
156         }
157     }
158 }