1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.net.InetSocketAddress;
31 import java.util.Set;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import org.apache.hc.client5.http.DnsResolver;
40 import org.apache.hc.client5.http.HttpRoute;
41 import org.apache.hc.client5.http.SchemePortResolver;
42 import org.apache.hc.client5.http.config.ConnectionConfig;
43 import org.apache.hc.client5.http.config.TlsConfig;
44 import org.apache.hc.client5.http.impl.ConnPoolSupport;
45 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
46 import org.apache.hc.client5.http.impl.PrefixedIncrementingId;
47 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
48 import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
49 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
50 import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
51 import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
52 import org.apache.hc.core5.annotation.Contract;
53 import org.apache.hc.core5.annotation.Internal;
54 import org.apache.hc.core5.annotation.ThreadingBehavior;
55 import org.apache.hc.core5.concurrent.BasicFuture;
56 import org.apache.hc.core5.concurrent.CallbackContribution;
57 import org.apache.hc.core5.concurrent.ComplexFuture;
58 import org.apache.hc.core5.concurrent.FutureCallback;
59 import org.apache.hc.core5.function.Resolver;
60 import org.apache.hc.core5.http.HttpHost;
61 import org.apache.hc.core5.http.HttpVersion;
62 import org.apache.hc.core5.http.ProtocolVersion;
63 import org.apache.hc.core5.http.URIScheme;
64 import org.apache.hc.core5.http.config.Lookup;
65 import org.apache.hc.core5.http.config.RegistryBuilder;
66 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
67 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
68 import org.apache.hc.core5.http.nio.HandlerFactory;
69 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
70 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
71 import org.apache.hc.core5.http.protocol.HttpContext;
72 import org.apache.hc.core5.http2.HttpVersionPolicy;
73 import org.apache.hc.core5.http2.nio.command.PingCommand;
74 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
75 import org.apache.hc.core5.http2.ssl.ApplicationProtocol;
76 import org.apache.hc.core5.io.CloseMode;
77 import org.apache.hc.core5.pool.ConnPoolControl;
78 import org.apache.hc.core5.pool.LaxConnPool;
79 import org.apache.hc.core5.pool.ManagedConnPool;
80 import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
81 import org.apache.hc.core5.pool.PoolEntry;
82 import org.apache.hc.core5.pool.PoolReusePolicy;
83 import org.apache.hc.core5.pool.PoolStats;
84 import org.apache.hc.core5.pool.StrictConnPool;
85 import org.apache.hc.core5.reactor.Command;
86 import org.apache.hc.core5.reactor.ConnectionInitiator;
87 import org.apache.hc.core5.reactor.ProtocolIOSession;
88 import org.apache.hc.core5.reactor.ssl.TlsDetails;
89 import org.apache.hc.core5.util.Args;
90 import org.apache.hc.core5.util.Deadline;
91 import org.apache.hc.core5.util.Identifiable;
92 import org.apache.hc.core5.util.TimeValue;
93 import org.apache.hc.core5.util.Timeout;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
116 public class PoolingAsyncClientConnectionManager implements AsyncClientConnectionManager, ConnPoolControl<HttpRoute> {
117
118 private static final Logger LOG = LoggerFactory.getLogger(PoolingAsyncClientConnectionManager.class);
119
120 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
121 public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
122
123 private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
124 private final AsyncClientConnectionOperator connectionOperator;
125 private final AtomicBoolean closed;
126
127 private volatile Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
128 private volatile Resolver<HttpHost, TlsConfig> tlsConfigResolver;
129
130 public PoolingAsyncClientConnectionManager() {
131 this(RegistryBuilder.<TlsStrategy>create()
132 .register(URIScheme.HTTPS.getId(), DefaultClientTlsStrategy.getDefault())
133 .build());
134 }
135
136 public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
137 this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND);
138 }
139
140 public PoolingAsyncClientConnectionManager(
141 final Lookup<TlsStrategy> tlsStrategyLookup,
142 final PoolConcurrencyPolicy poolConcurrencyPolicy,
143 final TimeValue timeToLive) {
144 this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
145 }
146
147 public PoolingAsyncClientConnectionManager(
148 final Lookup<TlsStrategy> tlsStrategyLookup,
149 final PoolConcurrencyPolicy poolConcurrencyPolicy,
150 final PoolReusePolicy poolReusePolicy,
151 final TimeValue timeToLive) {
152 this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
153 }
154
155 public PoolingAsyncClientConnectionManager(
156 final Lookup<TlsStrategy> tlsStrategyLookup,
157 final PoolConcurrencyPolicy poolConcurrencyPolicy,
158 final PoolReusePolicy poolReusePolicy,
159 final TimeValue timeToLive,
160 final SchemePortResolver schemePortResolver,
161 final DnsResolver dnsResolver) {
162 this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
163 poolConcurrencyPolicy, poolReusePolicy, timeToLive);
164 }
165
166 @Internal
167 protected PoolingAsyncClientConnectionManager(
168 final AsyncClientConnectionOperator connectionOperator,
169 final PoolConcurrencyPolicy poolConcurrencyPolicy,
170 final PoolReusePolicy poolReusePolicy,
171 final TimeValue timeToLive) {
172 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
173 switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
174 case STRICT:
175 this.pool = new StrictConnPool<HttpRoute, ManagedAsyncClientConnection>(
176 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
177 DEFAULT_MAX_TOTAL_CONNECTIONS,
178 timeToLive,
179 poolReusePolicy,
180 null) {
181
182 @Override
183 public void closeExpired() {
184 enumAvailable(e -> closeIfExpired(e));
185 }
186
187 };
188 break;
189 case LAX:
190 this.pool = new LaxConnPool<HttpRoute, ManagedAsyncClientConnection>(
191 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
192 timeToLive,
193 poolReusePolicy,
194 null) {
195
196 @Override
197 public void closeExpired() {
198 enumAvailable(e -> closeIfExpired(e));
199 }
200
201 };
202 break;
203 default:
204 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
205 }
206 this.closed = new AtomicBoolean(false);
207 }
208
209 @Internal
210 protected PoolingAsyncClientConnectionManager(
211 final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
212 final AsyncClientConnectionOperator connectionOperator) {
213 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
214 this.pool = Args.notNull(pool, "Connection pool");
215 this.closed = new AtomicBoolean(false);
216 }
217
218 @Override
219 public void close() {
220 close(CloseMode.GRACEFUL);
221 }
222
223 @Override
224 public void close(final CloseMode closeMode) {
225 if (this.closed.compareAndSet(false, true)) {
226 if (LOG.isDebugEnabled()) {
227 LOG.debug("Shutdown connection pool {}", closeMode);
228 }
229 this.pool.close(closeMode);
230 LOG.debug("Connection pool shut down");
231 }
232 }
233
234 private InternalConnectionEndpoint cast(final AsyncConnectionEndpoint endpoint) {
235 if (endpoint instanceof InternalConnectionEndpoint) {
236 return (InternalConnectionEndpoint) endpoint;
237 }
238 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
239 }
240
241 private ConnectionConfig resolveConnectionConfig(final HttpRoute route) {
242 final Resolver<HttpRoute, ConnectionConfig> resolver = this.connectionConfigResolver;
243 final ConnectionConfig connectionConfig = resolver != null ? resolver.resolve(route) : null;
244 return connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
245 }
246
247 private TlsConfig resolveTlsConfig(final HttpHost host, final Object attachment) {
248 if (attachment instanceof TlsConfig) {
249 return (TlsConfig) attachment;
250 }
251 final Resolver<HttpHost, TlsConfig> resolver = this.tlsConfigResolver;
252 final TlsConfig tlsConfig = resolver != null ? resolver.resolve(host) : null;
253 return tlsConfig != null ? tlsConfig : TlsConfig.DEFAULT;
254 }
255
256 @Override
257 public Future<AsyncConnectionEndpoint> lease(
258 final String id,
259 final HttpRoute route,
260 final Object state,
261 final Timeout requestTimeout,
262 final FutureCallback<AsyncConnectionEndpoint> callback) {
263 if (LOG.isDebugEnabled()) {
264 LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
265 }
266 return new Future<AsyncConnectionEndpoint>() {
267
268 final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
269 final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
270
271 final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
272 route,
273 state,
274 requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
275
276 @Override
277 public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
278 if (poolEntry.hasConnection()) {
279 final TimeValue timeToLive = connectionConfig.getTimeToLive();
280 if (TimeValue.isNonNegative(timeToLive)) {
281 if (timeToLive.getDuration() == 0
282 || Deadline.calculate(poolEntry.getCreated(), timeToLive).isExpired()) {
283 poolEntry.discardConnection(CloseMode.GRACEFUL);
284 }
285 }
286 }
287 if (poolEntry.hasConnection()) {
288 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
289 final TimeValue timeValue = connectionConfig.getValidateAfterInactivity();
290 if (connection.isOpen() && TimeValue.isNonNegative(timeValue)) {
291 if (timeValue.getDuration() == 0
292 || Deadline.calculate(poolEntry.getUpdated(), timeValue).isExpired()) {
293 final ProtocolVersion protocolVersion = connection.getProtocolVersion();
294 if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
295 connection.submitCommand(new PingCommand(new BasicPingHandler(result -> {
296 if (result == null || !result) {
297 if (LOG.isDebugEnabled()) {
298 LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
299 }
300 poolEntry.discardConnection(CloseMode.GRACEFUL);
301 }
302 leaseCompleted(poolEntry);
303 })), Command.Priority.IMMEDIATE);
304 return;
305 }
306 if (LOG.isDebugEnabled()) {
307 LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
308 }
309 poolEntry.discardConnection(CloseMode.IMMEDIATE);
310 }
311 }
312 }
313 leaseCompleted(poolEntry);
314 }
315
316 void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
317 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
318 if (connection != null) {
319 connection.activate();
320 }
321 if (LOG.isDebugEnabled()) {
322 LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
323 }
324 final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
325 if (LOG.isDebugEnabled()) {
326 LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
327 }
328 resultFuture.completed(endpoint);
329 }
330
331 @Override
332 public void failed(final Exception ex) {
333 if (LOG.isDebugEnabled()) {
334 LOG.debug("{} endpoint lease failed", id);
335 }
336 resultFuture.failed(ex);
337 }
338
339 @Override
340 public void cancelled() {
341 if (LOG.isDebugEnabled()) {
342 LOG.debug("{} endpoint lease cancelled", id);
343 }
344 resultFuture.cancel();
345 }
346
347 });
348
349 @Override
350 public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
351 return resultFuture.get();
352 }
353
354 @Override
355 public AsyncConnectionEndpoint get(
356 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
357 return resultFuture.get(timeout, unit);
358 }
359
360 @Override
361 public boolean cancel(final boolean mayInterruptIfRunning) {
362 return leaseFuture.cancel(mayInterruptIfRunning);
363 }
364
365 @Override
366 public boolean isDone() {
367 return resultFuture.isDone();
368 }
369
370 @Override
371 public boolean isCancelled() {
372 return resultFuture.isCancelled();
373 }
374
375 };
376 }
377
378 @Override
379 public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
380 Args.notNull(endpoint, "Managed endpoint");
381 Args.notNull(keepAlive, "Keep-alive time");
382 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry = cast(endpoint).detach();
383 if (entry == null) {
384 return;
385 }
386 if (LOG.isDebugEnabled()) {
387 LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
388 }
389 final ManagedAsyncClientConnection connection = entry.getConnection();
390 boolean reusable = connection != null && connection.isOpen();
391 try {
392 if (reusable) {
393 entry.updateState(state);
394 entry.updateExpiry(keepAlive);
395 connection.passivate();
396 if (LOG.isDebugEnabled()) {
397 final String s;
398 if (TimeValue.isPositive(keepAlive)) {
399 s = "for " + keepAlive;
400 } else {
401 s = "indefinitely";
402 }
403 LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
404 }
405 }
406 } catch (final RuntimeException ex) {
407 reusable = false;
408 throw ex;
409 } finally {
410 pool.release(entry, reusable);
411 if (LOG.isDebugEnabled()) {
412 LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
413 }
414 }
415 }
416
417 @Override
418 public Future<AsyncConnectionEndpoint> connect(
419 final AsyncConnectionEndpoint endpoint,
420 final ConnectionInitiator connectionInitiator,
421 final Timeout timeout,
422 final Object attachment,
423 final HttpContext context,
424 final FutureCallback<AsyncConnectionEndpoint> callback) {
425 Args.notNull(endpoint, "Endpoint");
426 Args.notNull(connectionInitiator, "Connection initiator");
427 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
428 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
429 if (internalEndpoint.isConnected()) {
430 resultFuture.completed(endpoint);
431 return resultFuture;
432 }
433 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getPoolEntry();
434 final HttpRoute route = poolEntry.getRoute();
435 final HttpHost host;
436 if (route.getProxyHost() != null) {
437 host = route.getProxyHost();
438 } else {
439 host = route.getTargetHost();
440 }
441 final InetSocketAddress localAddress = route.getLocalSocketAddress();
442 final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
443 final TlsConfig tlsConfig = resolveTlsConfig(host, attachment);
444 final Timeout connectTimeout = timeout != null ? timeout : connectionConfig.getConnectTimeout();
445
446 if (LOG.isDebugEnabled()) {
447 LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
448 }
449 final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
450 connectionInitiator,
451 host,
452 localAddress,
453 connectTimeout,
454 route.isTunnelled() ? TlsConfig.copy(tlsConfig)
455 .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
456 .build() : tlsConfig,
457 context,
458 new FutureCallback<ManagedAsyncClientConnection>() {
459
460 @Override
461 public void completed(final ManagedAsyncClientConnection connection) {
462 try {
463 if (LOG.isDebugEnabled()) {
464 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
465 }
466 final Timeout socketTimeout = connectionConfig.getSocketTimeout();
467 if (socketTimeout != null) {
468 connection.setSocketTimeout(socketTimeout);
469 }
470 poolEntry.assignConnection(connection);
471 resultFuture.completed(internalEndpoint);
472 } catch (final RuntimeException ex) {
473 resultFuture.failed(ex);
474 }
475 }
476
477 @Override
478 public void failed(final Exception ex) {
479 resultFuture.failed(ex);
480 }
481
482 @Override
483 public void cancelled() {
484 resultFuture.cancel();
485 }
486 });
487 resultFuture.setDependency(connectFuture);
488 return resultFuture;
489 }
490
491 @Override
492 public void upgrade(
493 final AsyncConnectionEndpoint endpoint,
494 final Object attachment,
495 final HttpContext context,
496 final FutureCallback<AsyncConnectionEndpoint> callback) {
497 Args.notNull(endpoint, "Managed endpoint");
498 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
499 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
500 final HttpRoute route = poolEntry.getRoute();
501 final HttpHost host = route.getProxyHost() != null ? route.getProxyHost() : route.getTargetHost();
502 final TlsConfig tlsConfig = resolveTlsConfig(host, attachment);
503 connectionOperator.upgrade(
504 poolEntry.getConnection(),
505 route.getTargetHost(),
506 attachment != null ? attachment : tlsConfig,
507 context,
508 new CallbackContribution<ManagedAsyncClientConnection>(callback) {
509
510 @Override
511 public void completed(final ManagedAsyncClientConnection connection) {
512 if (LOG.isDebugEnabled()) {
513 LOG.debug("{} upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
514 }
515 final TlsDetails tlsDetails = connection.getTlsDetails();
516 if (tlsDetails != null && ApplicationProtocol.HTTP_2.id.equals(tlsDetails.getApplicationProtocol())) {
517 connection.switchProtocol(ApplicationProtocol.HTTP_2.id, new CallbackContribution<ProtocolIOSession>(callback) {
518
519 @Override
520 public void completed(final ProtocolIOSession protocolIOSession) {
521 if (callback != null) {
522 callback.completed(endpoint);
523 }
524 }
525
526 });
527 } else {
528 if (callback != null) {
529 callback.completed(endpoint);
530 }
531 }
532 }
533 });
534 }
535
536 @Override
537 public void upgrade(final AsyncConnectionEndpoint endpoint, final Object attachment, final HttpContext context) {
538 upgrade(endpoint, attachment, context, null);
539 }
540
541 @Override
542 public Set<HttpRoute> getRoutes() {
543 return pool.getRoutes();
544 }
545
546 @Override
547 public void setMaxTotal(final int max) {
548 pool.setMaxTotal(max);
549 }
550
551 @Override
552 public int getMaxTotal() {
553 return pool.getMaxTotal();
554 }
555
556 @Override
557 public void setDefaultMaxPerRoute(final int max) {
558 pool.setDefaultMaxPerRoute(max);
559 }
560
561 @Override
562 public int getDefaultMaxPerRoute() {
563 return pool.getDefaultMaxPerRoute();
564 }
565
566 @Override
567 public void setMaxPerRoute(final HttpRoute route, final int max) {
568 pool.setMaxPerRoute(route, max);
569 }
570
571 @Override
572 public int getMaxPerRoute(final HttpRoute route) {
573 return pool.getMaxPerRoute(route);
574 }
575
576 @Override
577 public void closeIdle(final TimeValue idletime) {
578 pool.closeIdle(idletime);
579 }
580
581 @Override
582 public void closeExpired() {
583 pool.closeExpired();
584 }
585
586 @Override
587 public PoolStats getTotalStats() {
588 return pool.getTotalStats();
589 }
590
591 @Override
592 public PoolStats getStats(final HttpRoute route) {
593 return pool.getStats(route);
594 }
595
596
597
598
599
600
601 public void setDefaultConnectionConfig(final ConnectionConfig config) {
602 this.connectionConfigResolver = (route) -> config;
603 }
604
605
606
607
608
609
610 public void setConnectionConfigResolver(final Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver) {
611 this.connectionConfigResolver = connectionConfigResolver;
612 }
613
614
615
616
617
618
619 public void setDefaultTlsConfig(final TlsConfig config) {
620 this.tlsConfigResolver = (host) -> config;
621 }
622
623
624
625
626
627
628 public void setTlsConfigResolver(final Resolver<HttpHost, TlsConfig> tlsConfigResolver) {
629 this.tlsConfigResolver = tlsConfigResolver;
630 }
631
632 void closeIfExpired(final PoolEntry<HttpRoute, ManagedAsyncClientConnection > entry) {
633 final long now = System.currentTimeMillis();
634 if (entry.getExpiryDeadline().isBefore(now)) {
635 entry.discardConnection(CloseMode.GRACEFUL);
636 } else {
637 final ConnectionConfig connectionConfig = resolveConnectionConfig(entry.getRoute());
638 final TimeValue timeToLive = connectionConfig.getTimeToLive();
639 if (timeToLive != null && Deadline.calculate(entry.getCreated(), timeToLive).isBefore(now)) {
640 entry.discardConnection(CloseMode.GRACEFUL);
641 }
642 }
643 }
644
645
646
647
648 @Deprecated
649 public TimeValue getValidateAfterInactivity() {
650 return ConnectionConfig.DEFAULT.getValidateAfterInactivity();
651 }
652
653
654
655
656
657
658
659
660
661
662 @Deprecated
663 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
664 setDefaultConnectionConfig(ConnectionConfig.custom()
665 .setValidateAfterInactivity(validateAfterInactivity)
666 .build());
667 }
668
669 private static final PrefixedIncrementingId INCREMENTING_ID = new PrefixedIncrementingId("ep-");
670
671 static class InternalConnectionEndpoint extends AsyncConnectionEndpoint implements Identifiable {
672
673 private final AtomicReference<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> poolEntryRef;
674 private final String id;
675
676 InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
677 this.poolEntryRef = new AtomicReference<>(poolEntry);
678 this.id = INCREMENTING_ID.getNextId();
679 }
680
681 @Override
682 public String getId() {
683 return id;
684 }
685
686 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getPoolEntry() {
687 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
688 if (poolEntry == null) {
689 throw new ConnectionShutdownException();
690 }
691 return poolEntry;
692 }
693
694 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getValidatedPoolEntry() {
695 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = getPoolEntry();
696 if (poolEntry.getConnection() == null) {
697 throw new ConnectionShutdownException();
698 }
699 return poolEntry;
700 }
701
702 PoolEntry<HttpRoute, ManagedAsyncClientConnection> detach() {
703 return poolEntryRef.getAndSet(null);
704 }
705
706 @Override
707 public void close(final CloseMode closeMode) {
708 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
709 if (poolEntry != null) {
710 if (LOG.isDebugEnabled()) {
711 LOG.debug("{} close {}", id, closeMode);
712 }
713 poolEntry.discardConnection(closeMode);
714 }
715 }
716
717 @Override
718 public boolean isConnected() {
719 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
720 if (poolEntry == null) {
721 return false;
722 }
723 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
724 if (connection == null) {
725 return false;
726 }
727 if (!connection.isOpen()) {
728 poolEntry.discardConnection(CloseMode.IMMEDIATE);
729 return false;
730 }
731 return true;
732 }
733
734 @Override
735 public void setSocketTimeout(final Timeout timeout) {
736 getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
737 }
738
739 @Override
740 public void execute(
741 final String exchangeId,
742 final AsyncClientExchangeHandler exchangeHandler,
743 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
744 final HttpContext context) {
745 final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
746 if (LOG.isDebugEnabled()) {
747 LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
748 }
749 context.setProtocolVersion(connection.getProtocolVersion());
750 connection.submitCommand(
751 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
752 Command.Priority.NORMAL);
753 }
754
755 }
756
757
758
759
760
761
762
763
764 boolean isClosed() {
765 return this.closed.get();
766 }
767
768 }