View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.io;
29  
30  import java.io.IOException;
31  import java.time.Instant;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.TimeoutException;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  import java.util.concurrent.atomic.AtomicLong;
36  import java.util.concurrent.atomic.AtomicReference;
37  
38  import org.apache.hc.client5.http.DnsResolver;
39  import org.apache.hc.client5.http.HttpRoute;
40  import org.apache.hc.client5.http.SchemePortResolver;
41  import org.apache.hc.client5.http.config.ConnectionConfig;
42  import org.apache.hc.client5.http.config.TlsConfig;
43  import org.apache.hc.client5.http.impl.ConnPoolSupport;
44  import org.apache.hc.client5.http.impl.ConnectionShutdownException;
45  import org.apache.hc.client5.http.io.ConnectionEndpoint;
46  import org.apache.hc.client5.http.io.HttpClientConnectionManager;
47  import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
48  import org.apache.hc.client5.http.io.LeaseRequest;
49  import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
50  import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
51  import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
52  import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
53  import org.apache.hc.core5.annotation.Contract;
54  import org.apache.hc.core5.annotation.ThreadingBehavior;
55  import org.apache.hc.core5.http.ClassicHttpRequest;
56  import org.apache.hc.core5.http.ClassicHttpResponse;
57  import org.apache.hc.core5.http.HttpException;
58  import org.apache.hc.core5.http.HttpHost;
59  import org.apache.hc.core5.http.URIScheme;
60  import org.apache.hc.core5.http.config.Lookup;
61  import org.apache.hc.core5.http.config.Registry;
62  import org.apache.hc.core5.http.config.RegistryBuilder;
63  import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
64  import org.apache.hc.core5.http.io.HttpConnectionFactory;
65  import org.apache.hc.core5.http.io.SocketConfig;
66  import org.apache.hc.core5.http.protocol.HttpContext;
67  import org.apache.hc.core5.io.CloseMode;
68  import org.apache.hc.core5.util.Args;
69  import org.apache.hc.core5.util.Asserts;
70  import org.apache.hc.core5.util.Deadline;
71  import org.apache.hc.core5.util.LangUtils;
72  import org.apache.hc.core5.util.TimeValue;
73  import org.apache.hc.core5.util.Timeout;
74  import org.slf4j.Logger;
75  import org.slf4j.LoggerFactory;
76  
77  /**
78   * A connection manager for a single connection. This connection manager maintains only one active
79   * connection. Even though this class is fully thread-safe it ought to be used by one execution
80   * thread only, as only one thread a time can lease the connection at a time.
81   * <p>
82   * This connection manager will make an effort to reuse the connection for subsequent requests
83   * with the same {@link HttpRoute route}. It will, however, close the existing connection and
84   * open it for the given route, if the route of the persistent connection does not match that
85   * of the connection request. If the connection has been already been allocated
86   * {@link IllegalStateException} is thrown.
87   * </p>
88   * <p>
89   * This connection manager implementation should be used inside an EJB container instead of
90   * {@link PoolingHttpClientConnectionManager}.
91   * </p>
92   *
93   * @since 4.3
94   */
95  @Contract(threading = ThreadingBehavior.SAFE)
96  public class BasicHttpClientConnectionManager implements HttpClientConnectionManager {
97  
98      private static final Logger LOG = LoggerFactory.getLogger(BasicHttpClientConnectionManager.class);
99  
100     private static final AtomicLong COUNT = new AtomicLong(0);
101 
102     private final HttpClientConnectionOperator connectionOperator;
103     private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
104     private final String id;
105 
106     private ManagedHttpClientConnection conn;
107     private HttpRoute route;
108     private Object state;
109     private long created;
110     private long updated;
111     private long expiry;
112     private boolean leased;
113     private SocketConfig socketConfig;
114     private ConnectionConfig connectionConfig;
115     private TlsConfig tlsConfig;
116 
117     private final AtomicBoolean closed;
118 
119     private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
120         return RegistryBuilder.<ConnectionSocketFactory>create()
121                 .register(URIScheme.HTTP.id, PlainConnectionSocketFactory.getSocketFactory())
122                 .register(URIScheme.HTTPS.id, SSLConnectionSocketFactory.getSocketFactory())
123                 .build();
124     }
125 
126     public BasicHttpClientConnectionManager(
127             final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
128             final HttpConnectionFactory<ManagedHttpClientConnection> connFactory,
129             final SchemePortResolver schemePortResolver,
130             final DnsResolver dnsResolver) {
131       this(new DefaultHttpClientConnectionOperator(
132               socketFactoryRegistry, schemePortResolver, dnsResolver), connFactory);
133     }
134 
135     /**
136      * @since 4.4
137      */
138     public BasicHttpClientConnectionManager(
139             final HttpClientConnectionOperator httpClientConnectionOperator,
140             final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
141         super();
142         this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
143         this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
144         this.id = String.format("ep-%010d", COUNT.getAndIncrement());
145         this.expiry = Long.MAX_VALUE;
146         this.socketConfig = SocketConfig.DEFAULT;
147         this.connectionConfig = ConnectionConfig.DEFAULT;
148         this.tlsConfig = TlsConfig.DEFAULT;
149         this.closed = new AtomicBoolean(false);
150     }
151 
152     public BasicHttpClientConnectionManager(
153             final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
154             final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
155         this(socketFactoryRegistry, connFactory, null, null);
156     }
157 
158     public BasicHttpClientConnectionManager(
159             final Lookup<ConnectionSocketFactory> socketFactoryRegistry) {
160         this(socketFactoryRegistry, null, null, null);
161     }
162 
163     public BasicHttpClientConnectionManager() {
164         this(getDefaultRegistry(), null, null, null);
165     }
166 
167     @Override
168     public void close() {
169         close(CloseMode.GRACEFUL);
170     }
171 
172     @Override
173     public void close(final CloseMode closeMode) {
174         if (this.closed.compareAndSet(false, true)) {
175             closeConnection(closeMode);
176         }
177     }
178 
179     HttpRoute getRoute() {
180         return route;
181     }
182 
183     Object getState() {
184         return state;
185     }
186 
187     public synchronized SocketConfig getSocketConfig() {
188         return socketConfig;
189     }
190 
191     public synchronized void setSocketConfig(final SocketConfig socketConfig) {
192         this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
193     }
194 
195     /**
196      * @since 5.2
197      */
198     public synchronized ConnectionConfig getConnectionConfig() {
199         return connectionConfig;
200     }
201 
202     /**
203      * @since 5.2
204      */
205     public synchronized void setConnectionConfig(final ConnectionConfig connectionConfig) {
206         this.connectionConfig = connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
207     }
208 
209     /**
210      * @since 5.2
211      */
212     public synchronized TlsConfig getTlsConfig() {
213         return tlsConfig;
214     }
215 
216     /**
217      * @since 5.2
218      */
219     public synchronized void setTlsConfig(final TlsConfig tlsConfig) {
220         this.tlsConfig = tlsConfig != null ? tlsConfig : TlsConfig.DEFAULT;
221     }
222 
223     public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
224         return lease(id, route, Timeout.DISABLED, state);
225     }
226 
227     @Override
228     public LeaseRequest lease(final String id, final HttpRoute route, final Timeout requestTimeout, final Object state) {
229         return new LeaseRequest() {
230 
231             @Override
232             public ConnectionEndpoint get(
233                     final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
234                 try {
235                     return new InternalConnectionEndpoint(route, getConnection(route, state));
236                 } catch (final IOException ex) {
237                     throw new ExecutionException(ex.getMessage(), ex);
238                 }
239             }
240 
241             @Override
242             public boolean cancel() {
243                 return false;
244             }
245 
246         };
247     }
248 
249     private synchronized void closeConnection(final CloseMode closeMode) {
250         if (this.conn != null) {
251             if (LOG.isDebugEnabled()) {
252                 LOG.debug("{} Closing connection {}", id, closeMode);
253             }
254             this.conn.close(closeMode);
255             this.conn = null;
256         }
257     }
258 
259     private void checkExpiry() {
260         if (this.conn != null && System.currentTimeMillis() >= this.expiry) {
261             if (LOG.isDebugEnabled()) {
262                 LOG.debug("{} Connection expired @ {}", id, Instant.ofEpochMilli(this.expiry));
263             }
264             closeConnection(CloseMode.GRACEFUL);
265         }
266     }
267 
268     private void validate() {
269         if (this.conn != null) {
270             final TimeValue timeToLive = connectionConfig.getTimeToLive();
271             if (TimeValue.isNonNegative(timeToLive)) {
272                 final Deadline deadline = Deadline.calculate(created, timeToLive);
273                 if (deadline.isExpired()) {
274                     closeConnection(CloseMode.GRACEFUL);
275                 }
276             }
277         }
278         if (this.conn != null) {
279             final TimeValue timeValue = connectionConfig.getValidateAfterInactivity() != null ?
280                     connectionConfig.getValidateAfterInactivity() : TimeValue.ofSeconds(2);
281             if (TimeValue.isNonNegative(timeValue)) {
282                 final Deadline deadline = Deadline.calculate(updated, timeValue);
283                 if (deadline.isExpired()) {
284                     boolean stale;
285                     try {
286                         stale = conn.isStale();
287                     } catch (final IOException ignore) {
288                         stale = true;
289                     }
290                     if (stale) {
291                         if (LOG.isDebugEnabled()) {
292                             LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(conn));
293                         }
294                         closeConnection(CloseMode.GRACEFUL);
295                     }
296                 }
297             }
298         }
299     }
300 
301     synchronized ManagedHttpClientConnection getConnection(final HttpRoute route, final Object state) throws IOException {
302         Asserts.check(!isClosed(), "Connection manager has been shut down");
303         if (LOG.isDebugEnabled()) {
304             LOG.debug("{} Get connection for route {}", id, route);
305         }
306         Asserts.check(!this.leased, "Connection %s is still allocated", conn);
307         if (!LangUtils.equals(this.route, route) || !LangUtils.equals(this.state, state)) {
308             closeConnection(CloseMode.GRACEFUL);
309         }
310         this.route = route;
311         this.state = state;
312         checkExpiry();
313         validate();
314         if (this.conn == null) {
315             this.conn = this.connFactory.createConnection(null);
316             this.created = System.currentTimeMillis();
317         } else {
318             this.conn.activate();
319         }
320         this.leased = true;
321         if (LOG.isDebugEnabled()) {
322             LOG.debug("{} Using connection {}", id, conn);
323         }
324         return this.conn;
325     }
326 
327     private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
328         if (endpoint instanceof InternalConnectionEndpoint) {
329             return (InternalConnectionEndpoint) endpoint;
330         }
331         throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
332     }
333 
334     @Override
335     public synchronized void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
336         Args.notNull(endpoint, "Managed endpoint");
337         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
338         final ManagedHttpClientConnection conn = internalEndpoint.detach();
339         if (LOG.isDebugEnabled()) {
340             LOG.debug("{} Releasing connection {}", id, conn);
341         }
342         if (isClosed()) {
343             return;
344         }
345         try {
346             if (keepAlive == null) {
347                 this.conn.close(CloseMode.GRACEFUL);
348             }
349             this.updated = System.currentTimeMillis();
350             if (!this.conn.isOpen() && !this.conn.isConsistent()) {
351                 this.route = null;
352                 this.conn = null;
353                 this.expiry = Long.MAX_VALUE;
354                 if (LOG.isDebugEnabled()) {
355                     LOG.debug("{} Connection is not kept alive", id);
356                 }
357             } else {
358                 this.state = state;
359                 if (conn != null) {
360                     conn.passivate();
361                 }
362                 if (TimeValue.isPositive(keepAlive)) {
363                     if (LOG.isDebugEnabled()) {
364                         LOG.debug("{} Connection can be kept alive for {}", id, keepAlive);
365                     }
366                     this.expiry = this.updated + keepAlive.toMilliseconds();
367                 } else {
368                     if (LOG.isDebugEnabled()) {
369                         LOG.debug("{} Connection can be kept alive indefinitely", id);
370                     }
371                     this.expiry = Long.MAX_VALUE;
372                 }
373             }
374         } finally {
375             this.leased = false;
376         }
377     }
378 
379     @Override
380     public synchronized void connect(final ConnectionEndpoint endpoint, final TimeValue timeout, final HttpContext context) throws IOException {
381         Args.notNull(endpoint, "Endpoint");
382 
383         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
384         if (internalEndpoint.isConnected()) {
385             return;
386         }
387         final HttpRoute route = internalEndpoint.getRoute();
388         final HttpHost host;
389         if (route.getProxyHost() != null) {
390             host = route.getProxyHost();
391         } else {
392             host = route.getTargetHost();
393         }
394         final Timeout connectTimeout = timeout != null ? Timeout.of(timeout.getDuration(), timeout.getTimeUnit()) : connectionConfig.getConnectTimeout();
395         final ManagedHttpClientConnection connection = internalEndpoint.getConnection();
396         if (LOG.isDebugEnabled()) {
397             LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
398         }
399         this.connectionOperator.connect(
400                 connection,
401                 host,
402                 route.getLocalSocketAddress(),
403                 connectTimeout,
404                 socketConfig,
405                 tlsConfig,
406                 context);
407         if (LOG.isDebugEnabled()) {
408             LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
409         }
410         final Timeout socketTimeout = connectionConfig.getSocketTimeout();
411         if (socketTimeout != null) {
412             connection.setSocketTimeout(socketTimeout);
413         }
414     }
415 
416     @Override
417     public synchronized void upgrade(
418             final ConnectionEndpoint endpoint,
419             final HttpContext context) throws IOException {
420         Args.notNull(endpoint, "Endpoint");
421         Args.notNull(route, "HTTP route");
422         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
423         this.connectionOperator.upgrade(
424                 internalEndpoint.getConnection(),
425                 internalEndpoint.getRoute().getTargetHost(),
426                 tlsConfig,
427                 context);
428     }
429 
430     public synchronized void closeExpired() {
431         if (isClosed()) {
432             return;
433         }
434         if (!this.leased) {
435             checkExpiry();
436         }
437     }
438 
439     public synchronized void closeIdle(final TimeValue idleTime) {
440         Args.notNull(idleTime, "Idle time");
441         if (isClosed()) {
442             return;
443         }
444         if (!this.leased) {
445             long time = idleTime.toMilliseconds();
446             if (time < 0) {
447                 time = 0;
448             }
449             final long deadline = System.currentTimeMillis() - time;
450             if (this.updated <= deadline) {
451                 closeConnection(CloseMode.GRACEFUL);
452             }
453         }
454     }
455 
456     /**
457      * @see #setValidateAfterInactivity(TimeValue)
458      *
459      * @since 5.1
460      *
461      * @deprecated Use {@link #getConnectionConfig()}
462      */
463     @Deprecated
464     public TimeValue getValidateAfterInactivity() {
465         return connectionConfig.getValidateAfterInactivity();
466     }
467 
468     /**
469      * Defines period of inactivity after which persistent connections must
470      * be re-validated prior to being {@link #lease(String, HttpRoute, Object)} leased} to the consumer.
471      * Negative values passed to this method disable connection validation. This check helps
472      * detect connections that have become stale (half-closed) while kept inactive in the pool.
473      *
474      * @since 5.1
475      *
476      * @deprecated Use {@link #setConnectionConfig(ConnectionConfig)}
477      */
478     @Deprecated
479     public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
480         this.connectionConfig = ConnectionConfig.custom()
481                 .setValidateAfterInactivity(validateAfterInactivity)
482                 .build();
483     }
484 
485     class InternalConnectionEndpoint extends ConnectionEndpoint {
486 
487         private final HttpRoute route;
488         private final AtomicReference<ManagedHttpClientConnection> connRef;
489 
490         public InternalConnectionEndpoint(final HttpRoute route, final ManagedHttpClientConnection conn) {
491             this.route = route;
492             this.connRef = new AtomicReference<>(conn);
493         }
494 
495         HttpRoute getRoute() {
496             return route;
497         }
498 
499         ManagedHttpClientConnection getConnection() {
500             final ManagedHttpClientConnection conn = this.connRef.get();
501             if (conn == null) {
502                 throw new ConnectionShutdownException();
503             }
504             return conn;
505         }
506 
507         ManagedHttpClientConnection getValidatedConnection() {
508             final ManagedHttpClientConnection conn = getConnection();
509             Asserts.check(conn.isOpen(), "Endpoint is not connected");
510             return conn;
511         }
512 
513         ManagedHttpClientConnection detach() {
514             return this.connRef.getAndSet(null);
515         }
516 
517         @Override
518         public boolean isConnected() {
519             final ManagedHttpClientConnection conn = getConnection();
520             return conn != null && conn.isOpen();
521         }
522 
523         @Override
524         public void close(final CloseMode closeMode) {
525             final ManagedHttpClientConnection conn = detach();
526             if (conn != null) {
527                 conn.close(closeMode);
528             }
529         }
530 
531         @Override
532         public void close() throws IOException {
533             final ManagedHttpClientConnection conn = detach();
534             if (conn != null) {
535                 conn.close();
536             }
537         }
538 
539         @Override
540         public void setSocketTimeout(final Timeout timeout) {
541             getValidatedConnection().setSocketTimeout(timeout);
542         }
543 
544         @Override
545         public ClassicHttpResponse execute(
546                 final String exchangeId,
547                 final ClassicHttpRequest request,
548                 final HttpRequestExecutor requestExecutor,
549                 final HttpContext context) throws IOException, HttpException {
550             Args.notNull(request, "HTTP request");
551             Args.notNull(requestExecutor, "Request executor");
552             if (LOG.isDebugEnabled()) {
553                 LOG.debug("{} Executing exchange {}", id, exchangeId);
554             }
555             return requestExecutor.execute(request, getValidatedConnection(), context);
556         }
557 
558     }
559 
560     /**
561      * Method that can be called to determine whether the connection manager has been shut down and
562      * is closed or not.
563      *
564      * @return {@code true} if the connection manager has been shut down and is closed, otherwise
565      * return {@code false}.
566      */
567     boolean isClosed() {
568         return this.closed.get();
569     }
570 
571 }