View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactive;
28  
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.Collections;
32  import java.util.List;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicInteger;
36  import java.util.concurrent.atomic.AtomicReference;
37  
38  import org.apache.hc.core5.http.HttpStreamResetException;
39  import org.apache.hc.core5.http.nio.CapacityChannel;
40  import org.junit.Assert;
41  import org.junit.Test;
42  import org.reactivestreams.Subscriber;
43  import org.reactivestreams.Subscription;
44  
45  import io.reactivex.Flowable;
46  import io.reactivex.Notification;
47  import io.reactivex.Observable;
48  import io.reactivex.Single;
49  import io.reactivex.functions.Consumer;
50  
51  public class TestReactiveDataConsumer {
52  
53      @Test
54      public void testStreamThatEndsNormally() throws Exception {
55          final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
56  
57          final List<ByteBuffer> output = Collections.synchronizedList(new ArrayList<ByteBuffer>());
58  
59          final CountDownLatch complete = new CountDownLatch(1);
60          Observable.fromPublisher(consumer)
61              .materialize()
62              .forEach(new Consumer<Notification<ByteBuffer>>() {
63                  @Override
64                  public void accept(final Notification<ByteBuffer> byteBufferNotification) throws Exception {
65                      if (byteBufferNotification.isOnComplete()) {
66                          complete.countDown();
67                      } else if (byteBufferNotification.isOnNext()) {
68                          output.add(byteBufferNotification.getValue());
69                      } else {
70                          throw new IllegalArgumentException();
71                      }
72                  }
73              });
74  
75          consumer.consume(ByteBuffer.wrap(new byte[]{ '1' }));
76          consumer.consume(ByteBuffer.wrap(new byte[]{ '2' }));
77          consumer.consume(ByteBuffer.wrap(new byte[]{ '3' }));
78          consumer.streamEnd(null);
79  
80          Assert.assertTrue("Stream did not finish before timeout", complete.await(1, TimeUnit.SECONDS));
81          Assert.assertEquals(3, output.size());
82          Assert.assertEquals(ByteBuffer.wrap(new byte[]{ '1' }), output.get(0));
83          Assert.assertEquals(ByteBuffer.wrap(new byte[]{ '2' }), output.get(1));
84          Assert.assertEquals(ByteBuffer.wrap(new byte[]{ '3' }), output.get(2));
85      }
86  
87      @Test
88      public void testStreamThatEndsWithError() {
89          final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
90          final Single<List<Notification<ByteBuffer>>> single = Observable.fromPublisher(consumer)
91              .materialize()
92              .toList();
93  
94          final Exception ex = new RuntimeException();
95          consumer.failed(ex);
96  
97          Assert.assertSame(ex, single.blockingGet().get(0).getError());
98      }
99  
100     @Test(expected = HttpStreamResetException.class)
101     public void testCancellation() throws Exception {
102         final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
103         consumer.subscribe(new Subscriber<ByteBuffer>() {
104             @Override
105             public void onSubscribe(final Subscription s) {
106                 s.cancel();
107             }
108 
109             @Override
110             public void onNext(final ByteBuffer byteBuffer) {
111             }
112 
113             @Override
114             public void onError(final Throwable throwable) {
115             }
116 
117             @Override
118             public void onComplete() {
119             }
120         });
121 
122         consumer.consume(ByteBuffer.wrap(new byte[1024]));
123     }
124 
125     @Test
126     public void testCapacityIncrements() throws Exception {
127         final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
128         final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
129 
130         final AtomicInteger lastIncrement = new AtomicInteger(-1);
131         final CapacityChannel channel = new CapacityChannel() {
132             @Override
133             public void update(final int increment) {
134                 lastIncrement.set(increment);
135             }
136         };
137         consumer.updateCapacity(channel);
138         Assert.assertEquals("CapacityChannel#update should not have been invoked yet", -1, lastIncrement.get());
139 
140         final AtomicInteger received = new AtomicInteger(0);
141         final AtomicReference<Subscription> subscription = new AtomicReference<>();
142         consumer.subscribe(new Subscriber<ByteBuffer>() {
143             @Override
144             public void onSubscribe(final Subscription s) {
145                 subscription.set(s);
146             }
147 
148             @Override
149             public void onNext(final ByteBuffer byteBuffer) {
150                 received.incrementAndGet();
151             }
152 
153             @Override
154             public void onError(final Throwable throwable) {
155             }
156 
157             @Override
158             public void onComplete() {
159             }
160         });
161 
162         consumer.consume(data.duplicate());
163         consumer.consume(data.duplicate());
164         consumer.consume(data.duplicate());
165         consumer.consume(data.duplicate());
166 
167         subscription.get().request(1);
168         Assert.assertEquals(1024, lastIncrement.get());
169 
170         subscription.get().request(2);
171         Assert.assertEquals(2 * 1024, lastIncrement.get());
172 
173         subscription.get().request(99);
174         Assert.assertEquals(1024, lastIncrement.get());
175     }
176 
177     @Test
178     public void testFullResponseBuffering() throws Exception {
179         // Due to inherent race conditions, is possible for the entire response to be buffered and completed before
180         // the Subscriber shows up. This must be handled correctly.
181         final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
182         final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
183 
184         consumer.consume(data.duplicate());
185         consumer.consume(data.duplicate());
186         consumer.consume(data.duplicate());
187         consumer.streamEnd(null);
188 
189         Assert.assertEquals(Flowable.fromPublisher(consumer).count().blockingGet().longValue(), 3L);
190     }
191 
192     @Test
193     public void testErrorBuffering() throws Exception {
194         final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
195         final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
196 
197         final RuntimeException ex = new RuntimeException();
198         consumer.consume(data.duplicate());
199         consumer.consume(data.duplicate());
200         consumer.consume(data.duplicate());
201         consumer.failed(ex);
202 
203         final Notification<ByteBuffer> result = Flowable.fromPublisher(consumer)
204             .materialize()
205             .singleOrError()
206             .blockingGet();
207         Assert.assertSame(ex, result.getError());
208     }
209 
210     @Test
211     public void testFailAfterCompletion() {
212         // Calling consumer.failed() after consumer.streamEnd() must be a no-op.
213         // The exception must be discarded, and the subscriber must see that
214         // the stream was successfully completed.
215         final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
216 
217         consumer.streamEnd(null);
218 
219         final RuntimeException ex = new RuntimeException();
220         consumer.failed(ex);
221 
222         final Notification<ByteBuffer> result = Flowable.fromPublisher(consumer)
223                 .materialize()
224                 .singleOrError()
225                 .blockingGet();
226         Assert.assertFalse(result.isOnError());
227         Assert.assertTrue(result.isOnComplete());
228     }
229 }