1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.testing.sync;
29
30 import java.io.ByteArrayInputStream;
31 import java.io.IOException;
32 import java.net.URI;
33 import java.nio.charset.StandardCharsets;
34 import java.util.ArrayList;
35 import java.util.List;
36
37 import org.apache.hc.client5.http.classic.methods.HttpGet;
38 import org.apache.hc.client5.http.classic.methods.HttpPost;
39 import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
40 import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
41 import org.apache.hc.core5.http.ClassicHttpResponse;
42 import org.apache.hc.core5.http.ContentType;
43 import org.apache.hc.core5.http.EntityDetails;
44 import org.apache.hc.core5.http.Header;
45 import org.apache.hc.core5.http.HeaderElements;
46 import org.apache.hc.core5.http.HttpException;
47 import org.apache.hc.core5.http.HttpHeaders;
48 import org.apache.hc.core5.http.HttpHost;
49 import org.apache.hc.core5.http.HttpResponse;
50 import org.apache.hc.core5.http.HttpResponseInterceptor;
51 import org.apache.hc.core5.http.impl.HttpProcessors;
52 import org.apache.hc.core5.http.io.entity.EntityUtils;
53 import org.apache.hc.core5.http.io.entity.InputStreamEntity;
54 import org.apache.hc.core5.http.protocol.HttpContext;
55 import org.apache.hc.core5.http.protocol.HttpProcessor;
56 import org.junit.Assert;
57 import org.junit.Test;
58
59 public class TestConnectionReuse extends LocalServerTestBase {
60
61 @Test
62 public void testReuseOfPersistentConnections() throws Exception {
63 this.connManager.setMaxTotal(5);
64 this.connManager.setDefaultMaxPerRoute(5);
65
66 final HttpHost target = start();
67
68 final WorkerThread[] workers = new WorkerThread[10];
69 for (int i = 0; i < workers.length; i++) {
70 workers[i] = new WorkerThread(
71 this.httpclient,
72 target,
73 new URI("/random/2000"),
74 10, false);
75 }
76
77 for (final WorkerThread worker : workers) {
78 worker.start();
79 }
80 for (final WorkerThread worker : workers) {
81 worker.join(10000);
82 final Exception ex = worker.getException();
83 if (ex != null) {
84 throw ex;
85 }
86 }
87
88
89 Assert.assertTrue(this.connManager.getTotalStats().getAvailable() > 0);
90 }
91
92 @Test
93 public void testReuseOfPersistentConnectionsWithStreamedRequestAndResponse() throws Exception {
94 this.connManager.setMaxTotal(5);
95 this.connManager.setDefaultMaxPerRoute(5);
96
97 final HttpHost target = start();
98
99 final WorkerThread[] workers = new WorkerThread[10];
100 for (int i = 0; i < workers.length; i++) {
101 final List<HttpUriRequestBase> requests = new ArrayList<>();
102 for (int j = 0; j < 10; j++) {
103 final HttpPost post = new HttpPost(new URI("/random/2000"));
104
105 post.setEntity(new InputStreamEntity(
106 new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)),
107 ContentType.APPLICATION_OCTET_STREAM));
108 requests.add(post);
109 }
110 workers[i] = new WorkerThread(this.httpclient, target, false, requests);
111 }
112
113 for (final WorkerThread worker : workers) {
114 worker.start();
115 }
116 for (final WorkerThread worker : workers) {
117 worker.join(10000);
118 final Exception ex = worker.getException();
119 if (ex != null) {
120 throw ex;
121 }
122 }
123
124
125 Assert.assertTrue(this.connManager.getTotalStats().getAvailable() > 0);
126 }
127
128 private static class AlwaysCloseConn implements HttpResponseInterceptor {
129
130 @Override
131 public void process(
132 final HttpResponse response,
133 final EntityDetails entityDetails,
134 final HttpContext context) throws HttpException, IOException {
135 response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
136 }
137
138 }
139
140 @Test
141 public void testReuseOfClosedConnections() throws Exception {
142 this.connManager.setMaxTotal(5);
143 this.connManager.setDefaultMaxPerRoute(5);
144
145 final HttpProcessor httpproc = HttpProcessors.customServer(null)
146 .add(new AlwaysCloseConn())
147 .build();
148 final HttpHost target = start(httpproc, null);
149
150 final WorkerThread[] workers = new WorkerThread[10];
151 for (int i = 0; i < workers.length; i++) {
152 workers[i] = new WorkerThread(
153 this.httpclient,
154 target,
155 new URI("/random/2000"),
156 10, false);
157 }
158
159 for (final WorkerThread worker : workers) {
160 worker.start();
161 }
162 for (final WorkerThread worker : workers) {
163 worker.join(10000);
164 final Exception ex = worker.getException();
165 if (ex != null) {
166 throw ex;
167 }
168 }
169
170
171 Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
172 }
173
174 @Test
175 public void testReuseOfAbortedConnections() throws Exception {
176 this.connManager.setMaxTotal(5);
177 this.connManager.setDefaultMaxPerRoute(5);
178
179 final HttpHost target = start();
180
181 final WorkerThread[] workers = new WorkerThread[10];
182 for (int i = 0; i < workers.length; i++) {
183 workers[i] = new WorkerThread(
184 this.httpclient,
185 target,
186 new URI("/random/2000"),
187 10, true);
188 }
189
190 for (final WorkerThread worker : workers) {
191 worker.start();
192 }
193 for (final WorkerThread worker : workers) {
194 worker.join(10000);
195 final Exception ex = worker.getException();
196 if (ex != null) {
197 throw ex;
198 }
199 }
200
201
202 Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
203 }
204
205 @Test
206 public void testKeepAliveHeaderRespected() throws Exception {
207 this.connManager.setMaxTotal(1);
208 this.connManager.setDefaultMaxPerRoute(1);
209
210 final HttpProcessor httpproc = HttpProcessors.customServer(null)
211 .add(new ResponseKeepAlive())
212 .build();
213 final HttpHost target = start(httpproc, null);
214
215 ClassicHttpResponse response = this.httpclient.execute(target, new HttpGet("/random/2000"));
216 EntityUtils.consume(response.getEntity());
217
218 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
219
220 response = this.httpclient.execute(target, new HttpGet("/random/2000"));
221 EntityUtils.consume(response.getEntity());
222
223 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
224
225
226 Thread.sleep(1100);
227 response = this.httpclient.execute(target, new HttpGet("/random/2000"));
228 EntityUtils.consume(response.getEntity());
229
230 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
231
232
233
234 Thread.sleep(500);
235 response = this.httpclient.execute(target, new HttpGet("/random/2000"));
236 EntityUtils.consume(response.getEntity());
237
238 Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
239 }
240
241 private static class WorkerThread extends Thread {
242
243 private final HttpHost target;
244 private final CloseableHttpClient httpclient;
245 private final boolean forceClose;
246 private final List<HttpUriRequestBase> requests;
247
248 private volatile Exception exception;
249
250 public WorkerThread(
251 final CloseableHttpClient httpclient,
252 final HttpHost target,
253 final URI requestURI,
254 final int repetitions,
255 final boolean forceClose) {
256 super();
257 this.httpclient = httpclient;
258 this.target = target;
259 this.forceClose = forceClose;
260 this.requests = new ArrayList<>(repetitions);
261 for (int i = 0; i < repetitions; i++) {
262 requests.add(new HttpGet(requestURI));
263 }
264 }
265
266 public WorkerThread(
267 final CloseableHttpClient httpclient,
268 final HttpHost target,
269 final boolean forceClose,
270 final List<HttpUriRequestBase> requests) {
271 super();
272 this.httpclient = httpclient;
273 this.target = target;
274 this.forceClose = forceClose;
275 this.requests = requests;
276 }
277
278 @Override
279 public void run() {
280 try {
281 for (final HttpUriRequestBase request : requests) {
282 final ClassicHttpResponse response = this.httpclient.execute(
283 this.target,
284 request);
285 if (this.forceClose) {
286 request.cancel();
287 } else {
288 EntityUtils.consume(response.getEntity());
289 }
290 }
291 } catch (final Exception ex) {
292 this.exception = ex;
293 }
294 }
295
296 public Exception getException() {
297 return exception;
298 }
299
300 }
301
302
303
304 private static class ResponseKeepAlive implements HttpResponseInterceptor {
305 @Override
306 public void process(
307 final HttpResponse response,
308 final EntityDetails entityDetails,
309 final HttpContext context) throws HttpException, IOException {
310 final Header connection = response.getFirstHeader(HttpHeaders.CONNECTION);
311 if(connection != null) {
312 if(!connection.getValue().equalsIgnoreCase("Close")) {
313 response.addHeader(HeaderElements.KEEP_ALIVE, "timeout=1");
314 }
315 }
316 }
317 }
318
319 }