1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.util.Set;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicReference;
37
38 import org.apache.hc.client5.http.DnsResolver;
39 import org.apache.hc.client5.http.EndpointInfo;
40 import org.apache.hc.client5.http.HttpRoute;
41 import org.apache.hc.client5.http.SchemePortResolver;
42 import org.apache.hc.client5.http.config.ConnectionConfig;
43 import org.apache.hc.client5.http.config.TlsConfig;
44 import org.apache.hc.client5.http.impl.ConnPoolSupport;
45 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
46 import org.apache.hc.client5.http.impl.PrefixedIncrementingId;
47 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
48 import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
49 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
50 import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
51 import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
52 import org.apache.hc.core5.annotation.Contract;
53 import org.apache.hc.core5.annotation.Internal;
54 import org.apache.hc.core5.annotation.ThreadingBehavior;
55 import org.apache.hc.core5.concurrent.BasicFuture;
56 import org.apache.hc.core5.concurrent.CallbackContribution;
57 import org.apache.hc.core5.concurrent.ComplexFuture;
58 import org.apache.hc.core5.concurrent.FutureCallback;
59 import org.apache.hc.core5.function.Resolver;
60 import org.apache.hc.core5.http.HttpHost;
61 import org.apache.hc.core5.http.HttpVersion;
62 import org.apache.hc.core5.http.ProtocolVersion;
63 import org.apache.hc.core5.http.URIScheme;
64 import org.apache.hc.core5.http.config.Lookup;
65 import org.apache.hc.core5.http.config.RegistryBuilder;
66 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
67 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
68 import org.apache.hc.core5.http.nio.HandlerFactory;
69 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
70 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
71 import org.apache.hc.core5.http.protocol.HttpContext;
72 import org.apache.hc.core5.http2.HttpVersionPolicy;
73 import org.apache.hc.core5.http2.nio.command.PingCommand;
74 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
75 import org.apache.hc.core5.http2.ssl.ApplicationProtocol;
76 import org.apache.hc.core5.io.CloseMode;
77 import org.apache.hc.core5.pool.ConnPoolControl;
78 import org.apache.hc.core5.pool.LaxConnPool;
79 import org.apache.hc.core5.pool.ManagedConnPool;
80 import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
81 import org.apache.hc.core5.pool.PoolEntry;
82 import org.apache.hc.core5.pool.PoolReusePolicy;
83 import org.apache.hc.core5.pool.PoolStats;
84 import org.apache.hc.core5.pool.StrictConnPool;
85 import org.apache.hc.core5.reactor.Command;
86 import org.apache.hc.core5.reactor.ConnectionInitiator;
87 import org.apache.hc.core5.reactor.ProtocolIOSession;
88 import org.apache.hc.core5.reactor.ssl.TlsDetails;
89 import org.apache.hc.core5.util.Args;
90 import org.apache.hc.core5.util.Deadline;
91 import org.apache.hc.core5.util.Identifiable;
92 import org.apache.hc.core5.util.TimeValue;
93 import org.apache.hc.core5.util.Timeout;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
116 public class PoolingAsyncClientConnectionManager implements AsyncClientConnectionManager, ConnPoolControl<HttpRoute> {
117
118 private static final Logger LOG = LoggerFactory.getLogger(PoolingAsyncClientConnectionManager.class);
119
120 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
121 public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
122
123 private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
124 private final AsyncClientConnectionOperator connectionOperator;
125 private final AtomicBoolean closed;
126
127 private volatile Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
128 private volatile Resolver<HttpHost, TlsConfig> tlsConfigResolver;
129
130 public PoolingAsyncClientConnectionManager() {
131 this(RegistryBuilder.<TlsStrategy>create()
132 .register(URIScheme.HTTPS.getId(), DefaultClientTlsStrategy.createDefault())
133 .build());
134 }
135
136 public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
137 this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND);
138 }
139
140 public PoolingAsyncClientConnectionManager(
141 final Lookup<TlsStrategy> tlsStrategyLookup,
142 final PoolConcurrencyPolicy poolConcurrencyPolicy,
143 final TimeValue timeToLive) {
144 this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
145 }
146
147 public PoolingAsyncClientConnectionManager(
148 final Lookup<TlsStrategy> tlsStrategyLookup,
149 final PoolConcurrencyPolicy poolConcurrencyPolicy,
150 final PoolReusePolicy poolReusePolicy,
151 final TimeValue timeToLive) {
152 this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
153 }
154
155 public PoolingAsyncClientConnectionManager(
156 final Lookup<TlsStrategy> tlsStrategyLookup,
157 final PoolConcurrencyPolicy poolConcurrencyPolicy,
158 final PoolReusePolicy poolReusePolicy,
159 final TimeValue timeToLive,
160 final SchemePortResolver schemePortResolver,
161 final DnsResolver dnsResolver) {
162 this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
163 poolConcurrencyPolicy, poolReusePolicy, timeToLive);
164 }
165
166 @Internal
167 public PoolingAsyncClientConnectionManager(
168 final AsyncClientConnectionOperator connectionOperator,
169 final PoolConcurrencyPolicy poolConcurrencyPolicy,
170 final PoolReusePolicy poolReusePolicy,
171 final TimeValue timeToLive) {
172 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
173 switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
174 case STRICT:
175 this.pool = new StrictConnPool<HttpRoute, ManagedAsyncClientConnection>(
176 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
177 DEFAULT_MAX_TOTAL_CONNECTIONS,
178 timeToLive,
179 poolReusePolicy,
180 null) {
181
182 @Override
183 public void closeExpired() {
184 enumAvailable(e -> closeIfExpired(e));
185 }
186
187 };
188 break;
189 case LAX:
190 this.pool = new LaxConnPool<HttpRoute, ManagedAsyncClientConnection>(
191 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
192 timeToLive,
193 poolReusePolicy,
194 null) {
195
196 @Override
197 public void closeExpired() {
198 enumAvailable(e -> closeIfExpired(e));
199 }
200
201 };
202 break;
203 default:
204 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
205 }
206 this.closed = new AtomicBoolean(false);
207 }
208
209 @Internal
210 protected PoolingAsyncClientConnectionManager(
211 final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
212 final AsyncClientConnectionOperator connectionOperator) {
213 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
214 this.pool = Args.notNull(pool, "Connection pool");
215 this.closed = new AtomicBoolean(false);
216 }
217
218 @Override
219 public void close() {
220 close(CloseMode.GRACEFUL);
221 }
222
223 @Override
224 public void close(final CloseMode closeMode) {
225 if (this.closed.compareAndSet(false, true)) {
226 if (LOG.isDebugEnabled()) {
227 LOG.debug("Shutdown connection pool {}", closeMode);
228 }
229 this.pool.close(closeMode);
230 LOG.debug("Connection pool shut down");
231 }
232 }
233
234 private InternalConnectionEndpoint cast(final AsyncConnectionEndpoint endpoint) {
235 if (endpoint instanceof InternalConnectionEndpoint) {
236 return (InternalConnectionEndpoint) endpoint;
237 }
238 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
239 }
240
241 private ConnectionConfig resolveConnectionConfig(final HttpRoute route) {
242 final Resolver<HttpRoute, ConnectionConfig> resolver = this.connectionConfigResolver;
243 final ConnectionConfig connectionConfig = resolver != null ? resolver.resolve(route) : null;
244 return connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
245 }
246
247 private TlsConfig resolveTlsConfig(final HttpHost host) {
248 final Resolver<HttpHost, TlsConfig> resolver = this.tlsConfigResolver;
249 TlsConfig tlsConfig = resolver != null ? resolver.resolve(host) : null;
250 if (tlsConfig == null) {
251 tlsConfig = TlsConfig.DEFAULT;
252 }
253 if (URIScheme.HTTP.same(host.getSchemeName())
254 && tlsConfig.getHttpVersionPolicy() == HttpVersionPolicy.NEGOTIATE) {
255
256
257 tlsConfig = TlsConfig.copy(tlsConfig)
258 .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
259 .build();
260 }
261 return tlsConfig;
262 }
263
264 @Override
265 public Future<AsyncConnectionEndpoint> lease(
266 final String id,
267 final HttpRoute route,
268 final Object state,
269 final Timeout requestTimeout,
270 final FutureCallback<AsyncConnectionEndpoint> callback) {
271 if (LOG.isDebugEnabled()) {
272 LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
273 }
274 return new Future<AsyncConnectionEndpoint>() {
275
276 final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
277 final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
278
279 final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
280 route,
281 state,
282 requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
283
284 @Override
285 public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
286 if (poolEntry.hasConnection()) {
287 final TimeValue timeToLive = connectionConfig.getTimeToLive();
288 if (TimeValue.isNonNegative(timeToLive)) {
289 if (timeToLive.getDuration() == 0
290 || Deadline.calculate(poolEntry.getCreated(), timeToLive).isExpired()) {
291 poolEntry.discardConnection(CloseMode.GRACEFUL);
292 }
293 }
294 }
295 if (poolEntry.hasConnection()) {
296 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
297 final TimeValue timeValue = connectionConfig.getValidateAfterInactivity();
298 if (connection.isOpen() && TimeValue.isNonNegative(timeValue)) {
299 if (timeValue.getDuration() == 0
300 || Deadline.calculate(poolEntry.getUpdated(), timeValue).isExpired()) {
301 final ProtocolVersion protocolVersion = connection.getProtocolVersion();
302 if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
303 connection.submitCommand(new PingCommand(new BasicPingHandler(result -> {
304 if (result == null || !result) {
305 if (LOG.isDebugEnabled()) {
306 LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
307 }
308 poolEntry.discardConnection(CloseMode.GRACEFUL);
309 }
310 leaseCompleted(poolEntry);
311 })), Command.Priority.IMMEDIATE);
312 return;
313 }
314 if (LOG.isDebugEnabled()) {
315 LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
316 }
317 poolEntry.discardConnection(CloseMode.IMMEDIATE);
318 }
319 }
320 }
321 leaseCompleted(poolEntry);
322 }
323
324 void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
325 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
326 if (connection != null) {
327 connection.activate();
328 }
329 if (LOG.isDebugEnabled()) {
330 LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
331 }
332 final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
333 if (LOG.isDebugEnabled()) {
334 LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
335 }
336 resultFuture.completed(endpoint);
337 }
338
339 @Override
340 public void failed(final Exception ex) {
341 if (LOG.isDebugEnabled()) {
342 LOG.debug("{} endpoint lease failed", id);
343 }
344 resultFuture.failed(ex);
345 }
346
347 @Override
348 public void cancelled() {
349 if (LOG.isDebugEnabled()) {
350 LOG.debug("{} endpoint lease cancelled", id);
351 }
352 resultFuture.cancel();
353 }
354
355 });
356
357 @Override
358 public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
359 return resultFuture.get();
360 }
361
362 @Override
363 public AsyncConnectionEndpoint get(
364 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
365 return resultFuture.get(timeout, unit);
366 }
367
368 @Override
369 public boolean cancel(final boolean mayInterruptIfRunning) {
370 return leaseFuture.cancel(mayInterruptIfRunning);
371 }
372
373 @Override
374 public boolean isDone() {
375 return resultFuture.isDone();
376 }
377
378 @Override
379 public boolean isCancelled() {
380 return resultFuture.isCancelled();
381 }
382
383 };
384 }
385
386 @Override
387 public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
388 Args.notNull(endpoint, "Managed endpoint");
389 Args.notNull(keepAlive, "Keep-alive time");
390 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry = cast(endpoint).detach();
391 if (entry == null) {
392 return;
393 }
394 if (LOG.isDebugEnabled()) {
395 LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
396 }
397 if (this.isClosed()) {
398 return;
399 }
400 final ManagedAsyncClientConnection connection = entry.getConnection();
401 boolean reusable = connection != null && connection.isOpen();
402 try {
403 if (reusable) {
404 entry.updateState(state);
405 entry.updateExpiry(keepAlive);
406 connection.passivate();
407 if (LOG.isDebugEnabled()) {
408 final String s;
409 if (TimeValue.isPositive(keepAlive)) {
410 s = "for " + keepAlive;
411 } else {
412 s = "indefinitely";
413 }
414 LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
415 }
416 }
417 } catch (final RuntimeException ex) {
418 reusable = false;
419 throw ex;
420 } finally {
421 pool.release(entry, reusable);
422 if (LOG.isDebugEnabled()) {
423 LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
424 }
425 }
426 }
427
428 @Override
429 public Future<AsyncConnectionEndpoint> connect(
430 final AsyncConnectionEndpoint endpoint,
431 final ConnectionInitiator connectionInitiator,
432 final Timeout timeout,
433 final Object attachment,
434 final HttpContext context,
435 final FutureCallback<AsyncConnectionEndpoint> callback) {
436 Args.notNull(endpoint, "Endpoint");
437 Args.notNull(connectionInitiator, "Connection initiator");
438 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
439 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
440 if (internalEndpoint.isConnected()) {
441 resultFuture.completed(endpoint);
442 return resultFuture;
443 }
444 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getPoolEntry();
445 final HttpRoute route = poolEntry.getRoute();
446 final HttpHost firstHop = route.getProxyHost() != null ? route.getProxyHost(): route.getTargetHost();
447 final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
448 final Timeout connectTimeout = timeout != null ? timeout : connectionConfig.getConnectTimeout();
449
450 if (LOG.isDebugEnabled()) {
451 LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), firstHop, connectTimeout);
452 }
453 final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
454 connectionInitiator,
455 firstHop,
456 route.getTargetName(),
457 route.getLocalSocketAddress(),
458 connectTimeout,
459 route.isTunnelled() ? null : resolveTlsConfig(route.getTargetHost()),
460 context,
461 new FutureCallback<ManagedAsyncClientConnection>() {
462
463 @Override
464 public void completed(final ManagedAsyncClientConnection connection) {
465 try {
466 if (LOG.isDebugEnabled()) {
467 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
468 }
469 final Timeout socketTimeout = connectionConfig.getSocketTimeout();
470 if (socketTimeout != null) {
471 connection.setSocketTimeout(socketTimeout);
472 }
473 poolEntry.assignConnection(connection);
474 resultFuture.completed(internalEndpoint);
475 } catch (final RuntimeException ex) {
476 resultFuture.failed(ex);
477 }
478 }
479
480 @Override
481 public void failed(final Exception ex) {
482 resultFuture.failed(ex);
483 }
484
485 @Override
486 public void cancelled() {
487 resultFuture.cancel();
488 }
489 });
490 resultFuture.setDependency(connectFuture);
491 return resultFuture;
492 }
493
494 @Override
495 public void upgrade(
496 final AsyncConnectionEndpoint endpoint,
497 final Object attachment,
498 final HttpContext context,
499 final FutureCallback<AsyncConnectionEndpoint> callback) {
500 Args.notNull(endpoint, "Managed endpoint");
501 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
502 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
503 final HttpRoute route = poolEntry.getRoute();
504 final HttpHost target = route.getTargetHost();
505 connectionOperator.upgrade(
506 poolEntry.getConnection(),
507 target,
508 route.getTargetName(),
509 attachment != null ? attachment : resolveTlsConfig(target),
510 context,
511 new CallbackContribution<ManagedAsyncClientConnection>(callback) {
512
513 @Override
514 public void completed(final ManagedAsyncClientConnection connection) {
515 if (LOG.isDebugEnabled()) {
516 LOG.debug("{} upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
517 }
518 final TlsDetails tlsDetails = connection.getTlsDetails();
519 if (tlsDetails != null && ApplicationProtocol.HTTP_2.id.equals(tlsDetails.getApplicationProtocol())) {
520 connection.switchProtocol(ApplicationProtocol.HTTP_2.id, new CallbackContribution<ProtocolIOSession>(callback) {
521
522 @Override
523 public void completed(final ProtocolIOSession protocolIOSession) {
524 if (callback != null) {
525 callback.completed(endpoint);
526 }
527 }
528
529 });
530 } else {
531 if (callback != null) {
532 callback.completed(endpoint);
533 }
534 }
535 }
536 });
537 }
538
539 @Override
540 public void upgrade(final AsyncConnectionEndpoint endpoint, final Object attachment, final HttpContext context) {
541 upgrade(endpoint, attachment, context, null);
542 }
543
544 @Override
545 public Set<HttpRoute> getRoutes() {
546 return pool.getRoutes();
547 }
548
549 @Override
550 public void setMaxTotal(final int max) {
551 pool.setMaxTotal(max);
552 }
553
554 @Override
555 public int getMaxTotal() {
556 return pool.getMaxTotal();
557 }
558
559 @Override
560 public void setDefaultMaxPerRoute(final int max) {
561 pool.setDefaultMaxPerRoute(max);
562 }
563
564 @Override
565 public int getDefaultMaxPerRoute() {
566 return pool.getDefaultMaxPerRoute();
567 }
568
569 @Override
570 public void setMaxPerRoute(final HttpRoute route, final int max) {
571 pool.setMaxPerRoute(route, max);
572 }
573
574 @Override
575 public int getMaxPerRoute(final HttpRoute route) {
576 return pool.getMaxPerRoute(route);
577 }
578
579 @Override
580 public void closeIdle(final TimeValue idletime) {
581 if (isClosed()) {
582 return;
583 }
584 pool.closeIdle(idletime);
585 }
586
587 @Override
588 public void closeExpired() {
589 if (isClosed()) {
590 return;
591 }
592 pool.closeExpired();
593 }
594
595 @Override
596 public PoolStats getTotalStats() {
597 return pool.getTotalStats();
598 }
599
600 @Override
601 public PoolStats getStats(final HttpRoute route) {
602 return pool.getStats(route);
603 }
604
605
606
607
608
609
610 public void setDefaultConnectionConfig(final ConnectionConfig config) {
611 this.connectionConfigResolver = (route) -> config;
612 }
613
614
615
616
617
618
619 public void setConnectionConfigResolver(final Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver) {
620 this.connectionConfigResolver = connectionConfigResolver;
621 }
622
623
624
625
626
627
628 public void setDefaultTlsConfig(final TlsConfig config) {
629 this.tlsConfigResolver = (host) -> config;
630 }
631
632
633
634
635
636
637 public void setTlsConfigResolver(final Resolver<HttpHost, TlsConfig> tlsConfigResolver) {
638 this.tlsConfigResolver = tlsConfigResolver;
639 }
640
641 void closeIfExpired(final PoolEntry<HttpRoute, ManagedAsyncClientConnection > entry) {
642 final long now = System.currentTimeMillis();
643 if (entry.getExpiryDeadline().isBefore(now)) {
644 entry.discardConnection(CloseMode.GRACEFUL);
645 } else {
646 final ConnectionConfig connectionConfig = resolveConnectionConfig(entry.getRoute());
647 final TimeValue timeToLive = connectionConfig.getTimeToLive();
648 if (timeToLive != null && Deadline.calculate(entry.getCreated(), timeToLive).isBefore(now)) {
649 entry.discardConnection(CloseMode.GRACEFUL);
650 }
651 }
652 }
653
654
655
656
657 @Deprecated
658 public TimeValue getValidateAfterInactivity() {
659 return ConnectionConfig.DEFAULT.getValidateAfterInactivity();
660 }
661
662
663
664
665
666
667
668
669
670
671 @Deprecated
672 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
673 setDefaultConnectionConfig(ConnectionConfig.custom()
674 .setValidateAfterInactivity(validateAfterInactivity)
675 .build());
676 }
677
678 private static final PrefixedIncrementingId INCREMENTING_ID = new PrefixedIncrementingId("ep-");
679
680 static class InternalConnectionEndpoint extends AsyncConnectionEndpoint implements Identifiable {
681
682 private final AtomicReference<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> poolEntryRef;
683 private final String id;
684
685 InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
686 this.poolEntryRef = new AtomicReference<>(poolEntry);
687 this.id = INCREMENTING_ID.getNextId();
688 }
689
690 @Override
691 public String getId() {
692 return id;
693 }
694
695 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getPoolEntry() {
696 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
697 if (poolEntry == null) {
698 throw new ConnectionShutdownException();
699 }
700 return poolEntry;
701 }
702
703 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getValidatedPoolEntry() {
704 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = getPoolEntry();
705 if (poolEntry.getConnection() == null) {
706 throw new ConnectionShutdownException();
707 }
708 return poolEntry;
709 }
710
711 PoolEntry<HttpRoute, ManagedAsyncClientConnection> detach() {
712 return poolEntryRef.getAndSet(null);
713 }
714
715 @Override
716 public void close(final CloseMode closeMode) {
717 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
718 if (poolEntry != null) {
719 if (LOG.isDebugEnabled()) {
720 LOG.debug("{} close {}", id, closeMode);
721 }
722 poolEntry.discardConnection(closeMode);
723 }
724 }
725
726 @Override
727 public boolean isConnected() {
728 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
729 if (poolEntry == null) {
730 return false;
731 }
732 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
733 if (connection == null) {
734 return false;
735 }
736 if (!connection.isOpen()) {
737 poolEntry.discardConnection(CloseMode.IMMEDIATE);
738 return false;
739 }
740 return true;
741 }
742
743 @Override
744 public void setSocketTimeout(final Timeout timeout) {
745 getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
746 }
747
748 @Override
749 public void execute(
750 final String exchangeId,
751 final AsyncClientExchangeHandler exchangeHandler,
752 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
753 final HttpContext context) {
754 final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
755 if (LOG.isDebugEnabled()) {
756 LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
757 }
758 context.setProtocolVersion(connection.getProtocolVersion());
759 connection.submitCommand(
760 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
761 Command.Priority.NORMAL);
762 }
763
764 @Override
765 public EndpointInfo getInfo() {
766 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
767 if (poolEntry != null) {
768 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
769 if (connection != null && connection.isOpen()) {
770 final TlsDetails tlsDetails = connection.getTlsDetails();
771 return new EndpointInfo(connection.getProtocolVersion(), tlsDetails != null ? tlsDetails.getSSLSession() : null);
772 }
773 }
774 return null;
775 }
776
777 }
778
779
780
781
782
783
784
785
786
787 public boolean isClosed() {
788 return this.closed.get();
789 }
790
791 }