View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.testing.reactive;
28  
29  import static java.lang.String.format;
30  
31  import java.io.ByteArrayOutputStream;
32  import java.net.InetSocketAddress;
33  import java.net.SocketTimeoutException;
34  import java.net.URI;
35  import java.nio.ByteBuffer;
36  import java.nio.channels.Channels;
37  import java.nio.channels.WritableByteChannel;
38  import java.util.Arrays;
39  import java.util.Collection;
40  import java.util.List;
41  import java.util.Random;
42  import java.util.concurrent.CancellationException;
43  import java.util.concurrent.ExecutionException;
44  import java.util.concurrent.Future;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicReference;
47  
48  import org.apache.hc.core5.function.Supplier;
49  import org.apache.hc.core5.http.HttpResponse;
50  import org.apache.hc.core5.http.HttpStreamResetException;
51  import org.apache.hc.core5.http.Message;
52  import org.apache.hc.core5.http.Method;
53  import org.apache.hc.core5.http.URIScheme;
54  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
55  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
56  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
57  import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
58  import org.apache.hc.core5.http2.HttpVersionPolicy;
59  import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
60  import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
61  import org.apache.hc.core5.io.CloseMode;
62  import org.apache.hc.core5.reactive.ReactiveEntityProducer;
63  import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
64  import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
65  import org.apache.hc.core5.reactor.IOReactorConfig;
66  import org.apache.hc.core5.reactor.ListenerEndpoint;
67  import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
68  import org.apache.hc.core5.testing.nio.LoggingExceptionCallback;
69  import org.apache.hc.core5.testing.nio.LoggingH2StreamListener;
70  import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener;
71  import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator;
72  import org.apache.hc.core5.testing.nio.LoggingIOSessionListener;
73  import org.apache.hc.core5.testing.reactive.ReactiveTestUtils.StreamDescription;
74  import org.apache.hc.core5.util.TextUtils;
75  import org.apache.hc.core5.util.Timeout;
76  import org.junit.Assert;
77  import org.junit.Rule;
78  import org.junit.Test;
79  import org.junit.rules.ExternalResource;
80  import org.junit.runner.RunWith;
81  import org.junit.runners.Parameterized;
82  import org.reactivestreams.Publisher;
83  import org.slf4j.Logger;
84  import org.slf4j.LoggerFactory;
85  
86  import io.reactivex.Flowable;
87  import io.reactivex.Observable;
88  import io.reactivex.functions.Action;
89  import io.reactivex.functions.Consumer;
90  
91  @RunWith(Parameterized.class)
92  public class ReactiveClientTest {
93  
94      private final Logger log = LoggerFactory.getLogger(getClass());
95  
96      @Parameterized.Parameters(name = "{0}")
97      public static Collection<Object[]> protocols() {
98          return Arrays.asList(new Object[][]{
99              { HttpVersionPolicy.FORCE_HTTP_1 },
100             { HttpVersionPolicy.FORCE_HTTP_2 }
101         });
102     }
103     private static final Timeout SOCKET_TIMEOUT = Timeout.ofSeconds(30);
104     private static final Timeout RESULT_TIMEOUT = Timeout.ofSeconds(60);
105 
106     private static final Random RANDOM = new Random();
107 
108     private final HttpVersionPolicy versionPolicy;
109 
110     public ReactiveClientTest(final HttpVersionPolicy httpVersionPolicy) {
111         this.versionPolicy = httpVersionPolicy;
112     }
113 
114     private HttpAsyncServer server;
115 
116     @Rule
117     public ExternalResource serverResource = new ExternalResource() {
118 
119         @Override
120         protected void before() throws Throwable {
121             log.debug("Starting up test server");
122             server = H2ServerBootstrap.bootstrap()
123                 .setVersionPolicy(versionPolicy)
124                 .setIOReactorConfig(
125                     IOReactorConfig.custom()
126                         .setSoTimeout(SOCKET_TIMEOUT)
127                         .build())
128                 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER)
129                 .setStreamListener(LoggingH2StreamListener.INSTANCE)
130                 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
131                 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
132                 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
133                 .register("*", new Supplier<AsyncServerExchangeHandler>() {
134 
135                     @Override
136                     public AsyncServerExchangeHandler get() {
137                         return new ReactiveServerExchangeHandler(new ReactiveEchoProcessor());
138                     }
139 
140                 })
141                 .create();
142         }
143 
144         @Override
145         protected void after() {
146             log.debug("Shutting down test server");
147             if (server != null) {
148                 server.close(CloseMode.GRACEFUL);
149             }
150         }
151 
152     };
153 
154     private HttpAsyncRequester requester;
155 
156     @Rule
157     public ExternalResource clientResource = new ExternalResource() {
158 
159         @Override
160         protected void before() throws Throwable {
161             log.debug("Starting up test client");
162             requester = H2RequesterBootstrap.bootstrap()
163                 .setVersionPolicy(versionPolicy)
164                 .setIOReactorConfig(IOReactorConfig.custom()
165                     .setSoTimeout(SOCKET_TIMEOUT)
166                     .build())
167                 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT)
168                 .setStreamListener(LoggingH2StreamListener.INSTANCE)
169                 .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
170                 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
171                 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
172                 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
173                 .create();
174         }
175 
176         @Override
177         protected void after() {
178             log.debug("Shutting down test client");
179             if (requester != null) {
180                 requester.close(CloseMode.GRACEFUL);
181             }
182         }
183 
184     };
185 
186     @Test
187     public void testSimpleRequest() throws Exception {
188         final InetSocketAddress address = startClientAndServer();
189         final byte[] input = new byte[1024];
190         RANDOM.nextBytes(input);
191         final Publisher<ByteBuffer> publisher = Flowable.just(ByteBuffer.wrap(input));
192         final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, input.length, null, null);
193 
194         final BasicRequestProducer request = getRequestProducer(address, producer);
195 
196         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
197         requester.execute(request, consumer, SOCKET_TIMEOUT, null);
198 
199         final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
200                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
201 
202         final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
203         final WritableByteChannel writableByteChannel = Channels.newChannel(byteArrayOutputStream);
204         for (final ByteBuffer byteBuffer : Observable.fromPublisher(response.getBody()).toList().blockingGet()) {
205             writableByteChannel.write(byteBuffer);
206         }
207         writableByteChannel.close();
208         final byte[] output = byteArrayOutputStream.toByteArray();
209         Assert.assertArrayEquals(input, output);
210     }
211 
212     private BasicRequestProducer getRequestProducer(final InetSocketAddress address, final ReactiveEntityProducer producer) {
213         return new BasicRequestProducer(Method.POST, URI.create("http://localhost:" + address.getPort()), producer);
214     }
215 
216     @Test
217     public void testLongRunningRequest() throws Exception {
218         final InetSocketAddress address = startClientAndServer();
219         final long expectedLength = 6_554_200L;
220         final AtomicReference<String> expectedHash = new AtomicReference<>(null);
221         final Flowable<ByteBuffer> stream = ReactiveTestUtils.produceStream(expectedLength, expectedHash);
222         final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
223         final BasicRequestProducer request = getRequestProducer(address, producer);
224 
225         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
226         requester.execute(request, consumer, SOCKET_TIMEOUT, null);
227         final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
228                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
229         final StreamDescription desc = ReactiveTestUtils.consumeStream(response.getBody()).blockingGet();
230 
231         Assert.assertEquals(expectedLength, desc.length);
232         Assert.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
233     }
234 
235     @Test
236     public void testManySmallBuffers() throws Exception {
237         // This test is not flaky. If it starts randomly failing, then there is a problem with how
238         // ReactiveDataConsumer signals capacity with its capacity channel. The situations in which
239         // this kind of bug manifests depend on the ordering of several events on different threads
240         // so it's unlikely to consistently occur.
241         final InetSocketAddress address = startClientAndServer();
242         for (int i = 0; i < 10; i++) {
243             final long expectedLength = 1_024_000;
244             final int maximumBlockSize = 1024;
245             final AtomicReference<String> expectedHash = new AtomicReference<>(null);
246             final Publisher<ByteBuffer> stream = ReactiveTestUtils.produceStream(expectedLength, maximumBlockSize, expectedHash);
247             final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
248             final BasicRequestProducer request = getRequestProducer(address, producer);
249 
250             final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
251             requester.execute(request, consumer, SOCKET_TIMEOUT, null);
252             final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
253                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
254             final StreamDescription desc = ReactiveTestUtils.consumeStream(response.getBody()).blockingGet();
255 
256             Assert.assertEquals(expectedLength, desc.length);
257             Assert.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
258         }
259     }
260 
261     @Test
262     public void testRequestError() throws Exception {
263         final InetSocketAddress address = startClientAndServer();
264         final RuntimeException exceptionThrown = new RuntimeException("Test");
265         final Publisher<ByteBuffer> publisher = Flowable.error(exceptionThrown);
266         final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, 100, null, null);
267 
268         final BasicRequestProducer request = getRequestProducer(address, producer);
269 
270         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
271 
272         final Future<Void> future = requester.execute(request, consumer, SOCKET_TIMEOUT, null);
273 
274         try {
275             future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
276             Assert.fail("Expected exception");
277         } catch (final ExecutionException ex) {
278             Assert.assertTrue(ex.getCause() instanceof HttpStreamResetException);
279             Assert.assertSame(exceptionThrown, ex.getCause().getCause());
280         }
281     }
282 
283     @Test
284     public void testRequestTimeout() throws Exception {
285         final InetSocketAddress address = startClientAndServer();
286         final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
287         final Publisher<ByteBuffer> publisher = Flowable.<ByteBuffer>never()
288             .doOnCancel(new Action() {
289                 @Override
290                 public void run() {
291                     requestPublisherWasCancelled.set(true);
292                 }
293             });
294         final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
295         final BasicRequestProducer request = getRequestProducer(address, producer);
296 
297         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
298         final Future<Void> future = requester.execute(request, consumer, Timeout.ofSeconds(1), null);
299 
300         try {
301             future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
302         } catch (final ExecutionException ex) {
303             Assert.assertTrue(requestPublisherWasCancelled.get());
304             final Throwable cause = ex.getCause();
305             if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_1) {
306                 Assert.assertTrue("Expected SocketTimeoutException, but got " + cause.getClass().getName(),
307                     cause instanceof SocketTimeoutException);
308             } else if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_2) {
309                 Assert.assertTrue(format("Expected RST_STREAM, but %s was thrown", cause.getClass().getName()),
310                     cause instanceof HttpStreamResetException);
311             } else {
312                 Assert.fail("Unknown HttpVersionPolicy: " + versionPolicy);
313             }
314         }
315     }
316 
317     @Test
318     public void testResponseCancellation() throws Exception {
319         final InetSocketAddress address = startClientAndServer();
320         final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
321         final AtomicReference<Throwable> requestStreamError = new AtomicReference<>();
322         final Publisher<ByteBuffer> stream = ReactiveTestUtils.produceStream(Long.MAX_VALUE, 1024, null)
323             .doOnCancel(new Action() {
324                 @Override
325                 public void run() throws Exception {
326                     requestPublisherWasCancelled.set(true);
327                 }
328             })
329             .doOnError(new Consumer<Throwable>() {
330                 @Override
331                 public void accept(final Throwable throwable) throws Exception {
332                     requestStreamError.set(throwable);
333                 }
334             });
335         final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
336         final BasicRequestProducer request = getRequestProducer(address, producer);
337 
338         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
339         final Future<Void> future = requester.execute(request, consumer, SOCKET_TIMEOUT, null);
340         final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
341                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
342 
343         final AtomicBoolean responsePublisherWasCancelled = new AtomicBoolean(false);
344         final List<ByteBuffer> outputBuffers = Flowable.fromPublisher(response.getBody())
345             .doOnCancel(new Action() {
346                 @Override
347                 public void run() throws Exception {
348                     responsePublisherWasCancelled.set(true);
349                 }
350             })
351             .take(3)
352             .toList()
353             .blockingGet();
354         Assert.assertEquals(3, outputBuffers.size());
355         Assert.assertTrue("The response subscription should have been cancelled", responsePublisherWasCancelled.get());
356         try {
357             future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
358             Assert.fail("Expected exception");
359         } catch (final ExecutionException | CancellationException ex) {
360             Assert.assertTrue(ex.getCause() instanceof HttpStreamResetException);
361             Assert.assertTrue(requestPublisherWasCancelled.get());
362             Assert.assertNull(requestStreamError.get());
363         }
364     }
365 
366     private InetSocketAddress startClientAndServer() throws InterruptedException, ExecutionException {
367         server.start();
368         final ListenerEndpoint listener = server.listen(new InetSocketAddress(0), URIScheme.HTTP).get();
369         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
370         requester.start();
371         return address;
372     }
373 }