1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import java.util.concurrent.Future;
30
31 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
32 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
33 import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
34 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
35 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
36 import org.apache.hc.client5.http.protocol.HttpClientContext;
37 import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
38 import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
39 import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
40 import org.apache.hc.core5.http.ContentType;
41 import org.apache.hc.core5.http.EndpointDetails;
42 import org.apache.hc.core5.http.HttpException;
43 import org.apache.hc.core5.http.HttpHost;
44 import org.apache.hc.core5.http.HttpResponse;
45 import org.apache.hc.core5.http.HttpStatus;
46 import org.apache.hc.core5.http.URIScheme;
47 import org.apache.hc.core5.http.protocol.HttpContext;
48 import org.apache.hc.core5.http.protocol.HttpCoreContext;
49 import org.apache.hc.core5.net.URIAuthority;
50 import org.junit.jupiter.api.Assertions;
51 import org.junit.jupiter.api.Test;
52
53 public class TestHttp1AsyncStatefulConnManagement extends AbstractIntegrationTestBase {
54
55 public TestHttp1AsyncStatefulConnManagement() {
56 super(URIScheme.HTTP, ClientProtocolLevel.STANDARD, ServerProtocolLevel.STANDARD);
57 }
58
59 @Test
60 public void testStatefulConnections() throws Exception {
61 configureServer(bootstrap -> bootstrap.register("*", () -> new AbstractSimpleServerExchangeHandler() {
62
63 @Override
64 protected SimpleHttpResponse handle(
65 final SimpleHttpRequest request,
66 final HttpCoreContext context) throws HttpException {
67 final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
68 response.setBody("Whatever", ContentType.TEXT_PLAIN);
69 return response;
70 }
71 }));
72 final HttpHost target = startServer();
73
74 configureClient(builder -> builder
75 .setUserTokenHandler((route, context) -> context.getAttribute("user")));
76
77 final TestAsyncClient client = startClient();
78
79 final int workerCount = 2;
80 final int requestCount = 5;
81
82 final HttpContext[] contexts = new HttpContext[workerCount];
83 final HttpWorker[] workers = new HttpWorker[workerCount];
84 for (int i = 0; i < contexts.length; i++) {
85 final HttpClientContext context = HttpClientContext.create();
86 contexts[i] = context;
87 workers[i] = new HttpWorker(
88 "user" + i,
89 context, requestCount, target, client);
90 }
91
92 for (final HttpWorker worker : workers) {
93 worker.start();
94 }
95 for (final HttpWorker worker : workers) {
96 worker.join(TIMEOUT.toMilliseconds());
97 }
98 for (final HttpWorker worker : workers) {
99 final Exception ex = worker.getException();
100 if (ex != null) {
101 throw ex;
102 }
103 Assertions.assertEquals(requestCount, worker.getCount());
104 }
105
106 for (final HttpContext context : contexts) {
107 final String state0 = (String) context.getAttribute("r0");
108 Assertions.assertNotNull(state0);
109 for (int r = 1; r < requestCount; r++) {
110 Assertions.assertEquals(state0, context.getAttribute("r" + r));
111 }
112 }
113
114 }
115
116 static class HttpWorker extends Thread {
117
118 private final String uid;
119 private final HttpClientContext context;
120 private final int requestCount;
121 private final HttpHost target;
122 private final CloseableHttpAsyncClient httpclient;
123
124 private volatile Exception exception;
125 private volatile int count;
126
127 public HttpWorker(
128 final String uid,
129 final HttpClientContext context,
130 final int requestCount,
131 final HttpHost target,
132 final CloseableHttpAsyncClient httpclient) {
133 super();
134 this.uid = uid;
135 this.context = context;
136 this.requestCount = requestCount;
137 this.target = target;
138 this.httpclient = httpclient;
139 this.count = 0;
140 }
141
142 public int getCount() {
143 return count;
144 }
145
146 public Exception getException() {
147 return exception;
148 }
149
150 @Override
151 public void run() {
152 try {
153 context.setAttribute("user", uid);
154 for (int r = 0; r < requestCount; r++) {
155 final SimpleHttpRequest request = SimpleRequestBuilder.get()
156 .setHttpHost(target)
157 .setPath("/")
158 .build();
159 final Future<SimpleHttpResponse> future = httpclient.execute(request, null);
160 future.get();
161
162 count++;
163 final EndpointDetails endpointDetails = context.getEndpointDetails();
164 final String connuid = Integer.toHexString(System.identityHashCode(endpointDetails));
165 context.setAttribute("r" + r, connuid);
166 }
167
168 } catch (final Exception ex) {
169 exception = ex;
170 }
171 }
172
173 }
174
175 @Test
176 public void testRouteSpecificPoolRecylcing() throws Exception {
177 configureServer(bootstrap -> bootstrap.register("*", () -> new AbstractSimpleServerExchangeHandler() {
178
179 @Override
180 protected SimpleHttpResponse handle(
181 final SimpleHttpRequest request,
182 final HttpCoreContext context) throws HttpException {
183 final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
184 response.setBody("Whatever", ContentType.TEXT_PLAIN);
185 return response;
186 }
187 }));
188 final HttpHost target = startServer();
189
190
191
192
193
194 configureClient(builder -> builder
195 .setUserTokenHandler((route, context) -> context.getAttribute("user")));
196
197 final TestAsyncClient client = startClient();
198
199 final PoolingAsyncClientConnectionManager connManager = client.getConnectionManager();
200
201 final int maxConn = 2;
202
203 connManager.setMaxTotal(maxConn);
204 connManager.setDefaultMaxPerRoute(maxConn);
205
206
207 final HttpContext context1 = HttpClientContext.create();
208 context1.setAttribute("user", "stuff");
209
210 final SimpleHttpRequest request1 = SimpleRequestBuilder.get()
211 .setHttpHost(target)
212 .setPath("/")
213 .build();
214 final Future<SimpleHttpResponse> future1 = client.execute(request1, context1, null);
215 final HttpResponse response1 = future1.get();
216 Assertions.assertNotNull(response1);
217 Assertions.assertEquals(200, response1.getCode());
218
219
220
221
222
223 Thread.sleep(100);
224
225
226
227 final HttpContext context2 = HttpClientContext.create();
228
229 final SimpleHttpRequest request2 = SimpleRequestBuilder.get()
230 .setScheme(target.getSchemeName())
231 .setAuthority(new URIAuthority("127.0.0.1", target.getPort()))
232 .setPath("/")
233 .build();
234 final Future<SimpleHttpResponse> future2 = client.execute(request2, context2, null);
235 final HttpResponse response2 = future2.get();
236 Assertions.assertNotNull(response2);
237 Assertions.assertEquals(200, response2.getCode());
238
239
240
241
242
243 Thread.sleep(100);
244
245
246
247
248
249
250 final HttpContext context3 = HttpClientContext.create();
251
252 final SimpleHttpRequest request3 = SimpleRequestBuilder.get()
253 .setHttpHost(target)
254 .setPath("/")
255 .build();
256 final Future<SimpleHttpResponse> future3 = client.execute(request3, context3, null);
257 final HttpResponse response3 = future3.get();
258 Assertions.assertNotNull(response3);
259 Assertions.assertEquals(200, response3.getCode());
260 }
261
262 }