1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import static java.util.concurrent.TimeUnit.MILLISECONDS;
30 import static org.hamcrest.MatcherAssert.assertThat;
31
32 import java.io.ByteArrayOutputStream;
33 import java.nio.ByteBuffer;
34 import java.nio.channels.Channels;
35 import java.nio.channels.WritableByteChannel;
36 import java.nio.charset.StandardCharsets;
37 import java.util.LinkedHashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Queue;
41 import java.util.Random;
42 import java.util.concurrent.ArrayBlockingQueue;
43 import java.util.concurrent.BlockingQueue;
44 import java.util.concurrent.ConcurrentLinkedQueue;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.ExecutorService;
47 import java.util.concurrent.Executors;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.atomic.AtomicInteger;
50 import java.util.concurrent.atomic.AtomicReference;
51
52 import io.reactivex.rxjava3.core.Flowable;
53 import io.reactivex.rxjava3.schedulers.Schedulers;
54 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
55 import org.apache.hc.client5.http.protocol.HttpClientContext;
56 import org.apache.hc.core5.concurrent.FutureCallback;
57 import org.apache.hc.core5.http.ContentType;
58 import org.apache.hc.core5.http.HttpHost;
59 import org.apache.hc.core5.http.HttpResponse;
60 import org.apache.hc.core5.http.Message;
61 import org.apache.hc.core5.http.URIScheme;
62 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
63 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
64 import org.apache.hc.core5.reactive.ReactiveEntityProducer;
65 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
66 import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
67 import org.apache.hc.core5.testing.nio.H2TestServer;
68 import org.apache.hc.core5.testing.reactive.Reactive3TestUtils;
69 import org.apache.hc.core5.testing.reactive.Reactive3TestUtils.StreamDescription;
70 import org.apache.hc.core5.testing.reactive.ReactiveEchoProcessor;
71 import org.apache.hc.core5.testing.reactive.ReactiveRandomProcessor;
72 import org.apache.hc.core5.util.TextUtils;
73 import org.hamcrest.CoreMatchers;
74 import org.junit.jupiter.api.Assertions;
75 import org.junit.jupiter.api.Test;
76 import org.junit.jupiter.api.Timeout;
77 import org.reactivestreams.Publisher;
78
79 public abstract class AbstractHttpReactiveFundamentalsTest<T extends CloseableHttpAsyncClient> extends AbstractIntegrationTestBase {
80
81 public AbstractHttpReactiveFundamentalsTest(final URIScheme scheme) {
82 super(scheme);
83 }
84
85 abstract protected H2TestServer startServer() throws Exception;
86
87 abstract protected T startClient() throws Exception;
88
89 @Test
90 @Timeout(value = 60_000, unit = MILLISECONDS)
91 public void testSequentialGetRequests() throws Exception {
92 final H2TestServer server = startServer();
93 server.register("/random/*", () ->
94 new ReactiveServerExchangeHandler(new ReactiveRandomProcessor()));
95 final HttpHost target = targetHost();
96 final T client = startClient();
97 for (int i = 0; i < 3; i++) {
98 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
99
100 client.execute(AsyncRequestBuilder.get(target + "/random/2048").build(), consumer, null);
101
102 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
103 assertThat(response, CoreMatchers.notNullValue());
104 assertThat(response.getHead().getCode(), CoreMatchers.equalTo(200));
105
106 final String body = publisherToString(response.getBody());
107 assertThat(body, CoreMatchers.notNullValue());
108 assertThat(body.length(), CoreMatchers.equalTo(2048));
109 }
110 }
111
112 @Test
113 @Timeout(value = 2000, unit = MILLISECONDS)
114 public void testSequentialHeadRequests() throws Exception {
115 final H2TestServer server = startServer();
116 server.register("/random/*", () ->
117 new ReactiveServerExchangeHandler(new ReactiveRandomProcessor()));
118 final HttpHost target = targetHost();
119 final T client = startClient();
120 for (int i = 0; i < 3; i++) {
121 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
122
123 client.execute(AsyncRequestBuilder.head(target + "/random/2048").build(), consumer, null);
124
125 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
126 assertThat(response, CoreMatchers.notNullValue());
127 assertThat(response.getHead().getCode(), CoreMatchers.equalTo(200));
128
129 final String body = publisherToString(response.getBody());
130 assertThat(body, CoreMatchers.nullValue());
131 }
132 }
133
134 @Test
135 @Timeout(value = 60_000, unit = MILLISECONDS)
136 public void testSequentialPostRequests() throws Exception {
137 final H2TestServer server = startServer();
138 server.register("/echo/*", () ->
139 new ReactiveServerExchangeHandler(new ReactiveEchoProcessor()));
140 final HttpHost target = targetHost();
141 final T client = startClient();
142 for (int i = 0; i < 3; i++) {
143 final byte[] b1 = new byte[1024];
144 final Random rnd = new Random(System.currentTimeMillis());
145 rnd.nextBytes(b1);
146 final Flowable<ByteBuffer> publisher = Flowable.just(ByteBuffer.wrap(b1));
147 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
148 final AsyncRequestProducer request = AsyncRequestBuilder.post(target + "/echo/")
149 .setEntity(new ReactiveEntityProducer(publisher, -1, ContentType.APPLICATION_OCTET_STREAM, null))
150 .build();
151
152 client.execute(request, consumer, HttpClientContext.create(), null);
153
154 final Future<Message<HttpResponse, Publisher<ByteBuffer>>> responseFuture = consumer.getResponseFuture();
155 final Message<HttpResponse, Publisher<ByteBuffer>> responseMessage = responseFuture.get();
156 assertThat(responseMessage, CoreMatchers.notNullValue());
157 final HttpResponse response = responseMessage.getHead();
158 assertThat(response.getCode(), CoreMatchers.equalTo(200));
159 final byte[] b2 = publisherToByteArray(responseMessage.getBody());
160 assertThat(b1, CoreMatchers.equalTo(b2));
161 }
162 }
163
164 @Test
165 @Timeout(value = 60_000, unit = MILLISECONDS)
166 public void testConcurrentPostRequests() throws Exception {
167 final H2TestServer server = startServer();
168 server.register("/echo/*", () ->
169 new ReactiveServerExchangeHandler(new ReactiveEchoProcessor()));
170 final HttpHost target = targetHost();
171 final T client = startClient();
172
173 final int reqCount = 500;
174 final int maxSize = 128 * 1024;
175 final Map<Long, StreamingTestCase> testCases = StreamingTestCase.generate(reqCount, maxSize);
176 final BlockingQueue<StreamDescription> responses = new ArrayBlockingQueue<>(reqCount);
177
178 for (final StreamingTestCase testCase : testCases.values()) {
179 final ReactiveEntityProducer producer = new ReactiveEntityProducer(testCase.stream, testCase.length,
180 ContentType.APPLICATION_OCTET_STREAM, null);
181 final AsyncRequestProducer request = AsyncRequestBuilder.post(target + "/echo/")
182 .setEntity(producer)
183 .build();
184
185 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(new FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>>() {
186 @Override
187 public void completed(final Message<HttpResponse, Publisher<ByteBuffer>> result) {
188 final Flowable<ByteBuffer> flowable = Flowable.fromPublisher(result.getBody())
189 .observeOn(Schedulers.io());
190 Reactive3TestUtils.consumeStream(flowable).subscribe(responses::add);
191 }
192 @Override
193 public void failed(final Exception ex) { }
194 @Override
195 public void cancelled() { }
196 });
197 client.execute(request, consumer, HttpClientContext.create(), null);
198 }
199
200 for (int i = 0; i < reqCount; i++) {
201 final StreamDescription streamDescription = responses.take();
202 final StreamingTestCase streamingTestCase = testCases.get(streamDescription.length);
203 final long expectedLength = streamingTestCase.length;
204 final long actualLength = streamDescription.length;
205 Assertions.assertEquals(expectedLength, actualLength);
206
207 final String expectedHash = streamingTestCase.expectedHash.get();
208 final String actualHash = TextUtils.toHexString(streamDescription.md.digest());
209 Assertions.assertEquals(expectedHash, actualHash);
210 }
211 }
212
213 @Test
214 @Timeout(value = 60_000, unit = MILLISECONDS)
215 public void testRequestExecutionFromCallback() throws Exception {
216 final H2TestServer server = startServer();
217 server.register("/random/*", () ->
218 new ReactiveServerExchangeHandler(new ReactiveRandomProcessor()));
219 final HttpHost target = targetHost();
220 final T client = startClient();
221 final int requestNum = 50;
222 final AtomicInteger count = new AtomicInteger(requestNum);
223 final Queue<Message<HttpResponse, Publisher<ByteBuffer>>> resultQueue = new ConcurrentLinkedQueue<>();
224 final CountDownLatch countDownLatch = new CountDownLatch(requestNum);
225
226 final FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> callback = new FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>>() {
227 @Override
228 public void completed(final Message<HttpResponse, Publisher<ByteBuffer>> result) {
229 try {
230 resultQueue.add(result);
231 if (count.decrementAndGet() > 0) {
232 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(this);
233 client.execute(AsyncRequestBuilder.get(target + "/random/2048").build(), consumer, null);
234 }
235 } finally {
236 countDownLatch.countDown();
237 }
238 }
239
240 @Override
241 public void failed(final Exception ex) {
242 countDownLatch.countDown();
243 }
244
245 @Override
246 public void cancelled() {
247 countDownLatch.countDown();
248 }
249 };
250
251 final int threadNum = 5;
252 final ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
253 for (int i = 0; i < threadNum; i++) {
254 executorService.execute(() -> {
255 if (!Thread.currentThread().isInterrupted()) {
256 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(callback);
257 client.execute(AsyncRequestBuilder.get(target + "/random/2048").build(), consumer, null);
258 }
259 });
260 }
261
262 assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
263
264 executorService.shutdownNow();
265 executorService.awaitTermination(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
266
267 for (;;) {
268 final Message<HttpResponse, Publisher<ByteBuffer>> response = resultQueue.poll();
269 if (response == null) {
270 break;
271 }
272 assertThat(response.getHead().getCode(), CoreMatchers.equalTo(200));
273 }
274 }
275
276 @Test
277 public void testBadRequest() throws Exception {
278 final H2TestServer server = startServer();
279 server.register("/random/*", () ->
280 new ReactiveServerExchangeHandler(new ReactiveRandomProcessor()));
281 final HttpHost target = targetHost();
282 final T client = startClient();
283 final AsyncRequestProducer request = AsyncRequestBuilder.get(target + "/random/boom").build();
284 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
285
286 client.execute(request, consumer, null);
287
288 final Future<Message<HttpResponse, Publisher<ByteBuffer>>> future = consumer.getResponseFuture();
289 final HttpResponse response = future.get().getHead();
290 assertThat(response, CoreMatchers.notNullValue());
291 assertThat(response.getCode(), CoreMatchers.equalTo(400));
292 }
293
294 static String publisherToString(final Publisher<ByteBuffer> publisher) throws Exception {
295 final byte[] bytes = publisherToByteArray(publisher);
296 if (bytes == null) {
297 return null;
298 }
299 return new String(bytes, StandardCharsets.UTF_8);
300 }
301
302 static byte[] publisherToByteArray(final Publisher<ByteBuffer> publisher) throws Exception {
303 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
304 try (WritableByteChannel channel = Channels.newChannel(baos)) {
305 final List<ByteBuffer> bufs = Flowable.fromPublisher(publisher)
306 .toList()
307 .blockingGet();
308 if (bufs.isEmpty()) {
309 return null;
310 }
311 for (final ByteBuffer buf : bufs) {
312 channel.write(buf);
313 }
314 }
315 return baos.toByteArray();
316 }
317
318 private static final class StreamingTestCase {
319 final long length;
320 final AtomicReference<String> expectedHash;
321 final Flowable<ByteBuffer> stream;
322
323 StreamingTestCase(final long length, final AtomicReference<String> expectedHash, final Flowable<ByteBuffer> stream) {
324 this.length = length;
325 this.expectedHash = expectedHash;
326 this.stream = stream;
327 }
328
329 static Map<Long, StreamingTestCase> generate(final int numTestCases, final int maxSize) {
330 final Map<Long, StreamingTestCase> testCases = new LinkedHashMap<>();
331 int testCaseNum = 0;
332 while (testCases.size() < numTestCases) {
333 final long seed = 198723L * testCaseNum++;
334 final int length = 1 + new Random(seed).nextInt(maxSize);
335 final AtomicReference<String> expectedHash = new AtomicReference<>();
336 final Flowable<ByteBuffer> stream = Reactive3TestUtils.produceStream(length, expectedHash);
337 final StreamingTestCase streamingTestCase = new StreamingTestCase(length, expectedHash, stream);
338 testCases.put((long) length, streamingTestCase);
339 }
340 return testCases;
341 }
342 }
343 }