1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.concurrent.atomic.AtomicReference;
37
38 import org.apache.hc.core5.http.HttpStreamResetException;
39 import org.apache.hc.core5.http.nio.CapacityChannel;
40 import org.junit.Assert;
41 import org.junit.Test;
42 import org.reactivestreams.Subscriber;
43 import org.reactivestreams.Subscription;
44
45 import io.reactivex.Flowable;
46 import io.reactivex.Notification;
47 import io.reactivex.Observable;
48 import io.reactivex.Single;
49 import io.reactivex.functions.Consumer;
50
51 public class TestReactiveDataConsumer {
52
53 @Test
54 public void testStreamThatEndsNormally() throws Exception {
55 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
56
57 final List<ByteBuffer> output = Collections.synchronizedList(new ArrayList<ByteBuffer>());
58
59 final CountDownLatch complete = new CountDownLatch(1);
60 Observable.fromPublisher(consumer)
61 .materialize()
62 .forEach(new Consumer<Notification<ByteBuffer>>() {
63 @Override
64 public void accept(final Notification<ByteBuffer> byteBufferNotification) throws Exception {
65 if (byteBufferNotification.isOnComplete()) {
66 complete.countDown();
67 } else if (byteBufferNotification.isOnNext()) {
68 output.add(byteBufferNotification.getValue());
69 } else {
70 throw new IllegalArgumentException();
71 }
72 }
73 });
74
75 consumer.consume(ByteBuffer.wrap(new byte[]{ '1' }));
76 consumer.consume(ByteBuffer.wrap(new byte[]{ '2' }));
77 consumer.consume(ByteBuffer.wrap(new byte[]{ '3' }));
78 consumer.streamEnd(null);
79
80 Assert.assertTrue("Stream did not finish before timeout", complete.await(1, TimeUnit.SECONDS));
81 Assert.assertEquals(3, output.size());
82 Assert.assertEquals(ByteBuffer.wrap(new byte[]{ '1' }), output.get(0));
83 Assert.assertEquals(ByteBuffer.wrap(new byte[]{ '2' }), output.get(1));
84 Assert.assertEquals(ByteBuffer.wrap(new byte[]{ '3' }), output.get(2));
85 }
86
87 @Test
88 public void testStreamThatEndsWithError() {
89 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
90 final Single<List<Notification<ByteBuffer>>> single = Observable.fromPublisher(consumer)
91 .materialize()
92 .toList();
93
94 final Exception ex = new RuntimeException();
95 consumer.failed(ex);
96
97 Assert.assertSame(ex, single.blockingGet().get(0).getError());
98 }
99
100 @Test(expected = HttpStreamResetException.class)
101 public void testCancellation() throws Exception {
102 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
103 consumer.subscribe(new Subscriber<ByteBuffer>() {
104 @Override
105 public void onSubscribe(final Subscription s) {
106 s.cancel();
107 }
108
109 @Override
110 public void onNext(final ByteBuffer byteBuffer) {
111 }
112
113 @Override
114 public void onError(final Throwable throwable) {
115 }
116
117 @Override
118 public void onComplete() {
119 }
120 });
121
122 consumer.consume(ByteBuffer.wrap(new byte[1024]));
123 }
124
125 @Test
126 public void testCapacityIncrements() throws Exception {
127 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
128 final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
129
130 final AtomicInteger lastIncrement = new AtomicInteger(-1);
131 final CapacityChannel channel = new CapacityChannel() {
132 @Override
133 public void update(final int increment) {
134 lastIncrement.set(increment);
135 }
136 };
137 consumer.updateCapacity(channel);
138 Assert.assertEquals("CapacityChannel#update should not have been invoked yet", -1, lastIncrement.get());
139
140 final AtomicInteger received = new AtomicInteger(0);
141 final AtomicReference<Subscription> subscription = new AtomicReference<>();
142 consumer.subscribe(new Subscriber<ByteBuffer>() {
143 @Override
144 public void onSubscribe(final Subscription s) {
145 subscription.set(s);
146 }
147
148 @Override
149 public void onNext(final ByteBuffer byteBuffer) {
150 received.incrementAndGet();
151 }
152
153 @Override
154 public void onError(final Throwable throwable) {
155 }
156
157 @Override
158 public void onComplete() {
159 }
160 });
161
162 consumer.consume(data.duplicate());
163 consumer.consume(data.duplicate());
164 consumer.consume(data.duplicate());
165 consumer.consume(data.duplicate());
166
167 subscription.get().request(1);
168 Assert.assertEquals(1024, lastIncrement.get());
169
170 subscription.get().request(2);
171 Assert.assertEquals(2 * 1024, lastIncrement.get());
172
173 subscription.get().request(99);
174 Assert.assertEquals(1024, lastIncrement.get());
175 }
176
177 @Test
178 public void testFullResponseBuffering() throws Exception {
179
180
181 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
182 final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
183
184 consumer.consume(data.duplicate());
185 consumer.consume(data.duplicate());
186 consumer.consume(data.duplicate());
187 consumer.streamEnd(null);
188
189 Assert.assertEquals(Flowable.fromPublisher(consumer).count().blockingGet().longValue(), 3L);
190 }
191
192 @Test
193 public void testErrorBuffering() throws Exception {
194 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
195 final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
196
197 final RuntimeException ex = new RuntimeException();
198 consumer.consume(data.duplicate());
199 consumer.consume(data.duplicate());
200 consumer.consume(data.duplicate());
201 consumer.failed(ex);
202
203 final Notification<ByteBuffer> result = Flowable.fromPublisher(consumer)
204 .materialize()
205 .singleOrError()
206 .blockingGet();
207 Assert.assertSame(ex, result.getError());
208 }
209
210 @Test
211 public void testFailAfterCompletion() {
212
213
214
215 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
216
217 consumer.streamEnd(null);
218
219 final RuntimeException ex = new RuntimeException();
220 consumer.failed(ex);
221
222 final Notification<ByteBuffer> result = Flowable.fromPublisher(consumer)
223 .materialize()
224 .singleOrError()
225 .blockingGet();
226 Assert.assertFalse(result.isOnError());
227 Assert.assertTrue(result.isOnComplete());
228 }
229 }