View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactive;
28  
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.Collections;
32  import java.util.List;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicInteger;
36  import java.util.concurrent.atomic.AtomicReference;
37  
38  import io.reactivex.rxjava3.core.Flowable;
39  import io.reactivex.rxjava3.core.Notification;
40  import io.reactivex.rxjava3.core.Observable;
41  import io.reactivex.rxjava3.core.Single;
42  import org.apache.hc.core5.http.HttpStreamResetException;
43  import org.apache.hc.core5.http.nio.CapacityChannel;
44  import org.junit.jupiter.api.Assertions;
45  import org.junit.jupiter.api.Test;
46  import org.reactivestreams.Subscriber;
47  import org.reactivestreams.Subscription;
48  
49  public class TestReactiveDataConsumer {
50  
51      @Test
52      public void testStreamThatEndsNormally() throws Exception {
53          final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
54  
55          final List<ByteBuffer> output = Collections.synchronizedList(new ArrayList<>());
56  
57          final CountDownLatch complete = new CountDownLatch(1);
58          Observable.fromPublisher(consumer)
59              .materialize()
60              .forEach(byteBufferNotification -> {
61                  if (byteBufferNotification.isOnComplete()) {
62                      complete.countDown();
63                  } else if (byteBufferNotification.isOnNext()) {
64                      output.add(byteBufferNotification.getValue());
65                  } else {
66                      throw new IllegalArgumentException();
67                  }
68              });
69  
70          consumer.consume(ByteBuffer.wrap(new byte[]{ '1' }));
71          consumer.consume(ByteBuffer.wrap(new byte[]{ '2' }));
72          consumer.consume(ByteBuffer.wrap(new byte[]{ '3' }));
73          consumer.streamEnd(null);
74  
75          Assertions.assertTrue(complete.await(1, TimeUnit.SECONDS), "Stream did not finish before timeout");
76          Assertions.assertEquals(3, output.size());
77          Assertions.assertEquals(ByteBuffer.wrap(new byte[]{ '1' }), output.get(0));
78          Assertions.assertEquals(ByteBuffer.wrap(new byte[]{ '2' }), output.get(1));
79          Assertions.assertEquals(ByteBuffer.wrap(new byte[]{ '3' }), output.get(2));
80      }
81  
82      @Test
83      public void testStreamThatEndsWithError() {
84          final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
85          final Single<List<Notification<ByteBuffer>>> single = Observable.fromPublisher(consumer)
86              .materialize()
87              .toList();
88  
89          final Exception ex = new RuntimeException();
90          consumer.failed(ex);
91  
92          Assertions.assertSame(ex, single.blockingGet().get(0).getError());
93      }
94  
95      @Test
96      public void testCancellation() throws Exception {
97          final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
98          consumer.subscribe(new Subscriber<ByteBuffer>() {
99              @Override
100             public void onSubscribe(final Subscription s) {
101                 s.cancel();
102             }
103 
104             @Override
105             public void onNext(final ByteBuffer byteBuffer) {
106             }
107 
108             @Override
109             public void onError(final Throwable throwable) {
110             }
111 
112             @Override
113             public void onComplete() {
114             }
115         });
116 
117         Assertions.assertThrows(HttpStreamResetException.class, () ->
118             consumer.consume(ByteBuffer.wrap(new byte[1024])));
119     }
120 
121     @Test
122     public void testCapacityIncrements() throws Exception {
123         final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
124         final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
125 
126         final AtomicInteger lastIncrement = new AtomicInteger(-1);
127         final CapacityChannel channel = lastIncrement::set;
128         consumer.updateCapacity(channel);
129         Assertions.assertEquals(-1, lastIncrement.get(), "CapacityChannel#update should not have been invoked yet");
130 
131         final AtomicInteger received = new AtomicInteger(0);
132         final AtomicReference<Subscription> subscription = new AtomicReference<>();
133         consumer.subscribe(new Subscriber<ByteBuffer>() {
134             @Override
135             public void onSubscribe(final Subscription s) {
136                 subscription.set(s);
137             }
138 
139             @Override
140             public void onNext(final ByteBuffer byteBuffer) {
141                 received.incrementAndGet();
142             }
143 
144             @Override
145             public void onError(final Throwable throwable) {
146             }
147 
148             @Override
149             public void onComplete() {
150             }
151         });
152 
153         consumer.consume(data.duplicate());
154         consumer.consume(data.duplicate());
155         consumer.consume(data.duplicate());
156         consumer.consume(data.duplicate());
157 
158         subscription.get().request(1);
159         Assertions.assertEquals(1024, lastIncrement.get());
160 
161         subscription.get().request(2);
162         Assertions.assertEquals(2 * 1024, lastIncrement.get());
163 
164         subscription.get().request(99);
165         Assertions.assertEquals(1024, lastIncrement.get());
166     }
167 
168     @Test
169     public void testFullResponseBuffering() throws Exception {
170         // Due to inherent race conditions, is possible for the entire response to be buffered and completed before
171         // the Subscriber shows up. This must be handled correctly.
172         final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
173         final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
174 
175         consumer.consume(data.duplicate());
176         consumer.consume(data.duplicate());
177         consumer.consume(data.duplicate());
178         consumer.streamEnd(null);
179 
180         Assertions.assertEquals(Flowable.fromPublisher(consumer).count().blockingGet().longValue(), 3L);
181     }
182 
183     @Test
184     public void testErrorBuffering() throws Exception {
185         final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
186         final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
187 
188         final RuntimeException ex = new RuntimeException();
189         consumer.consume(data.duplicate());
190         consumer.consume(data.duplicate());
191         consumer.consume(data.duplicate());
192         consumer.failed(ex);
193 
194         final Notification<ByteBuffer> result = Flowable.fromPublisher(consumer)
195             .materialize()
196             .singleOrError()
197             .blockingGet();
198         Assertions.assertSame(ex, result.getError());
199     }
200 
201     @Test
202     public void testFailAfterCompletion() {
203         // Calling consumer.failed() after consumer.streamEnd() must be a no-op.
204         // The exception must be discarded, and the subscriber must see that
205         // the stream was successfully completed.
206         final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
207 
208         consumer.streamEnd(null);
209 
210         final RuntimeException ex = new RuntimeException();
211         consumer.failed(ex);
212 
213         final Notification<ByteBuffer> result = Flowable.fromPublisher(consumer)
214                 .materialize()
215                 .singleOrError()
216                 .blockingGet();
217         Assertions.assertFalse(result.isOnError());
218         Assertions.assertTrue(result.isOnComplete());
219     }
220 }