View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.testing.sync;
29  
30  import java.io.ByteArrayInputStream;
31  import java.io.IOException;
32  import java.net.URI;
33  import java.nio.charset.StandardCharsets;
34  import java.util.ArrayList;
35  import java.util.List;
36  
37  import org.apache.hc.client5.http.classic.methods.HttpGet;
38  import org.apache.hc.client5.http.classic.methods.HttpPost;
39  import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
40  import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
41  import org.apache.hc.core5.http.ClassicHttpResponse;
42  import org.apache.hc.core5.http.ContentType;
43  import org.apache.hc.core5.http.EntityDetails;
44  import org.apache.hc.core5.http.Header;
45  import org.apache.hc.core5.http.HeaderElements;
46  import org.apache.hc.core5.http.HttpException;
47  import org.apache.hc.core5.http.HttpHeaders;
48  import org.apache.hc.core5.http.HttpHost;
49  import org.apache.hc.core5.http.HttpResponse;
50  import org.apache.hc.core5.http.HttpResponseInterceptor;
51  import org.apache.hc.core5.http.impl.HttpProcessors;
52  import org.apache.hc.core5.http.io.entity.EntityUtils;
53  import org.apache.hc.core5.http.io.entity.InputStreamEntity;
54  import org.apache.hc.core5.http.protocol.HttpContext;
55  import org.apache.hc.core5.http.protocol.HttpProcessor;
56  import org.junit.Assert;
57  import org.junit.Test;
58  
59  public class TestConnectionReuse extends LocalServerTestBase {
60  
61      @Test
62      public void testReuseOfPersistentConnections() throws Exception {
63          this.connManager.setMaxTotal(5);
64          this.connManager.setDefaultMaxPerRoute(5);
65  
66          final HttpHost target = start();
67  
68          final WorkerThread[] workers = new WorkerThread[10];
69          for (int i = 0; i < workers.length; i++) {
70              workers[i] = new WorkerThread(
71                      this.httpclient,
72                      target,
73                      new URI("/random/2000"),
74                      10, false);
75          }
76  
77          for (final WorkerThread worker : workers) {
78              worker.start();
79          }
80          for (final WorkerThread worker : workers) {
81              worker.join(10000);
82              final Exception ex = worker.getException();
83              if (ex != null) {
84                  throw ex;
85              }
86          }
87  
88          // Expect some connection in the pool
89          Assert.assertTrue(this.connManager.getTotalStats().getAvailable() > 0);
90      }
91  
92      @Test
93      public void testReuseOfPersistentConnectionsWithStreamedRequestAndResponse() throws Exception {
94          this.connManager.setMaxTotal(5);
95          this.connManager.setDefaultMaxPerRoute(5);
96  
97          final HttpHost target = start();
98  
99          final WorkerThread[] workers = new WorkerThread[10];
100         for (int i = 0; i < workers.length; i++) {
101             final List<HttpUriRequestBase> requests = new ArrayList<>();
102             for (int j = 0; j < 10; j++) {
103                 final HttpPost post = new HttpPost(new URI("/random/2000"));
104                 // non-repeatable
105                 post.setEntity(new InputStreamEntity(
106                         new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)),
107                         ContentType.APPLICATION_OCTET_STREAM));
108                 requests.add(post);
109             }
110             workers[i] = new WorkerThread(this.httpclient, target, false, requests);
111         }
112 
113         for (final WorkerThread worker : workers) {
114             worker.start();
115         }
116         for (final WorkerThread worker : workers) {
117             worker.join(10000);
118             final Exception ex = worker.getException();
119             if (ex != null) {
120                 throw ex;
121             }
122         }
123 
124         // Expect some connection in the pool
125         Assert.assertTrue(this.connManager.getTotalStats().getAvailable() > 0);
126     }
127 
128     private static class AlwaysCloseConn implements HttpResponseInterceptor {
129 
130         @Override
131         public void process(
132                 final HttpResponse response,
133                 final EntityDetails entityDetails,
134                 final HttpContext context) throws HttpException, IOException {
135             response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
136         }
137 
138     }
139 
140     @Test
141     public void testReuseOfClosedConnections() throws Exception {
142         this.connManager.setMaxTotal(5);
143         this.connManager.setDefaultMaxPerRoute(5);
144 
145         final HttpProcessor httpproc = HttpProcessors.customServer(null)
146                 .add(new AlwaysCloseConn())
147                 .build();
148         final HttpHost target = start(httpproc, null);
149 
150         final WorkerThread[] workers = new WorkerThread[10];
151         for (int i = 0; i < workers.length; i++) {
152             workers[i] = new WorkerThread(
153                     this.httpclient,
154                     target,
155                     new URI("/random/2000"),
156                     10, false);
157         }
158 
159         for (final WorkerThread worker : workers) {
160             worker.start();
161         }
162         for (final WorkerThread worker : workers) {
163             worker.join(10000);
164             final Exception ex = worker.getException();
165             if (ex != null) {
166                 throw ex;
167             }
168         }
169 
170         // Expect zero connections in the pool
171         Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
172     }
173 
174     @Test
175     public void testReuseOfAbortedConnections() throws Exception {
176         this.connManager.setMaxTotal(5);
177         this.connManager.setDefaultMaxPerRoute(5);
178 
179         final HttpHost target = start();
180 
181         final WorkerThread[] workers = new WorkerThread[10];
182         for (int i = 0; i < workers.length; i++) {
183             workers[i] = new WorkerThread(
184                     this.httpclient,
185                     target,
186                     new URI("/random/2000"),
187                     10, true);
188         }
189 
190         for (final WorkerThread worker : workers) {
191             worker.start();
192         }
193         for (final WorkerThread worker : workers) {
194             worker.join(10000);
195             final Exception ex = worker.getException();
196             if (ex != null) {
197                 throw ex;
198             }
199         }
200 
201         // Expect zero connections in the pool
202         Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
203     }
204 
205     @Test
206     public void testKeepAliveHeaderRespected() throws Exception {
207         this.connManager.setMaxTotal(1);
208         this.connManager.setDefaultMaxPerRoute(1);
209 
210         final HttpProcessor httpproc = HttpProcessors.customServer(null)
211                 .add(new ResponseKeepAlive())
212                 .build();
213         final HttpHost target = start(httpproc, null);
214 
215         ClassicHttpResponse response = this.httpclient.execute(target, new HttpGet("/random/2000"));
216         EntityUtils.consume(response.getEntity());
217 
218         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
219 
220         response = this.httpclient.execute(target, new HttpGet("/random/2000"));
221         EntityUtils.consume(response.getEntity());
222 
223         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
224 
225         // Now sleep for 1.1 seconds and let the timeout do its work
226         Thread.sleep(1100);
227         response = this.httpclient.execute(target, new HttpGet("/random/2000"));
228         EntityUtils.consume(response.getEntity());
229 
230         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
231 
232         // Do another request just under the 1 second limit & make
233         // sure we reuse that connection.
234         Thread.sleep(500);
235         response = this.httpclient.execute(target, new HttpGet("/random/2000"));
236         EntityUtils.consume(response.getEntity());
237 
238         Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
239     }
240 
241     private static class WorkerThread extends Thread {
242 
243         private final HttpHost target;
244         private final CloseableHttpClient httpclient;
245         private final boolean forceClose;
246         private final List<HttpUriRequestBase> requests;
247 
248         private volatile Exception exception;
249 
250         public WorkerThread(
251                 final CloseableHttpClient httpclient,
252                 final HttpHost target,
253                 final URI requestURI,
254                 final int repetitions,
255                 final boolean forceClose) {
256             super();
257             this.httpclient = httpclient;
258             this.target = target;
259             this.forceClose = forceClose;
260             this.requests = new ArrayList<>(repetitions);
261             for (int i = 0; i < repetitions; i++) {
262                 requests.add(new HttpGet(requestURI));
263             }
264         }
265 
266         public WorkerThread(
267                 final CloseableHttpClient httpclient,
268                 final HttpHost target,
269                 final boolean forceClose,
270                 final List<HttpUriRequestBase> requests) {
271             super();
272             this.httpclient = httpclient;
273             this.target = target;
274             this.forceClose = forceClose;
275             this.requests = requests;
276         }
277 
278         @Override
279         public void run() {
280             try {
281                 for (final HttpUriRequestBase request : requests) {
282                     final ClassicHttpResponse response = this.httpclient.execute(
283                             this.target,
284                             request);
285                     if (this.forceClose) {
286                         request.cancel();
287                     } else {
288                         EntityUtils.consume(response.getEntity());
289                     }
290                 }
291             } catch (final Exception ex) {
292                 this.exception = ex;
293             }
294         }
295 
296         public Exception getException() {
297             return exception;
298         }
299 
300     }
301 
302     // A very basic keep-alive header interceptor, to add Keep-Alive: timeout=1
303     // if there is no Connection: close header.
304     private static class ResponseKeepAlive implements HttpResponseInterceptor {
305         @Override
306         public void process(
307                 final HttpResponse response,
308                 final EntityDetails entityDetails,
309                 final HttpContext context) throws HttpException, IOException {
310             final Header connection = response.getFirstHeader(HttpHeaders.CONNECTION);
311             if(connection != null) {
312                 if(!connection.getValue().equalsIgnoreCase("Close")) {
313                     response.addHeader(HeaderElements.KEEP_ALIVE, "timeout=1");
314                 }
315             }
316         }
317     }
318 
319 }