View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.hc.client5.http.HttpRoute;
34  import org.apache.hc.client5.http.async.AsyncExecRuntime;
35  import org.apache.hc.client5.http.config.RequestConfig;
36  import org.apache.hc.client5.http.impl.ConnPoolSupport;
37  import org.apache.hc.client5.http.impl.Operations;
38  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
39  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
40  import org.apache.hc.client5.http.protocol.HttpClientContext;
41  import org.apache.hc.core5.concurrent.Cancellable;
42  import org.apache.hc.core5.concurrent.FutureCallback;
43  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
44  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
45  import org.apache.hc.core5.http.nio.HandlerFactory;
46  import org.apache.hc.core5.http2.HttpVersionPolicy;
47  import org.apache.hc.core5.io.CloseMode;
48  import org.apache.hc.core5.reactor.ConnectionInitiator;
49  import org.apache.hc.core5.util.TimeValue;
50  import org.apache.hc.core5.util.Timeout;
51  import org.slf4j.Logger;
52  
53  class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
54  
55      private final Logger log;
56      private final AsyncClientConnectionManager manager;
57      private final ConnectionInitiator connectionInitiator;
58      private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
59      private final HttpVersionPolicy versionPolicy;
60      private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
61      private volatile boolean reusable;
62      private volatile Object state;
63      private volatile TimeValue validDuration;
64  
65      InternalHttpAsyncExecRuntime(
66              final Logger log,
67              final AsyncClientConnectionManager manager,
68              final ConnectionInitiator connectionInitiator,
69              final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
70              final HttpVersionPolicy versionPolicy) {
71          super();
72          this.log = log;
73          this.manager = manager;
74          this.connectionInitiator = connectionInitiator;
75          this.pushHandlerFactory = pushHandlerFactory;
76          this.versionPolicy = versionPolicy;
77          this.endpointRef = new AtomicReference<>();
78          this.validDuration = TimeValue.NEG_ONE_MILLISECOND;
79      }
80  
81      @Override
82      public boolean isEndpointAcquired() {
83          return endpointRef.get() != null;
84      }
85  
86      @Override
87      public Cancellable acquireEndpoint(
88              final String id,
89              final HttpRoute route,
90              final Object object,
91              final HttpClientContext context,
92              final FutureCallback<AsyncExecRuntime> callback) {
93          if (endpointRef.get() == null) {
94              state = object;
95              final RequestConfig requestConfig = context.getRequestConfig();
96              final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
97              if (log.isDebugEnabled()) {
98                  log.debug("{} acquiring endpoint ({})", id, connectionRequestTimeout);
99              }
100             return Operations.cancellable(manager.lease(
101                     id,
102                     route,
103                     object,
104                     connectionRequestTimeout,
105                     new FutureCallback<AsyncConnectionEndpoint>() {
106 
107                         @Override
108                         public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
109                             endpointRef.set(connectionEndpoint);
110                             reusable = connectionEndpoint.isConnected();
111                             if (log.isDebugEnabled()) {
112                                 log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
113                             }
114                             callback.completed(InternalHttpAsyncExecRuntime.this);
115                         }
116 
117                         @Override
118                         public void failed(final Exception ex) {
119                             callback.failed(ex);
120                         }
121 
122                         @Override
123                         public void cancelled() {
124                             callback.cancelled();
125                         }
126                     }));
127         }
128         callback.completed(this);
129         return Operations.nonCancellable();
130     }
131 
132     private void discardEndpoint(final AsyncConnectionEndpoint endpoint) {
133         try {
134             endpoint.close(CloseMode.IMMEDIATE);
135             if (log.isDebugEnabled()) {
136                 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
137             }
138         } finally {
139             if (log.isDebugEnabled()) {
140                 log.debug("{} discarding endpoint", ConnPoolSupport.getId(endpoint));
141             }
142             manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
143         }
144     }
145 
146     @Override
147     public void releaseEndpoint() {
148         final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
149         if (endpoint != null) {
150             if (reusable) {
151                 if (log.isDebugEnabled()) {
152                     log.debug("{} releasing valid endpoint", ConnPoolSupport.getId(endpoint));
153                 }
154                 manager.release(endpoint, state, validDuration);
155             } else {
156                 discardEndpoint(endpoint);
157             }
158         }
159     }
160 
161     @Override
162     public void discardEndpoint() {
163         final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
164         if (endpoint != null) {
165             discardEndpoint(endpoint);
166         }
167     }
168 
169     @Override
170     public boolean validateConnection() {
171         if (reusable) {
172             final AsyncConnectionEndpoint endpoint = endpointRef.get();
173             return endpoint != null && endpoint.isConnected();
174         }
175         final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
176         if (endpoint != null) {
177             discardEndpoint(endpoint);
178         }
179         return false;
180     }
181 
182     AsyncConnectionEndpoint ensureValid() {
183         final AsyncConnectionEndpoint endpoint = endpointRef.get();
184         if (endpoint == null) {
185             throw new IllegalStateException("Endpoint not acquired / already released");
186         }
187         return endpoint;
188     }
189 
190     @Override
191     public boolean isEndpointConnected() {
192         final AsyncConnectionEndpoint endpoint = endpointRef.get();
193         return endpoint != null && endpoint.isConnected();
194     }
195 
196     @Override
197     public Cancellable connectEndpoint(
198             final HttpClientContext context,
199             final FutureCallback<AsyncExecRuntime> callback) {
200         final AsyncConnectionEndpoint endpoint = ensureValid();
201         if (endpoint.isConnected()) {
202             callback.completed(this);
203             return Operations.nonCancellable();
204         }
205         final RequestConfig requestConfig = context.getRequestConfig();
206         final Timeout connectTimeout = requestConfig.getConnectTimeout();
207         if (log.isDebugEnabled()) {
208             log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
209         }
210         return Operations.cancellable(manager.connect(
211                 endpoint,
212                 connectionInitiator,
213                 connectTimeout,
214                 versionPolicy,
215                 context,
216                 new FutureCallback<AsyncConnectionEndpoint>() {
217 
218                     @Override
219                     public void completed(final AsyncConnectionEndpoint endpoint) {
220                         if (log.isDebugEnabled()) {
221                             log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
222                         }
223                         callback.completed(InternalHttpAsyncExecRuntime.this);
224                     }
225 
226                     @Override
227                     public void failed(final Exception ex) {
228                         callback.failed(ex);
229                     }
230 
231                     @Override
232                     public void cancelled() {
233                         callback.cancelled();
234                     }
235 
236         }));
237 
238     }
239 
240     @Override
241     public void upgradeTls(final HttpClientContext context) {
242         final AsyncConnectionEndpoint endpoint = ensureValid();
243         if (log.isDebugEnabled()) {
244             log.debug("{} upgrading endpoint", ConnPoolSupport.getId(endpoint));
245         }
246         manager.upgrade(endpoint, versionPolicy, context);
247     }
248 
249     @Override
250     public Cancellable execute(
251             final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
252         final AsyncConnectionEndpoint endpoint = ensureValid();
253         if (endpoint.isConnected()) {
254             if (log.isDebugEnabled()) {
255                 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
256             }
257             final RequestConfig requestConfig = context.getRequestConfig();
258             final Timeout responseTimeout = requestConfig.getResponseTimeout();
259             if (responseTimeout != null) {
260                 endpoint.setSocketTimeout(responseTimeout);
261             }
262             endpoint.execute(id, exchangeHandler, context);
263             if (context.getRequestConfig().isHardCancellationEnabled()) {
264                 return new Cancellable() {
265                     @Override
266                     public boolean cancel() {
267                         exchangeHandler.cancel();
268                         return true;
269                     }
270                 };
271             }
272         } else {
273             connectEndpoint(context, new FutureCallback<AsyncExecRuntime>() {
274 
275                 @Override
276                 public void completed(final AsyncExecRuntime runtime) {
277                     if (log.isDebugEnabled()) {
278                         log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
279                     }
280                     try {
281                         endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
282                     } catch (final RuntimeException ex) {
283                         failed(ex);
284                     }
285                 }
286 
287                 @Override
288                 public void failed(final Exception ex) {
289                     exchangeHandler.failed(ex);
290                 }
291 
292                 @Override
293                 public void cancelled() {
294                     exchangeHandler.failed(new InterruptedIOException());
295                 }
296 
297             });
298         }
299         return Operations.nonCancellable();
300     }
301 
302     @Override
303     public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
304         reusable = true;
305         state = newState;
306         validDuration = newValidDuration;
307     }
308 
309     @Override
310     public void markConnectionNonReusable() {
311         reusable = false;
312         state = null;
313         validDuration = null;
314     }
315 
316     @Override
317     public AsyncExecRuntime fork() {
318         return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, versionPolicy);
319     }
320 
321 }