View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.hc.client5.http.HttpRoute;
34  import org.apache.hc.client5.http.async.AsyncExecRuntime;
35  import org.apache.hc.client5.http.config.RequestConfig;
36  import org.apache.hc.client5.http.impl.ConnPoolSupport;
37  import org.apache.hc.client5.http.impl.Operations;
38  import org.apache.hc.client5.http.protocol.HttpClientContext;
39  import org.apache.hc.core5.concurrent.Cancellable;
40  import org.apache.hc.core5.concurrent.ComplexCancellable;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.http.HttpHost;
43  import org.apache.hc.core5.http.HttpVersion;
44  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
45  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
46  import org.apache.hc.core5.http.nio.HandlerFactory;
47  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
48  import org.apache.hc.core5.io.CloseMode;
49  import org.apache.hc.core5.reactor.Command;
50  import org.apache.hc.core5.reactor.IOSession;
51  import org.apache.hc.core5.util.Identifiable;
52  import org.apache.hc.core5.util.TimeValue;
53  import org.apache.hc.core5.util.Timeout;
54  import org.slf4j.Logger;
55  
56  class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
57  
58      private final Logger log;
59      private final InternalH2ConnPool connPool;
60      private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
61      private final AtomicReference<Endpoint> sessionRef;
62      private volatile boolean reusable;
63  
64      InternalH2AsyncExecRuntime(
65              final Logger log,
66              final InternalH2ConnPool connPool,
67              final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
68          super();
69          this.log = log;
70          this.connPool = connPool;
71          this.pushHandlerFactory = pushHandlerFactory;
72          this.sessionRef = new AtomicReference<>();
73      }
74  
75      @Override
76      public boolean isEndpointAcquired() {
77          return sessionRef.get() != null;
78      }
79  
80      @Override
81      public Cancellable acquireEndpoint(
82              final String id,
83              final HttpRoute route,
84              final Object object,
85              final HttpClientContext context,
86              final FutureCallback<AsyncExecRuntime> callback) {
87          if (sessionRef.get() == null) {
88              final HttpHost target = route.getTargetHost();
89              final RequestConfig requestConfig = context.getRequestConfig();
90              @SuppressWarnings("deprecation")
91              final Timeout connectTimeout = requestConfig.getConnectTimeout();
92              if (log.isDebugEnabled()) {
93                  log.debug("{} acquiring endpoint ({})", id, connectTimeout);
94              }
95              return Operations.cancellable(connPool.getSession(target, connectTimeout,
96                      new FutureCallback<IOSession>() {
97  
98                          @Override
99                          public void completed(final IOSession ioSession) {
100                             sessionRef.set(new Endpoint(target, ioSession));
101                             reusable = true;
102                             if (log.isDebugEnabled()) {
103                                 log.debug("{} acquired endpoint", id);
104                             }
105                             callback.completed(InternalH2AsyncExecRuntime.this);
106                         }
107 
108                         @Override
109                         public void failed(final Exception ex) {
110                             callback.failed(ex);
111                         }
112 
113                         @Override
114                         public void cancelled() {
115                             callback.cancelled();
116                         }
117 
118                     }));
119         }
120         callback.completed(this);
121         return Operations.nonCancellable();
122     }
123 
124     private void closeEndpoint(final Endpoint endpoint) {
125         endpoint.session.close(CloseMode.GRACEFUL);
126         if (log.isDebugEnabled()) {
127             log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
128         }
129     }
130 
131     @Override
132     public void releaseEndpoint() {
133         final Endpoint endpoint = sessionRef.getAndSet(null);
134         if (endpoint != null && !reusable) {
135             closeEndpoint(endpoint);
136         }
137     }
138 
139     @Override
140     public void discardEndpoint() {
141         final Endpoint endpoint = sessionRef.getAndSet(null);
142         if (endpoint != null) {
143             closeEndpoint(endpoint);
144         }
145     }
146 
147     @Override
148     public boolean validateConnection() {
149         if (reusable) {
150             final Endpoint endpoint = sessionRef.get();
151             return endpoint != null && endpoint.session.isOpen();
152         }
153         final Endpoint endpoint = sessionRef.getAndSet(null);
154         if (endpoint != null) {
155             closeEndpoint(endpoint);
156         }
157         return false;
158     }
159 
160     @Override
161     public boolean isEndpointConnected() {
162         final Endpoint endpoint = sessionRef.get();
163         return endpoint != null && endpoint.session.isOpen();
164     }
165 
166 
167     Endpoint ensureValid() {
168         final Endpoint endpoint = sessionRef.get();
169         if (endpoint == null) {
170             throw new IllegalStateException("I/O session not acquired / already released");
171         }
172         return endpoint;
173     }
174 
175     @Override
176     public Cancellable connectEndpoint(
177             final HttpClientContext context,
178             final FutureCallback<AsyncExecRuntime> callback) {
179         final Endpoint endpoint = ensureValid();
180         if (endpoint.session.isOpen()) {
181             callback.completed(this);
182             return Operations.nonCancellable();
183         }
184         final HttpHost target = endpoint.target;
185         final RequestConfig requestConfig = context.getRequestConfig();
186         @SuppressWarnings("deprecation")
187         final Timeout connectTimeout = requestConfig.getConnectTimeout();
188         if (log.isDebugEnabled()) {
189             log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
190         }
191         return Operations.cancellable(connPool.getSession(target, connectTimeout,
192             new FutureCallback<IOSession>() {
193 
194             @Override
195             public void completed(final IOSession ioSession) {
196                 sessionRef.set(new Endpoint(target, ioSession));
197                 reusable = true;
198                 if (log.isDebugEnabled()) {
199                     log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
200                 }
201                 callback.completed(InternalH2AsyncExecRuntime.this);
202             }
203 
204             @Override
205             public void failed(final Exception ex) {
206                 callback.failed(ex);
207             }
208 
209             @Override
210             public void cancelled() {
211                 callback.cancelled();
212             }
213 
214         }));
215 
216     }
217 
218     @Override
219     public void disconnectEndpoint() {
220         final Endpoint endpoint = sessionRef.get();
221         if (endpoint != null) {
222             endpoint.session.close(CloseMode.GRACEFUL);
223         }
224     }
225 
226     @Override
227     public void upgradeTls(final HttpClientContext context) {
228         throw new UnsupportedOperationException();
229     }
230 
231     @Override
232     public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
233         throw new UnsupportedOperationException();
234     }
235 
236     @Override
237     public Cancellable execute(
238             final String id,
239             final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
240         final ComplexCancellable complexCancellable = new ComplexCancellable();
241         final Endpoint endpoint = ensureValid();
242         final IOSession session = endpoint.session;
243         if (session.isOpen()) {
244             if (log.isDebugEnabled()) {
245                 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
246             }
247             context.setProtocolVersion(HttpVersion.HTTP_2);
248             session.enqueue(
249                     new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
250                     Command.Priority.NORMAL);
251         } else {
252             final HttpHost target = endpoint.target;
253             final RequestConfig requestConfig = context.getRequestConfig();
254             @SuppressWarnings("deprecation")
255             final Timeout connectTimeout = requestConfig.getConnectTimeout();
256             connPool.getSession(target, connectTimeout, new FutureCallback<IOSession>() {
257 
258                 @Override
259                 public void completed(final IOSession ioSession) {
260                     sessionRef.set(new Endpoint(target, ioSession));
261                     reusable = true;
262                     if (log.isDebugEnabled()) {
263                         log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
264                     }
265                     context.setProtocolVersion(HttpVersion.HTTP_2);
266                     session.enqueue(
267                             new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
268                             Command.Priority.NORMAL);
269                 }
270 
271                 @Override
272                 public void failed(final Exception ex) {
273                     exchangeHandler.failed(ex);
274                 }
275 
276                 @Override
277                 public void cancelled() {
278                     exchangeHandler.failed(new InterruptedIOException());
279                 }
280 
281             });
282         }
283         return complexCancellable;
284     }
285 
286     @Override
287     public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
288         throw new UnsupportedOperationException();
289     }
290 
291     @Override
292     public void markConnectionNonReusable() {
293         reusable = false;
294     }
295 
296     static class Endpoint implements Identifiable {
297 
298         final HttpHost target;
299         final IOSession session;
300 
301         Endpoint(final HttpHost target, final IOSession session) {
302             this.target = target;
303             this.session = session;
304         }
305 
306         @Override
307         public String getId() {
308             return session.getId();
309         }
310 
311     }
312 
313     @Override
314     public AsyncExecRuntime fork() {
315         return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory);
316     }
317 
318 }