View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.hc.client5.http.HttpRoute;
34  import org.apache.hc.client5.http.async.AsyncExecRuntime;
35  import org.apache.hc.client5.http.config.RequestConfig;
36  import org.apache.hc.client5.http.impl.ConnPoolSupport;
37  import org.apache.hc.client5.http.impl.Operations;
38  import org.apache.hc.client5.http.protocol.HttpClientContext;
39  import org.apache.hc.core5.concurrent.Cancellable;
40  import org.apache.hc.core5.concurrent.ComplexCancellable;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.http.HttpHost;
43  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
44  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
45  import org.apache.hc.core5.http.nio.HandlerFactory;
46  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
47  import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
48  import org.apache.hc.core5.io.CloseMode;
49  import org.apache.hc.core5.reactor.Command;
50  import org.apache.hc.core5.reactor.IOSession;
51  import org.apache.hc.core5.util.Identifiable;
52  import org.apache.hc.core5.util.TimeValue;
53  import org.apache.hc.core5.util.Timeout;
54  import org.slf4j.Logger;
55  
56  class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
57  
58      private final Logger log;
59      private final H2ConnPool connPool;
60      private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
61      private final AtomicReference<Endpoint> sessionRef;
62      private volatile boolean reusable;
63  
64      InternalH2AsyncExecRuntime(
65              final Logger log,
66              final H2ConnPool connPool,
67              final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
68          super();
69          this.log = log;
70          this.connPool = connPool;
71          this.pushHandlerFactory = pushHandlerFactory;
72          this.sessionRef = new AtomicReference<>();
73      }
74  
75      @Override
76      public boolean isEndpointAcquired() {
77          return sessionRef.get() != null;
78      }
79  
80      @Override
81      public Cancellable acquireEndpoint(
82              final String id,
83              final HttpRoute route,
84              final Object object,
85              final HttpClientContext context,
86              final FutureCallback<AsyncExecRuntime> callback) {
87          if (sessionRef.get() == null) {
88              final HttpHost target = route.getTargetHost();
89              final RequestConfig requestConfig = context.getRequestConfig();
90              final Timeout connectTimeout = requestConfig.getConnectTimeout();
91              if (log.isDebugEnabled()) {
92                  log.debug("{} acquiring endpoint ({})", id, connectTimeout);
93              }
94              return Operations.cancellable(connPool.getSession(
95                      target,
96                      connectTimeout,
97                      new FutureCallback<IOSession>() {
98  
99                          @Override
100                         public void completed(final IOSession ioSession) {
101                             sessionRef.set(new Endpoint(target, ioSession));
102                             reusable = true;
103                             if (log.isDebugEnabled()) {
104                                 log.debug("{} acquired endpoint", id);
105                             }
106                             callback.completed(InternalH2AsyncExecRuntime.this);
107                         }
108 
109                         @Override
110                         public void failed(final Exception ex) {
111                             callback.failed(ex);
112                         }
113 
114                         @Override
115                         public void cancelled() {
116                             callback.cancelled();
117                         }
118 
119                     }));
120         }
121         callback.completed(this);
122         return Operations.nonCancellable();
123     }
124 
125     private void closeEndpoint(final Endpoint endpoint) {
126         endpoint.session.close(CloseMode.GRACEFUL);
127         if (log.isDebugEnabled()) {
128             log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
129         }
130     }
131 
132     @Override
133     public void releaseEndpoint() {
134         final Endpoint endpoint = sessionRef.getAndSet(null);
135         if (endpoint != null && !reusable) {
136             closeEndpoint(endpoint);
137         }
138     }
139 
140     @Override
141     public void discardEndpoint() {
142         final Endpoint endpoint = sessionRef.getAndSet(null);
143         if (endpoint != null) {
144             closeEndpoint(endpoint);
145         }
146     }
147 
148     @Override
149     public boolean validateConnection() {
150         if (reusable) {
151             final Endpoint endpoint = sessionRef.get();
152             return endpoint != null && endpoint.session.isOpen();
153         }
154         final Endpoint endpoint = sessionRef.getAndSet(null);
155         if (endpoint != null) {
156             closeEndpoint(endpoint);
157         }
158         return false;
159     }
160 
161     @Override
162     public boolean isEndpointConnected() {
163         final Endpoint endpoint = sessionRef.get();
164         return endpoint != null && endpoint.session.isOpen();
165     }
166 
167 
168     Endpoint ensureValid() {
169         final Endpoint endpoint = sessionRef.get();
170         if (endpoint == null) {
171             throw new IllegalStateException("I/O session not acquired / already released");
172         }
173         return endpoint;
174     }
175 
176     @Override
177     public Cancellable connectEndpoint(
178             final HttpClientContext context,
179             final FutureCallback<AsyncExecRuntime> callback) {
180         final Endpoint endpoint = ensureValid();
181         if (endpoint.session.isOpen()) {
182             callback.completed(this);
183             return Operations.nonCancellable();
184         }
185         final HttpHost target = endpoint.target;
186         final RequestConfig requestConfig = context.getRequestConfig();
187         final Timeout connectTimeout = requestConfig.getConnectTimeout();
188         if (log.isDebugEnabled()) {
189             log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
190         }
191         return Operations.cancellable(connPool.getSession(target, connectTimeout,
192             new FutureCallback<IOSession>() {
193 
194             @Override
195             public void completed(final IOSession ioSession) {
196                 sessionRef.set(new Endpoint(target, ioSession));
197                 reusable = true;
198                 if (log.isDebugEnabled()) {
199                     log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
200                 }
201                 callback.completed(InternalH2AsyncExecRuntime.this);
202             }
203 
204             @Override
205             public void failed(final Exception ex) {
206                 callback.failed(ex);
207             }
208 
209             @Override
210             public void cancelled() {
211                 callback.cancelled();
212             }
213 
214         }));
215 
216     }
217 
218     @Override
219     public void upgradeTls(final HttpClientContext context) {
220         throw new UnsupportedOperationException();
221     }
222 
223     @Override
224     public Cancellable execute(
225             final String id,
226             final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
227         final ComplexCancellable complexCancellable = new ComplexCancellable();
228         final Endpoint endpoint = ensureValid();
229         final IOSession session = endpoint.session;
230         if (session.isOpen()) {
231             if (log.isDebugEnabled()) {
232                 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
233             }
234             session.enqueue(
235                     new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
236                     Command.Priority.NORMAL);
237         } else {
238             final HttpHost target = endpoint.target;
239             final RequestConfig requestConfig = context.getRequestConfig();
240             final Timeout connectTimeout = requestConfig.getConnectTimeout();
241             connPool.getSession(target, connectTimeout, new FutureCallback<IOSession>() {
242 
243                 @Override
244                 public void completed(final IOSession ioSession) {
245                     sessionRef.set(new Endpoint(target, ioSession));
246                     reusable = true;
247                     if (log.isDebugEnabled()) {
248                         log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
249                     }
250                     session.enqueue(
251                             new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
252                             Command.Priority.NORMAL);
253                 }
254 
255                 @Override
256                 public void failed(final Exception ex) {
257                     exchangeHandler.failed(ex);
258                 }
259 
260                 @Override
261                 public void cancelled() {
262                     exchangeHandler.failed(new InterruptedIOException());
263                 }
264 
265             });
266         }
267         return complexCancellable;
268     }
269 
270     @Override
271     public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
272         throw new UnsupportedOperationException();
273     }
274 
275     @Override
276     public void markConnectionNonReusable() {
277         reusable = false;
278     }
279 
280     static class Endpoint implements Identifiable {
281 
282         final HttpHost target;
283         final IOSession session;
284 
285         Endpoint(final HttpHost target, final IOSession session) {
286             this.target = target;
287             this.session = session;
288         }
289 
290         @Override
291         public String getId() {
292             return session.getId();
293         }
294 
295     }
296 
297     @Override
298     public AsyncExecRuntime fork() {
299         return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory);
300     }
301 
302 }