View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.nio.ByteBuffer;
32  import java.util.List;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
37  import org.apache.hc.client5.http.HttpRoute;
38  import org.apache.hc.client5.http.UserTokenHandler;
39  import org.apache.hc.client5.http.async.AsyncExecCallback;
40  import org.apache.hc.client5.http.async.AsyncExecChain;
41  import org.apache.hc.client5.http.async.AsyncExecChainHandler;
42  import org.apache.hc.client5.http.async.AsyncExecRuntime;
43  import org.apache.hc.client5.http.impl.ProtocolSwitchStrategy;
44  import org.apache.hc.client5.http.protocol.HttpClientContext;
45  import org.apache.hc.core5.annotation.Contract;
46  import org.apache.hc.core5.annotation.Internal;
47  import org.apache.hc.core5.annotation.ThreadingBehavior;
48  import org.apache.hc.core5.concurrent.CancellableDependency;
49  import org.apache.hc.core5.concurrent.FutureCallback;
50  import org.apache.hc.core5.http.EntityDetails;
51  import org.apache.hc.core5.http.Header;
52  import org.apache.hc.core5.http.HttpException;
53  import org.apache.hc.core5.http.HttpRequest;
54  import org.apache.hc.core5.http.HttpResponse;
55  import org.apache.hc.core5.http.HttpStatus;
56  import org.apache.hc.core5.http.ProtocolException;
57  import org.apache.hc.core5.http.ProtocolVersion;
58  import org.apache.hc.core5.http.message.RequestLine;
59  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
60  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
61  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
62  import org.apache.hc.core5.http.nio.CapacityChannel;
63  import org.apache.hc.core5.http.nio.DataStreamChannel;
64  import org.apache.hc.core5.http.nio.RequestChannel;
65  import org.apache.hc.core5.http.protocol.HttpContext;
66  import org.apache.hc.core5.http.protocol.HttpProcessor;
67  import org.apache.hc.core5.util.Args;
68  import org.apache.hc.core5.util.TimeValue;
69  import org.slf4j.Logger;
70  import org.slf4j.LoggerFactory;
71  
72  /**
73   * Usually the last HTTP/1.1 request execution handler in the asynchronous
74   * request execution chain that is responsible for execution of
75   * request/response exchanges with the opposite endpoint.
76   *
77   * @since 5.0
78   */
79  @Contract(threading = ThreadingBehavior.STATELESS)
80  @Internal
81  class HttpAsyncMainClientExec implements AsyncExecChainHandler {
82  
83      private static final Logger LOG = LoggerFactory.getLogger(HttpAsyncMainClientExec.class);
84  
85      private final HttpProcessor httpProcessor;
86      private final ConnectionKeepAliveStrategy keepAliveStrategy;
87      private final UserTokenHandler userTokenHandler;
88      private final ProtocolSwitchStrategy protocolSwitchStrategy;
89  
90      HttpAsyncMainClientExec(final HttpProcessor httpProcessor,
91                              final ConnectionKeepAliveStrategy keepAliveStrategy,
92                              final UserTokenHandler userTokenHandler) {
93          this.httpProcessor = Args.notNull(httpProcessor, "HTTP protocol processor");
94          this.keepAliveStrategy = keepAliveStrategy;
95          this.userTokenHandler = userTokenHandler;
96          this.protocolSwitchStrategy = new ProtocolSwitchStrategy();
97      }
98  
99      @Override
100     public void execute(
101             final HttpRequest request,
102             final AsyncEntityProducer entityProducer,
103             final AsyncExecChain.Scope scope,
104             final AsyncExecChain chain,
105             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
106         final String exchangeId = scope.exchangeId;
107         final HttpRoute route = scope.route;
108         final CancellableDependency operation = scope.cancellableDependency;
109         final HttpClientContext clientContext = scope.clientContext;
110         final AsyncExecRuntime execRuntime = scope.execRuntime;
111 
112         if (LOG.isDebugEnabled()) {
113             LOG.debug("{} executing {}", exchangeId, new RequestLine(request));
114         }
115 
116         final AtomicInteger messageCountDown = new AtomicInteger(2);
117         final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
118 
119             private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
120 
121             @Override
122             public void releaseResources() {
123                 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
124                 if (entityConsumer != null) {
125                     entityConsumer.releaseResources();
126                 }
127             }
128 
129             @Override
130             public void failed(final Exception cause) {
131                 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
132                 if (entityConsumer != null) {
133                     entityConsumer.releaseResources();
134                 }
135                 execRuntime.markConnectionNonReusable();
136                 asyncExecCallback.failed(cause);
137             }
138 
139             @Override
140             public void cancel() {
141                 if (messageCountDown.get() > 0) {
142                     failed(new InterruptedIOException());
143                 }
144             }
145 
146             @Override
147             public void produceRequest(
148                     final RequestChannel channel,
149                     final HttpContext context) throws HttpException, IOException {
150 
151                 clientContext.setRoute(route);
152                 clientContext.setRequest(request);
153                 httpProcessor.process(request, entityProducer, clientContext);
154 
155                 channel.sendRequest(request, entityProducer, context);
156                 if (entityProducer == null) {
157                     messageCountDown.decrementAndGet();
158                 }
159             }
160 
161             @Override
162             public int available() {
163                 return entityProducer.available();
164             }
165 
166             @Override
167             public void produce(final DataStreamChannel channel) throws IOException {
168                 entityProducer.produce(new DataStreamChannel() {
169 
170                     @Override
171                     public void requestOutput() {
172                         channel.requestOutput();
173                     }
174 
175                     @Override
176                     public int write(final ByteBuffer src) throws IOException {
177                         return channel.write(src);
178                     }
179 
180                     @Override
181                     public void endStream(final List<? extends Header> trailers) throws IOException {
182                         channel.endStream(trailers);
183                         if (messageCountDown.decrementAndGet() <= 0) {
184                             asyncExecCallback.completed();
185                         }
186                     }
187 
188                     @Override
189                     public void endStream() throws IOException {
190                         channel.endStream();
191                         if (messageCountDown.decrementAndGet() <= 0) {
192                             asyncExecCallback.completed();
193                         }
194                     }
195 
196                 });
197             }
198 
199             @Override
200             public void consumeInformation(
201                     final HttpResponse response,
202                     final HttpContext context) throws HttpException, IOException {
203                 if (response.getCode() == HttpStatus.SC_SWITCHING_PROTOCOLS) {
204                     final ProtocolVersion upgradeProtocol = protocolSwitchStrategy.switchProtocol(response);
205                     if (upgradeProtocol == null || !upgradeProtocol.getProtocol().equals("TLS")) {
206                         throw new ProtocolException("Failure switching protocols");
207                     }
208                     if (LOG.isDebugEnabled()) {
209                         LOG.debug("Switching to {}", upgradeProtocol);
210                     }
211                     execRuntime.upgradeTls(clientContext, new FutureCallback<AsyncExecRuntime>() {
212 
213                         @Override
214                         public void completed(final AsyncExecRuntime result) {
215                             LOG.debug("Successfully switched to {}", upgradeProtocol);
216                         }
217 
218                         @Override
219                         public void failed(final Exception ex) {
220                             asyncExecCallback.failed(ex);
221                         }
222 
223                         @Override
224                         public void cancelled() {
225                             asyncExecCallback.failed(new InterruptedIOException());
226                         }
227 
228                     });
229                 } else {
230                     asyncExecCallback.handleInformationResponse(response);
231                 }
232             }
233 
234             @Override
235             public void consumeResponse(
236                     final HttpResponse response,
237                     final EntityDetails entityDetails,
238                     final HttpContext context) throws HttpException, IOException {
239 
240                 clientContext.setResponse(response);
241                 httpProcessor.process(response, entityDetails, clientContext);
242 
243                 entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
244                 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
245                     messageCountDown.decrementAndGet();
246                 }
247                 final TimeValue keepAliveDuration = keepAliveStrategy.getKeepAliveDuration(response, clientContext);
248                 Object userToken = clientContext.getUserToken();
249                 if (userToken == null) {
250                     userToken = userTokenHandler.getUserToken(route, request, clientContext);
251                     clientContext.setUserToken(userToken);
252                 }
253                 execRuntime.markConnectionReusable(userToken, keepAliveDuration);
254                 if (entityDetails == null) {
255                     execRuntime.validateConnection();
256                     if (messageCountDown.decrementAndGet() <= 0) {
257                         asyncExecCallback.completed();
258                     }
259                 }
260             }
261 
262             @Override
263             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
264                 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
265                 if (entityConsumer != null) {
266                     entityConsumer.updateCapacity(capacityChannel);
267                 } else {
268                     capacityChannel.update(Integer.MAX_VALUE);
269                 }
270             }
271 
272             @Override
273             public void consume(final ByteBuffer src) throws IOException {
274                 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
275                 if (entityConsumer != null) {
276                     entityConsumer.consume(src);
277                 }
278             }
279 
280             @Override
281             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
282                 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
283                 if (entityConsumer != null) {
284                     entityConsumer.streamEnd(trailers);
285                 } else {
286                     execRuntime.validateConnection();
287                 }
288                 if (messageCountDown.decrementAndGet() <= 0) {
289                     asyncExecCallback.completed();
290                 }
291             }
292 
293         };
294 
295         if (LOG.isDebugEnabled()) {
296             operation.setDependency(execRuntime.execute(
297                     exchangeId,
298                     new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
299                     clientContext));
300         } else {
301             operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
302         }
303     }
304 
305 }