1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import java.util.concurrent.Future;
30
31 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
32 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
33 import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
34 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
35 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
36 import org.apache.hc.client5.http.protocol.HttpClientContext;
37 import org.apache.hc.core5.http.ContentType;
38 import org.apache.hc.core5.http.EndpointDetails;
39 import org.apache.hc.core5.http.HttpException;
40 import org.apache.hc.core5.http.HttpHost;
41 import org.apache.hc.core5.http.HttpResponse;
42 import org.apache.hc.core5.http.HttpStatus;
43 import org.apache.hc.core5.http.URIScheme;
44 import org.apache.hc.core5.http.config.Http1Config;
45 import org.apache.hc.core5.http.protocol.HttpContext;
46 import org.apache.hc.core5.http.protocol.HttpCoreContext;
47 import org.apache.hc.core5.net.URIAuthority;
48 import org.apache.hc.core5.testing.nio.H2TestServer;
49 import org.junit.jupiter.api.Assertions;
50 import org.junit.jupiter.api.Test;
51
52 public class TestHttp1AsyncStatefulConnManagement extends AbstractIntegrationTestBase {
53
54 public TestHttp1AsyncStatefulConnManagement() {
55 super(URIScheme.HTTP);
56 }
57
58 protected H2TestServer startServer() throws Exception {
59 return startServer(Http1Config.DEFAULT, null, null);
60 }
61
62 @Test
63 public void testStatefulConnections() throws Exception {
64 final H2TestServer server = startServer();
65 server.register("*", () -> new AbstractSimpleServerExchangeHandler() {
66
67 @Override
68 protected SimpleHttpResponse handle(
69 final SimpleHttpRequest request,
70 final HttpCoreContext context) throws HttpException {
71 final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
72 response.setBody("Whatever", ContentType.TEXT_PLAIN);
73 return response;
74 }
75 });
76
77 final HttpHost target = targetHost();
78
79 final CloseableHttpAsyncClient client = startClient(builer -> builer
80 .setUserTokenHandler((route, context) -> context.getAttribute("user")));
81
82 final int workerCount = 2;
83 final int requestCount = 5;
84
85 final HttpContext[] contexts = new HttpContext[workerCount];
86 final HttpWorker[] workers = new HttpWorker[workerCount];
87 for (int i = 0; i < contexts.length; i++) {
88 final HttpClientContext context = HttpClientContext.create();
89 contexts[i] = context;
90 workers[i] = new HttpWorker(
91 "user" + i,
92 context, requestCount, target, client);
93 }
94
95 for (final HttpWorker worker : workers) {
96 worker.start();
97 }
98 for (final HttpWorker worker : workers) {
99 worker.join(TIMEOUT.toMilliseconds());
100 }
101 for (final HttpWorker worker : workers) {
102 final Exception ex = worker.getException();
103 if (ex != null) {
104 throw ex;
105 }
106 Assertions.assertEquals(requestCount, worker.getCount());
107 }
108
109 for (final HttpContext context : contexts) {
110 final String state0 = (String) context.getAttribute("r0");
111 Assertions.assertNotNull(state0);
112 for (int r = 1; r < requestCount; r++) {
113 Assertions.assertEquals(state0, context.getAttribute("r" + r));
114 }
115 }
116
117 }
118
119 static class HttpWorker extends Thread {
120
121 private final String uid;
122 private final HttpClientContext context;
123 private final int requestCount;
124 private final HttpHost target;
125 private final CloseableHttpAsyncClient httpclient;
126
127 private volatile Exception exception;
128 private volatile int count;
129
130 public HttpWorker(
131 final String uid,
132 final HttpClientContext context,
133 final int requestCount,
134 final HttpHost target,
135 final CloseableHttpAsyncClient httpclient) {
136 super();
137 this.uid = uid;
138 this.context = context;
139 this.requestCount = requestCount;
140 this.target = target;
141 this.httpclient = httpclient;
142 this.count = 0;
143 }
144
145 public int getCount() {
146 return count;
147 }
148
149 public Exception getException() {
150 return exception;
151 }
152
153 @Override
154 public void run() {
155 try {
156 context.setAttribute("user", uid);
157 for (int r = 0; r < requestCount; r++) {
158 final SimpleHttpRequest request = SimpleRequestBuilder.get()
159 .setHttpHost(target)
160 .setPath("/")
161 .build();
162 final Future<SimpleHttpResponse> future = httpclient.execute(request, null);
163 future.get();
164
165 count++;
166 final EndpointDetails endpointDetails = context.getEndpointDetails();
167 final String connuid = Integer.toHexString(System.identityHashCode(endpointDetails));
168 context.setAttribute("r" + r, connuid);
169 }
170
171 } catch (final Exception ex) {
172 exception = ex;
173 }
174 }
175
176 }
177
178 @Test
179 public void testRouteSpecificPoolRecylcing() throws Exception {
180 final H2TestServer server = startServer();
181 server.register("*", () -> new AbstractSimpleServerExchangeHandler() {
182
183 @Override
184 protected SimpleHttpResponse handle(
185 final SimpleHttpRequest request,
186 final HttpCoreContext context) throws HttpException {
187 final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
188 response.setBody("Whatever", ContentType.TEXT_PLAIN);
189 return response;
190 }
191 });
192
193
194
195
196
197 final HttpHost target = targetHost();
198
199 final CloseableHttpAsyncClient client = startClient(builer -> builer
200 .setUserTokenHandler((route, context) -> context.getAttribute("user")));
201 final PoolingAsyncClientConnectionManager connManager = connManager();
202
203 final int maxConn = 2;
204
205 connManager.setMaxTotal(maxConn);
206 connManager.setDefaultMaxPerRoute(maxConn);
207
208
209 final HttpContext context1 = HttpClientContext.create();
210 context1.setAttribute("user", "stuff");
211
212 final SimpleHttpRequest request1 = SimpleRequestBuilder.get()
213 .setHttpHost(target)
214 .setPath("/")
215 .build();
216 final Future<SimpleHttpResponse> future1 = client.execute(request1, context1, null);
217 final HttpResponse response1 = future1.get();
218 Assertions.assertNotNull(response1);
219 Assertions.assertEquals(200, response1.getCode());
220
221
222
223
224
225 Thread.sleep(100);
226
227
228
229 final HttpContext context2 = HttpClientContext.create();
230
231 final SimpleHttpRequest request2 = SimpleRequestBuilder.get()
232 .setScheme(target.getSchemeName())
233 .setAuthority(new URIAuthority("127.0.0.1", target.getPort()))
234 .setPath("/")
235 .build();
236 final Future<SimpleHttpResponse> future2 = client.execute(request2, context2, null);
237 final HttpResponse response2 = future2.get();
238 Assertions.assertNotNull(response2);
239 Assertions.assertEquals(200, response2.getCode());
240
241
242
243
244
245 Thread.sleep(100);
246
247
248
249
250
251
252 final HttpContext context3 = HttpClientContext.create();
253
254 final SimpleHttpRequest request3 = SimpleRequestBuilder.get()
255 .setHttpHost(target)
256 .setPath("/")
257 .build();
258 final Future<SimpleHttpResponse> future3 = client.execute(request3, context3, null);
259 final HttpResponse response3 = future3.get();
260 Assertions.assertNotNull(response3);
261 Assertions.assertEquals(200, response3.getCode());
262 }
263
264 }