View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.nio;
29  
30  import java.util.Set;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.TimeoutException;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicReference;
37  
38  import org.apache.hc.client5.http.DnsResolver;
39  import org.apache.hc.client5.http.EndpointInfo;
40  import org.apache.hc.client5.http.HttpRoute;
41  import org.apache.hc.client5.http.SchemePortResolver;
42  import org.apache.hc.client5.http.config.ConnectionConfig;
43  import org.apache.hc.client5.http.config.TlsConfig;
44  import org.apache.hc.client5.http.impl.ConnPoolSupport;
45  import org.apache.hc.client5.http.impl.ConnectionShutdownException;
46  import org.apache.hc.client5.http.impl.PrefixedIncrementingId;
47  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
48  import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
49  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
50  import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
51  import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
52  import org.apache.hc.core5.annotation.Contract;
53  import org.apache.hc.core5.annotation.Internal;
54  import org.apache.hc.core5.annotation.ThreadingBehavior;
55  import org.apache.hc.core5.concurrent.BasicFuture;
56  import org.apache.hc.core5.concurrent.CallbackContribution;
57  import org.apache.hc.core5.concurrent.ComplexFuture;
58  import org.apache.hc.core5.concurrent.FutureCallback;
59  import org.apache.hc.core5.function.Resolver;
60  import org.apache.hc.core5.http.HttpHost;
61  import org.apache.hc.core5.http.HttpVersion;
62  import org.apache.hc.core5.http.ProtocolVersion;
63  import org.apache.hc.core5.http.URIScheme;
64  import org.apache.hc.core5.http.config.Lookup;
65  import org.apache.hc.core5.http.config.RegistryBuilder;
66  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
67  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
68  import org.apache.hc.core5.http.nio.HandlerFactory;
69  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
70  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
71  import org.apache.hc.core5.http.protocol.HttpContext;
72  import org.apache.hc.core5.http2.HttpVersionPolicy;
73  import org.apache.hc.core5.http2.nio.command.PingCommand;
74  import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
75  import org.apache.hc.core5.http2.ssl.ApplicationProtocol;
76  import org.apache.hc.core5.io.CloseMode;
77  import org.apache.hc.core5.pool.ConnPoolControl;
78  import org.apache.hc.core5.pool.LaxConnPool;
79  import org.apache.hc.core5.pool.ManagedConnPool;
80  import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
81  import org.apache.hc.core5.pool.PoolEntry;
82  import org.apache.hc.core5.pool.PoolReusePolicy;
83  import org.apache.hc.core5.pool.PoolStats;
84  import org.apache.hc.core5.pool.StrictConnPool;
85  import org.apache.hc.core5.reactor.Command;
86  import org.apache.hc.core5.reactor.ConnectionInitiator;
87  import org.apache.hc.core5.reactor.ProtocolIOSession;
88  import org.apache.hc.core5.reactor.ssl.TlsDetails;
89  import org.apache.hc.core5.util.Args;
90  import org.apache.hc.core5.util.Deadline;
91  import org.apache.hc.core5.util.Identifiable;
92  import org.apache.hc.core5.util.TimeValue;
93  import org.apache.hc.core5.util.Timeout;
94  import org.slf4j.Logger;
95  import org.slf4j.LoggerFactory;
96  
97  /**
98   * {@code PoolingAsyncClientConnectionManager} maintains a pool of non-blocking
99   * {@link org.apache.hc.core5.http.HttpConnection}s and is able to service
100  * connection requests from multiple execution threads. Connections are pooled
101  * on a per route basis. A request for a route which already the manager has
102  * persistent connections for available in the pool will be services by leasing
103  * a connection from the pool rather than creating a new connection.
104  * <p>
105  * {@code PoolingAsyncClientConnectionManager} maintains a maximum limit
106  * of connection on a per route basis and in total. Connection limits
107  * can be adjusted using {@link ConnPoolControl} methods.
108  * <p>
109  * Total time to live (TTL) set at construction time defines maximum life span
110  * of persistent connections regardless of their expiration setting. No persistent
111  * connection will be re-used past its TTL value.
112  *
113  * @since 5.0
114  */
115 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
116 public class PoolingAsyncClientConnectionManager implements AsyncClientConnectionManager, ConnPoolControl<HttpRoute> {
117 
118     private static final Logger LOG = LoggerFactory.getLogger(PoolingAsyncClientConnectionManager.class);
119 
120     public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
121     public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
122 
123     private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
124     private final AsyncClientConnectionOperator connectionOperator;
125     private final AtomicBoolean closed;
126 
127     private volatile Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
128     private volatile Resolver<HttpHost, TlsConfig> tlsConfigResolver;
129 
130     public PoolingAsyncClientConnectionManager() {
131         this(RegistryBuilder.<TlsStrategy>create()
132                 .register(URIScheme.HTTPS.getId(), DefaultClientTlsStrategy.createDefault())
133                 .build());
134     }
135 
136     public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
137         this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND);
138     }
139 
140     public PoolingAsyncClientConnectionManager(
141             final Lookup<TlsStrategy> tlsStrategyLookup,
142             final PoolConcurrencyPolicy poolConcurrencyPolicy,
143             final TimeValue timeToLive) {
144         this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
145     }
146 
147     public PoolingAsyncClientConnectionManager(
148             final Lookup<TlsStrategy> tlsStrategyLookup,
149             final PoolConcurrencyPolicy poolConcurrencyPolicy,
150             final PoolReusePolicy poolReusePolicy,
151             final TimeValue timeToLive) {
152         this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
153     }
154 
155     public PoolingAsyncClientConnectionManager(
156             final Lookup<TlsStrategy> tlsStrategyLookup,
157             final PoolConcurrencyPolicy poolConcurrencyPolicy,
158             final PoolReusePolicy poolReusePolicy,
159             final TimeValue timeToLive,
160             final SchemePortResolver schemePortResolver,
161             final DnsResolver dnsResolver) {
162         this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
163                 poolConcurrencyPolicy, poolReusePolicy, timeToLive);
164     }
165 
166     @Internal
167     public PoolingAsyncClientConnectionManager(
168             final AsyncClientConnectionOperator connectionOperator,
169             final PoolConcurrencyPolicy poolConcurrencyPolicy,
170             final PoolReusePolicy poolReusePolicy,
171             final TimeValue timeToLive) {
172         this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
173         switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
174             case STRICT:
175                 this.pool = new StrictConnPool<HttpRoute, ManagedAsyncClientConnection>(
176                         DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
177                         DEFAULT_MAX_TOTAL_CONNECTIONS,
178                         timeToLive,
179                         poolReusePolicy,
180                         null) {
181 
182                     @Override
183                     public void closeExpired() {
184                         enumAvailable(e -> closeIfExpired(e));
185                     }
186 
187                 };
188                 break;
189             case LAX:
190                 this.pool = new LaxConnPool<HttpRoute, ManagedAsyncClientConnection>(
191                         DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
192                         timeToLive,
193                         poolReusePolicy,
194                         null) {
195 
196                     @Override
197                     public void closeExpired() {
198                         enumAvailable(e -> closeIfExpired(e));
199                     }
200 
201                 };
202                 break;
203             default:
204                 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
205         }
206         this.closed = new AtomicBoolean(false);
207     }
208 
209     @Internal
210     protected PoolingAsyncClientConnectionManager(
211             final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
212             final AsyncClientConnectionOperator connectionOperator) {
213         this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
214         this.pool = Args.notNull(pool, "Connection pool");
215         this.closed = new AtomicBoolean(false);
216     }
217 
218     @Override
219     public void close() {
220         close(CloseMode.GRACEFUL);
221     }
222 
223     @Override
224     public void close(final CloseMode closeMode) {
225         if (this.closed.compareAndSet(false, true)) {
226             if (LOG.isDebugEnabled()) {
227                 LOG.debug("Shutdown connection pool {}", closeMode);
228             }
229             this.pool.close(closeMode);
230             LOG.debug("Connection pool shut down");
231         }
232     }
233 
234     private InternalConnectionEndpoint cast(final AsyncConnectionEndpoint endpoint) {
235         if (endpoint instanceof InternalConnectionEndpoint) {
236             return (InternalConnectionEndpoint) endpoint;
237         }
238         throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
239     }
240 
241     private ConnectionConfig resolveConnectionConfig(final HttpRoute route) {
242         final Resolver<HttpRoute, ConnectionConfig> resolver = this.connectionConfigResolver;
243         final ConnectionConfig connectionConfig = resolver != null ? resolver.resolve(route) : null;
244         return connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
245     }
246 
247     private TlsConfig resolveTlsConfig(final HttpHost host) {
248         final Resolver<HttpHost, TlsConfig> resolver = this.tlsConfigResolver;
249         TlsConfig tlsConfig = resolver != null ? resolver.resolve(host) : null;
250         if (tlsConfig == null) {
251             tlsConfig = TlsConfig.DEFAULT;
252         }
253         if (URIScheme.HTTP.same(host.getSchemeName())
254                 && tlsConfig.getHttpVersionPolicy() == HttpVersionPolicy.NEGOTIATE) {
255             // Plain HTTP does not support protocol negotiation.
256             // Fall back to HTTP/1.1
257             tlsConfig = TlsConfig.copy(tlsConfig)
258                     .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
259                     .build();
260         }
261         return tlsConfig;
262     }
263 
264     @Override
265     public Future<AsyncConnectionEndpoint> lease(
266             final String id,
267             final HttpRoute route,
268             final Object state,
269             final Timeout requestTimeout,
270             final FutureCallback<AsyncConnectionEndpoint> callback) {
271         if (LOG.isDebugEnabled()) {
272             LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
273         }
274         return new Future<AsyncConnectionEndpoint>() {
275 
276             final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
277             final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
278 
279             final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
280                     route,
281                     state,
282                     requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
283 
284                         @Override
285                         public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
286                             if (poolEntry.hasConnection()) {
287                                 final TimeValue timeToLive = connectionConfig.getTimeToLive();
288                                 if (TimeValue.isNonNegative(timeToLive)) {
289                                     if (timeToLive.getDuration() == 0
290                                             || Deadline.calculate(poolEntry.getCreated(), timeToLive).isExpired()) {
291                                         poolEntry.discardConnection(CloseMode.GRACEFUL);
292                                     }
293                                 }
294                             }
295                             if (poolEntry.hasConnection()) {
296                                 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
297                                 final TimeValue timeValue = connectionConfig.getValidateAfterInactivity();
298                                 if (connection.isOpen() && TimeValue.isNonNegative(timeValue)) {
299                                     if (timeValue.getDuration() == 0
300                                             || Deadline.calculate(poolEntry.getUpdated(), timeValue).isExpired()) {
301                                         final ProtocolVersion protocolVersion = connection.getProtocolVersion();
302                                         if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
303                                             connection.submitCommand(new PingCommand(new BasicPingHandler(result -> {
304                                                 if (result == null || !result)  {
305                                                     if (LOG.isDebugEnabled()) {
306                                                         LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
307                                                     }
308                                                     poolEntry.discardConnection(CloseMode.GRACEFUL);
309                                                 }
310                                                 leaseCompleted(poolEntry);
311                                             })), Command.Priority.IMMEDIATE);
312                                             return;
313                                         }
314                                         if (LOG.isDebugEnabled()) {
315                                             LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
316                                         }
317                                         poolEntry.discardConnection(CloseMode.IMMEDIATE);
318                                     }
319                                 }
320                             }
321                             leaseCompleted(poolEntry);
322                         }
323 
324                         void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
325                             final ManagedAsyncClientConnection connection = poolEntry.getConnection();
326                             if (connection != null) {
327                                 connection.activate();
328                             }
329                             if (LOG.isDebugEnabled()) {
330                                 LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
331                             }
332                             final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
333                             if (LOG.isDebugEnabled()) {
334                                 LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
335                             }
336                             resultFuture.completed(endpoint);
337                         }
338 
339                         @Override
340                         public void failed(final Exception ex) {
341                             if (LOG.isDebugEnabled()) {
342                                 LOG.debug("{} endpoint lease failed", id);
343                             }
344                             resultFuture.failed(ex);
345                         }
346 
347                         @Override
348                         public void cancelled() {
349                             if (LOG.isDebugEnabled()) {
350                                 LOG.debug("{} endpoint lease cancelled", id);
351                             }
352                             resultFuture.cancel();
353                         }
354 
355                     });
356 
357             @Override
358             public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
359                 return resultFuture.get();
360             }
361 
362             @Override
363             public AsyncConnectionEndpoint get(
364                     final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
365                 return resultFuture.get(timeout, unit);
366             }
367 
368             @Override
369             public boolean cancel(final boolean mayInterruptIfRunning) {
370                 return leaseFuture.cancel(mayInterruptIfRunning);
371             }
372 
373             @Override
374             public boolean isDone() {
375                 return resultFuture.isDone();
376             }
377 
378             @Override
379             public boolean isCancelled() {
380                 return resultFuture.isCancelled();
381             }
382 
383         };
384     }
385 
386     @Override
387     public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
388         Args.notNull(endpoint, "Managed endpoint");
389         Args.notNull(keepAlive, "Keep-alive time");
390         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry = cast(endpoint).detach();
391         if (entry == null) {
392             return;
393         }
394         if (LOG.isDebugEnabled()) {
395             LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
396         }
397         if (this.isClosed()) {
398             return;
399         }
400         final ManagedAsyncClientConnection connection = entry.getConnection();
401         boolean reusable = connection != null && connection.isOpen();
402         try {
403             if (reusable) {
404                 entry.updateState(state);
405                 entry.updateExpiry(keepAlive);
406                 connection.passivate();
407                 if (LOG.isDebugEnabled()) {
408                     final String s;
409                     if (TimeValue.isPositive(keepAlive)) {
410                         s = "for " + keepAlive;
411                     } else {
412                         s = "indefinitely";
413                     }
414                     LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
415                 }
416             }
417         } catch (final RuntimeException ex) {
418             reusable = false;
419             throw ex;
420         } finally {
421             pool.release(entry, reusable);
422             if (LOG.isDebugEnabled()) {
423                 LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
424             }
425         }
426     }
427 
428     @Override
429     public Future<AsyncConnectionEndpoint> connect(
430             final AsyncConnectionEndpoint endpoint,
431             final ConnectionInitiator connectionInitiator,
432             final Timeout timeout,
433             final Object attachment,
434             final HttpContext context,
435             final FutureCallback<AsyncConnectionEndpoint> callback) {
436         Args.notNull(endpoint, "Endpoint");
437         Args.notNull(connectionInitiator, "Connection initiator");
438         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
439         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
440         if (internalEndpoint.isConnected()) {
441             resultFuture.completed(endpoint);
442             return resultFuture;
443         }
444         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getPoolEntry();
445         final HttpRoute route = poolEntry.getRoute();
446         final HttpHost firstHop = route.getProxyHost() != null ? route.getProxyHost(): route.getTargetHost();
447         final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
448         final Timeout connectTimeout = timeout != null ? timeout : connectionConfig.getConnectTimeout();
449 
450         if (LOG.isDebugEnabled()) {
451             LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), firstHop, connectTimeout);
452         }
453         final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
454                 connectionInitiator,
455                 firstHop,
456                 route.getTargetName(),
457                 route.getLocalSocketAddress(),
458                 connectTimeout,
459                 route.isTunnelled() ? null : resolveTlsConfig(route.getTargetHost()),
460                 context,
461                 new FutureCallback<ManagedAsyncClientConnection>() {
462 
463                     @Override
464                     public void completed(final ManagedAsyncClientConnection connection) {
465                         try {
466                             if (LOG.isDebugEnabled()) {
467                                 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
468                             }
469                             final Timeout socketTimeout = connectionConfig.getSocketTimeout();
470                             if (socketTimeout != null) {
471                                 connection.setSocketTimeout(socketTimeout);
472                             }
473                             poolEntry.assignConnection(connection);
474                             resultFuture.completed(internalEndpoint);
475                         } catch (final RuntimeException ex) {
476                             resultFuture.failed(ex);
477                         }
478                     }
479 
480                     @Override
481                     public void failed(final Exception ex) {
482                         resultFuture.failed(ex);
483                     }
484 
485                     @Override
486                     public void cancelled() {
487                         resultFuture.cancel();
488                     }
489                 });
490         resultFuture.setDependency(connectFuture);
491         return resultFuture;
492     }
493 
494     @Override
495     public void upgrade(
496             final AsyncConnectionEndpoint endpoint,
497             final Object attachment,
498             final HttpContext context,
499             final FutureCallback<AsyncConnectionEndpoint> callback) {
500         Args.notNull(endpoint, "Managed endpoint");
501         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
502         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
503         final HttpRoute route = poolEntry.getRoute();
504         final HttpHost target = route.getTargetHost();
505         connectionOperator.upgrade(
506                 poolEntry.getConnection(),
507                 target,
508                 route.getTargetName(),
509                 attachment != null ? attachment : resolveTlsConfig(target),
510                 context,
511                 new CallbackContribution<ManagedAsyncClientConnection>(callback) {
512 
513                     @Override
514                     public void completed(final ManagedAsyncClientConnection connection) {
515                         if (LOG.isDebugEnabled()) {
516                             LOG.debug("{} upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
517                         }
518                         final TlsDetails tlsDetails = connection.getTlsDetails();
519                         if (tlsDetails != null && ApplicationProtocol.HTTP_2.id.equals(tlsDetails.getApplicationProtocol())) {
520                             connection.switchProtocol(ApplicationProtocol.HTTP_2.id, new CallbackContribution<ProtocolIOSession>(callback) {
521 
522                                 @Override
523                                 public void completed(final ProtocolIOSession protocolIOSession) {
524                                     if (callback != null) {
525                                         callback.completed(endpoint);
526                                     }
527                                 }
528 
529                             });
530                         } else {
531                             if (callback != null) {
532                                 callback.completed(endpoint);
533                             }
534                         }
535                     }
536                 });
537     }
538 
539     @Override
540     public void upgrade(final AsyncConnectionEndpoint endpoint, final Object attachment, final HttpContext context) {
541         upgrade(endpoint, attachment, context, null);
542     }
543 
544     @Override
545     public Set<HttpRoute> getRoutes() {
546         return pool.getRoutes();
547     }
548 
549     @Override
550     public void setMaxTotal(final int max) {
551         pool.setMaxTotal(max);
552     }
553 
554     @Override
555     public int getMaxTotal() {
556         return pool.getMaxTotal();
557     }
558 
559     @Override
560     public void setDefaultMaxPerRoute(final int max) {
561         pool.setDefaultMaxPerRoute(max);
562     }
563 
564     @Override
565     public int getDefaultMaxPerRoute() {
566         return pool.getDefaultMaxPerRoute();
567     }
568 
569     @Override
570     public void setMaxPerRoute(final HttpRoute route, final int max) {
571         pool.setMaxPerRoute(route, max);
572     }
573 
574     @Override
575     public int getMaxPerRoute(final HttpRoute route) {
576         return pool.getMaxPerRoute(route);
577     }
578 
579     @Override
580     public void closeIdle(final TimeValue idletime) {
581         if (isClosed()) {
582             return;
583         }
584         pool.closeIdle(idletime);
585     }
586 
587     @Override
588     public void closeExpired() {
589         if (isClosed()) {
590             return;
591         }
592         pool.closeExpired();
593     }
594 
595     @Override
596     public PoolStats getTotalStats() {
597         return pool.getTotalStats();
598     }
599 
600     @Override
601     public PoolStats getStats(final HttpRoute route) {
602         return pool.getStats(route);
603     }
604 
605     /**
606      * Sets the same {@link ConnectionConfig} for all routes
607      *
608      * @since 5.2
609      */
610     public void setDefaultConnectionConfig(final ConnectionConfig config) {
611         this.connectionConfigResolver = (route) -> config;
612     }
613 
614     /**
615      * Sets {@link Resolver} of {@link ConnectionConfig} on a per route basis.
616      *
617      * @since 5.2
618      */
619     public void setConnectionConfigResolver(final Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver) {
620         this.connectionConfigResolver = connectionConfigResolver;
621     }
622 
623     /**
624      * Sets the same {@link ConnectionConfig} for all hosts
625      *
626      * @since 5.2
627      */
628     public void setDefaultTlsConfig(final TlsConfig config) {
629         this.tlsConfigResolver = (host) -> config;
630     }
631 
632     /**
633      * Sets {@link Resolver} of {@link TlsConfig} on a per host basis.
634      *
635      * @since 5.2
636      */
637     public void setTlsConfigResolver(final Resolver<HttpHost, TlsConfig> tlsConfigResolver) {
638         this.tlsConfigResolver = tlsConfigResolver;
639     }
640 
641     void closeIfExpired(final PoolEntry<HttpRoute, ManagedAsyncClientConnection > entry) {
642         final long now = System.currentTimeMillis();
643         if (entry.getExpiryDeadline().isBefore(now)) {
644             entry.discardConnection(CloseMode.GRACEFUL);
645         } else {
646             final ConnectionConfig connectionConfig = resolveConnectionConfig(entry.getRoute());
647             final TimeValue timeToLive = connectionConfig.getTimeToLive();
648             if (timeToLive != null && Deadline.calculate(entry.getCreated(), timeToLive).isBefore(now)) {
649                 entry.discardConnection(CloseMode.GRACEFUL);
650             }
651         }
652     }
653 
654     /**
655      * @deprecated Use custom {@link #setConnectionConfigResolver(Resolver)}
656      */
657     @Deprecated
658     public TimeValue getValidateAfterInactivity() {
659         return ConnectionConfig.DEFAULT.getValidateAfterInactivity();
660     }
661 
662     /**
663      * Defines period of inactivity after which persistent connections must
664      * be re-validated prior to being {@link #lease(String, HttpRoute, Object, Timeout,
665      * FutureCallback)} leased} to the consumer. Negative values passed
666      * to this method disable connection validation. This check helps detect connections
667      * that have become stale (half-closed) while kept inactive in the pool.
668      *
669      * @deprecated Use {@link #setConnectionConfigResolver(Resolver)}.
670      */
671     @Deprecated
672     public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
673         setDefaultConnectionConfig(ConnectionConfig.custom()
674                 .setValidateAfterInactivity(validateAfterInactivity)
675                 .build());
676     }
677 
678     private static final PrefixedIncrementingId INCREMENTING_ID = new PrefixedIncrementingId("ep-");
679 
680     static class InternalConnectionEndpoint extends AsyncConnectionEndpoint implements Identifiable {
681 
682         private final AtomicReference<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> poolEntryRef;
683         private final String id;
684 
685         InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
686             this.poolEntryRef = new AtomicReference<>(poolEntry);
687             this.id = INCREMENTING_ID.getNextId();
688         }
689 
690         @Override
691         public String getId() {
692             return id;
693         }
694 
695         PoolEntry<HttpRoute, ManagedAsyncClientConnection> getPoolEntry() {
696             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
697             if (poolEntry == null) {
698                 throw new ConnectionShutdownException();
699             }
700             return poolEntry;
701         }
702 
703         PoolEntry<HttpRoute, ManagedAsyncClientConnection> getValidatedPoolEntry() {
704             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = getPoolEntry();
705             if (poolEntry.getConnection() == null) {
706                 throw new ConnectionShutdownException();
707             }
708             return poolEntry;
709         }
710 
711         PoolEntry<HttpRoute, ManagedAsyncClientConnection> detach() {
712             return poolEntryRef.getAndSet(null);
713         }
714 
715         @Override
716         public void close(final CloseMode closeMode) {
717             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
718             if (poolEntry != null) {
719                 if (LOG.isDebugEnabled()) {
720                     LOG.debug("{} close {}", id, closeMode);
721                 }
722                 poolEntry.discardConnection(closeMode);
723             }
724         }
725 
726         @Override
727         public boolean isConnected() {
728             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
729             if (poolEntry == null) {
730                 return false;
731             }
732             final ManagedAsyncClientConnection connection = poolEntry.getConnection();
733             if (connection == null) {
734                 return false;
735             }
736             if (!connection.isOpen()) {
737                 poolEntry.discardConnection(CloseMode.IMMEDIATE);
738                 return false;
739             }
740             return true;
741         }
742 
743         @Override
744         public void setSocketTimeout(final Timeout timeout) {
745             getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
746         }
747 
748         @Override
749         public void execute(
750                 final String exchangeId,
751                 final AsyncClientExchangeHandler exchangeHandler,
752                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
753                 final HttpContext context) {
754             final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
755             if (LOG.isDebugEnabled()) {
756                 LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
757             }
758             context.setProtocolVersion(connection.getProtocolVersion());
759             connection.submitCommand(
760                     new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
761                     Command.Priority.NORMAL);
762         }
763 
764         @Override
765         public EndpointInfo getInfo() {
766             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
767             if (poolEntry != null) {
768                 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
769                 if (connection != null && connection.isOpen()) {
770                     final TlsDetails tlsDetails = connection.getTlsDetails();
771                     return new EndpointInfo(connection.getProtocolVersion(), tlsDetails != null ? tlsDetails.getSSLSession() : null);
772                 }
773             }
774             return null;
775         }
776 
777     }
778 
779     /**
780      * Method that can be called to determine whether the connection manager has been shut down and
781      * is closed or not.
782      *
783      * @return {@code true} if the connection manager has been shut down and is closed, otherwise
784      * return {@code false}.
785      * @since 5.4
786      */
787     public boolean isClosed() {
788         return this.closed.get();
789     }
790 
791 }