View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.testing.sync;
29  
30  import java.io.ByteArrayInputStream;
31  import java.io.IOException;
32  import java.net.URI;
33  import java.nio.charset.StandardCharsets;
34  import java.util.ArrayList;
35  import java.util.List;
36  
37  import org.apache.hc.client5.http.classic.methods.HttpGet;
38  import org.apache.hc.client5.http.classic.methods.HttpPost;
39  import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
40  import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
41  import org.apache.hc.client5.testing.classic.RandomHandler;
42  import org.apache.hc.client5.testing.extension.sync.ClientProtocolLevel;
43  import org.apache.hc.client5.testing.extension.sync.TestClient;
44  import org.apache.hc.core5.http.ClassicHttpRequest;
45  import org.apache.hc.core5.http.ContentType;
46  import org.apache.hc.core5.http.EntityDetails;
47  import org.apache.hc.core5.http.Header;
48  import org.apache.hc.core5.http.HeaderElements;
49  import org.apache.hc.core5.http.HttpException;
50  import org.apache.hc.core5.http.HttpHeaders;
51  import org.apache.hc.core5.http.HttpHost;
52  import org.apache.hc.core5.http.HttpResponse;
53  import org.apache.hc.core5.http.HttpResponseInterceptor;
54  import org.apache.hc.core5.http.URIScheme;
55  import org.apache.hc.core5.http.impl.HttpProcessors;
56  import org.apache.hc.core5.http.io.entity.EntityUtils;
57  import org.apache.hc.core5.http.io.entity.InputStreamEntity;
58  import org.apache.hc.core5.http.protocol.HttpContext;
59  import org.junit.jupiter.api.Assertions;
60  import org.junit.jupiter.api.Test;
61  
62  public class TestConnectionReuse extends AbstractIntegrationTestBase {
63  
64      public TestConnectionReuse() {
65          super(URIScheme.HTTP, ClientProtocolLevel.STANDARD);
66      }
67  
68      @Test
69      public void testReuseOfPersistentConnections() throws Exception {
70          configureServer(bootstrap -> bootstrap
71                  .register("/random/*", new RandomHandler()));
72          final HttpHost target = startServer();
73  
74          final TestClient client = client();
75          final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
76          connManager.setMaxTotal(5);
77          connManager.setDefaultMaxPerRoute(5);
78  
79          final WorkerThread[] workers = new WorkerThread[10];
80          for (int i = 0; i < workers.length; i++) {
81              workers[i] = new WorkerThread(
82                      client,
83                      target,
84                      new URI("/random/2000"),
85                      10, false);
86          }
87  
88          for (final WorkerThread worker : workers) {
89              worker.start();
90          }
91          for (final WorkerThread worker : workers) {
92              worker.join(10000);
93              final Exception ex = worker.getException();
94              if (ex != null) {
95                  throw ex;
96              }
97          }
98  
99          // Expect leased connections to be returned
100         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
101         // Expect some connection in the pool
102         Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
103     }
104 
105     @Test
106     public void testReuseOfPersistentConnectionsWithStreamedRequestAndResponse() throws Exception {
107         configureServer(bootstrap -> bootstrap
108                 .register("/random/*", new RandomHandler()));
109         final HttpHost target = startServer();
110 
111         final TestClient client = client();
112         final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
113         connManager.setMaxTotal(5);
114         connManager.setDefaultMaxPerRoute(5);
115 
116         final WorkerThread[] workers = new WorkerThread[10];
117         for (int i = 0; i < workers.length; i++) {
118             final List<ClassicHttpRequest> requests = new ArrayList<>();
119             for (int j = 0; j < 10; j++) {
120                 final HttpPost post = new HttpPost(new URI("/random/2000"));
121                 // non-repeatable
122                 post.setEntity(new InputStreamEntity(
123                         new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)),
124                         ContentType.APPLICATION_OCTET_STREAM));
125                 requests.add(post);
126             }
127             workers[i] = new WorkerThread(client, target, false, requests);
128         }
129 
130         for (final WorkerThread worker : workers) {
131             worker.start();
132         }
133         for (final WorkerThread worker : workers) {
134             worker.join(10000);
135             final Exception ex = worker.getException();
136             if (ex != null) {
137                 throw ex;
138             }
139         }
140 
141         // Expect leased connections to be returned
142         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
143         // Expect some connection in the pool
144         Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
145     }
146 
147     private static class AlwaysCloseConn implements HttpResponseInterceptor {
148 
149         @Override
150         public void process(
151                 final HttpResponse response,
152                 final EntityDetails entityDetails,
153                 final HttpContext context) throws HttpException, IOException {
154             response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
155         }
156 
157     }
158 
159     @Test
160     public void testReuseOfClosedConnections() throws Exception {
161         configureServer(bootstrap -> bootstrap
162                 .setHttpProcessor(HttpProcessors.customServer(null)
163                         .add(new AlwaysCloseConn())
164                         .build()));
165         final HttpHost target = startServer();
166 
167         final TestClient client = client();
168         final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
169         connManager.setMaxTotal(5);
170         connManager.setDefaultMaxPerRoute(5);
171 
172         final WorkerThread[] workers = new WorkerThread[10];
173         for (int i = 0; i < workers.length; i++) {
174             workers[i] = new WorkerThread(
175                     client,
176                     target,
177                     new URI("/random/2000"),
178                     10, false);
179         }
180 
181         for (final WorkerThread worker : workers) {
182             worker.start();
183         }
184         for (final WorkerThread worker : workers) {
185             worker.join(10000);
186             final Exception ex = worker.getException();
187             if (ex != null) {
188                 throw ex;
189             }
190         }
191 
192         // Expect leased connections to be returned
193         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
194         // Expect zero connections in the pool
195         Assertions.assertEquals(0, connManager.getTotalStats().getAvailable());
196     }
197 
198     @Test
199     public void testReuseOfAbortedConnections() throws Exception {
200         configureServer(bootstrap -> bootstrap
201                 .register("/random/*", new RandomHandler()));
202         final HttpHost target = startServer();
203 
204         final TestClient client = client();
205         final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
206         connManager.setMaxTotal(5);
207         connManager.setDefaultMaxPerRoute(5);
208 
209         final WorkerThread[] workers = new WorkerThread[10];
210         for (int i = 0; i < workers.length; i++) {
211             workers[i] = new WorkerThread(
212                     client,
213                     target,
214                     new URI("/random/2000"),
215                     10, true);
216         }
217 
218         for (final WorkerThread worker : workers) {
219             worker.start();
220         }
221         for (final WorkerThread worker : workers) {
222             worker.join(10000);
223             final Exception ex = worker.getException();
224             if (ex != null) {
225                 throw ex;
226             }
227         }
228 
229         // Expect leased connections to be returned
230         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
231         // Expect some connections in the pool
232         Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
233     }
234 
235     @Test
236     public void testKeepAliveHeaderRespected() throws Exception {
237         configureServer(bootstrap -> bootstrap
238                 .setHttpProcessor(HttpProcessors.customServer(null)
239                         .add(new ResponseKeepAlive())
240                         .build())
241                 .register("/random/*", new RandomHandler()));
242         final HttpHost target = startServer();
243 
244         final TestClient client = client();
245         final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
246         connManager.setMaxTotal(1);
247         connManager.setDefaultMaxPerRoute(1);
248 
249         client.execute(target, new HttpGet("/random/2000"), response -> {
250             EntityUtils.consume(response.getEntity());
251             return null;
252         });
253 
254         Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
255 
256         client.execute(target, new HttpGet("/random/2000"), response -> {
257             EntityUtils.consume(response.getEntity());
258             return null;
259         });
260 
261         Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
262 
263         // Now sleep for 1.1 seconds and let the timeout do its work
264         Thread.sleep(1100);
265         client.execute(target, new HttpGet("/random/2000"), response -> {
266             EntityUtils.consume(response.getEntity());
267             return null;
268         });
269 
270         Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
271 
272         // Do another request just under the 1 second limit & make
273         // sure we reuse that connection.
274         Thread.sleep(500);
275         client.execute(target, new HttpGet("/random/2000"), response -> {
276             EntityUtils.consume(response.getEntity());
277             return null;
278         });
279 
280         // Expect leased connections to be returned
281         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
282         Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
283     }
284 
285     private static class WorkerThread extends Thread {
286 
287         private final HttpHost target;
288         private final CloseableHttpClient httpclient;
289         private final boolean forceClose;
290         private final List<ClassicHttpRequest> requests;
291 
292         private volatile Exception exception;
293 
294         public WorkerThread(
295                 final CloseableHttpClient httpclient,
296                 final HttpHost target,
297                 final URI requestURI,
298                 final int repetitions,
299                 final boolean forceClose) {
300             super();
301             this.httpclient = httpclient;
302             this.target = target;
303             this.forceClose = forceClose;
304             this.requests = new ArrayList<>(repetitions);
305             for (int i = 0; i < repetitions; i++) {
306                 requests.add(new HttpGet(requestURI));
307             }
308         }
309 
310         public WorkerThread(
311                 final CloseableHttpClient httpclient,
312                 final HttpHost target,
313                 final boolean forceClose,
314                 final List<ClassicHttpRequest> requests) {
315             super();
316             this.httpclient = httpclient;
317             this.target = target;
318             this.forceClose = forceClose;
319             this.requests = requests;
320         }
321 
322         @Override
323         public void run() {
324             try {
325                 for (final ClassicHttpRequest request : requests) {
326                     this.httpclient.execute(this.target, request, response -> {
327                         if (this.forceClose) {
328                             response.close();
329                         } else {
330                             EntityUtils.consume(response.getEntity());
331                         }
332                         return null;
333                     });
334                 }
335             } catch (final Exception ex) {
336                 this.exception = ex;
337             }
338         }
339 
340         public Exception getException() {
341             return exception;
342         }
343 
344     }
345 
346     // A very basic keep-alive header interceptor, to add Keep-Alive: timeout=1
347     // if there is no Connection: close header.
348     private static class ResponseKeepAlive implements HttpResponseInterceptor {
349         @Override
350         public void process(
351                 final HttpResponse response,
352                 final EntityDetails entityDetails,
353                 final HttpContext context) throws HttpException, IOException {
354             final Header connection = response.getFirstHeader(HttpHeaders.CONNECTION);
355             if(connection != null) {
356                 if(!connection.getValue().equalsIgnoreCase("Close")) {
357                     response.addHeader(HeaderElements.KEEP_ALIVE, "timeout=1");
358                 }
359             }
360         }
361     }
362 
363 }