1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import static java.util.concurrent.TimeUnit.MILLISECONDS;
30 import static org.hamcrest.MatcherAssert.assertThat;
31
32 import java.nio.ByteBuffer;
33
34 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
35 import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
36 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
37 import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
38 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
39 import org.apache.hc.core5.http.HeaderElements;
40 import org.apache.hc.core5.http.HttpHeaders;
41 import org.apache.hc.core5.http.HttpHost;
42 import org.apache.hc.core5.http.HttpResponse;
43 import org.apache.hc.core5.http.Message;
44 import org.apache.hc.core5.http.URIScheme;
45 import org.apache.hc.core5.http.config.Http1Config;
46 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
47 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
48 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
49 import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
50 import org.apache.hc.core5.testing.nio.H2TestServer;
51 import org.apache.hc.core5.testing.reactive.ReactiveRandomProcessor;
52 import org.hamcrest.CoreMatchers;
53 import org.junit.jupiter.api.Test;
54 import org.junit.jupiter.api.Timeout;
55 import org.junit.jupiter.params.ParameterizedTest;
56 import org.junit.jupiter.params.provider.ValueSource;
57 import org.reactivestreams.Publisher;
58
59 public abstract class TestHttp1Reactive extends AbstractHttpReactiveFundamentalsTest<CloseableHttpAsyncClient> {
60
61 public TestHttp1Reactive(final URIScheme scheme) {
62 super(scheme);
63 }
64
65 @Override
66 protected H2TestServer startServer() throws Exception {
67 return startServer(Http1Config.DEFAULT, null, null);
68 }
69
70 @Override
71 protected CloseableHttpAsyncClient startClient() throws Exception {
72 return startClient(b -> {});
73 }
74
75 @ParameterizedTest(name = "{displayName}; concurrent connections: {0}")
76 @ValueSource(ints = {5, 1, 20})
77 @Timeout(value = 60_000, unit = MILLISECONDS)
78 public void testSequentialGetRequestsCloseConnection(final int concurrentConns) throws Exception {
79 final H2TestServer server = startServer();
80 server.register("/random/*", () ->
81 new ReactiveServerExchangeHandler(new ReactiveRandomProcessor()));
82 final HttpHost target = targetHost();
83
84 final CloseableHttpAsyncClient client = startClient();
85 final PoolingAsyncClientConnectionManager connManager = connManager();
86 connManager.setDefaultMaxPerRoute(concurrentConns);
87 connManager.setMaxTotal(100);
88
89 for (int i = 0; i < 3; i++) {
90 final SimpleHttpRequest get = SimpleRequestBuilder.get()
91 .setHttpHost(target)
92 .setPath("/random/2048")
93 .build();
94 get.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
95 final AsyncRequestProducer request = AsyncRequestBuilder.get(target + "/random/2048").build();
96 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
97
98 client.execute(request, consumer, null);
99
100 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
101 assertThat(response, CoreMatchers.notNullValue());
102 assertThat(response.getHead().getCode(), CoreMatchers.equalTo(200));
103 final String body = publisherToString(response.getBody());
104 assertThat(body, CoreMatchers.notNullValue());
105 assertThat(body.length(), CoreMatchers.equalTo(2048));
106 }
107 }
108
109 @Test
110 @Timeout(value = 60_000, unit = MILLISECONDS)
111 public void testSharedPool() throws Exception {
112 final H2TestServer server = startServer();
113 server.register("/random/*", () ->
114 new ReactiveServerExchangeHandler(new ReactiveRandomProcessor()));
115 final HttpHost target = targetHost();
116
117 final CloseableHttpAsyncClient client = startClient();
118 final PoolingAsyncClientConnectionManager connManager = connManager();
119
120 final AsyncRequestProducer request1 = AsyncRequestBuilder.get(target + "/random/2048").build();
121 final ReactiveResponseConsumer consumer1 = new ReactiveResponseConsumer();
122
123 client.execute(request1, consumer1, null);
124
125 final Message<HttpResponse, Publisher<ByteBuffer>> response1 = consumer1.getResponseFuture().get();
126 assertThat(response1, CoreMatchers.notNullValue());
127 assertThat(response1.getHead(), CoreMatchers.notNullValue());
128 assertThat(response1.getHead().getCode(), CoreMatchers.equalTo(200));
129 final String body1 = publisherToString(response1.getBody());
130 assertThat(body1, CoreMatchers.notNullValue());
131 assertThat(body1.length(), CoreMatchers.equalTo(2048));
132
133
134 try (final CloseableHttpAsyncClient httpclient2 = HttpAsyncClients.custom()
135 .setConnectionManager(connManager)
136 .setConnectionManagerShared(true)
137 .build()) {
138 httpclient2.start();
139 final AsyncRequestProducer request2 = AsyncRequestBuilder.get(target + "/random/2048").build();
140 final ReactiveResponseConsumer consumer2 = new ReactiveResponseConsumer();
141
142 httpclient2.execute(request2, consumer2, null);
143
144 final Message<HttpResponse, Publisher<ByteBuffer>> response2 = consumer2.getResponseFuture().get();
145 assertThat(response2, CoreMatchers.notNullValue());
146 assertThat(response2.getHead().getCode(), CoreMatchers.equalTo(200));
147 final String body2 = publisherToString(response2.getBody());
148 assertThat(body2, CoreMatchers.notNullValue());
149 assertThat(body2.length(), CoreMatchers.equalTo(2048));
150 }
151
152 final AsyncRequestProducer request3 = AsyncRequestBuilder.get(target + "/random/2048").build();
153 final ReactiveResponseConsumer consumer3 = new ReactiveResponseConsumer();
154
155 client.execute(request3, consumer3, null);
156
157 final Message<HttpResponse, Publisher<ByteBuffer>> response3 = consumer3.getResponseFuture().get();
158 assertThat(response3, CoreMatchers.notNullValue());
159 assertThat(response3.getHead().getCode(), CoreMatchers.equalTo(200));
160 final String body3 = publisherToString(response3.getBody());
161 assertThat(body3, CoreMatchers.notNullValue());
162 assertThat(body3.length(), CoreMatchers.equalTo(2048));
163 }
164
165 }