View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.testing.async;
28  
29  import java.util.concurrent.Future;
30  
31  import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
32  import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
33  import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
34  import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
35  import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
36  import org.apache.hc.client5.http.protocol.HttpClientContext;
37  import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
38  import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
39  import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
40  import org.apache.hc.core5.http.ContentType;
41  import org.apache.hc.core5.http.EndpointDetails;
42  import org.apache.hc.core5.http.HttpException;
43  import org.apache.hc.core5.http.HttpHost;
44  import org.apache.hc.core5.http.HttpResponse;
45  import org.apache.hc.core5.http.HttpStatus;
46  import org.apache.hc.core5.http.URIScheme;
47  import org.apache.hc.core5.http.protocol.HttpContext;
48  import org.apache.hc.core5.http.protocol.HttpCoreContext;
49  import org.apache.hc.core5.net.URIAuthority;
50  import org.junit.jupiter.api.Assertions;
51  import org.junit.jupiter.api.Test;
52  
53  public class TestHttp1AsyncStatefulConnManagement extends AbstractIntegrationTestBase {
54  
55      public TestHttp1AsyncStatefulConnManagement() {
56          super(URIScheme.HTTP, ClientProtocolLevel.STANDARD, ServerProtocolLevel.STANDARD);
57      }
58  
59      @Test
60      public void testStatefulConnections() throws Exception {
61          configureServer(bootstrap -> bootstrap.register("*", () -> new AbstractSimpleServerExchangeHandler() {
62  
63              @Override
64              protected SimpleHttpResponse handle(
65                      final SimpleHttpRequest request,
66                      final HttpCoreContext context) throws HttpException {
67                  final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
68                  response.setBody("Whatever", ContentType.TEXT_PLAIN);
69                  return response;
70              }
71          }));
72          final HttpHost target = startServer();
73  
74          configureClient(builder -> builder
75                  .setUserTokenHandler((route, context) -> context.getAttribute("user")));
76  
77          final TestAsyncClient client = startClient();
78  
79          final int workerCount = 2;
80          final int requestCount = 5;
81  
82          final HttpContext[] contexts = new HttpContext[workerCount];
83          final HttpWorker[] workers = new HttpWorker[workerCount];
84          for (int i = 0; i < contexts.length; i++) {
85              final HttpClientContext context = HttpClientContext.create();
86              contexts[i] = context;
87              workers[i] = new HttpWorker(
88                      "user" + i,
89                      context, requestCount, target, client);
90          }
91  
92          for (final HttpWorker worker : workers) {
93              worker.start();
94          }
95          for (final HttpWorker worker : workers) {
96              worker.join(TIMEOUT.toMilliseconds());
97          }
98          for (final HttpWorker worker : workers) {
99              final Exception ex = worker.getException();
100             if (ex != null) {
101                 throw ex;
102             }
103             Assertions.assertEquals(requestCount, worker.getCount());
104         }
105 
106         for (final HttpContext context : contexts) {
107             final String state0 = (String) context.getAttribute("r0");
108             Assertions.assertNotNull(state0);
109             for (int r = 1; r < requestCount; r++) {
110                 Assertions.assertEquals(state0, context.getAttribute("r" + r));
111             }
112         }
113 
114     }
115 
116     static class HttpWorker extends Thread {
117 
118         private final String uid;
119         private final HttpClientContext context;
120         private final int requestCount;
121         private final HttpHost target;
122         private final CloseableHttpAsyncClient httpclient;
123 
124         private volatile Exception exception;
125         private volatile int count;
126 
127         public HttpWorker(
128                 final String uid,
129                 final HttpClientContext context,
130                 final int requestCount,
131                 final HttpHost target,
132                 final CloseableHttpAsyncClient httpclient) {
133             super();
134             this.uid = uid;
135             this.context = context;
136             this.requestCount = requestCount;
137             this.target = target;
138             this.httpclient = httpclient;
139             this.count = 0;
140         }
141 
142         public int getCount() {
143             return count;
144         }
145 
146         public Exception getException() {
147             return exception;
148         }
149 
150         @Override
151         public void run() {
152             try {
153                 context.setAttribute("user", uid);
154                 for (int r = 0; r < requestCount; r++) {
155                     final SimpleHttpRequest request = SimpleRequestBuilder.get()
156                             .setHttpHost(target)
157                             .setPath("/")
158                             .build();
159                     final Future<SimpleHttpResponse> future = httpclient.execute(request, null);
160                     future.get();
161 
162                     count++;
163                     final EndpointDetails endpointDetails = context.getEndpointDetails();
164                     final String connuid = Integer.toHexString(System.identityHashCode(endpointDetails));
165                     context.setAttribute("r" + r, connuid);
166                 }
167 
168             } catch (final Exception ex) {
169                 exception = ex;
170             }
171         }
172 
173     }
174 
175     @Test
176     public void testRouteSpecificPoolRecylcing() throws Exception {
177         configureServer(bootstrap -> bootstrap.register("*", () -> new AbstractSimpleServerExchangeHandler() {
178 
179             @Override
180             protected SimpleHttpResponse handle(
181                     final SimpleHttpRequest request,
182                     final HttpCoreContext context) throws HttpException {
183                 final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
184                 response.setBody("Whatever", ContentType.TEXT_PLAIN);
185                 return response;
186             }
187         }));
188         final HttpHost target = startServer();
189 
190         // This tests what happens when a maxed connection pool needs
191         // to kill the last idle connection to a route to build a new
192         // one to the same route.
193 
194         configureClient(builder -> builder
195                 .setUserTokenHandler((route, context) -> context.getAttribute("user")));
196 
197         final TestAsyncClient client = startClient();
198 
199         final PoolingAsyncClientConnectionManager connManager = client.getConnectionManager();
200 
201         final int maxConn = 2;
202         // We build a client with 2 max active // connections, and 2 max per route.
203         connManager.setMaxTotal(maxConn);
204         connManager.setDefaultMaxPerRoute(maxConn);
205 
206         // Bottom of the pool : a *keep alive* connection to Route 1.
207         final HttpContext context1 = HttpClientContext.create();
208         context1.setAttribute("user", "stuff");
209 
210         final SimpleHttpRequest request1 = SimpleRequestBuilder.get()
211                 .setHttpHost(target)
212                 .setPath("/")
213                 .build();
214         final Future<SimpleHttpResponse> future1 = client.execute(request1, context1, null);
215         final HttpResponse response1 = future1.get();
216         Assertions.assertNotNull(response1);
217         Assertions.assertEquals(200, response1.getCode());
218 
219         // The ConnPoolByRoute now has 1 free connection, out of 2 max
220         // The ConnPoolByRoute has one RouteSpcfcPool, that has one free connection
221         // for [localhost][stuff]
222 
223         Thread.sleep(100);
224 
225         // Send a very simple HTTP get (it MUST be simple, no auth, no proxy, no 302, no 401, ...)
226         // Send it to another route. Must be a keepalive.
227         final HttpContext context2 = HttpClientContext.create();
228 
229         final SimpleHttpRequest request2 = SimpleRequestBuilder.get()
230                 .setScheme(target.getSchemeName())
231                 .setAuthority(new URIAuthority("127.0.0.1", target.getPort()))
232                 .setPath("/")
233                 .build();
234         final Future<SimpleHttpResponse> future2 = client.execute(request2, context2, null);
235         final HttpResponse response2 = future2.get();
236         Assertions.assertNotNull(response2);
237         Assertions.assertEquals(200, response2.getCode());
238 
239         // ConnPoolByRoute now has 2 free connexions, out of its 2 max.
240         // The [localhost][stuff] RouteSpcfcPool is the same as earlier
241         // And there is a [127.0.0.1][null] pool with 1 free connection
242 
243         Thread.sleep(100);
244 
245         // This will put the ConnPoolByRoute to the targeted state :
246         // [localhost][stuff] will not get reused because this call is [localhost][null]
247         // So the ConnPoolByRoute will need to kill one connection (it is maxed out globally).
248         // The killed conn is the oldest, which means the first HTTPGet ([localhost][stuff]).
249         // When this happens, the RouteSpecificPool becomes empty.
250         final HttpContext context3 = HttpClientContext.create();
251 
252         final SimpleHttpRequest request3 = SimpleRequestBuilder.get()
253                 .setHttpHost(target)
254                 .setPath("/")
255                 .build();
256         final Future<SimpleHttpResponse> future3 = client.execute(request3, context3, null);
257         final HttpResponse response3 = future3.get();
258         Assertions.assertNotNull(response3);
259         Assertions.assertEquals(200, response3.getCode());
260     }
261 
262 }