1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.testing.reactive;
28
29 import static java.lang.String.format;
30
31 import java.io.ByteArrayOutputStream;
32 import java.net.InetSocketAddress;
33 import java.net.SocketTimeoutException;
34 import java.net.URI;
35 import java.nio.ByteBuffer;
36 import java.nio.channels.Channels;
37 import java.nio.channels.WritableByteChannel;
38 import java.util.Arrays;
39 import java.util.Collection;
40 import java.util.List;
41 import java.util.Random;
42 import java.util.concurrent.CancellationException;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicReference;
47
48 import org.apache.hc.core5.function.Supplier;
49 import org.apache.hc.core5.http.HttpResponse;
50 import org.apache.hc.core5.http.HttpStreamResetException;
51 import org.apache.hc.core5.http.Message;
52 import org.apache.hc.core5.http.Method;
53 import org.apache.hc.core5.http.URIScheme;
54 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
55 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
56 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
57 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
58 import org.apache.hc.core5.http2.HttpVersionPolicy;
59 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
60 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
61 import org.apache.hc.core5.io.CloseMode;
62 import org.apache.hc.core5.reactive.ReactiveEntityProducer;
63 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
64 import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
65 import org.apache.hc.core5.reactor.IOReactorConfig;
66 import org.apache.hc.core5.reactor.ListenerEndpoint;
67 import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
68 import org.apache.hc.core5.testing.nio.LoggingExceptionCallback;
69 import org.apache.hc.core5.testing.nio.LoggingH2StreamListener;
70 import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener;
71 import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator;
72 import org.apache.hc.core5.testing.nio.LoggingIOSessionListener;
73 import org.apache.hc.core5.testing.reactive.ReactiveTestUtils.StreamDescription;
74 import org.apache.hc.core5.util.TextUtils;
75 import org.apache.hc.core5.util.Timeout;
76 import org.junit.Assert;
77 import org.junit.Rule;
78 import org.junit.Test;
79 import org.junit.rules.ExternalResource;
80 import org.junit.runner.RunWith;
81 import org.junit.runners.Parameterized;
82 import org.reactivestreams.Publisher;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85
86 import io.reactivex.Flowable;
87 import io.reactivex.Observable;
88 import io.reactivex.functions.Action;
89 import io.reactivex.functions.Consumer;
90
91 @RunWith(Parameterized.class)
92 public class ReactiveClientTest {
93
94 private final Logger log = LoggerFactory.getLogger(getClass());
95
96 @Parameterized.Parameters(name = "{0}")
97 public static Collection<Object[]> protocols() {
98 return Arrays.asList(new Object[][]{
99 { HttpVersionPolicy.FORCE_HTTP_1 },
100 { HttpVersionPolicy.FORCE_HTTP_2 }
101 });
102 }
103 private static final Timeout SOCKET_TIMEOUT = Timeout.ofSeconds(30);
104 private static final Timeout RESULT_TIMEOUT = Timeout.ofSeconds(60);
105
106 private static final Random RANDOM = new Random();
107
108 private final HttpVersionPolicy versionPolicy;
109
110 public ReactiveClientTest(final HttpVersionPolicy httpVersionPolicy) {
111 this.versionPolicy = httpVersionPolicy;
112 }
113
114 private HttpAsyncServer server;
115
116 @Rule
117 public ExternalResource serverResource = new ExternalResource() {
118
119 @Override
120 protected void before() throws Throwable {
121 log.debug("Starting up test server");
122 server = H2ServerBootstrap.bootstrap()
123 .setVersionPolicy(versionPolicy)
124 .setIOReactorConfig(
125 IOReactorConfig.custom()
126 .setSoTimeout(SOCKET_TIMEOUT)
127 .build())
128 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER)
129 .setStreamListener(LoggingH2StreamListener.INSTANCE)
130 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
131 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
132 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
133 .register("*", new Supplier<AsyncServerExchangeHandler>() {
134
135 @Override
136 public AsyncServerExchangeHandler get() {
137 return new ReactiveServerExchangeHandler(new ReactiveEchoProcessor());
138 }
139
140 })
141 .create();
142 }
143
144 @Override
145 protected void after() {
146 log.debug("Shutting down test server");
147 if (server != null) {
148 server.close(CloseMode.GRACEFUL);
149 }
150 }
151
152 };
153
154 private HttpAsyncRequester requester;
155
156 @Rule
157 public ExternalResource clientResource = new ExternalResource() {
158
159 @Override
160 protected void before() throws Throwable {
161 log.debug("Starting up test client");
162 requester = H2RequesterBootstrap.bootstrap()
163 .setVersionPolicy(versionPolicy)
164 .setIOReactorConfig(IOReactorConfig.custom()
165 .setSoTimeout(SOCKET_TIMEOUT)
166 .build())
167 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT)
168 .setStreamListener(LoggingH2StreamListener.INSTANCE)
169 .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
170 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
171 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
172 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
173 .create();
174 }
175
176 @Override
177 protected void after() {
178 log.debug("Shutting down test client");
179 if (requester != null) {
180 requester.close(CloseMode.GRACEFUL);
181 }
182 }
183
184 };
185
186 @Test
187 public void testSimpleRequest() throws Exception {
188 final InetSocketAddress address = startClientAndServer();
189 final byte[] input = new byte[1024];
190 RANDOM.nextBytes(input);
191 final Publisher<ByteBuffer> publisher = Flowable.just(ByteBuffer.wrap(input));
192 final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, input.length, null, null);
193
194 final BasicRequestProducer request = getRequestProducer(address, producer);
195
196 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
197 requester.execute(request, consumer, SOCKET_TIMEOUT, null);
198
199 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
200 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
201
202 final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
203 final WritableByteChannel writableByteChannel = Channels.newChannel(byteArrayOutputStream);
204 for (final ByteBuffer byteBuffer : Observable.fromPublisher(response.getBody()).toList().blockingGet()) {
205 writableByteChannel.write(byteBuffer);
206 }
207 writableByteChannel.close();
208 final byte[] output = byteArrayOutputStream.toByteArray();
209 Assert.assertArrayEquals(input, output);
210 }
211
212 private BasicRequestProducer getRequestProducer(final InetSocketAddress address, final ReactiveEntityProducer producer) {
213 return new BasicRequestProducer(Method.POST, URI.create("http://localhost:" + address.getPort()), producer);
214 }
215
216 @Test
217 public void testLongRunningRequest() throws Exception {
218 final InetSocketAddress address = startClientAndServer();
219 final long expectedLength = 6_554_200L;
220 final AtomicReference<String> expectedHash = new AtomicReference<>(null);
221 final Flowable<ByteBuffer> stream = ReactiveTestUtils.produceStream(expectedLength, expectedHash);
222 final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
223 final BasicRequestProducer request = getRequestProducer(address, producer);
224
225 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
226 requester.execute(request, consumer, SOCKET_TIMEOUT, null);
227 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
228 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
229 final StreamDescription desc = ReactiveTestUtils.consumeStream(response.getBody()).blockingGet();
230
231 Assert.assertEquals(expectedLength, desc.length);
232 Assert.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
233 }
234
235 @Test
236 public void testManySmallBuffers() throws Exception {
237
238
239
240
241 final InetSocketAddress address = startClientAndServer();
242 for (int i = 0; i < 10; i++) {
243 final long expectedLength = 1_024_000;
244 final int maximumBlockSize = 1024;
245 final AtomicReference<String> expectedHash = new AtomicReference<>(null);
246 final Publisher<ByteBuffer> stream = ReactiveTestUtils.produceStream(expectedLength, maximumBlockSize, expectedHash);
247 final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
248 final BasicRequestProducer request = getRequestProducer(address, producer);
249
250 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
251 requester.execute(request, consumer, SOCKET_TIMEOUT, null);
252 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
253 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
254 final StreamDescription desc = ReactiveTestUtils.consumeStream(response.getBody()).blockingGet();
255
256 Assert.assertEquals(expectedLength, desc.length);
257 Assert.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
258 }
259 }
260
261 @Test
262 public void testRequestError() throws Exception {
263 final InetSocketAddress address = startClientAndServer();
264 final RuntimeException exceptionThrown = new RuntimeException("Test");
265 final Publisher<ByteBuffer> publisher = Flowable.error(exceptionThrown);
266 final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, 100, null, null);
267
268 final BasicRequestProducer request = getRequestProducer(address, producer);
269
270 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
271
272 final Future<Void> future = requester.execute(request, consumer, SOCKET_TIMEOUT, null);
273
274 try {
275 future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
276 Assert.fail("Expected exception");
277 } catch (final ExecutionException ex) {
278 Assert.assertTrue(ex.getCause() instanceof HttpStreamResetException);
279 Assert.assertSame(exceptionThrown, ex.getCause().getCause());
280 }
281 }
282
283 @Test
284 public void testRequestTimeout() throws Exception {
285 final InetSocketAddress address = startClientAndServer();
286 final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
287 final Publisher<ByteBuffer> publisher = Flowable.<ByteBuffer>never()
288 .doOnCancel(new Action() {
289 @Override
290 public void run() {
291 requestPublisherWasCancelled.set(true);
292 }
293 });
294 final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
295 final BasicRequestProducer request = getRequestProducer(address, producer);
296
297 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
298 final Future<Void> future = requester.execute(request, consumer, Timeout.ofSeconds(1), null);
299
300 try {
301 future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
302 } catch (final ExecutionException ex) {
303 Assert.assertTrue(requestPublisherWasCancelled.get());
304 final Throwable cause = ex.getCause();
305 if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_1) {
306 Assert.assertTrue("Expected SocketTimeoutException, but got " + cause.getClass().getName(),
307 cause instanceof SocketTimeoutException);
308 } else if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_2) {
309 Assert.assertTrue(format("Expected RST_STREAM, but %s was thrown", cause.getClass().getName()),
310 cause instanceof HttpStreamResetException);
311 } else {
312 Assert.fail("Unknown HttpVersionPolicy: " + versionPolicy);
313 }
314 }
315 }
316
317 @Test
318 public void testResponseCancellation() throws Exception {
319 final InetSocketAddress address = startClientAndServer();
320 final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
321 final AtomicReference<Throwable> requestStreamError = new AtomicReference<>();
322 final Publisher<ByteBuffer> stream = ReactiveTestUtils.produceStream(Long.MAX_VALUE, 1024, null)
323 .doOnCancel(new Action() {
324 @Override
325 public void run() throws Exception {
326 requestPublisherWasCancelled.set(true);
327 }
328 })
329 .doOnError(new Consumer<Throwable>() {
330 @Override
331 public void accept(final Throwable throwable) throws Exception {
332 requestStreamError.set(throwable);
333 }
334 });
335 final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
336 final BasicRequestProducer request = getRequestProducer(address, producer);
337
338 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
339 final Future<Void> future = requester.execute(request, consumer, SOCKET_TIMEOUT, null);
340 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
341 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
342
343 final AtomicBoolean responsePublisherWasCancelled = new AtomicBoolean(false);
344 final List<ByteBuffer> outputBuffers = Flowable.fromPublisher(response.getBody())
345 .doOnCancel(new Action() {
346 @Override
347 public void run() throws Exception {
348 responsePublisherWasCancelled.set(true);
349 }
350 })
351 .take(3)
352 .toList()
353 .blockingGet();
354 Assert.assertEquals(3, outputBuffers.size());
355 Assert.assertTrue("The response subscription should have been cancelled", responsePublisherWasCancelled.get());
356 try {
357 future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
358 Assert.fail("Expected exception");
359 } catch (final ExecutionException | CancellationException ex) {
360 Assert.assertTrue(ex.getCause() instanceof HttpStreamResetException);
361 Assert.assertTrue(requestPublisherWasCancelled.get());
362 Assert.assertNull(requestStreamError.get());
363 }
364 }
365
366 private InetSocketAddress startClientAndServer() throws InterruptedException, ExecutionException {
367 server.start();
368 final ListenerEndpoint listener = server.listen(new InetSocketAddress(0), URIScheme.HTTP).get();
369 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
370 requester.start();
371 return address;
372 }
373 }