1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import java.util.concurrent.Future;
30
31 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
32 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
33 import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
34 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
35 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
36 import org.apache.hc.client5.http.protocol.HttpClientContext;
37 import org.apache.hc.core5.http.ContentType;
38 import org.apache.hc.core5.http.EndpointDetails;
39 import org.apache.hc.core5.http.HttpException;
40 import org.apache.hc.core5.http.HttpHost;
41 import org.apache.hc.core5.http.HttpResponse;
42 import org.apache.hc.core5.http.HttpStatus;
43 import org.apache.hc.core5.http.URIScheme;
44 import org.apache.hc.core5.http.config.Http1Config;
45 import org.apache.hc.core5.http.protocol.BasicHttpContext;
46 import org.apache.hc.core5.http.protocol.HttpContext;
47 import org.apache.hc.core5.http.protocol.HttpCoreContext;
48 import org.apache.hc.core5.net.URIAuthority;
49 import org.apache.hc.core5.testing.nio.H2TestServer;
50 import org.junit.jupiter.api.Assertions;
51 import org.junit.jupiter.api.Test;
52
53 public class TestHttp1AsyncStatefulConnManagement extends AbstractIntegrationTestBase {
54
55 public TestHttp1AsyncStatefulConnManagement() {
56 super(URIScheme.HTTP);
57 }
58
59 protected H2TestServer startServer() throws Exception {
60 return startServer(Http1Config.DEFAULT, null, null);
61 }
62
63 @Test
64 public void testStatefulConnections() throws Exception {
65 final H2TestServer server = startServer();
66 server.register("*", () -> new AbstractSimpleServerExchangeHandler() {
67
68 @Override
69 protected SimpleHttpResponse handle(
70 final SimpleHttpRequest request,
71 final HttpCoreContext context) throws HttpException {
72 final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
73 response.setBody("Whatever", ContentType.TEXT_PLAIN);
74 return response;
75 }
76 });
77
78 final HttpHost target = targetHost();
79
80 final CloseableHttpAsyncClient client = startClient(builer -> builer
81 .setUserTokenHandler((route, context) -> context.getAttribute("user")));
82
83 final int workerCount = 2;
84 final int requestCount = 5;
85
86 final HttpContext[] contexts = new HttpContext[workerCount];
87 final HttpWorker[] workers = new HttpWorker[workerCount];
88 for (int i = 0; i < contexts.length; i++) {
89 final HttpClientContext context = HttpClientContext.create();
90 contexts[i] = context;
91 workers[i] = new HttpWorker(
92 "user" + i,
93 context, requestCount, target, client);
94 }
95
96 for (final HttpWorker worker : workers) {
97 worker.start();
98 }
99 for (final HttpWorker worker : workers) {
100 worker.join(TIMEOUT.toMilliseconds());
101 }
102 for (final HttpWorker worker : workers) {
103 final Exception ex = worker.getException();
104 if (ex != null) {
105 throw ex;
106 }
107 Assertions.assertEquals(requestCount, worker.getCount());
108 }
109
110 for (final HttpContext context : contexts) {
111 final String state0 = (String) context.getAttribute("r0");
112 Assertions.assertNotNull(state0);
113 for (int r = 1; r < requestCount; r++) {
114 Assertions.assertEquals(state0, context.getAttribute("r" + r));
115 }
116 }
117
118 }
119
120 static class HttpWorker extends Thread {
121
122 private final String uid;
123 private final HttpClientContext context;
124 private final int requestCount;
125 private final HttpHost target;
126 private final CloseableHttpAsyncClient httpclient;
127
128 private volatile Exception exception;
129 private volatile int count;
130
131 public HttpWorker(
132 final String uid,
133 final HttpClientContext context,
134 final int requestCount,
135 final HttpHost target,
136 final CloseableHttpAsyncClient httpclient) {
137 super();
138 this.uid = uid;
139 this.context = context;
140 this.requestCount = requestCount;
141 this.target = target;
142 this.httpclient = httpclient;
143 this.count = 0;
144 }
145
146 public int getCount() {
147 return count;
148 }
149
150 public Exception getException() {
151 return exception;
152 }
153
154 @Override
155 public void run() {
156 try {
157 context.setAttribute("user", uid);
158 for (int r = 0; r < requestCount; r++) {
159 final SimpleHttpRequest request = SimpleRequestBuilder.get()
160 .setHttpHost(target)
161 .setPath("/")
162 .build();
163 final Future<SimpleHttpResponse> future = httpclient.execute(request, null);
164 future.get();
165
166 count++;
167 final EndpointDetails endpointDetails = context.getEndpointDetails();
168 final String connuid = Integer.toHexString(System.identityHashCode(endpointDetails));
169 context.setAttribute("r" + r, connuid);
170 }
171
172 } catch (final Exception ex) {
173 exception = ex;
174 }
175 }
176
177 }
178
179 @Test
180 public void testRouteSpecificPoolRecylcing() throws Exception {
181 final H2TestServer server = startServer();
182 server.register("*", () -> new AbstractSimpleServerExchangeHandler() {
183
184 @Override
185 protected SimpleHttpResponse handle(
186 final SimpleHttpRequest request,
187 final HttpCoreContext context) throws HttpException {
188 final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
189 response.setBody("Whatever", ContentType.TEXT_PLAIN);
190 return response;
191 }
192 });
193
194
195
196
197
198 final HttpHost target = targetHost();
199
200 final CloseableHttpAsyncClient client = startClient(builer -> builer
201 .setUserTokenHandler((route, context) -> context.getAttribute("user")));
202 final PoolingAsyncClientConnectionManager connManager = connManager();
203
204 final int maxConn = 2;
205
206 connManager.setMaxTotal(maxConn);
207 connManager.setDefaultMaxPerRoute(maxConn);
208
209
210 final HttpContext context1 = new BasicHttpContext();
211 context1.setAttribute("user", "stuff");
212
213 final SimpleHttpRequest request1 = SimpleRequestBuilder.get()
214 .setHttpHost(target)
215 .setPath("/")
216 .build();
217 final Future<SimpleHttpResponse> future1 = client.execute(request1, context1, null);
218 final HttpResponse response1 = future1.get();
219 Assertions.assertNotNull(response1);
220 Assertions.assertEquals(200, response1.getCode());
221
222
223
224
225
226 Thread.sleep(100);
227
228
229
230 final HttpContext context2 = new BasicHttpContext();
231
232 final SimpleHttpRequest request2 = SimpleRequestBuilder.get()
233 .setScheme(target.getSchemeName())
234 .setAuthority(new URIAuthority("127.0.0.1", target.getPort()))
235 .setPath("/")
236 .build();
237 final Future<SimpleHttpResponse> future2 = client.execute(request2, context2, null);
238 final HttpResponse response2 = future2.get();
239 Assertions.assertNotNull(response2);
240 Assertions.assertEquals(200, response2.getCode());
241
242
243
244
245
246 Thread.sleep(100);
247
248
249
250
251
252
253 final HttpContext context3 = new BasicHttpContext();
254
255 final SimpleHttpRequest request3 = SimpleRequestBuilder.get()
256 .setHttpHost(target)
257 .setPath("/")
258 .build();
259 final Future<SimpleHttpResponse> future3 = client.execute(request3, context3, null);
260 final HttpResponse response3 = future3.get();
261 Assertions.assertNotNull(response3);
262 Assertions.assertEquals(200, response3.getCode());
263 }
264
265 }