1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.concurrent.atomic.AtomicReference;
37
38 import io.reactivex.rxjava3.core.Flowable;
39 import io.reactivex.rxjava3.core.Notification;
40 import io.reactivex.rxjava3.core.Observable;
41 import io.reactivex.rxjava3.core.Single;
42 import org.apache.hc.core5.http.HttpStreamResetException;
43 import org.apache.hc.core5.http.nio.CapacityChannel;
44 import org.junit.jupiter.api.Assertions;
45 import org.junit.jupiter.api.Test;
46 import org.reactivestreams.Subscriber;
47 import org.reactivestreams.Subscription;
48
49 public class TestReactiveDataConsumer {
50
51 @Test
52 public void testStreamThatEndsNormally() throws Exception {
53 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
54
55 final List<ByteBuffer> output = Collections.synchronizedList(new ArrayList<>());
56
57 final CountDownLatch complete = new CountDownLatch(1);
58 Observable.fromPublisher(consumer)
59 .materialize()
60 .forEach(byteBufferNotification -> {
61 if (byteBufferNotification.isOnComplete()) {
62 complete.countDown();
63 } else if (byteBufferNotification.isOnNext()) {
64 output.add(byteBufferNotification.getValue());
65 } else {
66 throw new IllegalArgumentException();
67 }
68 });
69
70 consumer.consume(ByteBuffer.wrap(new byte[]{ '1' }));
71 consumer.consume(ByteBuffer.wrap(new byte[]{ '2' }));
72 consumer.consume(ByteBuffer.wrap(new byte[]{ '3' }));
73 consumer.streamEnd(null);
74
75 Assertions.assertTrue(complete.await(1, TimeUnit.SECONDS), "Stream did not finish before timeout");
76 Assertions.assertEquals(3, output.size());
77 Assertions.assertEquals(ByteBuffer.wrap(new byte[]{ '1' }), output.get(0));
78 Assertions.assertEquals(ByteBuffer.wrap(new byte[]{ '2' }), output.get(1));
79 Assertions.assertEquals(ByteBuffer.wrap(new byte[]{ '3' }), output.get(2));
80 }
81
82 @Test
83 public void testStreamThatEndsWithError() {
84 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
85 final Single<List<Notification<ByteBuffer>>> single = Observable.fromPublisher(consumer)
86 .materialize()
87 .toList();
88
89 final Exception ex = new RuntimeException();
90 consumer.failed(ex);
91
92 Assertions.assertSame(ex, single.blockingGet().get(0).getError());
93 }
94
95 @Test
96 public void testCancellation() throws Exception {
97 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
98 consumer.subscribe(new Subscriber<ByteBuffer>() {
99 @Override
100 public void onSubscribe(final Subscription s) {
101 s.cancel();
102 }
103
104 @Override
105 public void onNext(final ByteBuffer byteBuffer) {
106 }
107
108 @Override
109 public void onError(final Throwable throwable) {
110 }
111
112 @Override
113 public void onComplete() {
114 }
115 });
116
117 Assertions.assertThrows(HttpStreamResetException.class, () ->
118 consumer.consume(ByteBuffer.wrap(new byte[1024])));
119 }
120
121 @Test
122 public void testCapacityIncrements() throws Exception {
123 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
124 final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
125
126 final AtomicInteger lastIncrement = new AtomicInteger(-1);
127 final CapacityChannel channel = lastIncrement::set;
128 consumer.updateCapacity(channel);
129 Assertions.assertEquals(-1, lastIncrement.get(), "CapacityChannel#update should not have been invoked yet");
130
131 final AtomicInteger received = new AtomicInteger(0);
132 final AtomicReference<Subscription> subscription = new AtomicReference<>();
133 consumer.subscribe(new Subscriber<ByteBuffer>() {
134 @Override
135 public void onSubscribe(final Subscription s) {
136 subscription.set(s);
137 }
138
139 @Override
140 public void onNext(final ByteBuffer byteBuffer) {
141 received.incrementAndGet();
142 }
143
144 @Override
145 public void onError(final Throwable throwable) {
146 }
147
148 @Override
149 public void onComplete() {
150 }
151 });
152
153 consumer.consume(data.duplicate());
154 consumer.consume(data.duplicate());
155 consumer.consume(data.duplicate());
156 consumer.consume(data.duplicate());
157
158 subscription.get().request(1);
159 Assertions.assertEquals(1024, lastIncrement.get());
160
161 subscription.get().request(2);
162 Assertions.assertEquals(2 * 1024, lastIncrement.get());
163
164 subscription.get().request(99);
165 Assertions.assertEquals(1024, lastIncrement.get());
166 }
167
168 @Test
169 public void testFullResponseBuffering() throws Exception {
170
171
172 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
173 final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
174
175 consumer.consume(data.duplicate());
176 consumer.consume(data.duplicate());
177 consumer.consume(data.duplicate());
178 consumer.streamEnd(null);
179
180 Assertions.assertEquals(Flowable.fromPublisher(consumer).count().blockingGet().longValue(), 3L);
181 }
182
183 @Test
184 public void testErrorBuffering() throws Exception {
185 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
186 final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
187
188 final RuntimeException ex = new RuntimeException();
189 consumer.consume(data.duplicate());
190 consumer.consume(data.duplicate());
191 consumer.consume(data.duplicate());
192 consumer.failed(ex);
193
194 final Notification<ByteBuffer> result = Flowable.fromPublisher(consumer)
195 .materialize()
196 .singleOrError()
197 .blockingGet();
198 Assertions.assertSame(ex, result.getError());
199 }
200
201 @Test
202 public void testFailAfterCompletion() {
203
204
205
206 final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
207
208 consumer.streamEnd(null);
209
210 final RuntimeException ex = new RuntimeException();
211 consumer.failed(ex);
212
213 final Notification<ByteBuffer> result = Flowable.fromPublisher(consumer)
214 .materialize()
215 .singleOrError()
216 .blockingGet();
217 Assertions.assertFalse(result.isOnError());
218 Assertions.assertTrue(result.isOnComplete());
219 }
220 }