View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.hc.client5.http.HttpRoute;
34  import org.apache.hc.client5.http.async.AsyncExecRuntime;
35  import org.apache.hc.client5.http.config.RequestConfig;
36  import org.apache.hc.client5.http.impl.ConnPoolSupport;
37  import org.apache.hc.client5.http.impl.Operations;
38  import org.apache.hc.client5.http.protocol.HttpClientContext;
39  import org.apache.hc.core5.concurrent.Cancellable;
40  import org.apache.hc.core5.concurrent.ComplexCancellable;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.http.HttpHost;
43  import org.apache.hc.core5.http.HttpVersion;
44  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
45  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
46  import org.apache.hc.core5.http.nio.HandlerFactory;
47  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
48  import org.apache.hc.core5.io.CloseMode;
49  import org.apache.hc.core5.reactor.Command;
50  import org.apache.hc.core5.reactor.IOSession;
51  import org.apache.hc.core5.util.Identifiable;
52  import org.apache.hc.core5.util.TimeValue;
53  import org.apache.hc.core5.util.Timeout;
54  import org.slf4j.Logger;
55  
56  class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
57  
58      private final Logger log;
59      private final InternalH2ConnPool connPool;
60      private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
61      private final AtomicReference<Endpoint> sessionRef;
62      private volatile boolean reusable;
63  
64      InternalH2AsyncExecRuntime(
65              final Logger log,
66              final InternalH2ConnPool connPool,
67              final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
68          super();
69          this.log = log;
70          this.connPool = connPool;
71          this.pushHandlerFactory = pushHandlerFactory;
72          this.sessionRef = new AtomicReference<>();
73      }
74  
75      @Override
76      public boolean isEndpointAcquired() {
77          return sessionRef.get() != null;
78      }
79  
80      @Override
81      public Cancellable acquireEndpoint(
82              final String id,
83              final HttpRoute route,
84              final Object object,
85              final HttpClientContext context,
86              final FutureCallback<AsyncExecRuntime> callback) {
87          if (sessionRef.get() == null) {
88              final HttpHost target = route.getTargetHost();
89              final RequestConfig requestConfig = context.getRequestConfig();
90              @SuppressWarnings("deprecation")
91              final Timeout connectTimeout = requestConfig.getConnectTimeout();
92              if (log.isDebugEnabled()) {
93                  log.debug("{} acquiring endpoint ({})", id, connectTimeout);
94              }
95              return Operations.cancellable(connPool.getSession(target, connectTimeout,
96                      new FutureCallback<IOSession>() {
97  
98                          @Override
99                          public void completed(final IOSession ioSession) {
100                             sessionRef.set(new Endpoint(target, ioSession));
101                             reusable = true;
102                             if (log.isDebugEnabled()) {
103                                 log.debug("{} acquired endpoint", id);
104                             }
105                             context.setProtocolVersion(HttpVersion.HTTP_2);
106                             callback.completed(InternalH2AsyncExecRuntime.this);
107                         }
108 
109                         @Override
110                         public void failed(final Exception ex) {
111                             callback.failed(ex);
112                         }
113 
114                         @Override
115                         public void cancelled() {
116                             callback.cancelled();
117                         }
118 
119                     }));
120         }
121         callback.completed(this);
122         return Operations.nonCancellable();
123     }
124 
125     private void closeEndpoint(final Endpoint endpoint) {
126         endpoint.session.close(CloseMode.GRACEFUL);
127         if (log.isDebugEnabled()) {
128             log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
129         }
130     }
131 
132     @Override
133     public void releaseEndpoint() {
134         final Endpoint endpoint = sessionRef.getAndSet(null);
135         if (endpoint != null && !reusable) {
136             closeEndpoint(endpoint);
137         }
138     }
139 
140     @Override
141     public void discardEndpoint() {
142         final Endpoint endpoint = sessionRef.getAndSet(null);
143         if (endpoint != null) {
144             closeEndpoint(endpoint);
145         }
146     }
147 
148     @Override
149     public boolean validateConnection() {
150         if (reusable) {
151             final Endpoint endpoint = sessionRef.get();
152             return endpoint != null && endpoint.session.isOpen();
153         }
154         final Endpoint endpoint = sessionRef.getAndSet(null);
155         if (endpoint != null) {
156             closeEndpoint(endpoint);
157         }
158         return false;
159     }
160 
161     @Override
162     public boolean isEndpointConnected() {
163         final Endpoint endpoint = sessionRef.get();
164         return endpoint != null && endpoint.session.isOpen();
165     }
166 
167 
168     Endpoint ensureValid() {
169         final Endpoint endpoint = sessionRef.get();
170         if (endpoint == null) {
171             throw new IllegalStateException("I/O session not acquired / already released");
172         }
173         return endpoint;
174     }
175 
176     @Override
177     public Cancellable connectEndpoint(
178             final HttpClientContext context,
179             final FutureCallback<AsyncExecRuntime> callback) {
180         final Endpoint endpoint = ensureValid();
181         if (endpoint.session.isOpen()) {
182             callback.completed(this);
183             return Operations.nonCancellable();
184         }
185         final HttpHost target = endpoint.target;
186         final RequestConfig requestConfig = context.getRequestConfig();
187         @SuppressWarnings("deprecation")
188         final Timeout connectTimeout = requestConfig.getConnectTimeout();
189         if (log.isDebugEnabled()) {
190             log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
191         }
192         return Operations.cancellable(connPool.getSession(target, connectTimeout,
193             new FutureCallback<IOSession>() {
194 
195             @Override
196             public void completed(final IOSession ioSession) {
197                 sessionRef.set(new Endpoint(target, ioSession));
198                 reusable = true;
199                 if (log.isDebugEnabled()) {
200                     log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
201                 }
202                 callback.completed(InternalH2AsyncExecRuntime.this);
203             }
204 
205             @Override
206             public void failed(final Exception ex) {
207                 callback.failed(ex);
208             }
209 
210             @Override
211             public void cancelled() {
212                 callback.cancelled();
213             }
214 
215         }));
216 
217     }
218 
219     @Override
220     public void upgradeTls(final HttpClientContext context) {
221         throw new UnsupportedOperationException();
222     }
223 
224     @Override
225     public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
226         throw new UnsupportedOperationException();
227     }
228 
229     @Override
230     public Cancellable execute(
231             final String id,
232             final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
233         final ComplexCancellable complexCancellable = new ComplexCancellable();
234         final Endpoint endpoint = ensureValid();
235         final IOSession session = endpoint.session;
236         if (session.isOpen()) {
237             if (log.isDebugEnabled()) {
238                 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
239             }
240             session.enqueue(
241                     new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
242                     Command.Priority.NORMAL);
243         } else {
244             final HttpHost target = endpoint.target;
245             final RequestConfig requestConfig = context.getRequestConfig();
246             @SuppressWarnings("deprecation")
247             final Timeout connectTimeout = requestConfig.getConnectTimeout();
248             connPool.getSession(target, connectTimeout, new FutureCallback<IOSession>() {
249 
250                 @Override
251                 public void completed(final IOSession ioSession) {
252                     sessionRef.set(new Endpoint(target, ioSession));
253                     reusable = true;
254                     if (log.isDebugEnabled()) {
255                         log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
256                     }
257                     session.enqueue(
258                             new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
259                             Command.Priority.NORMAL);
260                 }
261 
262                 @Override
263                 public void failed(final Exception ex) {
264                     exchangeHandler.failed(ex);
265                 }
266 
267                 @Override
268                 public void cancelled() {
269                     exchangeHandler.failed(new InterruptedIOException());
270                 }
271 
272             });
273         }
274         return complexCancellable;
275     }
276 
277     @Override
278     public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
279         throw new UnsupportedOperationException();
280     }
281 
282     @Override
283     public void markConnectionNonReusable() {
284         reusable = false;
285     }
286 
287     static class Endpoint implements Identifiable {
288 
289         final HttpHost target;
290         final IOSession session;
291 
292         Endpoint(final HttpHost target, final IOSession session) {
293             this.target = target;
294             this.session = session;
295         }
296 
297         @Override
298         public String getId() {
299             return session.getId();
300         }
301 
302     }
303 
304     @Override
305     public AsyncExecRuntime fork() {
306         return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory);
307     }
308 
309 }