1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.testing.sync;
29
30 import java.io.ByteArrayInputStream;
31 import java.io.IOException;
32 import java.net.URI;
33 import java.nio.charset.StandardCharsets;
34 import java.util.ArrayList;
35 import java.util.List;
36
37 import org.apache.hc.client5.http.classic.methods.HttpGet;
38 import org.apache.hc.client5.http.classic.methods.HttpPost;
39 import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
40 import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
41 import org.apache.hc.client5.testing.classic.RandomHandler;
42 import org.apache.hc.client5.testing.sync.extension.TestClientResources;
43 import org.apache.hc.core5.http.ClassicHttpRequest;
44 import org.apache.hc.core5.http.ContentType;
45 import org.apache.hc.core5.http.EntityDetails;
46 import org.apache.hc.core5.http.Header;
47 import org.apache.hc.core5.http.HeaderElements;
48 import org.apache.hc.core5.http.HttpException;
49 import org.apache.hc.core5.http.HttpHeaders;
50 import org.apache.hc.core5.http.HttpHost;
51 import org.apache.hc.core5.http.HttpResponse;
52 import org.apache.hc.core5.http.HttpResponseInterceptor;
53 import org.apache.hc.core5.http.URIScheme;
54 import org.apache.hc.core5.http.impl.HttpProcessors;
55 import org.apache.hc.core5.http.io.entity.EntityUtils;
56 import org.apache.hc.core5.http.io.entity.InputStreamEntity;
57 import org.apache.hc.core5.http.protocol.HttpContext;
58 import org.apache.hc.core5.http.protocol.HttpProcessor;
59 import org.apache.hc.core5.testing.classic.ClassicTestServer;
60 import org.apache.hc.core5.util.Timeout;
61 import org.junit.jupiter.api.Assertions;
62 import org.junit.jupiter.api.Test;
63 import org.junit.jupiter.api.extension.RegisterExtension;
64
65 public class TestConnectionReuse {
66
67 public static final Timeout TIMEOUT = Timeout.ofMinutes(1);
68
69 @RegisterExtension
70 private TestClientResources testResources = new TestClientResources(URIScheme.HTTP, TIMEOUT);
71
72 public HttpHost targetHost() {
73 return testResources.targetHost();
74 }
75
76 @Test
77 public void testReuseOfPersistentConnections() throws Exception {
78 final ClassicTestServer server = testResources.startServer( null, null, null);
79 server.registerHandler("/random/*", new RandomHandler());
80 final HttpHost target = targetHost();
81
82 final CloseableHttpClient client = testResources.startClient(
83 builder -> builder
84 .setMaxConnTotal(5)
85 .setMaxConnPerRoute(5),
86 builder -> {
87 }
88 );
89
90 final PoolingHttpClientConnectionManager connManager = testResources.connManager();
91
92 final WorkerThread[] workers = new WorkerThread[10];
93 for (int i = 0; i < workers.length; i++) {
94 workers[i] = new WorkerThread(
95 client,
96 target,
97 new URI("/random/2000"),
98 10, false);
99 }
100
101 for (final WorkerThread worker : workers) {
102 worker.start();
103 }
104 for (final WorkerThread worker : workers) {
105 worker.join(10000);
106 final Exception ex = worker.getException();
107 if (ex != null) {
108 throw ex;
109 }
110 }
111
112
113 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
114
115 Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
116 }
117
118 @Test
119 public void testReuseOfPersistentConnectionsWithStreamedRequestAndResponse() throws Exception {
120 final ClassicTestServer server = testResources.startServer( null, null, null);
121 server.registerHandler("/random/*", new RandomHandler());
122 final HttpHost target = targetHost();
123
124 final CloseableHttpClient client = testResources.startClient(
125 builder -> builder
126 .setMaxConnTotal(5)
127 .setMaxConnPerRoute(5),
128 builder -> {
129 }
130 );
131
132 final PoolingHttpClientConnectionManager connManager = testResources.connManager();
133
134 final WorkerThread[] workers = new WorkerThread[10];
135 for (int i = 0; i < workers.length; i++) {
136 final List<ClassicHttpRequest> requests = new ArrayList<>();
137 for (int j = 0; j < 10; j++) {
138 final HttpPost post = new HttpPost(new URI("/random/2000"));
139
140 post.setEntity(new InputStreamEntity(
141 new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)),
142 ContentType.APPLICATION_OCTET_STREAM));
143 requests.add(post);
144 }
145 workers[i] = new WorkerThread(client, target, false, requests);
146 }
147
148 for (final WorkerThread worker : workers) {
149 worker.start();
150 }
151 for (final WorkerThread worker : workers) {
152 worker.join(10000);
153 final Exception ex = worker.getException();
154 if (ex != null) {
155 throw ex;
156 }
157 }
158
159
160 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
161
162 Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
163 }
164
165 private static class AlwaysCloseConn implements HttpResponseInterceptor {
166
167 @Override
168 public void process(
169 final HttpResponse response,
170 final EntityDetails entityDetails,
171 final HttpContext context) throws HttpException, IOException {
172 response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
173 }
174
175 }
176
177 @Test
178 public void testReuseOfClosedConnections() throws Exception {
179 final HttpProcessor httpproc = HttpProcessors.customServer(null)
180 .add(new AlwaysCloseConn())
181 .build();
182 final ClassicTestServer server = testResources.startServer( null, httpproc, null);
183 final HttpHost target = targetHost();
184
185 final CloseableHttpClient client = testResources.startClient(
186 builder -> builder
187 .setMaxConnTotal(5)
188 .setMaxConnPerRoute(5),
189 builder -> {
190 }
191 );
192
193 final PoolingHttpClientConnectionManager connManager = testResources.connManager();
194
195 final WorkerThread[] workers = new WorkerThread[10];
196 for (int i = 0; i < workers.length; i++) {
197 workers[i] = new WorkerThread(
198 client,
199 target,
200 new URI("/random/2000"),
201 10, false);
202 }
203
204 for (final WorkerThread worker : workers) {
205 worker.start();
206 }
207 for (final WorkerThread worker : workers) {
208 worker.join(10000);
209 final Exception ex = worker.getException();
210 if (ex != null) {
211 throw ex;
212 }
213 }
214
215
216 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
217
218 Assertions.assertEquals(0, connManager.getTotalStats().getAvailable());
219 }
220
221 @Test
222 public void testReuseOfAbortedConnections() throws Exception {
223 final ClassicTestServer server = testResources.startServer( null, null, null);
224 server.registerHandler("/random/*", new RandomHandler());
225 final HttpHost target = targetHost();
226
227 final CloseableHttpClient client = testResources.startClient(
228 builder -> builder
229 .setMaxConnTotal(5)
230 .setMaxConnPerRoute(5),
231 builder -> {
232 }
233 );
234
235 final PoolingHttpClientConnectionManager connManager = testResources.connManager();
236
237 final WorkerThread[] workers = new WorkerThread[10];
238 for (int i = 0; i < workers.length; i++) {
239 workers[i] = new WorkerThread(
240 client,
241 target,
242 new URI("/random/2000"),
243 10, true);
244 }
245
246 for (final WorkerThread worker : workers) {
247 worker.start();
248 }
249 for (final WorkerThread worker : workers) {
250 worker.join(10000);
251 final Exception ex = worker.getException();
252 if (ex != null) {
253 throw ex;
254 }
255 }
256
257
258 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
259
260 Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
261 }
262
263 @Test
264 public void testKeepAliveHeaderRespected() throws Exception {
265 final HttpProcessor httpproc = HttpProcessors.customServer(null)
266 .add(new ResponseKeepAlive())
267 .build();
268 final ClassicTestServer server = testResources.startServer( null, httpproc, null);
269 server.registerHandler("/random/*", new RandomHandler());
270 final HttpHost target = targetHost();
271
272 final CloseableHttpClient client = testResources.startClient(
273 builder -> builder
274 .setMaxConnTotal(1)
275 .setMaxConnPerRoute(1),
276 builder -> {
277 }
278 );
279
280 final PoolingHttpClientConnectionManager connManager = testResources.connManager();
281
282
283 client.execute(target, new HttpGet("/random/2000"), response -> {
284 EntityUtils.consume(response.getEntity());
285 return null;
286 });
287
288 Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
289
290 client.execute(target, new HttpGet("/random/2000"), response -> {
291 EntityUtils.consume(response.getEntity());
292 return null;
293 });
294
295 Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
296
297
298 Thread.sleep(1100);
299 client.execute(target, new HttpGet("/random/2000"), response -> {
300 EntityUtils.consume(response.getEntity());
301 return null;
302 });
303
304 Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
305
306
307
308 Thread.sleep(500);
309 client.execute(target, new HttpGet("/random/2000"), response -> {
310 EntityUtils.consume(response.getEntity());
311 return null;
312 });
313
314
315 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
316 Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
317 }
318
319 private static class WorkerThread extends Thread {
320
321 private final HttpHost target;
322 private final CloseableHttpClient httpclient;
323 private final boolean forceClose;
324 private final List<ClassicHttpRequest> requests;
325
326 private volatile Exception exception;
327
328 public WorkerThread(
329 final CloseableHttpClient httpclient,
330 final HttpHost target,
331 final URI requestURI,
332 final int repetitions,
333 final boolean forceClose) {
334 super();
335 this.httpclient = httpclient;
336 this.target = target;
337 this.forceClose = forceClose;
338 this.requests = new ArrayList<>(repetitions);
339 for (int i = 0; i < repetitions; i++) {
340 requests.add(new HttpGet(requestURI));
341 }
342 }
343
344 public WorkerThread(
345 final CloseableHttpClient httpclient,
346 final HttpHost target,
347 final boolean forceClose,
348 final List<ClassicHttpRequest> requests) {
349 super();
350 this.httpclient = httpclient;
351 this.target = target;
352 this.forceClose = forceClose;
353 this.requests = requests;
354 }
355
356 @Override
357 public void run() {
358 try {
359 for (final ClassicHttpRequest request : requests) {
360 this.httpclient.execute(this.target, request, response -> {
361 if (this.forceClose) {
362 response.close();
363 } else {
364 EntityUtils.consume(response.getEntity());
365 }
366 return null;
367 });
368 }
369 } catch (final Exception ex) {
370 this.exception = ex;
371 }
372 }
373
374 public Exception getException() {
375 return exception;
376 }
377
378 }
379
380
381
382 private static class ResponseKeepAlive implements HttpResponseInterceptor {
383 @Override
384 public void process(
385 final HttpResponse response,
386 final EntityDetails entityDetails,
387 final HttpContext context) throws HttpException, IOException {
388 final Header connection = response.getFirstHeader(HttpHeaders.CONNECTION);
389 if(connection != null) {
390 if(!connection.getValue().equalsIgnoreCase("Close")) {
391 response.addHeader(HeaderElements.KEEP_ALIVE, "timeout=1");
392 }
393 }
394 }
395 }
396
397 }