1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.testing.sync;
29
30 import java.io.ByteArrayInputStream;
31 import java.io.IOException;
32 import java.net.URI;
33 import java.nio.charset.StandardCharsets;
34 import java.util.ArrayList;
35 import java.util.List;
36
37 import org.apache.hc.client5.http.classic.methods.HttpGet;
38 import org.apache.hc.client5.http.classic.methods.HttpPost;
39 import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
40 import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
41 import org.apache.hc.client5.testing.classic.RandomHandler;
42 import org.apache.hc.client5.testing.extension.sync.ClientProtocolLevel;
43 import org.apache.hc.client5.testing.extension.sync.TestClient;
44 import org.apache.hc.core5.http.ClassicHttpRequest;
45 import org.apache.hc.core5.http.ContentType;
46 import org.apache.hc.core5.http.EntityDetails;
47 import org.apache.hc.core5.http.Header;
48 import org.apache.hc.core5.http.HeaderElements;
49 import org.apache.hc.core5.http.HttpException;
50 import org.apache.hc.core5.http.HttpHeaders;
51 import org.apache.hc.core5.http.HttpHost;
52 import org.apache.hc.core5.http.HttpResponse;
53 import org.apache.hc.core5.http.HttpResponseInterceptor;
54 import org.apache.hc.core5.http.URIScheme;
55 import org.apache.hc.core5.http.impl.HttpProcessors;
56 import org.apache.hc.core5.http.io.entity.EntityUtils;
57 import org.apache.hc.core5.http.io.entity.InputStreamEntity;
58 import org.apache.hc.core5.http.protocol.HttpContext;
59 import org.junit.jupiter.api.Assertions;
60 import org.junit.jupiter.api.Test;
61
62 public class TestConnectionReuse extends AbstractIntegrationTestBase {
63
64 public TestConnectionReuse() {
65 super(URIScheme.HTTP, ClientProtocolLevel.STANDARD);
66 }
67
68 @Test
69 public void testReuseOfPersistentConnections() throws Exception {
70 configureServer(bootstrap -> bootstrap
71 .register("/random/*", new RandomHandler()));
72 final HttpHost target = startServer();
73
74 final TestClient client = client();
75 final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
76 connManager.setMaxTotal(5);
77 connManager.setDefaultMaxPerRoute(5);
78
79 final WorkerThread[] workers = new WorkerThread[10];
80 for (int i = 0; i < workers.length; i++) {
81 workers[i] = new WorkerThread(
82 client,
83 target,
84 new URI("/random/2000"),
85 10, false);
86 }
87
88 for (final WorkerThread worker : workers) {
89 worker.start();
90 }
91 for (final WorkerThread worker : workers) {
92 worker.join(10000);
93 final Exception ex = worker.getException();
94 if (ex != null) {
95 throw ex;
96 }
97 }
98
99
100 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
101
102 Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
103 }
104
105 @Test
106 public void testReuseOfPersistentConnectionsWithStreamedRequestAndResponse() throws Exception {
107 configureServer(bootstrap -> bootstrap
108 .register("/random/*", new RandomHandler()));
109 final HttpHost target = startServer();
110
111 final TestClient client = client();
112 final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
113 connManager.setMaxTotal(5);
114 connManager.setDefaultMaxPerRoute(5);
115
116 final WorkerThread[] workers = new WorkerThread[10];
117 for (int i = 0; i < workers.length; i++) {
118 final List<ClassicHttpRequest> requests = new ArrayList<>();
119 for (int j = 0; j < 10; j++) {
120 final HttpPost post = new HttpPost(new URI("/random/2000"));
121
122 post.setEntity(new InputStreamEntity(
123 new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)),
124 ContentType.APPLICATION_OCTET_STREAM));
125 requests.add(post);
126 }
127 workers[i] = new WorkerThread(client, target, false, requests);
128 }
129
130 for (final WorkerThread worker : workers) {
131 worker.start();
132 }
133 for (final WorkerThread worker : workers) {
134 worker.join(10000);
135 final Exception ex = worker.getException();
136 if (ex != null) {
137 throw ex;
138 }
139 }
140
141
142 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
143
144 Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
145 }
146
147 private static class AlwaysCloseConn implements HttpResponseInterceptor {
148
149 @Override
150 public void process(
151 final HttpResponse response,
152 final EntityDetails entityDetails,
153 final HttpContext context) throws HttpException, IOException {
154 response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
155 }
156
157 }
158
159 @Test
160 public void testReuseOfClosedConnections() throws Exception {
161 configureServer(bootstrap -> bootstrap
162 .setHttpProcessor(HttpProcessors.customServer(null)
163 .add(new AlwaysCloseConn())
164 .build()));
165 final HttpHost target = startServer();
166
167 final TestClient client = client();
168 final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
169 connManager.setMaxTotal(5);
170 connManager.setDefaultMaxPerRoute(5);
171
172 final WorkerThread[] workers = new WorkerThread[10];
173 for (int i = 0; i < workers.length; i++) {
174 workers[i] = new WorkerThread(
175 client,
176 target,
177 new URI("/random/2000"),
178 10, false);
179 }
180
181 for (final WorkerThread worker : workers) {
182 worker.start();
183 }
184 for (final WorkerThread worker : workers) {
185 worker.join(10000);
186 final Exception ex = worker.getException();
187 if (ex != null) {
188 throw ex;
189 }
190 }
191
192
193 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
194
195 Assertions.assertEquals(0, connManager.getTotalStats().getAvailable());
196 }
197
198 @Test
199 public void testReuseOfAbortedConnections() throws Exception {
200 configureServer(bootstrap -> bootstrap
201 .register("/random/*", new RandomHandler()));
202 final HttpHost target = startServer();
203
204 final TestClient client = client();
205 final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
206 connManager.setMaxTotal(5);
207 connManager.setDefaultMaxPerRoute(5);
208
209 final WorkerThread[] workers = new WorkerThread[10];
210 for (int i = 0; i < workers.length; i++) {
211 workers[i] = new WorkerThread(
212 client,
213 target,
214 new URI("/random/2000"),
215 10, true);
216 }
217
218 for (final WorkerThread worker : workers) {
219 worker.start();
220 }
221 for (final WorkerThread worker : workers) {
222 worker.join(10000);
223 final Exception ex = worker.getException();
224 if (ex != null) {
225 throw ex;
226 }
227 }
228
229
230 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
231
232 Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
233 }
234
235 @Test
236 public void testKeepAliveHeaderRespected() throws Exception {
237 configureServer(bootstrap -> bootstrap
238 .setHttpProcessor(HttpProcessors.customServer(null)
239 .add(new ResponseKeepAlive())
240 .build())
241 .register("/random/*", new RandomHandler()));
242 final HttpHost target = startServer();
243
244 final TestClient client = client();
245 final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
246 connManager.setMaxTotal(1);
247 connManager.setDefaultMaxPerRoute(1);
248
249 client.execute(target, new HttpGet("/random/2000"), response -> {
250 EntityUtils.consume(response.getEntity());
251 return null;
252 });
253
254 Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
255
256 client.execute(target, new HttpGet("/random/2000"), response -> {
257 EntityUtils.consume(response.getEntity());
258 return null;
259 });
260
261 Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
262
263
264 Thread.sleep(1100);
265 client.execute(target, new HttpGet("/random/2000"), response -> {
266 EntityUtils.consume(response.getEntity());
267 return null;
268 });
269
270 Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
271
272
273
274 Thread.sleep(500);
275 client.execute(target, new HttpGet("/random/2000"), response -> {
276 EntityUtils.consume(response.getEntity());
277 return null;
278 });
279
280
281 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
282 Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
283 }
284
285 private static class WorkerThread extends Thread {
286
287 private final HttpHost target;
288 private final CloseableHttpClient httpclient;
289 private final boolean forceClose;
290 private final List<ClassicHttpRequest> requests;
291
292 private volatile Exception exception;
293
294 public WorkerThread(
295 final CloseableHttpClient httpclient,
296 final HttpHost target,
297 final URI requestURI,
298 final int repetitions,
299 final boolean forceClose) {
300 super();
301 this.httpclient = httpclient;
302 this.target = target;
303 this.forceClose = forceClose;
304 this.requests = new ArrayList<>(repetitions);
305 for (int i = 0; i < repetitions; i++) {
306 requests.add(new HttpGet(requestURI));
307 }
308 }
309
310 public WorkerThread(
311 final CloseableHttpClient httpclient,
312 final HttpHost target,
313 final boolean forceClose,
314 final List<ClassicHttpRequest> requests) {
315 super();
316 this.httpclient = httpclient;
317 this.target = target;
318 this.forceClose = forceClose;
319 this.requests = requests;
320 }
321
322 @Override
323 public void run() {
324 try {
325 for (final ClassicHttpRequest request : requests) {
326 this.httpclient.execute(this.target, request, response -> {
327 if (this.forceClose) {
328 response.close();
329 } else {
330 EntityUtils.consume(response.getEntity());
331 }
332 return null;
333 });
334 }
335 } catch (final Exception ex) {
336 this.exception = ex;
337 }
338 }
339
340 public Exception getException() {
341 return exception;
342 }
343
344 }
345
346
347
348 private static class ResponseKeepAlive implements HttpResponseInterceptor {
349 @Override
350 public void process(
351 final HttpResponse response,
352 final EntityDetails entityDetails,
353 final HttpContext context) throws HttpException, IOException {
354 final Header connection = response.getFirstHeader(HttpHeaders.CONNECTION);
355 if(connection != null) {
356 if(!connection.getValue().equalsIgnoreCase("Close")) {
357 response.addHeader(HeaderElements.KEEP_ALIVE, "timeout=1");
358 }
359 }
360 }
361 }
362
363 }