View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.reactive;
29  
30  import java.nio.ByteBuffer;
31  import java.nio.charset.StandardCharsets;
32  import java.security.MessageDigest;
33  import java.security.NoSuchAlgorithmException;
34  import java.util.Random;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import org.apache.hc.core5.util.TextUtils;
38  import org.reactivestreams.Publisher;
39  
40  import io.reactivex.Emitter;
41  import io.reactivex.Flowable;
42  import io.reactivex.Single;
43  import io.reactivex.functions.Consumer;
44  
45  /**
46   * @deprecated Use {@link Reactive3TestUtils} and RxJava3
47   */
48  @Deprecated
49  public class ReactiveTestUtils {
50      /** The range from which to generate random data. */
51      private final static byte[] RANGE = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
52              .getBytes(StandardCharsets.US_ASCII);
53  
54      /**
55       * Produces a deterministic stream of bytes, in randomly sized chunks of up to 128kB.
56       *
57       * @param length the number of bytes in the stream
58       * @return a reactive stream of bytes
59       */
60      public static Flowable<ByteBuffer> produceStream(final long length) {
61          return produceStream(length, null);
62      }
63  
64      /**
65       * Produces a deterministic stream of bytes, in randomly sized chunks of up to 128kB, while computing the hash of
66       * the random data.
67       *
68       * @param length the number of bytes in the stream
69       * @param hash an output argument for the hash, set when the end of the stream is reached; if {@code null}, the
70       *             hash will not be computed
71       * @return a reactive stream of bytes
72       */
73      public static Flowable<ByteBuffer> produceStream(final long length, final AtomicReference<String> hash) {
74          return produceStream(length, 128 * 1024, hash);
75      }
76  
77      /**
78       * Produces a deterministic stream of bytes, in randomly sized chunks, while computing the hash of the random data.
79       *
80       * @param length the number of bytes in the stream
81       * @param maximumBlockSize the maximum size of any {@code ByteBuffer in the stream}
82       * @param hash an output argument for the hash, set when the end of the stream is reached; if {@code null}, the
83       *             hash will not be computed
84       * @return a reactive stream of bytes
85       */
86      public static Flowable<ByteBuffer> produceStream(
87              final long length,
88              final int maximumBlockSize,
89              final AtomicReference<String> hash
90      ) {
91          return Flowable.generate(new Consumer<Emitter<ByteBuffer>>() {
92              final Random random = new Random(length); // Use the length as the random seed for easy reproducibility
93              long bytesEmitted;
94              final MessageDigest md = newMessageDigest();
95  
96              @Override
97              public void accept(final Emitter<ByteBuffer> emitter) {
98                  final long remainingLength = length - bytesEmitted;
99                  if (remainingLength == 0) {
100                     emitter.onComplete();
101                     if (hash != null) {
102                         hash.set(TextUtils.toHexString(md.digest()));
103                     }
104                 } else {
105                     final int bufferLength = (int) Math.min(remainingLength, 1 + random.nextInt(maximumBlockSize));
106                     final byte[] bs = new byte[bufferLength];
107                     for (int i = 0; i < bufferLength; i++) {
108                         final byte b = RANGE[(int) (random.nextDouble() * RANGE.length)];
109                         bs[i] = b;
110                     }
111                     if (hash != null) {
112                         md.update(bs);
113                     }
114                     emitter.onNext(ByteBuffer.wrap(bs));
115                     bytesEmitted += bufferLength;
116                 }
117             }
118         });
119     }
120 
121     /**
122      * Computes the hash of the deterministic stream (as produced by {@link #produceStream(long)}).
123      */
124     public static String getStreamHash(final long length) {
125         return TextUtils.toHexString(consumeStream(produceStream(length)).blockingGet().md.digest());
126     }
127 
128     /**
129      * Consumes the given stream and returns a data structure containing its length and digest.
130      */
131     public static Single<StreamDescription> consumeStream(final Publisher<ByteBuffer> publisher) {
132         final StreamDescription seed = new StreamDescription(0, newMessageDigest());
133         return Flowable.fromPublisher(publisher)
134                 .reduce(seed, (desc, byteBuffer) -> {
135                     final long length = desc.length + byteBuffer.remaining();
136                     desc.md.update(byteBuffer);
137                     return new StreamDescription(length, desc.md);
138                 });
139     }
140 
141     private static MessageDigest newMessageDigest() {
142         try {
143             return MessageDigest.getInstance("MD5");
144         } catch (final NoSuchAlgorithmException ex) {
145             throw new AssertionError(ex);
146         }
147     }
148 
149     public static class StreamDescription {
150         public final long length;
151         public final MessageDigest md;
152 
153         public StreamDescription(final long length, final MessageDigest md) {
154             this.length = length;
155             this.md = md;
156         }
157     }
158 }