View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.hc.client5.http.EndpointInfo;
34  import org.apache.hc.client5.http.HttpRoute;
35  import org.apache.hc.client5.http.async.AsyncExecRuntime;
36  import org.apache.hc.client5.http.config.RequestConfig;
37  import org.apache.hc.client5.http.config.TlsConfig;
38  import org.apache.hc.client5.http.impl.ConnPoolSupport;
39  import org.apache.hc.client5.http.impl.Operations;
40  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
41  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
42  import org.apache.hc.client5.http.protocol.HttpClientContext;
43  import org.apache.hc.core5.concurrent.CallbackContribution;
44  import org.apache.hc.core5.concurrent.Cancellable;
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
47  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
48  import org.apache.hc.core5.http.nio.HandlerFactory;
49  import org.apache.hc.core5.io.CloseMode;
50  import org.apache.hc.core5.reactor.ConnectionInitiator;
51  import org.apache.hc.core5.util.TimeValue;
52  import org.apache.hc.core5.util.Timeout;
53  import org.slf4j.Logger;
54  
55  class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
56  
57      private final Logger log;
58      private final AsyncClientConnectionManager manager;
59      private final ConnectionInitiator connectionInitiator;
60      private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
61      /**
62       * @deprecated TLS should be configured by the connection manager
63       */
64      @Deprecated
65      private final TlsConfig tlsConfig;
66      private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
67      private volatile boolean reusable;
68      private volatile Object state;
69      private volatile TimeValue validDuration;
70  
71      InternalHttpAsyncExecRuntime(
72              final Logger log,
73              final AsyncClientConnectionManager manager,
74              final ConnectionInitiator connectionInitiator,
75              final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
76              final TlsConfig tlsConfig) {
77          super();
78          this.log = log;
79          this.manager = manager;
80          this.connectionInitiator = connectionInitiator;
81          this.pushHandlerFactory = pushHandlerFactory;
82          this.tlsConfig = tlsConfig;
83          this.endpointRef = new AtomicReference<>();
84          this.validDuration = TimeValue.NEG_ONE_MILLISECOND;
85      }
86  
87      @Override
88      public boolean isEndpointAcquired() {
89          return endpointRef.get() != null;
90      }
91  
92      @Override
93      public Cancellable acquireEndpoint(
94              final String id,
95              final HttpRoute route,
96              final Object object,
97              final HttpClientContext context,
98              final FutureCallback<AsyncExecRuntime> callback) {
99          if (endpointRef.get() == null) {
100             state = object;
101             final RequestConfig requestConfig = context.getRequestConfigOrDefault();
102             final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
103             if (log.isDebugEnabled()) {
104                 log.debug("{} acquiring endpoint ({})", id, connectionRequestTimeout);
105             }
106             return Operations.cancellable(manager.lease(
107                     id,
108                     route,
109                     object,
110                     connectionRequestTimeout,
111                     new FutureCallback<AsyncConnectionEndpoint>() {
112 
113                         @Override
114                         public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
115                             endpointRef.set(connectionEndpoint);
116                             reusable = connectionEndpoint.isConnected();
117                             if (log.isDebugEnabled()) {
118                                 log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
119                             }
120                             callback.completed(InternalHttpAsyncExecRuntime.this);
121                         }
122 
123                         @Override
124                         public void failed(final Exception ex) {
125                             callback.failed(ex);
126                         }
127 
128                         @Override
129                         public void cancelled() {
130                             callback.cancelled();
131                         }
132                     }));
133         }
134         callback.completed(this);
135         return Operations.nonCancellable();
136     }
137 
138     private void discardEndpoint(final AsyncConnectionEndpoint endpoint) {
139         try {
140             endpoint.close(CloseMode.IMMEDIATE);
141             if (log.isDebugEnabled()) {
142                 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
143             }
144         } finally {
145             if (log.isDebugEnabled()) {
146                 log.debug("{} discarding endpoint", ConnPoolSupport.getId(endpoint));
147             }
148             manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
149         }
150     }
151 
152     @Override
153     public void releaseEndpoint() {
154         final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
155         if (endpoint != null) {
156             if (reusable) {
157                 if (log.isDebugEnabled()) {
158                     log.debug("{} releasing valid endpoint", ConnPoolSupport.getId(endpoint));
159                 }
160                 manager.release(endpoint, state, validDuration);
161             } else {
162                 discardEndpoint(endpoint);
163             }
164         }
165     }
166 
167     @Override
168     public void discardEndpoint() {
169         final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
170         if (endpoint != null) {
171             discardEndpoint(endpoint);
172         }
173     }
174 
175     @Override
176     public boolean validateConnection() {
177         if (reusable) {
178             final AsyncConnectionEndpoint endpoint = endpointRef.get();
179             return endpoint != null && endpoint.isConnected();
180         }
181         final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
182         if (endpoint != null) {
183             discardEndpoint(endpoint);
184         }
185         return false;
186     }
187 
188     AsyncConnectionEndpoint ensureValid() {
189         final AsyncConnectionEndpoint endpoint = endpointRef.get();
190         if (endpoint == null) {
191             throw new IllegalStateException("Endpoint not acquired / already released");
192         }
193         return endpoint;
194     }
195 
196     @Override
197     public boolean isEndpointConnected() {
198         final AsyncConnectionEndpoint endpoint = endpointRef.get();
199         return endpoint != null && endpoint.isConnected();
200     }
201 
202     @Override
203     public Cancellable connectEndpoint(
204             final HttpClientContext context,
205             final FutureCallback<AsyncExecRuntime> callback) {
206         final AsyncConnectionEndpoint endpoint = ensureValid();
207         if (endpoint.isConnected()) {
208             callback.completed(this);
209             return Operations.nonCancellable();
210         }
211         final RequestConfig requestConfig = context.getRequestConfigOrDefault();
212         @SuppressWarnings("deprecation")
213         final Timeout connectTimeout = requestConfig.getConnectTimeout();
214         if (log.isDebugEnabled()) {
215             log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
216         }
217         return Operations.cancellable(manager.connect(
218                 endpoint,
219                 connectionInitiator,
220                 connectTimeout,
221                 tlsConfig,
222                 context,
223                 new CallbackContribution<AsyncConnectionEndpoint>(callback) {
224 
225                     @Override
226                     public void completed(final AsyncConnectionEndpoint endpoint) {
227                         if (log.isDebugEnabled()) {
228                             log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
229                         }
230                         if (callback != null) {
231                             callback.completed(InternalHttpAsyncExecRuntime.this);
232                         }
233                     }
234 
235         }));
236 
237     }
238 
239     @Override
240     public void disconnectEndpoint() {
241         final AsyncConnectionEndpoint endpoint = endpointRef.get();
242         if (endpoint != null) {
243             endpoint.close(CloseMode.GRACEFUL);
244         }
245     }
246 
247     @Override
248     public void upgradeTls(final HttpClientContext context) {
249         upgradeTls(context, null);
250     }
251 
252     @Override
253     public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
254         final AsyncConnectionEndpoint endpoint = ensureValid();
255         if (log.isDebugEnabled()) {
256             log.debug("{} upgrading endpoint", ConnPoolSupport.getId(endpoint));
257         }
258         manager.upgrade(endpoint, tlsConfig, context, new CallbackContribution<AsyncConnectionEndpoint>(callback) {
259 
260             @Override
261             public void completed(final AsyncConnectionEndpoint endpoint) {
262                 if (callback != null) {
263                     callback.completed(InternalHttpAsyncExecRuntime.this);
264                 }
265             }
266 
267         });
268     }
269 
270     public EndpointInfo getEndpointInfo() {
271         final AsyncConnectionEndpoint endpoint = endpointRef.get();
272         return endpoint != null ? endpoint.getInfo() : null;
273     }
274 
275     @Override
276     public Cancellable execute(
277             final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
278         final AsyncConnectionEndpoint endpoint = ensureValid();
279         if (endpoint.isConnected()) {
280             if (log.isDebugEnabled()) {
281                 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
282             }
283             final RequestConfig requestConfig = context.getRequestConfigOrDefault();
284             final Timeout responseTimeout = requestConfig.getResponseTimeout();
285             if (responseTimeout != null) {
286                 endpoint.setSocketTimeout(responseTimeout);
287             }
288             endpoint.execute(id, exchangeHandler, context);
289             if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) {
290                 return () -> {
291                     exchangeHandler.cancel();
292                     return true;
293                 };
294             }
295         } else {
296             connectEndpoint(context, new FutureCallback<AsyncExecRuntime>() {
297 
298                 @Override
299                 public void completed(final AsyncExecRuntime runtime) {
300                     if (log.isDebugEnabled()) {
301                         log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
302                     }
303                     try {
304                         endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
305                     } catch (final RuntimeException ex) {
306                         failed(ex);
307                     }
308                 }
309 
310                 @Override
311                 public void failed(final Exception ex) {
312                     exchangeHandler.failed(ex);
313                 }
314 
315                 @Override
316                 public void cancelled() {
317                     exchangeHandler.failed(new InterruptedIOException());
318                 }
319 
320             });
321         }
322         return Operations.nonCancellable();
323     }
324 
325     @Override
326     public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
327         reusable = true;
328         state = newState;
329         validDuration = newValidDuration;
330     }
331 
332     @Override
333     public void markConnectionNonReusable() {
334         reusable = false;
335         state = null;
336         validDuration = null;
337     }
338 
339     @Override
340     public AsyncExecRuntime fork() {
341         return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig);
342     }
343 
344 }