1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import java.util.concurrent.Future;
30
31 import org.apache.hc.client5.http.HttpRoute;
32 import org.apache.hc.client5.http.UserTokenHandler;
33 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
34 import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
35 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
36 import org.apache.hc.client5.http.config.RequestConfig;
37 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
38 import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
39 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
40 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
41 import org.apache.hc.client5.http.protocol.HttpClientContext;
42 import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
43 import org.apache.hc.client5.testing.SSLTestContexts;
44 import org.apache.hc.core5.function.Supplier;
45 import org.apache.hc.core5.http.ContentType;
46 import org.apache.hc.core5.http.EndpointDetails;
47 import org.apache.hc.core5.http.HttpException;
48 import org.apache.hc.core5.http.HttpHost;
49 import org.apache.hc.core5.http.HttpResponse;
50 import org.apache.hc.core5.http.HttpStatus;
51 import org.apache.hc.core5.http.config.Http1Config;
52 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
53 import org.apache.hc.core5.http.protocol.BasicHttpContext;
54 import org.apache.hc.core5.http.protocol.HttpContext;
55 import org.apache.hc.core5.http.protocol.HttpCoreContext;
56 import org.junit.Assert;
57 import org.junit.Rule;
58 import org.junit.Test;
59 import org.junit.rules.ExternalResource;
60
61 public class TestHttp1AsyncStatefulConnManagement extends AbstractIntegrationTestBase<CloseableHttpAsyncClient> {
62
63 protected HttpAsyncClientBuilder clientBuilder;
64 protected PoolingAsyncClientConnectionManager connManager;
65
66 @Rule
67 public ExternalResource connManagerResource = new ExternalResource() {
68
69 @Override
70 protected void before() throws Throwable {
71 connManager = PoolingAsyncClientConnectionManagerBuilder.create()
72 .setTlsStrategy(new DefaultClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
73 .build();
74 }
75
76 @Override
77 protected void after() {
78 if (connManager != null) {
79 connManager.close();
80 connManager = null;
81 }
82 }
83
84 };
85
86 @Rule
87 public ExternalResource clientBuilderResource = new ExternalResource() {
88
89 @Override
90 protected void before() throws Throwable {
91 clientBuilder = HttpAsyncClientBuilder.create()
92 .setDefaultRequestConfig(RequestConfig.custom()
93 .setConnectTimeout(TIMEOUT)
94 .setConnectionRequestTimeout(TIMEOUT)
95 .build())
96 .setConnectionManager(connManager);
97 }
98
99 };
100
101 @Override
102 protected CloseableHttpAsyncClient createClient() throws Exception {
103 return clientBuilder.build();
104 }
105
106 @Override
107 public HttpHost start() throws Exception {
108 return super.start(null, Http1Config.DEFAULT);
109 }
110
111 @Test
112 public void testStatefulConnections() throws Exception {
113 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
114
115 @Override
116 public AsyncServerExchangeHandler get() {
117 return new AbstractSimpleServerExchangeHandler() {
118
119 @Override
120 protected SimpleHttpResponse handle(
121 final SimpleHttpRequest request,
122 final HttpCoreContext context) throws HttpException {
123 final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
124 response.setBody("Whatever", ContentType.TEXT_PLAIN);
125 return response;
126 }
127 };
128 }
129
130 });
131
132 final UserTokenHandler userTokenHandler = new UserTokenHandler() {
133
134 @Override
135 public Object getUserToken(final HttpRoute route, final HttpContext context) {
136 return context.getAttribute("user");
137 }
138
139 };
140 clientBuilder.setUserTokenHandler(userTokenHandler);
141 final HttpHost target = start();
142
143 final int workerCount = 2;
144 final int requestCount = 5;
145
146 final HttpContext[] contexts = new HttpContext[workerCount];
147 final HttpWorker[] workers = new HttpWorker[workerCount];
148 for (int i = 0; i < contexts.length; i++) {
149 final HttpClientContext context = HttpClientContext.create();
150 contexts[i] = context;
151 workers[i] = new HttpWorker(
152 "user" + i,
153 context, requestCount, target, httpclient);
154 }
155
156 for (final HttpWorker worker : workers) {
157 worker.start();
158 }
159 for (final HttpWorker worker : workers) {
160 worker.join(LONG_TIMEOUT.toMilliseconds());
161 }
162 for (final HttpWorker worker : workers) {
163 final Exception ex = worker.getException();
164 if (ex != null) {
165 throw ex;
166 }
167 Assert.assertEquals(requestCount, worker.getCount());
168 }
169
170 for (final HttpContext context : contexts) {
171 final String state0 = (String) context.getAttribute("r0");
172 Assert.assertNotNull(state0);
173 for (int r = 1; r < requestCount; r++) {
174 Assert.assertEquals(state0, context.getAttribute("r" + r));
175 }
176 }
177
178 }
179
180 static class HttpWorker extends Thread {
181
182 private final String uid;
183 private final HttpClientContext context;
184 private final int requestCount;
185 private final HttpHost target;
186 private final CloseableHttpAsyncClient httpclient;
187
188 private volatile Exception exception;
189 private volatile int count;
190
191 public HttpWorker(
192 final String uid,
193 final HttpClientContext context,
194 final int requestCount,
195 final HttpHost target,
196 final CloseableHttpAsyncClient httpclient) {
197 super();
198 this.uid = uid;
199 this.context = context;
200 this.requestCount = requestCount;
201 this.target = target;
202 this.httpclient = httpclient;
203 this.count = 0;
204 }
205
206 public int getCount() {
207 return count;
208 }
209
210 public Exception getException() {
211 return exception;
212 }
213
214 @Override
215 public void run() {
216 try {
217 context.setAttribute("user", uid);
218 for (int r = 0; r < requestCount; r++) {
219 final SimpleHttpRequest httpget = SimpleHttpRequests.get(target, "/");
220 final Future<SimpleHttpResponse> future = httpclient.execute(httpget, null);
221 future.get();
222
223 count++;
224 final EndpointDetails endpointDetails = context.getEndpointDetails();
225 final String connuid = Integer.toHexString(System.identityHashCode(endpointDetails));
226 context.setAttribute("r" + r, connuid);
227 }
228
229 } catch (final Exception ex) {
230 exception = ex;
231 }
232 }
233
234 }
235
236 @Test
237 public void testRouteSpecificPoolRecylcing() throws Exception {
238 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
239
240 @Override
241 public AsyncServerExchangeHandler get() {
242 return new AbstractSimpleServerExchangeHandler() {
243
244 @Override
245 protected SimpleHttpResponse handle(
246 final SimpleHttpRequest request,
247 final HttpCoreContext context) throws HttpException {
248 final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
249 response.setBody("Whatever", ContentType.TEXT_PLAIN);
250 return response;
251 }
252 };
253 }
254
255 });
256
257
258
259
260 final UserTokenHandler userTokenHandler = new UserTokenHandler() {
261
262 @Override
263 public Object getUserToken(final HttpRoute route, final HttpContext context) {
264 return context.getAttribute("user");
265 }
266
267 };
268 clientBuilder.setUserTokenHandler(userTokenHandler);
269
270 final HttpHost target = start();
271 final int maxConn = 2;
272
273 connManager.setMaxTotal(maxConn);
274 connManager.setDefaultMaxPerRoute(maxConn);
275
276
277 final HttpContext context1 = new BasicHttpContext();
278 context1.setAttribute("user", "stuff");
279
280 final Future<SimpleHttpResponse> future1 = httpclient.execute(SimpleHttpRequests.get(target, "/"), context1, null);
281 final HttpResponse response1 = future1.get();
282 Assert.assertNotNull(response1);
283 Assert.assertEquals(200, response1.getCode());
284
285
286
287
288
289 Thread.sleep(100);
290
291
292
293 final HttpContext context2 = new BasicHttpContext();
294
295 final Future<SimpleHttpResponse> future2 = httpclient.execute(SimpleHttpRequests.get(
296 new HttpHost(target.getSchemeName(), "127.0.0.1", target.getPort()),"/"), context2, null);
297 final HttpResponse response2 = future2.get();
298 Assert.assertNotNull(response2);
299 Assert.assertEquals(200, response2.getCode());
300
301
302
303
304
305 Thread.sleep(100);
306
307
308
309
310
311
312 final HttpContext context3 = new BasicHttpContext();
313 final Future<SimpleHttpResponse> future3 = httpclient.execute(SimpleHttpRequests.get(target, "/"), context3, null);
314 final HttpResponse response3 = future3.get();
315 Assert.assertNotNull(response3);
316 Assert.assertEquals(200, response3.getCode());
317 }
318
319 }