View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.io;
29  
30  import java.io.IOException;
31  import java.util.Date;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.TimeoutException;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  import java.util.concurrent.atomic.AtomicLong;
36  import java.util.concurrent.atomic.AtomicReference;
37  
38  import org.apache.hc.client5.http.DnsResolver;
39  import org.apache.hc.client5.http.HttpRoute;
40  import org.apache.hc.client5.http.SchemePortResolver;
41  import org.apache.hc.client5.http.impl.ConnPoolSupport;
42  import org.apache.hc.client5.http.impl.ConnectionShutdownException;
43  import org.apache.hc.client5.http.io.ConnectionEndpoint;
44  import org.apache.hc.client5.http.io.HttpClientConnectionManager;
45  import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
46  import org.apache.hc.client5.http.io.LeaseRequest;
47  import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
48  import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
49  import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
50  import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
51  import org.apache.hc.core5.annotation.Contract;
52  import org.apache.hc.core5.annotation.ThreadingBehavior;
53  import org.apache.hc.core5.http.ClassicHttpRequest;
54  import org.apache.hc.core5.http.ClassicHttpResponse;
55  import org.apache.hc.core5.http.HttpException;
56  import org.apache.hc.core5.http.HttpHost;
57  import org.apache.hc.core5.http.URIScheme;
58  import org.apache.hc.core5.http.config.Lookup;
59  import org.apache.hc.core5.http.config.Registry;
60  import org.apache.hc.core5.http.config.RegistryBuilder;
61  import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
62  import org.apache.hc.core5.http.io.HttpConnectionFactory;
63  import org.apache.hc.core5.http.io.SocketConfig;
64  import org.apache.hc.core5.http.protocol.HttpContext;
65  import org.apache.hc.core5.io.CloseMode;
66  import org.apache.hc.core5.util.Args;
67  import org.apache.hc.core5.util.Asserts;
68  import org.apache.hc.core5.util.LangUtils;
69  import org.apache.hc.core5.util.TimeValue;
70  import org.apache.hc.core5.util.Timeout;
71  import org.slf4j.Logger;
72  import org.slf4j.LoggerFactory;
73  
74  /**
75   * A connection manager for a single connection. This connection manager maintains only one active
76   * connection. Even though this class is fully thread-safe it ought to be used by one execution
77   * thread only, as only one thread a time can lease the connection at a time.
78   * <p>
79   * This connection manager will make an effort to reuse the connection for subsequent requests
80   * with the same {@link HttpRoute route}. It will, however, close the existing connection and
81   * open it for the given route, if the route of the persistent connection does not match that
82   * of the connection request. If the connection has been already been allocated
83   * {@link IllegalStateException} is thrown.
84   * </p>
85   * <p>
86   * This connection manager implementation should be used inside an EJB container instead of
87   * {@link PoolingHttpClientConnectionManager}.
88   * </p>
89   *
90   * @since 4.3
91   */
92  @Contract(threading = ThreadingBehavior.SAFE)
93  public class BasicHttpClientConnectionManager implements HttpClientConnectionManager {
94  
95      private static final Logger LOG = LoggerFactory.getLogger(BasicHttpClientConnectionManager.class);
96  
97      private static final AtomicLong COUNT = new AtomicLong(0);
98  
99      private final HttpClientConnectionOperator connectionOperator;
100     private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
101     private final String id;
102 
103     private ManagedHttpClientConnection conn;
104     private HttpRoute route;
105     private Object state;
106     private long updated;
107     private long expiry;
108     private boolean leased;
109     private SocketConfig socketConfig;
110 
111     private final AtomicBoolean closed;
112 
113     private volatile TimeValue validateAfterInactivity;
114 
115     private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
116         return RegistryBuilder.<ConnectionSocketFactory>create()
117                 .register(URIScheme.HTTP.id, PlainConnectionSocketFactory.getSocketFactory())
118                 .register(URIScheme.HTTPS.id, SSLConnectionSocketFactory.getSocketFactory())
119                 .build();
120     }
121 
122     public BasicHttpClientConnectionManager(
123             final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
124             final HttpConnectionFactory<ManagedHttpClientConnection> connFactory,
125             final SchemePortResolver schemePortResolver,
126             final DnsResolver dnsResolver) {
127       this(new DefaultHttpClientConnectionOperator(
128               socketFactoryRegistry, schemePortResolver, dnsResolver), connFactory);
129     }
130 
131     /**
132      * @since 4.4
133      */
134     public BasicHttpClientConnectionManager(
135             final HttpClientConnectionOperator httpClientConnectionOperator,
136             final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
137         super();
138         this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
139         this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
140         this.id = String.format("ep-%010d", COUNT.getAndIncrement());
141         this.expiry = Long.MAX_VALUE;
142         this.socketConfig = SocketConfig.DEFAULT;
143         this.closed = new AtomicBoolean(false);
144         this.validateAfterInactivity = TimeValue.ofSeconds(2L);
145     }
146 
147     public BasicHttpClientConnectionManager(
148             final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
149             final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
150         this(socketFactoryRegistry, connFactory, null, null);
151     }
152 
153     public BasicHttpClientConnectionManager(
154             final Lookup<ConnectionSocketFactory> socketFactoryRegistry) {
155         this(socketFactoryRegistry, null, null, null);
156     }
157 
158     public BasicHttpClientConnectionManager() {
159         this(getDefaultRegistry(), null, null, null);
160     }
161 
162     @Override
163     public void close() {
164         close(CloseMode.GRACEFUL);
165     }
166 
167     @Override
168     public void close(final CloseMode closeMode) {
169         if (this.closed.compareAndSet(false, true)) {
170             closeConnection(closeMode);
171         }
172     }
173 
174     HttpRoute getRoute() {
175         return route;
176     }
177 
178     Object getState() {
179         return state;
180     }
181 
182     public synchronized SocketConfig getSocketConfig() {
183         return socketConfig;
184     }
185 
186     public synchronized void setSocketConfig(final SocketConfig socketConfig) {
187         this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
188     }
189 
190     public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
191         return lease(id, route, Timeout.DISABLED, state);
192     }
193 
194     @Override
195     public LeaseRequest lease(final String id, final HttpRoute route, final Timeout requestTimeout, final Object state) {
196         return new LeaseRequest() {
197 
198             @Override
199             public ConnectionEndpoint get(
200                     final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
201                 try {
202                     return new InternalConnectionEndpoint(route, getConnection(route, state));
203                 } catch (final IOException ex) {
204                     throw new ExecutionException(ex.getMessage(), ex);
205                 }
206             }
207 
208             @Override
209             public boolean cancel() {
210                 return false;
211             }
212 
213         };
214     }
215 
216     private synchronized void closeConnection(final CloseMode closeMode) {
217         if (this.conn != null) {
218             if (LOG.isDebugEnabled()) {
219                 LOG.debug("{} Closing connection {}", id, closeMode);
220             }
221             this.conn.close(closeMode);
222             this.conn = null;
223         }
224     }
225 
226     private void checkExpiry() {
227         if (this.conn != null && System.currentTimeMillis() >= this.expiry) {
228             if (LOG.isDebugEnabled()) {
229                 LOG.debug("{} Connection expired @ {}", id, new Date(this.expiry));
230             }
231             closeConnection(CloseMode.GRACEFUL);
232         }
233     }
234 
235     private void validate() {
236         final TimeValue validateAfterInactivitySnapshot = validateAfterInactivity;
237         if (this.conn != null
238                 && TimeValue.isNonNegative(validateAfterInactivitySnapshot)
239                 && updated + validateAfterInactivitySnapshot.toMilliseconds() <= System.currentTimeMillis()) {
240             boolean stale;
241             try {
242                 stale = conn.isStale();
243             } catch (final IOException ignore) {
244                 stale = true;
245             }
246             if (stale) {
247                 if (LOG.isDebugEnabled()) {
248                     LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(conn));
249                 }
250                 closeConnection(CloseMode.GRACEFUL);
251             }
252         }
253     }
254 
255     synchronized ManagedHttpClientConnection getConnection(final HttpRoute route, final Object state) throws IOException {
256         Asserts.check(!this.closed.get(), "Connection manager has been shut down");
257         if (LOG.isDebugEnabled()) {
258             LOG.debug("{} Get connection for route {}", id, route);
259         }
260         Asserts.check(!this.leased, "Connection is still allocated");
261         if (!LangUtils.equals(this.route, route) || !LangUtils.equals(this.state, state)) {
262             closeConnection(CloseMode.GRACEFUL);
263         }
264         this.route = route;
265         this.state = state;
266         checkExpiry();
267         validate();
268         if (this.conn == null) {
269             this.conn = this.connFactory.createConnection(null);
270         } else {
271             this.conn.activate();
272         }
273         this.leased = true;
274         return this.conn;
275     }
276 
277     private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
278         if (endpoint instanceof InternalConnectionEndpoint) {
279             return (InternalConnectionEndpoint) endpoint;
280         }
281         throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
282     }
283 
284     @Override
285     public synchronized void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
286         Args.notNull(endpoint, "Managed endpoint");
287         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
288         final ManagedHttpClientConnection conn = internalEndpoint.detach();
289         if (LOG.isDebugEnabled()) {
290             LOG.debug("{} Releasing connection {}", id, conn);
291         }
292         if (this.closed.get()) {
293             return;
294         }
295         try {
296             if (keepAlive == null) {
297                 this.conn.close(CloseMode.GRACEFUL);
298             }
299             this.updated = System.currentTimeMillis();
300             if (!this.conn.isOpen() && !this.conn.isConsistent()) {
301                 this.conn = null;
302                 this.route = null;
303                 this.conn = null;
304                 this.expiry = Long.MAX_VALUE;
305                 if (LOG.isDebugEnabled()) {
306                     LOG.debug("{} Connection is not kept alive", id);
307                 }
308             } else {
309                 this.state = state;
310                 if (conn != null) {
311                     conn.passivate();
312                 }
313                 if (TimeValue.isPositive(keepAlive)) {
314                     if (LOG.isDebugEnabled()) {
315                         LOG.debug("{} Connection can be kept alive for {}", id, keepAlive);
316                     }
317                     this.expiry = this.updated + keepAlive.toMilliseconds();
318                 } else {
319                     if (LOG.isDebugEnabled()) {
320                         LOG.debug("{} Connection can be kept alive indefinitely", id);
321                     }
322                     this.expiry = Long.MAX_VALUE;
323                 }
324             }
325         } finally {
326             this.leased = false;
327         }
328     }
329 
330     @Override
331     public void connect(final ConnectionEndpoint endpoint, final TimeValue connectTimeout, final HttpContext context) throws IOException {
332         Args.notNull(endpoint, "Endpoint");
333 
334         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
335         if (internalEndpoint.isConnected()) {
336             return;
337         }
338         final HttpRoute route = internalEndpoint.getRoute();
339         final HttpHost host;
340         if (route.getProxyHost() != null) {
341             host = route.getProxyHost();
342         } else {
343             host = route.getTargetHost();
344         }
345         this.connectionOperator.connect(
346                 internalEndpoint.getConnection(),
347                 host,
348                 route.getLocalSocketAddress(),
349                 connectTimeout,
350                 this.socketConfig,
351                 context);
352     }
353 
354     @Override
355     public void upgrade(
356             final ConnectionEndpoint endpoint,
357             final HttpContext context) throws IOException {
358         Args.notNull(endpoint, "Endpoint");
359         Args.notNull(route, "HTTP route");
360         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
361         this.connectionOperator.upgrade(
362                 internalEndpoint.getConnection(),
363                 internalEndpoint.getRoute().getTargetHost(),
364                 context);
365     }
366 
367     public synchronized void closeExpired() {
368         if (this.closed.get()) {
369             return;
370         }
371         if (!this.leased) {
372             checkExpiry();
373         }
374     }
375 
376     public synchronized void closeIdle(final TimeValue idleTime) {
377         Args.notNull(idleTime, "Idle time");
378         if (this.closed.get()) {
379             return;
380         }
381         if (!this.leased) {
382             long time = idleTime.toMilliseconds();
383             if (time < 0) {
384                 time = 0;
385             }
386             final long deadline = System.currentTimeMillis() - time;
387             if (this.updated <= deadline) {
388                 closeConnection(CloseMode.GRACEFUL);
389             }
390         }
391     }
392 
393     /**
394      * @see #setValidateAfterInactivity(TimeValue)
395      *
396      * @since 5.1
397      */
398     public TimeValue getValidateAfterInactivity() {
399         return validateAfterInactivity;
400     }
401 
402     /**
403      * Defines period of inactivity after which persistent connections must
404      * be re-validated prior to being {@link #lease(String, HttpRoute, Object)} leased} to the consumer.
405      * Negative values passed to this method disable connection validation. This check helps
406      * detect connections that have become stale (half-closed) while kept inactive in the pool.
407      *
408      * @since 5.1
409      */
410     public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
411         this.validateAfterInactivity = validateAfterInactivity;
412     }
413 
414     class InternalConnectionEndpoint extends ConnectionEndpoint {
415 
416         private final HttpRoute route;
417         private final AtomicReference<ManagedHttpClientConnection> connRef;
418 
419         public InternalConnectionEndpoint(final HttpRoute route, final ManagedHttpClientConnection conn) {
420             this.route = route;
421             this.connRef = new AtomicReference<>(conn);
422         }
423 
424         HttpRoute getRoute() {
425             return route;
426         }
427 
428         ManagedHttpClientConnection getConnection() {
429             final ManagedHttpClientConnection conn = this.connRef.get();
430             if (conn == null) {
431                 throw new ConnectionShutdownException();
432             }
433             return conn;
434         }
435 
436         ManagedHttpClientConnection getValidatedConnection() {
437             final ManagedHttpClientConnection conn = getConnection();
438             Asserts.check(conn.isOpen(), "Endpoint is not connected");
439             return conn;
440         }
441 
442         ManagedHttpClientConnection detach() {
443             return this.connRef.getAndSet(null);
444         }
445 
446         @Override
447         public boolean isConnected() {
448             final ManagedHttpClientConnection conn = getConnection();
449             return conn != null && conn.isOpen();
450         }
451 
452         @Override
453         public void close(final CloseMode closeMode) {
454             final ManagedHttpClientConnection conn = detach();
455             if (conn != null) {
456                 conn.close(closeMode);
457             }
458         }
459 
460         @Override
461         public void close() throws IOException {
462             final ManagedHttpClientConnection conn = detach();
463             if (conn != null) {
464                 conn.close();
465             }
466         }
467 
468         @Override
469         public void setSocketTimeout(final Timeout timeout) {
470             getValidatedConnection().setSocketTimeout(timeout);
471         }
472 
473         @Override
474         public ClassicHttpResponse execute(
475                 final String exchangeId,
476                 final ClassicHttpRequest request,
477                 final HttpRequestExecutor requestExecutor,
478                 final HttpContext context) throws IOException, HttpException {
479             Args.notNull(request, "HTTP request");
480             Args.notNull(requestExecutor, "Request executor");
481             if (LOG.isDebugEnabled()) {
482                 LOG.debug("{} Executing exchange {}", id, exchangeId);
483             }
484             return requestExecutor.execute(request, getValidatedConnection(), context);
485         }
486 
487     }
488 
489 }