View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.testing.reactive;
28  
29  import static java.lang.String.format;
30  import static org.hamcrest.MatcherAssert.assertThat;
31  
32  import java.io.ByteArrayOutputStream;
33  import java.io.IOException;
34  import java.net.InetSocketAddress;
35  import java.net.SocketTimeoutException;
36  import java.net.URI;
37  import java.nio.ByteBuffer;
38  import java.nio.channels.Channels;
39  import java.nio.channels.WritableByteChannel;
40  import java.util.List;
41  import java.util.Random;
42  import java.util.concurrent.CancellationException;
43  import java.util.concurrent.ExecutionException;
44  import java.util.concurrent.Future;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicReference;
47  
48  import io.reactivex.rxjava3.core.Flowable;
49  import io.reactivex.rxjava3.core.Observable;
50  import org.apache.hc.core5.http.HttpResponse;
51  import org.apache.hc.core5.http.HttpStreamResetException;
52  import org.apache.hc.core5.http.Message;
53  import org.apache.hc.core5.http.Method;
54  import org.apache.hc.core5.http.URIScheme;
55  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
56  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
57  import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
58  import org.apache.hc.core5.http2.HttpVersionPolicy;
59  import org.apache.hc.core5.reactive.ReactiveEntityProducer;
60  import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
61  import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
62  import org.apache.hc.core5.reactor.IOReactorConfig;
63  import org.apache.hc.core5.reactor.ListenerEndpoint;
64  import org.apache.hc.core5.testing.nio.extension.H2AsyncRequesterResource;
65  import org.apache.hc.core5.testing.nio.extension.H2AsyncServerResource;
66  import org.apache.hc.core5.testing.reactive.Reactive3TestUtils.StreamDescription;
67  import org.apache.hc.core5.util.TextUtils;
68  import org.apache.hc.core5.util.Timeout;
69  import org.hamcrest.CoreMatchers;
70  import org.junit.jupiter.api.Assertions;
71  import org.junit.jupiter.api.Test;
72  import org.junit.jupiter.api.extension.RegisterExtension;
73  import org.reactivestreams.Publisher;
74  
75  public abstract class ReactiveClientTest {
76  
77      private static final Timeout SOCKET_TIMEOUT = Timeout.ofSeconds(30);
78      private static final Timeout RESULT_TIMEOUT = Timeout.ofSeconds(60);
79  
80      private static final Random RANDOM = new Random();
81  
82      private final HttpVersionPolicy versionPolicy;
83      @RegisterExtension
84      private final H2AsyncServerResource serverResource;
85      @RegisterExtension
86      private final H2AsyncRequesterResource clientResource;
87  
88      public ReactiveClientTest(final HttpVersionPolicy httpVersionPolicy) {
89          this.versionPolicy = httpVersionPolicy;
90          this.serverResource = new H2AsyncServerResource(bootstrap -> bootstrap
91                  .setVersionPolicy(versionPolicy)
92                  .setIOReactorConfig(
93                          IOReactorConfig.custom()
94                                  .setSoTimeout(SOCKET_TIMEOUT)
95                                  .build())
96                  .register("*", () -> new ReactiveServerExchangeHandler(new ReactiveEchoProcessor()))
97          );
98          this.clientResource = new H2AsyncRequesterResource(bootstrap -> bootstrap
99                  .setVersionPolicy(versionPolicy)
100                 .setIOReactorConfig(IOReactorConfig.custom()
101                         .setSoTimeout(SOCKET_TIMEOUT)
102                         .build())
103         );
104     }
105 
106     @Test
107     public void testSimpleRequest() throws Exception {
108         final InetSocketAddress address = startServer();
109         final HttpAsyncRequester requester = clientResource.start();
110         final byte[] input = new byte[1024];
111         RANDOM.nextBytes(input);
112         final Publisher<ByteBuffer> publisher = Flowable.just(ByteBuffer.wrap(input));
113         final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, input.length, null, null);
114 
115         final BasicRequestProducer request = getRequestProducer(address, producer);
116 
117         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
118         requester.execute(request, consumer, SOCKET_TIMEOUT, null);
119 
120         final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
121                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
122 
123         final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
124         final WritableByteChannel writableByteChannel = Channels.newChannel(byteArrayOutputStream);
125         for (final ByteBuffer byteBuffer : Observable.fromPublisher(response.getBody()).toList().blockingGet()) {
126             writableByteChannel.write(byteBuffer);
127         }
128         writableByteChannel.close();
129         final byte[] output = byteArrayOutputStream.toByteArray();
130         Assertions.assertArrayEquals(input, output);
131     }
132 
133     private BasicRequestProducer getRequestProducer(final InetSocketAddress address, final ReactiveEntityProducer producer) {
134         return new BasicRequestProducer(Method.POST, URI.create("http://localhost:" + address.getPort()), producer);
135     }
136 
137     @Test
138     public void testLongRunningRequest() throws Exception {
139         final InetSocketAddress address = startServer();
140         final HttpAsyncRequester requester = clientResource.start();
141         final long expectedLength = 6_554_200L;
142         final AtomicReference<String> expectedHash = new AtomicReference<>();
143         final Flowable<ByteBuffer> stream = Reactive3TestUtils.produceStream(expectedLength, expectedHash);
144         final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
145         final BasicRequestProducer request = getRequestProducer(address, producer);
146 
147         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
148         requester.execute(request, consumer, SOCKET_TIMEOUT, null);
149         final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
150                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
151         final StreamDescription desc = Reactive3TestUtils.consumeStream(response.getBody()).blockingGet();
152 
153         Assertions.assertEquals(expectedLength, desc.length);
154         Assertions.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
155     }
156 
157     @Test
158     public void testManySmallBuffers() throws Exception {
159         // This test is not flaky. If it starts randomly failing, then there is a problem with how
160         // ReactiveDataConsumer signals capacity with its capacity channel. The situations in which
161         // this kind of bug manifests depend on the ordering of several events on different threads
162         // so it's unlikely to consistently occur.
163         final InetSocketAddress address = startServer();
164         final HttpAsyncRequester requester = clientResource.start();
165         for (int i = 0; i < 10; i++) {
166             final long expectedLength = 1_024_000;
167             final int maximumBlockSize = 1024;
168             final AtomicReference<String> expectedHash = new AtomicReference<>();
169             final Publisher<ByteBuffer> stream = Reactive3TestUtils.produceStream(expectedLength, maximumBlockSize, expectedHash);
170             final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
171             final BasicRequestProducer request = getRequestProducer(address, producer);
172 
173             final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
174             requester.execute(request, consumer, SOCKET_TIMEOUT, null);
175             final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
176                     .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
177             final StreamDescription desc = Reactive3TestUtils.consumeStream(response.getBody()).blockingGet();
178 
179             Assertions.assertEquals(expectedLength, desc.length);
180             Assertions.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
181         }
182     }
183 
184     @Test
185     public void testRequestError() throws Exception {
186         final InetSocketAddress address = startServer();
187         final HttpAsyncRequester requester = clientResource.start();
188         final RuntimeException exceptionThrown = new RuntimeException("Test");
189         final Publisher<ByteBuffer> publisher = Flowable.error(exceptionThrown);
190         final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, 100, null, null);
191 
192         final BasicRequestProducer request = getRequestProducer(address, producer);
193 
194         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
195 
196         final Future<Void> future = requester.execute(request, consumer, SOCKET_TIMEOUT, null);
197 
198         final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
199                 future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit()));
200         Assertions.assertTrue(exception.getCause() instanceof HttpStreamResetException);
201         Assertions.assertSame(exceptionThrown, exception.getCause().getCause());
202     }
203 
204     @Test
205     public void testRequestTimeout() throws Exception {
206         final InetSocketAddress address = startServer();
207         final HttpAsyncRequester requester = clientResource.start();
208         final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
209         final Publisher<ByteBuffer> publisher = Flowable.<ByteBuffer>never()
210                 .doOnCancel(() -> requestPublisherWasCancelled.set(true));
211         final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
212         final BasicRequestProducer request = getRequestProducer(address, producer);
213 
214         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
215         final Future<Void> future = requester.execute(request, consumer, Timeout.ofSeconds(1), null);
216 
217         final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
218                 future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit()));
219         Assertions.assertTrue(requestPublisherWasCancelled.get());
220         final Throwable cause = exception.getCause();
221         if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_1) {
222             Assertions.assertTrue(cause instanceof SocketTimeoutException, "Expected SocketTimeoutException, but got " + cause.getClass().getName());
223         } else if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_2) {
224             Assertions.assertTrue(cause instanceof HttpStreamResetException, format("Expected RST_STREAM, but %s was thrown", cause.getClass().getName()));
225         } else {
226             Assertions.fail("Unknown HttpVersionPolicy: " + versionPolicy);
227         }
228     }
229 
230     @Test
231     public void testResponseCancellation() throws Exception {
232         final InetSocketAddress address = startServer();
233         final HttpAsyncRequester requester = clientResource.start();
234         final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
235         final AtomicReference<Throwable> requestStreamError = new AtomicReference<>();
236         final Publisher<ByteBuffer> stream = Reactive3TestUtils.produceStream(Long.MAX_VALUE, 1024, null)
237                 .doOnCancel(() -> requestPublisherWasCancelled.set(true))
238                 .doOnError(requestStreamError::set);
239         final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
240         final BasicRequestProducer request = getRequestProducer(address, producer);
241 
242         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
243         final Future<Void> future = requester.execute(request, consumer, SOCKET_TIMEOUT, null);
244         final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
245                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
246 
247         final AtomicBoolean responsePublisherWasCancelled = new AtomicBoolean(false);
248         final List<ByteBuffer> outputBuffers = Flowable.fromPublisher(response.getBody())
249                 .doOnCancel(() -> responsePublisherWasCancelled.set(true))
250                 .take(3)
251                 .toList()
252                 .blockingGet();
253         Assertions.assertEquals(3, outputBuffers.size());
254         Assertions.assertTrue(responsePublisherWasCancelled.get(), "The response subscription should have been cancelled");
255         final Exception exception = Assertions.assertThrows(Exception.class, () ->
256                 future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit()));
257         assertThat(exception, CoreMatchers.anyOf(
258                 CoreMatchers.instanceOf(CancellationException.class),
259                 CoreMatchers.instanceOf(ExecutionException.class)));
260         Assertions.assertTrue(exception.getCause() instanceof HttpStreamResetException);
261         Assertions.assertTrue(requestPublisherWasCancelled.get());
262         Assertions.assertNull(requestStreamError.get());
263     }
264 
265     private InetSocketAddress startServer() throws IOException, InterruptedException, ExecutionException {
266         final HttpAsyncServer server = serverResource.start();
267         final ListenerEndpoint listener = server.listen(new InetSocketAddress(0), URIScheme.HTTP).get();
268         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
269         return address;
270     }
271 }