View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.io;
29  
30  import java.io.IOException;
31  import java.time.Instant;
32  import java.util.Objects;
33  import java.util.concurrent.ExecutionException;
34  import java.util.concurrent.TimeoutException;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicLong;
37  import java.util.concurrent.atomic.AtomicReference;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.hc.client5.http.DnsResolver;
41  import org.apache.hc.client5.http.HttpRoute;
42  import org.apache.hc.client5.http.SchemePortResolver;
43  import org.apache.hc.client5.http.config.ConnectionConfig;
44  import org.apache.hc.client5.http.config.TlsConfig;
45  import org.apache.hc.client5.http.impl.ConnPoolSupport;
46  import org.apache.hc.client5.http.impl.ConnectionShutdownException;
47  import org.apache.hc.client5.http.io.ConnectionEndpoint;
48  import org.apache.hc.client5.http.io.HttpClientConnectionManager;
49  import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
50  import org.apache.hc.client5.http.io.LeaseRequest;
51  import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
52  import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
53  import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
54  import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
55  import org.apache.hc.core5.annotation.Contract;
56  import org.apache.hc.core5.annotation.ThreadingBehavior;
57  import org.apache.hc.core5.http.ClassicHttpRequest;
58  import org.apache.hc.core5.http.ClassicHttpResponse;
59  import org.apache.hc.core5.http.HttpException;
60  import org.apache.hc.core5.http.HttpHost;
61  import org.apache.hc.core5.http.URIScheme;
62  import org.apache.hc.core5.http.config.Lookup;
63  import org.apache.hc.core5.http.config.Registry;
64  import org.apache.hc.core5.http.config.RegistryBuilder;
65  import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
66  import org.apache.hc.core5.http.io.HttpConnectionFactory;
67  import org.apache.hc.core5.http.io.SocketConfig;
68  import org.apache.hc.core5.http.protocol.HttpContext;
69  import org.apache.hc.core5.io.CloseMode;
70  import org.apache.hc.core5.util.Args;
71  import org.apache.hc.core5.util.Asserts;
72  import org.apache.hc.core5.util.Deadline;
73  import org.apache.hc.core5.util.TimeValue;
74  import org.apache.hc.core5.util.Timeout;
75  import org.slf4j.Logger;
76  import org.slf4j.LoggerFactory;
77  
78  /**
79   * A connection manager for a single connection. This connection manager maintains only one active
80   * connection. Even though this class is fully thread-safe it ought to be used by one execution
81   * thread only, as only one thread a time can lease the connection at a time.
82   * <p>
83   * This connection manager will make an effort to reuse the connection for subsequent requests
84   * with the same {@link HttpRoute route}. It will, however, close the existing connection and
85   * open it for the given route, if the route of the persistent connection does not match that
86   * of the connection request. If the connection has been already been allocated
87   * {@link IllegalStateException} is thrown.
88   * </p>
89   * <p>
90   * This connection manager implementation should be used inside an EJB container instead of
91   * {@link PoolingHttpClientConnectionManager}.
92   * </p>
93   *
94   * @since 4.3
95   */
96  @Contract(threading = ThreadingBehavior.SAFE)
97  public class BasicHttpClientConnectionManager implements HttpClientConnectionManager {
98  
99      private static final Logger LOG = LoggerFactory.getLogger(BasicHttpClientConnectionManager.class);
100 
101     private static final AtomicLong COUNT = new AtomicLong(0);
102 
103     private final HttpClientConnectionOperator connectionOperator;
104     private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
105     private final String id;
106 
107     private final ReentrantLock lock;
108 
109     private ManagedHttpClientConnection conn;
110     private HttpRoute route;
111     private Object state;
112     private long created;
113     private long updated;
114     private long expiry;
115     private boolean leased;
116     private SocketConfig socketConfig;
117     private ConnectionConfig connectionConfig;
118     private TlsConfig tlsConfig;
119 
120     private final AtomicBoolean closed;
121 
122     private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
123         return RegistryBuilder.<ConnectionSocketFactory>create()
124                 .register(URIScheme.HTTP.id, PlainConnectionSocketFactory.getSocketFactory())
125                 .register(URIScheme.HTTPS.id, SSLConnectionSocketFactory.getSocketFactory())
126                 .build();
127     }
128 
129     public BasicHttpClientConnectionManager(
130             final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
131             final HttpConnectionFactory<ManagedHttpClientConnection> connFactory,
132             final SchemePortResolver schemePortResolver,
133             final DnsResolver dnsResolver) {
134       this(new DefaultHttpClientConnectionOperator(
135               socketFactoryRegistry, schemePortResolver, dnsResolver), connFactory);
136     }
137 
138     /**
139      * @since 4.4
140      */
141     public BasicHttpClientConnectionManager(
142             final HttpClientConnectionOperator httpClientConnectionOperator,
143             final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
144         super();
145         this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
146         this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
147         this.id = String.format("ep-%010d", COUNT.getAndIncrement());
148         this.expiry = Long.MAX_VALUE;
149         this.socketConfig = SocketConfig.DEFAULT;
150         this.connectionConfig = ConnectionConfig.DEFAULT;
151         this.tlsConfig = TlsConfig.DEFAULT;
152         this.closed = new AtomicBoolean(false);
153         this.lock = new ReentrantLock();
154     }
155 
156     public BasicHttpClientConnectionManager(
157             final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
158             final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
159         this(socketFactoryRegistry, connFactory, null, null);
160     }
161 
162     public BasicHttpClientConnectionManager(
163             final Lookup<ConnectionSocketFactory> socketFactoryRegistry) {
164         this(socketFactoryRegistry, null, null, null);
165     }
166 
167     public BasicHttpClientConnectionManager() {
168         this(getDefaultRegistry(), null, null, null);
169     }
170 
171     @Override
172     public void close() {
173         close(CloseMode.GRACEFUL);
174     }
175 
176     @Override
177     public void close(final CloseMode closeMode) {
178         if (this.closed.compareAndSet(false, true)) {
179             closeConnection(closeMode);
180         }
181     }
182 
183     HttpRoute getRoute() {
184         return route;
185     }
186 
187     Object getState() {
188         return state;
189     }
190 
191     public SocketConfig getSocketConfig() {
192         lock.lock();
193         try {
194             return socketConfig;
195         } finally {
196             lock.unlock();
197         }
198     }
199 
200     public void setSocketConfig(final SocketConfig socketConfig) {
201         lock.lock();
202         try {
203             this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
204         } finally {
205             lock.unlock();
206         }
207     }
208 
209     /**
210      * @since 5.2
211      */
212     public ConnectionConfig getConnectionConfig() {
213         lock.lock();
214         try {
215             return connectionConfig;
216         } finally {
217             lock.unlock();
218         }
219 
220     }
221 
222     /**
223      * @since 5.2
224      */
225     public void setConnectionConfig(final ConnectionConfig connectionConfig) {
226         lock.lock();
227         try {
228             this.connectionConfig = connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
229         } finally {
230             lock.unlock();
231         }
232     }
233 
234     /**
235      * @since 5.2
236      */
237     public TlsConfig getTlsConfig() {
238         lock.lock();
239         try {
240             return tlsConfig;
241         } finally {
242             lock.unlock();
243         }
244     }
245 
246     /**
247      * @since 5.2
248      */
249     public void setTlsConfig(final TlsConfig tlsConfig) {
250         lock.lock();
251         try {
252             this.tlsConfig = tlsConfig != null ? tlsConfig : TlsConfig.DEFAULT;
253         } finally {
254             lock.unlock();
255         }
256     }
257 
258     public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
259         return lease(id, route, Timeout.DISABLED, state);
260     }
261 
262     @Override
263     public LeaseRequest lease(final String id, final HttpRoute route, final Timeout requestTimeout, final Object state) {
264         return new LeaseRequest() {
265 
266             @Override
267             public ConnectionEndpoint get(
268                     final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
269                 try {
270                     return new InternalConnectionEndpoint(route, getConnection(route, state));
271                 } catch (final IOException ex) {
272                     throw new ExecutionException(ex.getMessage(), ex);
273                 }
274             }
275 
276             @Override
277             public boolean cancel() {
278                 return false;
279             }
280 
281         };
282     }
283 
284     private void closeConnection(final CloseMode closeMode) {
285         lock.lock();
286         try {
287             if (this.conn != null) {
288                 if (LOG.isDebugEnabled()) {
289                     LOG.debug("{} Closing connection {}", id, closeMode);
290                 }
291                 this.conn.close(closeMode);
292                 this.conn = null;
293             }
294         } finally {
295             lock.unlock();
296         }
297     }
298 
299     private void checkExpiry() {
300         if (this.conn != null && System.currentTimeMillis() >= this.expiry) {
301             if (LOG.isDebugEnabled()) {
302                 LOG.debug("{} Connection expired @ {}", id, Instant.ofEpochMilli(this.expiry));
303             }
304             closeConnection(CloseMode.GRACEFUL);
305         }
306     }
307 
308     private void validate() {
309         if (this.conn != null) {
310             final TimeValue timeToLive = connectionConfig.getTimeToLive();
311             if (TimeValue.isNonNegative(timeToLive)) {
312                 final Deadline deadline = Deadline.calculate(created, timeToLive);
313                 if (deadline.isExpired()) {
314                     closeConnection(CloseMode.GRACEFUL);
315                 }
316             }
317         }
318         if (this.conn != null) {
319             final TimeValue timeValue = connectionConfig.getValidateAfterInactivity() != null ?
320                     connectionConfig.getValidateAfterInactivity() : TimeValue.ofSeconds(2);
321             if (TimeValue.isNonNegative(timeValue)) {
322                 final Deadline deadline = Deadline.calculate(updated, timeValue);
323                 if (deadline.isExpired()) {
324                     boolean stale;
325                     try {
326                         stale = conn.isStale();
327                     } catch (final IOException ignore) {
328                         stale = true;
329                     }
330                     if (stale) {
331                         if (LOG.isDebugEnabled()) {
332                             LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(conn));
333                         }
334                         closeConnection(CloseMode.GRACEFUL);
335                     }
336                 }
337             }
338         }
339     }
340 
341     ManagedHttpClientConnection getConnection(final HttpRoute route, final Object state) throws IOException {
342         lock.lock();
343         try {
344             Asserts.check(!isClosed(), "Connection manager has been shut down");
345             if (LOG.isDebugEnabled()) {
346                 LOG.debug("{} Get connection for route {}", id, route);
347             }
348             Asserts.check(!this.leased, "Connection %s is still allocated", conn);
349             if (!Objects.equals(this.route, route) || !Objects.equals(this.state, state)) {
350                 closeConnection(CloseMode.GRACEFUL);
351             }
352             this.route = route;
353             this.state = state;
354             checkExpiry();
355             validate();
356             if (this.conn == null) {
357                 this.conn = this.connFactory.createConnection(null);
358                 this.created = System.currentTimeMillis();
359             } else {
360                 this.conn.activate();
361             }
362             this.leased = true;
363             if (LOG.isDebugEnabled()) {
364                 LOG.debug("{} Using connection {}", id, conn);
365             }
366             return this.conn;
367         } finally {
368             lock.unlock();
369         }
370     }
371 
372 
373     private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
374         if (endpoint instanceof InternalConnectionEndpoint) {
375             return (InternalConnectionEndpoint) endpoint;
376         }
377         throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
378     }
379 
380     @Override
381     public void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
382         lock.lock();
383         try {
384             Args.notNull(endpoint, "Managed endpoint");
385             final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
386             final ManagedHttpClientConnection conn = internalEndpoint.detach();
387             if (LOG.isDebugEnabled()) {
388                 LOG.debug("{} Releasing connection {}", id, conn);
389             }
390             if (isClosed()) {
391                 return;
392             }
393             try {
394                 if (keepAlive == null && conn != null) {
395                     conn.close(CloseMode.GRACEFUL);
396                 }
397                 this.updated = System.currentTimeMillis();
398                 if (conn != null && conn.isOpen() && conn.isConsistent()) {
399                     this.state = state;
400                     conn.passivate();
401                     if (TimeValue.isPositive(keepAlive)) {
402                         if (LOG.isDebugEnabled()) {
403                             LOG.debug("{} Connection can be kept alive for {}", id, keepAlive);
404                         }
405                         this.expiry = this.updated + keepAlive.toMilliseconds();
406                     } else {
407                         if (LOG.isDebugEnabled()) {
408                             LOG.debug("{} Connection can be kept alive indefinitely", id);
409                         }
410                         this.expiry = Long.MAX_VALUE;
411                     }
412                 } else {
413                     this.route = null;
414                     this.conn = null;
415                     this.expiry = Long.MAX_VALUE;
416                     if (LOG.isDebugEnabled()) {
417                         LOG.debug("{} Connection is not kept alive", id);
418                     }
419                 }
420             } finally {
421                 this.leased = false;
422             }
423         } finally {
424             lock.unlock();
425         }
426     }
427 
428     @Override
429     public void connect(final ConnectionEndpoint endpoint, final TimeValue timeout, final HttpContext context) throws IOException {
430         lock.lock();
431         try {
432             Args.notNull(endpoint, "Endpoint");
433 
434             final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
435             if (internalEndpoint.isConnected()) {
436                 return;
437             }
438             final HttpRoute route = internalEndpoint.getRoute();
439             final HttpHost host;
440             if (route.getProxyHost() != null) {
441                 host = route.getProxyHost();
442             } else {
443                 host = route.getTargetHost();
444             }
445             final Timeout connectTimeout = timeout != null ? Timeout.of(timeout.getDuration(), timeout.getTimeUnit()) : connectionConfig.getConnectTimeout();
446             final ManagedHttpClientConnection connection = internalEndpoint.getConnection();
447             if (LOG.isDebugEnabled()) {
448                 LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
449             }
450             this.connectionOperator.connect(
451                     connection,
452                     host,
453                     route.getLocalSocketAddress(),
454                     connectTimeout,
455                     socketConfig,
456                     tlsConfig,
457                     context);
458             if (LOG.isDebugEnabled()) {
459                 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
460             }
461             final Timeout socketTimeout = connectionConfig.getSocketTimeout();
462             if (socketTimeout != null) {
463                 connection.setSocketTimeout(socketTimeout);
464             }
465         } finally {
466             lock.unlock();
467         }
468     }
469 
470     @Override
471     public void upgrade(
472             final ConnectionEndpoint endpoint,
473             final HttpContext context) throws IOException {
474         lock.lock();
475         try {
476             Args.notNull(endpoint, "Endpoint");
477             Args.notNull(route, "HTTP route");
478             final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
479             this.connectionOperator.upgrade(
480                     internalEndpoint.getConnection(),
481                     internalEndpoint.getRoute().getTargetHost(),
482                     tlsConfig,
483                     context);
484         } finally {
485             lock.unlock();
486         }
487     }
488 
489     public void closeExpired() {
490         lock.lock();
491         try {
492             if (isClosed()) {
493                 return;
494             }
495             if (!this.leased) {
496                 checkExpiry();
497             }
498         } finally {
499             lock.unlock();
500         }
501     }
502 
503     public void closeIdle(final TimeValue idleTime) {
504         lock.lock();
505         try {
506             Args.notNull(idleTime, "Idle time");
507             if (isClosed()) {
508                 return;
509             }
510             if (!this.leased) {
511                 long time = idleTime.toMilliseconds();
512                 if (time < 0) {
513                     time = 0;
514                 }
515                 final long deadline = System.currentTimeMillis() - time;
516                 if (this.updated <= deadline) {
517                     closeConnection(CloseMode.GRACEFUL);
518                 }
519             }
520         } finally {
521             lock.unlock();
522         }
523     }
524 
525     /**
526      * @see #setValidateAfterInactivity(TimeValue)
527      *
528      * @since 5.1
529      *
530      * @deprecated Use {@link #getConnectionConfig()}
531      */
532     @Deprecated
533     public TimeValue getValidateAfterInactivity() {
534         return connectionConfig.getValidateAfterInactivity();
535     }
536 
537     /**
538      * Defines period of inactivity after which persistent connections must
539      * be re-validated prior to being {@link #lease(String, HttpRoute, Object)} leased} to the consumer.
540      * Negative values passed to this method disable connection validation. This check helps
541      * detect connections that have become stale (half-closed) while kept inactive in the pool.
542      *
543      * @since 5.1
544      *
545      * @deprecated Use {@link #setConnectionConfig(ConnectionConfig)}
546      */
547     @Deprecated
548     public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
549         this.connectionConfig = ConnectionConfig.custom()
550                 .setValidateAfterInactivity(validateAfterInactivity)
551                 .build();
552     }
553 
554     class InternalConnectionEndpoint extends ConnectionEndpoint {
555 
556         private final HttpRoute route;
557         private final AtomicReference<ManagedHttpClientConnection> connRef;
558 
559         public InternalConnectionEndpoint(final HttpRoute route, final ManagedHttpClientConnection conn) {
560             this.route = route;
561             this.connRef = new AtomicReference<>(conn);
562         }
563 
564         HttpRoute getRoute() {
565             return route;
566         }
567 
568         ManagedHttpClientConnection getConnection() {
569             final ManagedHttpClientConnection conn = this.connRef.get();
570             if (conn == null) {
571                 throw new ConnectionShutdownException();
572             }
573             return conn;
574         }
575 
576         ManagedHttpClientConnection getValidatedConnection() {
577             final ManagedHttpClientConnection conn = getConnection();
578             Asserts.check(conn.isOpen(), "Endpoint is not connected");
579             return conn;
580         }
581 
582         ManagedHttpClientConnection detach() {
583             return this.connRef.getAndSet(null);
584         }
585 
586         @Override
587         public boolean isConnected() {
588             final ManagedHttpClientConnection conn = getConnection();
589             return conn != null && conn.isOpen();
590         }
591 
592         @Override
593         public void close(final CloseMode closeMode) {
594             final ManagedHttpClientConnection conn = detach();
595             if (conn != null) {
596                 conn.close(closeMode);
597             }
598         }
599 
600         @Override
601         public void close() throws IOException {
602             final ManagedHttpClientConnection conn = detach();
603             if (conn != null) {
604                 conn.close();
605             }
606         }
607 
608         @Override
609         public void setSocketTimeout(final Timeout timeout) {
610             getValidatedConnection().setSocketTimeout(timeout);
611         }
612 
613         @Override
614         public ClassicHttpResponse execute(
615                 final String exchangeId,
616                 final ClassicHttpRequest request,
617                 final HttpRequestExecutor requestExecutor,
618                 final HttpContext context) throws IOException, HttpException {
619             Args.notNull(request, "HTTP request");
620             Args.notNull(requestExecutor, "Request executor");
621             if (LOG.isDebugEnabled()) {
622                 LOG.debug("{} Executing exchange {}", id, exchangeId);
623             }
624             return requestExecutor.execute(request, getValidatedConnection(), context);
625         }
626 
627     }
628 
629     /**
630      * Method that can be called to determine whether the connection manager has been shut down and
631      * is closed or not.
632      *
633      * @return {@code true} if the connection manager has been shut down and is closed, otherwise
634      * return {@code false}.
635      */
636     boolean isClosed() {
637         return this.closed.get();
638     }
639 
640 }