View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.hc.client5.http.EndpointInfo;
34  import org.apache.hc.client5.http.HttpRoute;
35  import org.apache.hc.client5.http.async.AsyncExecRuntime;
36  import org.apache.hc.client5.http.config.RequestConfig;
37  import org.apache.hc.client5.http.impl.ConnPoolSupport;
38  import org.apache.hc.client5.http.impl.Operations;
39  import org.apache.hc.client5.http.protocol.HttpClientContext;
40  import org.apache.hc.core5.concurrent.Cancellable;
41  import org.apache.hc.core5.concurrent.ComplexCancellable;
42  import org.apache.hc.core5.concurrent.FutureCallback;
43  import org.apache.hc.core5.http.HttpVersion;
44  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
45  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
46  import org.apache.hc.core5.http.nio.HandlerFactory;
47  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
48  import org.apache.hc.core5.io.CloseMode;
49  import org.apache.hc.core5.reactor.Command;
50  import org.apache.hc.core5.reactor.IOSession;
51  import org.apache.hc.core5.reactor.ssl.TlsDetails;
52  import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
53  import org.apache.hc.core5.util.Identifiable;
54  import org.apache.hc.core5.util.TimeValue;
55  import org.apache.hc.core5.util.Timeout;
56  import org.slf4j.Logger;
57  
58  class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
59  
60      private final Logger log;
61      private final InternalH2ConnPool connPool;
62      private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
63      private final AtomicReference<Endpoint> sessionRef;
64      private volatile boolean reusable;
65  
66      InternalH2AsyncExecRuntime(
67              final Logger log,
68              final InternalH2ConnPool connPool,
69              final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
70          super();
71          this.log = log;
72          this.connPool = connPool;
73          this.pushHandlerFactory = pushHandlerFactory;
74          this.sessionRef = new AtomicReference<>();
75      }
76  
77      @Override
78      public boolean isEndpointAcquired() {
79          return sessionRef.get() != null;
80      }
81  
82      @Override
83      public Cancellable acquireEndpoint(
84              final String id,
85              final HttpRoute route,
86              final Object object,
87              final HttpClientContext context,
88              final FutureCallback<AsyncExecRuntime> callback) {
89          if (sessionRef.get() == null) {
90              final RequestConfig requestConfig = context.getRequestConfigOrDefault();
91              @SuppressWarnings("deprecation")
92              final Timeout connectTimeout = requestConfig.getConnectTimeout();
93              if (log.isDebugEnabled()) {
94                  log.debug("{} acquiring endpoint ({})", id, connectTimeout);
95              }
96              return Operations.cancellable(connPool.getSession(route, connectTimeout,
97                      new FutureCallback<IOSession>() {
98  
99                          @Override
100                         public void completed(final IOSession ioSession) {
101                             sessionRef.set(new Endpoint(route, ioSession));
102                             reusable = true;
103                             if (log.isDebugEnabled()) {
104                                 log.debug("{} acquired endpoint", id);
105                             }
106                             callback.completed(InternalH2AsyncExecRuntime.this);
107                         }
108 
109                         @Override
110                         public void failed(final Exception ex) {
111                             callback.failed(ex);
112                         }
113 
114                         @Override
115                         public void cancelled() {
116                             callback.cancelled();
117                         }
118 
119                     }));
120         }
121         callback.completed(this);
122         return Operations.nonCancellable();
123     }
124 
125     private void closeEndpoint(final Endpoint endpoint) {
126         endpoint.session.close(CloseMode.GRACEFUL);
127         if (log.isDebugEnabled()) {
128             log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
129         }
130     }
131 
132     @Override
133     public void releaseEndpoint() {
134         final Endpoint endpoint = sessionRef.getAndSet(null);
135         if (endpoint != null && !reusable) {
136             closeEndpoint(endpoint);
137         }
138     }
139 
140     @Override
141     public void discardEndpoint() {
142         final Endpoint endpoint = sessionRef.getAndSet(null);
143         if (endpoint != null) {
144             closeEndpoint(endpoint);
145         }
146     }
147 
148     @Override
149     public boolean validateConnection() {
150         if (reusable) {
151             final Endpoint endpoint = sessionRef.get();
152             return endpoint != null && endpoint.session.isOpen();
153         }
154         final Endpoint endpoint = sessionRef.getAndSet(null);
155         if (endpoint != null) {
156             closeEndpoint(endpoint);
157         }
158         return false;
159     }
160 
161     @Override
162     public boolean isEndpointConnected() {
163         final Endpoint endpoint = sessionRef.get();
164         return endpoint != null && endpoint.session.isOpen();
165     }
166 
167 
168     Endpoint ensureValid() {
169         final Endpoint endpoint = sessionRef.get();
170         if (endpoint == null) {
171             throw new IllegalStateException("I/O session not acquired / already released");
172         }
173         return endpoint;
174     }
175 
176     @Override
177     public Cancellable connectEndpoint(
178             final HttpClientContext context,
179             final FutureCallback<AsyncExecRuntime> callback) {
180         final Endpoint endpoint = ensureValid();
181         if (endpoint.session.isOpen()) {
182             callback.completed(this);
183             return Operations.nonCancellable();
184         }
185         final HttpRoute route = endpoint.route;
186         final RequestConfig requestConfig = context.getRequestConfigOrDefault();
187         @SuppressWarnings("deprecation")
188         final Timeout connectTimeout = requestConfig.getConnectTimeout();
189         if (log.isDebugEnabled()) {
190             log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
191         }
192         return Operations.cancellable(connPool.getSession(route, connectTimeout,
193             new FutureCallback<IOSession>() {
194 
195             @Override
196             public void completed(final IOSession ioSession) {
197                 sessionRef.set(new Endpoint(route, ioSession));
198                 reusable = true;
199                 if (log.isDebugEnabled()) {
200                     log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
201                 }
202                 callback.completed(InternalH2AsyncExecRuntime.this);
203             }
204 
205             @Override
206             public void failed(final Exception ex) {
207                 callback.failed(ex);
208             }
209 
210             @Override
211             public void cancelled() {
212                 callback.cancelled();
213             }
214 
215         }));
216 
217     }
218 
219     @Override
220     public void disconnectEndpoint() {
221         final Endpoint endpoint = sessionRef.get();
222         if (endpoint != null) {
223             endpoint.session.close(CloseMode.GRACEFUL);
224         }
225     }
226 
227     @Override
228     public void upgradeTls(final HttpClientContext context) {
229         throw new UnsupportedOperationException();
230     }
231 
232     @Override
233     public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
234         throw new UnsupportedOperationException();
235     }
236 
237     public EndpointInfo getEndpointInfo() {
238         final Endpoint endpoint = sessionRef.get();
239         if (endpoint != null && endpoint.session.isOpen()) {
240             if (endpoint.session instanceof TransportSecurityLayer) {
241                 final TlsDetails tlsDetails = ((TransportSecurityLayer) endpoint.session).getTlsDetails();
242                 return new EndpointInfo(HttpVersion.HTTP_2, tlsDetails != null ? tlsDetails.getSSLSession() : null);
243             }
244         }
245         return null;
246     }
247 
248     @Override
249     public Cancellable execute(
250             final String id,
251             final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
252         final ComplexCancellable complexCancellable = new ComplexCancellable();
253         final Endpoint endpoint = ensureValid();
254         final IOSession session = endpoint.session;
255         if (session.isOpen()) {
256             if (log.isDebugEnabled()) {
257                 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
258             }
259             context.setProtocolVersion(HttpVersion.HTTP_2);
260             session.enqueue(
261                     new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
262                     Command.Priority.NORMAL);
263         } else {
264             final HttpRoute route = endpoint.route;
265             final RequestConfig requestConfig = context.getRequestConfigOrDefault();
266             @SuppressWarnings("deprecation")
267             final Timeout connectTimeout = requestConfig.getConnectTimeout();
268             connPool.getSession(route, connectTimeout, new FutureCallback<IOSession>() {
269 
270                 @Override
271                 public void completed(final IOSession ioSession) {
272                     sessionRef.set(new Endpoint(route, ioSession));
273                     reusable = true;
274                     if (log.isDebugEnabled()) {
275                         log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
276                     }
277                     context.setProtocolVersion(HttpVersion.HTTP_2);
278                     session.enqueue(
279                             new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
280                             Command.Priority.NORMAL);
281                 }
282 
283                 @Override
284                 public void failed(final Exception ex) {
285                     exchangeHandler.failed(ex);
286                 }
287 
288                 @Override
289                 public void cancelled() {
290                     exchangeHandler.failed(new InterruptedIOException());
291                 }
292 
293             });
294         }
295         return complexCancellable;
296     }
297 
298     @Override
299     public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
300         throw new UnsupportedOperationException();
301     }
302 
303     @Override
304     public void markConnectionNonReusable() {
305         reusable = false;
306     }
307 
308     static class Endpoint implements Identifiable {
309 
310         final HttpRoute route;
311         final IOSession session;
312 
313         Endpoint(final HttpRoute route, final IOSession session) {
314             this.route = route;
315             this.session = session;
316         }
317 
318         @Override
319         public String getId() {
320             return session.getId();
321         }
322 
323     }
324 
325     @Override
326     public AsyncExecRuntime fork() {
327         return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory);
328     }
329 
330 }