View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.testing.sync;
29  
30  import java.io.ByteArrayInputStream;
31  import java.io.IOException;
32  import java.net.URI;
33  import java.nio.charset.StandardCharsets;
34  import java.util.ArrayList;
35  import java.util.List;
36  
37  import org.apache.hc.client5.http.classic.methods.HttpGet;
38  import org.apache.hc.client5.http.classic.methods.HttpPost;
39  import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
40  import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
41  import org.apache.hc.client5.testing.classic.RandomHandler;
42  import org.apache.hc.client5.testing.sync.extension.TestClientResources;
43  import org.apache.hc.core5.http.ClassicHttpRequest;
44  import org.apache.hc.core5.http.ContentType;
45  import org.apache.hc.core5.http.EntityDetails;
46  import org.apache.hc.core5.http.Header;
47  import org.apache.hc.core5.http.HeaderElements;
48  import org.apache.hc.core5.http.HttpException;
49  import org.apache.hc.core5.http.HttpHeaders;
50  import org.apache.hc.core5.http.HttpHost;
51  import org.apache.hc.core5.http.HttpResponse;
52  import org.apache.hc.core5.http.HttpResponseInterceptor;
53  import org.apache.hc.core5.http.URIScheme;
54  import org.apache.hc.core5.http.impl.HttpProcessors;
55  import org.apache.hc.core5.http.io.entity.EntityUtils;
56  import org.apache.hc.core5.http.io.entity.InputStreamEntity;
57  import org.apache.hc.core5.http.protocol.HttpContext;
58  import org.apache.hc.core5.http.protocol.HttpProcessor;
59  import org.apache.hc.core5.testing.classic.ClassicTestServer;
60  import org.apache.hc.core5.util.Timeout;
61  import org.junit.jupiter.api.Assertions;
62  import org.junit.jupiter.api.Test;
63  import org.junit.jupiter.api.extension.RegisterExtension;
64  
65  public class TestConnectionReuse {
66  
67      public static final Timeout TIMEOUT = Timeout.ofMinutes(1);
68  
69      @RegisterExtension
70      private TestClientResources testResources = new TestClientResources(URIScheme.HTTP, TIMEOUT);
71  
72      public HttpHost targetHost() {
73          return testResources.targetHost();
74      }
75  
76      @Test
77      public void testReuseOfPersistentConnections() throws Exception {
78          final ClassicTestServer server =  testResources.startServer(  null, null, null);
79          server.registerHandler("/random/*", new RandomHandler());
80          final HttpHost target = targetHost();
81  
82          final CloseableHttpClient client = testResources.startClient(
83                  builder -> builder
84                          .setMaxConnTotal(5)
85                          .setMaxConnPerRoute(5),
86                  builder -> {
87                  }
88          );
89  
90          final PoolingHttpClientConnectionManager connManager = testResources.connManager();
91  
92          final WorkerThread[] workers = new WorkerThread[10];
93          for (int i = 0; i < workers.length; i++) {
94              workers[i] = new WorkerThread(
95                      client,
96                      target,
97                      new URI("/random/2000"),
98                      10, false);
99          }
100 
101         for (final WorkerThread worker : workers) {
102             worker.start();
103         }
104         for (final WorkerThread worker : workers) {
105             worker.join(10000);
106             final Exception ex = worker.getException();
107             if (ex != null) {
108                 throw ex;
109             }
110         }
111 
112         // Expect leased connections to be returned
113         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
114         // Expect some connection in the pool
115         Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
116     }
117 
118     @Test
119     public void testReuseOfPersistentConnectionsWithStreamedRequestAndResponse() throws Exception {
120         final ClassicTestServer server =  testResources.startServer(  null, null, null);
121         server.registerHandler("/random/*", new RandomHandler());
122         final HttpHost target = targetHost();
123 
124         final CloseableHttpClient client = testResources.startClient(
125                 builder -> builder
126                         .setMaxConnTotal(5)
127                         .setMaxConnPerRoute(5),
128                 builder -> {
129                 }
130         );
131 
132         final PoolingHttpClientConnectionManager connManager = testResources.connManager();
133 
134         final WorkerThread[] workers = new WorkerThread[10];
135         for (int i = 0; i < workers.length; i++) {
136             final List<ClassicHttpRequest> requests = new ArrayList<>();
137             for (int j = 0; j < 10; j++) {
138                 final HttpPost post = new HttpPost(new URI("/random/2000"));
139                 // non-repeatable
140                 post.setEntity(new InputStreamEntity(
141                         new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)),
142                         ContentType.APPLICATION_OCTET_STREAM));
143                 requests.add(post);
144             }
145             workers[i] = new WorkerThread(client, target, false, requests);
146         }
147 
148         for (final WorkerThread worker : workers) {
149             worker.start();
150         }
151         for (final WorkerThread worker : workers) {
152             worker.join(10000);
153             final Exception ex = worker.getException();
154             if (ex != null) {
155                 throw ex;
156             }
157         }
158 
159         // Expect leased connections to be returned
160         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
161         // Expect some connection in the pool
162         Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
163     }
164 
165     private static class AlwaysCloseConn implements HttpResponseInterceptor {
166 
167         @Override
168         public void process(
169                 final HttpResponse response,
170                 final EntityDetails entityDetails,
171                 final HttpContext context) throws HttpException, IOException {
172             response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
173         }
174 
175     }
176 
177     @Test
178     public void testReuseOfClosedConnections() throws Exception {
179         final HttpProcessor httpproc = HttpProcessors.customServer(null)
180                 .add(new AlwaysCloseConn())
181                 .build();
182         final ClassicTestServer server =  testResources.startServer(  null, httpproc, null);
183         final HttpHost target = targetHost();
184 
185         final CloseableHttpClient client = testResources.startClient(
186                 builder -> builder
187                         .setMaxConnTotal(5)
188                         .setMaxConnPerRoute(5),
189                 builder -> {
190                 }
191         );
192 
193         final PoolingHttpClientConnectionManager connManager = testResources.connManager();
194 
195         final WorkerThread[] workers = new WorkerThread[10];
196         for (int i = 0; i < workers.length; i++) {
197             workers[i] = new WorkerThread(
198                     client,
199                     target,
200                     new URI("/random/2000"),
201                     10, false);
202         }
203 
204         for (final WorkerThread worker : workers) {
205             worker.start();
206         }
207         for (final WorkerThread worker : workers) {
208             worker.join(10000);
209             final Exception ex = worker.getException();
210             if (ex != null) {
211                 throw ex;
212             }
213         }
214 
215         // Expect leased connections to be returned
216         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
217         // Expect zero connections in the pool
218         Assertions.assertEquals(0, connManager.getTotalStats().getAvailable());
219     }
220 
221     @Test
222     public void testReuseOfAbortedConnections() throws Exception {
223         final ClassicTestServer server =  testResources.startServer(  null, null, null);
224         server.registerHandler("/random/*", new RandomHandler());
225         final HttpHost target = targetHost();
226 
227         final CloseableHttpClient client = testResources.startClient(
228                 builder -> builder
229                         .setMaxConnTotal(5)
230                         .setMaxConnPerRoute(5),
231                 builder -> {
232                 }
233         );
234 
235         final PoolingHttpClientConnectionManager connManager = testResources.connManager();
236 
237         final WorkerThread[] workers = new WorkerThread[10];
238         for (int i = 0; i < workers.length; i++) {
239             workers[i] = new WorkerThread(
240                     client,
241                     target,
242                     new URI("/random/2000"),
243                     10, true);
244         }
245 
246         for (final WorkerThread worker : workers) {
247             worker.start();
248         }
249         for (final WorkerThread worker : workers) {
250             worker.join(10000);
251             final Exception ex = worker.getException();
252             if (ex != null) {
253                 throw ex;
254             }
255         }
256 
257         // Expect leased connections to be returned
258         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
259         // Expect some connections in the pool
260         Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
261     }
262 
263     @Test
264     public void testKeepAliveHeaderRespected() throws Exception {
265         final HttpProcessor httpproc = HttpProcessors.customServer(null)
266                 .add(new ResponseKeepAlive())
267                 .build();
268         final ClassicTestServer server =  testResources.startServer(  null, httpproc, null);
269         server.registerHandler("/random/*", new RandomHandler());
270         final HttpHost target = targetHost();
271 
272         final CloseableHttpClient client = testResources.startClient(
273                 builder -> builder
274                         .setMaxConnTotal(1)
275                         .setMaxConnPerRoute(1),
276                 builder -> {
277                 }
278         );
279 
280         final PoolingHttpClientConnectionManager connManager = testResources.connManager();
281 
282 
283         client.execute(target, new HttpGet("/random/2000"), response -> {
284             EntityUtils.consume(response.getEntity());
285             return null;
286         });
287 
288         Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
289 
290         client.execute(target, new HttpGet("/random/2000"), response -> {
291             EntityUtils.consume(response.getEntity());
292             return null;
293         });
294 
295         Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
296 
297         // Now sleep for 1.1 seconds and let the timeout do its work
298         Thread.sleep(1100);
299         client.execute(target, new HttpGet("/random/2000"), response -> {
300             EntityUtils.consume(response.getEntity());
301             return null;
302         });
303 
304         Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
305 
306         // Do another request just under the 1 second limit & make
307         // sure we reuse that connection.
308         Thread.sleep(500);
309         client.execute(target, new HttpGet("/random/2000"), response -> {
310             EntityUtils.consume(response.getEntity());
311             return null;
312         });
313 
314         // Expect leased connections to be returned
315         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
316         Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
317     }
318 
319     private static class WorkerThread extends Thread {
320 
321         private final HttpHost target;
322         private final CloseableHttpClient httpclient;
323         private final boolean forceClose;
324         private final List<ClassicHttpRequest> requests;
325 
326         private volatile Exception exception;
327 
328         public WorkerThread(
329                 final CloseableHttpClient httpclient,
330                 final HttpHost target,
331                 final URI requestURI,
332                 final int repetitions,
333                 final boolean forceClose) {
334             super();
335             this.httpclient = httpclient;
336             this.target = target;
337             this.forceClose = forceClose;
338             this.requests = new ArrayList<>(repetitions);
339             for (int i = 0; i < repetitions; i++) {
340                 requests.add(new HttpGet(requestURI));
341             }
342         }
343 
344         public WorkerThread(
345                 final CloseableHttpClient httpclient,
346                 final HttpHost target,
347                 final boolean forceClose,
348                 final List<ClassicHttpRequest> requests) {
349             super();
350             this.httpclient = httpclient;
351             this.target = target;
352             this.forceClose = forceClose;
353             this.requests = requests;
354         }
355 
356         @Override
357         public void run() {
358             try {
359                 for (final ClassicHttpRequest request : requests) {
360                     this.httpclient.execute(this.target, request, response -> {
361                         if (this.forceClose) {
362                             response.close();
363                         } else {
364                             EntityUtils.consume(response.getEntity());
365                         }
366                         return null;
367                     });
368                 }
369             } catch (final Exception ex) {
370                 this.exception = ex;
371             }
372         }
373 
374         public Exception getException() {
375             return exception;
376         }
377 
378     }
379 
380     // A very basic keep-alive header interceptor, to add Keep-Alive: timeout=1
381     // if there is no Connection: close header.
382     private static class ResponseKeepAlive implements HttpResponseInterceptor {
383         @Override
384         public void process(
385                 final HttpResponse response,
386                 final EntityDetails entityDetails,
387                 final HttpContext context) throws HttpException, IOException {
388             final Header connection = response.getFirstHeader(HttpHeaders.CONNECTION);
389             if(connection != null) {
390                 if(!connection.getValue().equalsIgnoreCase("Close")) {
391                     response.addHeader(HeaderElements.KEEP_ALIVE, "timeout=1");
392                 }
393             }
394         }
395     }
396 
397 }