1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.sync;
28
29 import java.io.IOException;
30
31 import org.apache.hc.client5.http.UserTokenHandler;
32 import org.apache.hc.client5.http.classic.methods.HttpGet;
33 import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
34 import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
35 import org.apache.hc.client5.http.protocol.HttpClientContext;
36 import org.apache.hc.client5.testing.extension.sync.ClientProtocolLevel;
37 import org.apache.hc.client5.testing.extension.sync.TestClient;
38 import org.apache.hc.core5.http.ClassicHttpRequest;
39 import org.apache.hc.core5.http.ClassicHttpResponse;
40 import org.apache.hc.core5.http.EndpointDetails;
41 import org.apache.hc.core5.http.HttpException;
42 import org.apache.hc.core5.http.HttpHost;
43 import org.apache.hc.core5.http.HttpStatus;
44 import org.apache.hc.core5.http.URIScheme;
45 import org.apache.hc.core5.http.io.HttpRequestHandler;
46 import org.apache.hc.core5.http.io.entity.EntityUtils;
47 import org.apache.hc.core5.http.io.entity.StringEntity;
48 import org.apache.hc.core5.http.protocol.HttpContext;
49 import org.apache.hc.core5.util.Timeout;
50 import org.junit.jupiter.api.Assertions;
51 import org.junit.jupiter.api.Test;
52
53
54
55
56 public class TestStatefulConnManagement extends AbstractIntegrationTestBase {
57
58 public static final Timeout LONG_TIMEOUT = Timeout.ofMinutes(3);
59
60 public TestStatefulConnManagement() {
61 super(URIScheme.HTTP, ClientProtocolLevel.STANDARD);
62 }
63
64 private static class SimpleService implements HttpRequestHandler {
65
66 public SimpleService() {
67 super();
68 }
69
70 @Override
71 public void handle(
72 final ClassicHttpRequest request,
73 final ClassicHttpResponse response,
74 final HttpContext context) throws HttpException, IOException {
75 response.setCode(HttpStatus.SC_OK);
76 final StringEntity entity = new StringEntity("Whatever");
77 response.setEntity(entity);
78 }
79 }
80
81 @Test
82 public void testStatefulConnections() throws Exception {
83 configureServer(bootstrap -> bootstrap
84 .register("*", new SimpleService()));
85 final HttpHost target = startServer();
86
87 final int workerCount = 5;
88 final int requestCount = 5;
89
90 final UserTokenHandler userTokenHandler = (route, context) -> {
91 final String id = (String) context.getAttribute("user");
92 return id;
93 };
94
95 configureClient(builder -> builder
96 .setUserTokenHandler(userTokenHandler));
97 final TestClient client = client();
98
99 final PoolingHttpClientConnectionManager connectionManager = client.getConnectionManager();
100 connectionManager.setMaxTotal(workerCount);
101 connectionManager.setDefaultMaxPerRoute(workerCount);
102
103 final HttpClientContext[] contexts = new HttpClientContext[workerCount];
104 final HttpWorker[] workers = new HttpWorker[workerCount];
105 for (int i = 0; i < contexts.length; i++) {
106 final HttpClientContext context = HttpClientContext.create();
107 contexts[i] = context;
108 workers[i] = new HttpWorker(
109 "user" + i,
110 context, requestCount, target, client);
111 }
112
113 for (final HttpWorker worker : workers) {
114 worker.start();
115 }
116 for (final HttpWorker worker : workers) {
117 worker.join(LONG_TIMEOUT.toMilliseconds());
118 }
119 for (final HttpWorker worker : workers) {
120 final Exception ex = worker.getException();
121 if (ex != null) {
122 throw ex;
123 }
124 Assertions.assertEquals(requestCount, worker.getCount());
125 }
126
127 for (final HttpContext context : contexts) {
128 final String state0 = (String) context.getAttribute("r0");
129 Assertions.assertNotNull(state0);
130 for (int r = 1; r < requestCount; r++) {
131 Assertions.assertEquals(state0, context.getAttribute("r" + r));
132 }
133 }
134
135 }
136
137 static class HttpWorker extends Thread {
138
139 private final String uid;
140 private final HttpClientContext context;
141 private final int requestCount;
142 private final HttpHost target;
143 private final CloseableHttpClient httpclient;
144
145 private volatile Exception exception;
146 private volatile int count;
147
148 public HttpWorker(
149 final String uid,
150 final HttpClientContext context,
151 final int requestCount,
152 final HttpHost target,
153 final CloseableHttpClient httpclient) {
154 super();
155 this.uid = uid;
156 this.context = context;
157 this.requestCount = requestCount;
158 this.target = target;
159 this.httpclient = httpclient;
160 this.count = 0;
161 }
162
163 public int getCount() {
164 return this.count;
165 }
166
167 public Exception getException() {
168 return this.exception;
169 }
170
171 @Override
172 public void run() {
173 try {
174 this.context.setAttribute("user", this.uid);
175 for (int r = 0; r < this.requestCount; r++) {
176 final HttpGet httpget = new HttpGet("/");
177 this.httpclient.execute(this.target, httpget, this.context, response -> {
178 EntityUtils.consume(response.getEntity());
179 return null;
180 });
181 this.count++;
182
183 final EndpointDetails endpointDetails = this.context.getEndpointDetails();
184 final String connuid = Integer.toHexString(System.identityHashCode(endpointDetails));
185 this.context.setAttribute("r" + r, connuid);
186 }
187
188 } catch (final Exception ex) {
189 this.exception = ex;
190 }
191 }
192
193 }
194
195 @Test
196 public void testRouteSpecificPoolRecylcing() throws Exception {
197 configureServer(bootstrap -> bootstrap.register("*", new SimpleService()));
198 final HttpHost target = startServer();
199
200
201
202
203
204 final int maxConn = 2;
205
206 configureClient(builder -> builder
207 .setUserTokenHandler((route, context) -> context.getAttribute("user")));
208 final TestClient client = client();
209
210 final PoolingHttpClientConnectionManager connectionManager = client.getConnectionManager();
211 connectionManager.setMaxTotal(maxConn);
212 connectionManager.setDefaultMaxPerRoute(maxConn);
213
214
215 final HttpContext context1 = HttpClientContext.create();
216 context1.setAttribute("user", "stuff");
217 client.execute(target, new HttpGet("/"), context1, response -> {
218 EntityUtils.consume(response.getEntity());
219 return null;
220 });
221
222
223
224
225
226 Thread.sleep(100);
227
228
229
230 final HttpContext context2 = HttpClientContext.create();
231 client.execute(new HttpHost("127.0.0.1", target.getPort()), new HttpGet("/"), context2, response -> {
232 EntityUtils.consume(response.getEntity());
233 return null;
234 });
235
236
237
238
239 Thread.sleep(100);
240
241
242
243
244
245
246 final HttpContext context3 = HttpClientContext.create();
247 client.execute(target, new HttpGet("/"), context3, response -> {
248 EntityUtils.consume(response.getEntity());
249 return null;
250 });
251
252
253
254
255
256 }
257
258 }