1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.io;
28
29 import java.io.IOException;
30 import java.util.Set;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeoutException;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import org.apache.hc.client5.http.DnsResolver;
40 import org.apache.hc.client5.http.HttpRoute;
41 import org.apache.hc.client5.http.SchemePortResolver;
42 import org.apache.hc.client5.http.impl.ConnPoolSupport;
43 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
44 import org.apache.hc.client5.http.io.ConnectionEndpoint;
45 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
46 import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
47 import org.apache.hc.client5.http.io.LeaseRequest;
48 import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
49 import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
50 import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
51 import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
52 import org.apache.hc.core5.annotation.Contract;
53 import org.apache.hc.core5.annotation.Internal;
54 import org.apache.hc.core5.annotation.ThreadingBehavior;
55 import org.apache.hc.core5.http.ClassicHttpRequest;
56 import org.apache.hc.core5.http.ClassicHttpResponse;
57 import org.apache.hc.core5.http.HttpException;
58 import org.apache.hc.core5.http.HttpHost;
59 import org.apache.hc.core5.http.URIScheme;
60 import org.apache.hc.core5.http.config.Registry;
61 import org.apache.hc.core5.http.config.RegistryBuilder;
62 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
63 import org.apache.hc.core5.http.io.HttpConnectionFactory;
64 import org.apache.hc.core5.http.io.SocketConfig;
65 import org.apache.hc.core5.http.protocol.HttpContext;
66 import org.apache.hc.core5.io.CloseMode;
67 import org.apache.hc.core5.pool.ConnPoolControl;
68 import org.apache.hc.core5.pool.LaxConnPool;
69 import org.apache.hc.core5.pool.ManagedConnPool;
70 import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
71 import org.apache.hc.core5.pool.PoolEntry;
72 import org.apache.hc.core5.pool.PoolReusePolicy;
73 import org.apache.hc.core5.pool.PoolStats;
74 import org.apache.hc.core5.pool.StrictConnPool;
75 import org.apache.hc.core5.util.Args;
76 import org.apache.hc.core5.util.Asserts;
77 import org.apache.hc.core5.util.Identifiable;
78 import org.apache.hc.core5.util.TimeValue;
79 import org.apache.hc.core5.util.Timeout;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
105 public class PoolingHttpClientConnectionManager
106 implements HttpClientConnectionManager, ConnPoolControl<HttpRoute> {
107
108 private static final Logger LOG = LoggerFactory.getLogger(PoolingHttpClientConnectionManager.class);
109
110 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
111 public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
112
113 private final HttpClientConnectionOperator connectionOperator;
114 private final ManagedConnPool<HttpRoute, ManagedHttpClientConnection> pool;
115 private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
116 private final AtomicBoolean closed;
117
118 private volatile SocketConfig defaultSocketConfig;
119 private volatile TimeValue validateAfterInactivity;
120
121 public PoolingHttpClientConnectionManager() {
122 this(RegistryBuilder.<ConnectionSocketFactory>create()
123 .register(URIScheme.HTTP.id, PlainConnectionSocketFactory.getSocketFactory())
124 .register(URIScheme.HTTPS.id, SSLConnectionSocketFactory.getSocketFactory())
125 .build());
126 }
127
128 public PoolingHttpClientConnectionManager(
129 final Registry<ConnectionSocketFactory> socketFactoryRegistry) {
130 this(socketFactoryRegistry, null);
131 }
132
133 public PoolingHttpClientConnectionManager(
134 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
135 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
136 this(socketFactoryRegistry, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND, connFactory);
137 }
138
139 public PoolingHttpClientConnectionManager(
140 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
141 final PoolConcurrencyPolicy poolConcurrencyPolicy,
142 final TimeValue timeToLive,
143 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
144 this(socketFactoryRegistry, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive, connFactory);
145 }
146
147 public PoolingHttpClientConnectionManager(
148 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
149 final PoolConcurrencyPolicy poolConcurrencyPolicy,
150 final PoolReusePolicy poolReusePolicy,
151 final TimeValue timeToLive) {
152 this(socketFactoryRegistry, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null);
153 }
154
155 public PoolingHttpClientConnectionManager(
156 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
157 final PoolConcurrencyPolicy poolConcurrencyPolicy,
158 final PoolReusePolicy poolReusePolicy,
159 final TimeValue timeToLive,
160 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
161 this(socketFactoryRegistry, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null, connFactory);
162 }
163
164 public PoolingHttpClientConnectionManager(
165 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
166 final PoolConcurrencyPolicy poolConcurrencyPolicy,
167 final PoolReusePolicy poolReusePolicy,
168 final TimeValue timeToLive,
169 final SchemePortResolver schemePortResolver,
170 final DnsResolver dnsResolver,
171 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
172 this(new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver),
173 poolConcurrencyPolicy,
174 poolReusePolicy,
175 timeToLive,
176 connFactory);
177 }
178
179 @Internal
180 protected PoolingHttpClientConnectionManager(
181 final HttpClientConnectionOperator httpClientConnectionOperator,
182 final PoolConcurrencyPolicy poolConcurrencyPolicy,
183 final PoolReusePolicy poolReusePolicy,
184 final TimeValue timeToLive,
185 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
186 super();
187 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
188 switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
189 case STRICT:
190 this.pool = new StrictConnPool<>(
191 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
192 DEFAULT_MAX_TOTAL_CONNECTIONS,
193 timeToLive,
194 poolReusePolicy,
195 null);
196 break;
197 case LAX:
198 this.pool = new LaxConnPool<>(
199 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
200 timeToLive,
201 poolReusePolicy,
202 null);
203 break;
204 default:
205 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
206 }
207 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
208 this.closed = new AtomicBoolean(false);
209 }
210
211 @Internal
212 protected PoolingHttpClientConnectionManager(
213 final HttpClientConnectionOperator httpClientConnectionOperator,
214 final ManagedConnPool<HttpRoute, ManagedHttpClientConnection> pool,
215 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
216 super();
217 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
218 this.pool = Args.notNull(pool, "Connection pool");
219 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
220 this.closed = new AtomicBoolean(false);
221 }
222
223 @Override
224 public void close() {
225 close(CloseMode.GRACEFUL);
226 }
227
228 @Override
229 public void close(final CloseMode closeMode) {
230 if (this.closed.compareAndSet(false, true)) {
231 if (LOG.isDebugEnabled()) {
232 LOG.debug("Shutdown connection pool {}", closeMode);
233 }
234 this.pool.close(closeMode);
235 LOG.debug("Connection pool shut down");
236 }
237 }
238
239 private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
240 if (endpoint instanceof InternalConnectionEndpoint) {
241 return (InternalConnectionEndpoint) endpoint;
242 }
243 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
244 }
245
246 public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
247 return lease(id, route, Timeout.DISABLED, state);
248 }
249
250 @Override
251 public LeaseRequest lease(
252 final String id,
253 final HttpRoute route,
254 final Timeout requestTimeout,
255 final Object state) {
256 Args.notNull(route, "HTTP route");
257 if (LOG.isDebugEnabled()) {
258 LOG.debug("{}: endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
259 }
260 final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(route, state, requestTimeout, null);
261 return new LeaseRequest() {
262
263 private volatile ConnectionEndpoint endpoint;
264
265 @Override
266 public synchronized ConnectionEndpoint get(
267 final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
268 Args.notNull(timeout, "Operation timeout");
269 if (this.endpoint != null) {
270 return this.endpoint;
271 }
272 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry;
273 try {
274 poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
275 if (poolEntry == null || leaseFuture.isCancelled()) {
276 throw new ExecutionException(new CancellationException("Operation cancelled"));
277 }
278 } catch (final TimeoutException ex) {
279 leaseFuture.cancel(true);
280 throw ex;
281 }
282 if (LOG.isDebugEnabled()) {
283 LOG.debug("{}: endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
284 }
285 try {
286 final TimeValue validateAfterInactivitySnapshot = validateAfterInactivity;
287 if (TimeValue.isNonNegative(validateAfterInactivitySnapshot)) {
288 final ManagedHttpClientConnection conn = poolEntry.getConnection();
289 if (conn != null
290 && poolEntry.getUpdated() + validateAfterInactivitySnapshot.toMilliseconds() <= System.currentTimeMillis()) {
291 boolean stale;
292 try {
293 stale = conn.isStale();
294 } catch (final IOException ignore) {
295 stale = true;
296 }
297 if (stale) {
298 if (LOG.isDebugEnabled()) {
299 LOG.debug("{}: connection {} is stale", id, ConnPoolSupport.getId(conn));
300 }
301 poolEntry.discardConnection(CloseMode.IMMEDIATE);
302 }
303 }
304 }
305 final ManagedHttpClientConnection conn = poolEntry.getConnection();
306 if (conn != null) {
307 conn.activate();
308 } else {
309 poolEntry.assignConnection(connFactory.createConnection(null));
310 }
311 if (leaseFuture.isCancelled()) {
312 if (LOG.isDebugEnabled()) {
313 LOG.debug("{}: endpoint lease cancelled", id);
314 }
315 pool.release(poolEntry, false);
316 } else {
317 this.endpoint = new InternalConnectionEndpoint(poolEntry);
318 if (LOG.isDebugEnabled()) {
319 LOG.debug("{}: acquired {}", id, ConnPoolSupport.getId(endpoint));
320 }
321 }
322 return this.endpoint;
323 } catch (final Exception ex) {
324 if (LOG.isDebugEnabled()) {
325 LOG.debug("{}: endpoint lease failed", id);
326 }
327 pool.release(poolEntry, false);
328 throw new ExecutionException(ex.getMessage(), ex);
329 }
330 }
331
332 @Override
333 public boolean cancel() {
334 return leaseFuture.cancel(true);
335 }
336
337 };
338
339 }
340
341 @Override
342 public void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
343 Args.notNull(endpoint, "Managed endpoint");
344 final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = cast(endpoint).detach();
345 if (entry == null) {
346 return;
347 }
348 if (LOG.isDebugEnabled()) {
349 LOG.debug("{}: releasing endpoint", ConnPoolSupport.getId(endpoint));
350 }
351 final ManagedHttpClientConnection conn = entry.getConnection();
352 if (conn != null && keepAlive == null) {
353 conn.close(CloseMode.GRACEFUL);
354 }
355 boolean reusable = conn != null && conn.isOpen() && conn.isConsistent();
356 try {
357 if (reusable) {
358 entry.updateState(state);
359 entry.updateExpiry(keepAlive);
360 conn.passivate();
361 if (LOG.isDebugEnabled()) {
362 final String s;
363 if (TimeValue.isPositive(keepAlive)) {
364 s = "for " + keepAlive;
365 } else {
366 s = "indefinitely";
367 }
368 LOG.debug("{}: connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn), s);
369 }
370 } else {
371 if (LOG.isDebugEnabled()) {
372 LOG.debug("{}: connection is not kept alive", ConnPoolSupport.getId(endpoint));
373 }
374 }
375 } catch (final RuntimeException ex) {
376 reusable = false;
377 throw ex;
378 } finally {
379 this.pool.release(entry, reusable);
380 if (LOG.isDebugEnabled()) {
381 LOG.debug("{}: connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
382 }
383 }
384 }
385
386 @Override
387 public void connect(final ConnectionEndpoint endpoint, final TimeValue connectTimeout, final HttpContext context) throws IOException {
388 Args.notNull(endpoint, "Managed endpoint");
389 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
390 if (internalEndpoint.isConnected()) {
391 return;
392 }
393 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = internalEndpoint.getPoolEntry();
394 if (!poolEntry.hasConnection()) {
395 poolEntry.assignConnection(connFactory.createConnection(null));
396 }
397 final HttpRoute route = poolEntry.getRoute();
398 final HttpHost host;
399 if (route.getProxyHost() != null) {
400 host = route.getProxyHost();
401 } else {
402 host = route.getTargetHost();
403 }
404 if (LOG.isDebugEnabled()) {
405 LOG.debug("{}: connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
406 }
407 final ManagedHttpClientConnection conn = poolEntry.getConnection();
408 final SocketConfig defaultSocketConfigSnapshot = defaultSocketConfig;
409 this.connectionOperator.connect(
410 conn,
411 host,
412 route.getLocalSocketAddress(),
413 connectTimeout,
414 defaultSocketConfigSnapshot != null ? defaultSocketConfigSnapshot : SocketConfig.DEFAULT,
415 context);
416 if (LOG.isDebugEnabled()) {
417 LOG.debug("{}: connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
418 }
419 }
420
421 @Override
422 public void upgrade(final ConnectionEndpoint endpoint, final HttpContext context) throws IOException {
423 Args.notNull(endpoint, "Managed endpoint");
424 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
425 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
426 final HttpRoute route = poolEntry.getRoute();
427 this.connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), context);
428 }
429
430 @Override
431 public void closeIdle(final TimeValue idleTime) {
432 Args.notNull(idleTime, "Idle time");
433 if (LOG.isDebugEnabled()) {
434 LOG.debug("Closing connections idle longer than {}", idleTime);
435 }
436 this.pool.closeIdle(idleTime);
437 }
438
439 @Override
440 public void closeExpired() {
441 LOG.debug("Closing expired connections");
442 this.pool.closeExpired();
443 }
444
445 @Override
446 public Set<HttpRoute> getRoutes() {
447 return this.pool.getRoutes();
448 }
449
450 @Override
451 public int getMaxTotal() {
452 return this.pool.getMaxTotal();
453 }
454
455 @Override
456 public void setMaxTotal(final int max) {
457 this.pool.setMaxTotal(max);
458 }
459
460 @Override
461 public int getDefaultMaxPerRoute() {
462 return this.pool.getDefaultMaxPerRoute();
463 }
464
465 @Override
466 public void setDefaultMaxPerRoute(final int max) {
467 this.pool.setDefaultMaxPerRoute(max);
468 }
469
470 @Override
471 public int getMaxPerRoute(final HttpRoute route) {
472 return this.pool.getMaxPerRoute(route);
473 }
474
475 @Override
476 public void setMaxPerRoute(final HttpRoute route, final int max) {
477 this.pool.setMaxPerRoute(route, max);
478 }
479
480 @Override
481 public PoolStats getTotalStats() {
482 return this.pool.getTotalStats();
483 }
484
485 @Override
486 public PoolStats getStats(final HttpRoute route) {
487 return this.pool.getStats(route);
488 }
489
490 public SocketConfig getDefaultSocketConfig() {
491 return this.defaultSocketConfig;
492 }
493
494 public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
495 this.defaultSocketConfig = defaultSocketConfig;
496 }
497
498
499
500
501
502
503 public TimeValue getValidateAfterInactivity() {
504 return validateAfterInactivity;
505 }
506
507
508
509
510
511
512
513
514
515 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
516 this.validateAfterInactivity = validateAfterInactivity;
517 }
518
519 private static final AtomicLong COUNT = new AtomicLong(0);
520
521 class InternalConnectionEndpoint extends ConnectionEndpoint implements Identifiable {
522
523 private final AtomicReference<PoolEntry<HttpRoute, ManagedHttpClientConnection>> poolEntryRef;
524 private final String id;
525
526 InternalConnectionEndpoint(
527 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry) {
528 this.poolEntryRef = new AtomicReference<>(poolEntry);
529 this.id = String.format("ep-%08X", COUNT.getAndIncrement());
530 }
531
532 @Override
533 public String getId() {
534 return id;
535 }
536
537 PoolEntry<HttpRoute, ManagedHttpClientConnection> getPoolEntry() {
538 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = poolEntryRef.get();
539 if (poolEntry == null) {
540 throw new ConnectionShutdownException();
541 }
542 return poolEntry;
543 }
544
545 PoolEntry<HttpRoute, ManagedHttpClientConnection> getValidatedPoolEntry() {
546 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = getPoolEntry();
547 final ManagedHttpClientConnection connection = poolEntry.getConnection();
548 Asserts.check(connection != null && connection.isOpen(), "Endpoint is not connected");
549 return poolEntry;
550 }
551
552 PoolEntry<HttpRoute, ManagedHttpClientConnection> detach() {
553 return poolEntryRef.getAndSet(null);
554 }
555
556 @Override
557 public void close(final CloseMode closeMode) {
558 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = poolEntryRef.get();
559 if (poolEntry != null) {
560 poolEntry.discardConnection(closeMode);
561 }
562 }
563
564 @Override
565 public void close() throws IOException {
566 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = poolEntryRef.get();
567 if (poolEntry != null) {
568 poolEntry.discardConnection(CloseMode.GRACEFUL);
569 }
570 }
571
572 @Override
573 public boolean isConnected() {
574 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = getPoolEntry();
575 final ManagedHttpClientConnection connection = poolEntry.getConnection();
576 return connection != null && connection.isOpen();
577 }
578
579 @Override
580 public void setSocketTimeout(final Timeout timeout) {
581 getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
582 }
583
584 @Override
585 public ClassicHttpResponse execute(
586 final String exchangeId,
587 final ClassicHttpRequest request,
588 final HttpRequestExecutor requestExecutor,
589 final HttpContext context) throws IOException, HttpException {
590 Args.notNull(request, "HTTP request");
591 Args.notNull(requestExecutor, "Request executor");
592 final ManagedHttpClientConnection connection = getValidatedPoolEntry().getConnection();
593 if (LOG.isDebugEnabled()) {
594 LOG.debug("{}: executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
595 }
596 return requestExecutor.execute(request, connection, context);
597 }
598
599 }
600
601 }