View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.nio.ByteBuffer;
33  import java.util.List;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.hc.client5.http.AuthenticationStrategy;
37  import org.apache.hc.client5.http.EndpointInfo;
38  import org.apache.hc.client5.http.HttpRoute;
39  import org.apache.hc.client5.http.RouteTracker;
40  import org.apache.hc.client5.http.SchemePortResolver;
41  import org.apache.hc.client5.http.async.AsyncExecCallback;
42  import org.apache.hc.client5.http.async.AsyncExecChain;
43  import org.apache.hc.client5.http.async.AsyncExecChainHandler;
44  import org.apache.hc.client5.http.async.AsyncExecRuntime;
45  import org.apache.hc.client5.http.auth.AuthExchange;
46  import org.apache.hc.client5.http.auth.ChallengeType;
47  import org.apache.hc.client5.http.config.RequestConfig;
48  import org.apache.hc.client5.http.impl.auth.AuthCacheKeeper;
49  import org.apache.hc.client5.http.impl.auth.HttpAuthenticator;
50  import org.apache.hc.client5.http.impl.routing.BasicRouteDirector;
51  import org.apache.hc.client5.http.protocol.HttpClientContext;
52  import org.apache.hc.client5.http.routing.HttpRouteDirector;
53  import org.apache.hc.core5.annotation.Contract;
54  import org.apache.hc.core5.annotation.Internal;
55  import org.apache.hc.core5.annotation.ThreadingBehavior;
56  import org.apache.hc.core5.concurrent.CancellableDependency;
57  import org.apache.hc.core5.concurrent.FutureCallback;
58  import org.apache.hc.core5.http.EntityDetails;
59  import org.apache.hc.core5.http.Header;
60  import org.apache.hc.core5.http.HttpException;
61  import org.apache.hc.core5.http.HttpHost;
62  import org.apache.hc.core5.http.HttpRequest;
63  import org.apache.hc.core5.http.HttpResponse;
64  import org.apache.hc.core5.http.HttpStatus;
65  import org.apache.hc.core5.http.HttpVersion;
66  import org.apache.hc.core5.http.Method;
67  import org.apache.hc.core5.http.message.BasicHttpRequest;
68  import org.apache.hc.core5.http.message.StatusLine;
69  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
70  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
71  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
72  import org.apache.hc.core5.http.nio.CapacityChannel;
73  import org.apache.hc.core5.http.nio.DataStreamChannel;
74  import org.apache.hc.core5.http.nio.RequestChannel;
75  import org.apache.hc.core5.http.protocol.HttpContext;
76  import org.apache.hc.core5.http.protocol.HttpProcessor;
77  import org.apache.hc.core5.util.Args;
78  import org.slf4j.Logger;
79  import org.slf4j.LoggerFactory;
80  
81  /**
82   * Request execution handler in the asynchronous request execution chain
83   * that is responsible for establishing connection to the target
84   * origin server as specified by the current connection route.
85   *
86   * @since 5.0
87   */
88  @Contract(threading = ThreadingBehavior.STATELESS)
89  @Internal
90  public final class AsyncConnectExec implements AsyncExecChainHandler {
91  
92      private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectExec.class);
93  
94      private final HttpProcessor proxyHttpProcessor;
95      private final AuthenticationStrategy proxyAuthStrategy;
96      private final HttpAuthenticator authenticator;
97      private final AuthCacheKeeper authCacheKeeper;
98      private final HttpRouteDirector routeDirector;
99  
100     public AsyncConnectExec(
101             final HttpProcessor proxyHttpProcessor,
102             final AuthenticationStrategy proxyAuthStrategy,
103             final SchemePortResolver schemePortResolver,
104             final boolean authCachingDisabled) {
105         Args.notNull(proxyHttpProcessor, "Proxy HTTP processor");
106         Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
107         this.proxyHttpProcessor = proxyHttpProcessor;
108         this.proxyAuthStrategy  = proxyAuthStrategy;
109         this.authenticator = new HttpAuthenticator();
110         this.authCacheKeeper = authCachingDisabled ? null : new AuthCacheKeeper(schemePortResolver);
111         this.routeDirector = BasicRouteDirector.INSTANCE;
112     }
113 
114     static class State {
115 
116         State(final HttpRoute route) {
117             tracker = new RouteTracker(route);
118         }
119 
120         final RouteTracker tracker;
121 
122         volatile boolean challenged;
123         volatile HttpResponse response;
124         volatile boolean tunnelRefused;
125 
126     }
127 
128     @Override
129     public void execute(
130             final HttpRequest request,
131             final AsyncEntityProducer entityProducer,
132             final AsyncExecChain.Scope scope,
133             final AsyncExecChain chain,
134             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
135         Args.notNull(request, "HTTP request");
136         Args.notNull(scope, "Scope");
137 
138         final String exchangeId = scope.exchangeId;
139         final HttpRoute route = scope.route;
140         final CancellableDependency cancellableDependency = scope.cancellableDependency;
141         final HttpClientContext clientContext = scope.clientContext;
142         final AsyncExecRuntime execRuntime = scope.execRuntime;
143         final State state = new State(route);
144 
145         if (!execRuntime.isEndpointAcquired()) {
146             final Object userToken = clientContext.getUserToken();
147             if (LOG.isDebugEnabled()) {
148                 LOG.debug("{} acquiring connection with route {}", exchangeId, route);
149             }
150             cancellableDependency.setDependency(execRuntime.acquireEndpoint(
151                     exchangeId, route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
152 
153                         @Override
154                         public void completed(final AsyncExecRuntime execRuntime) {
155                             if (execRuntime.isEndpointConnected()) {
156                                 try {
157                                     chain.proceed(request, entityProducer, scope, asyncExecCallback);
158                                 } catch (final HttpException | IOException ex) {
159                                     asyncExecCallback.failed(ex);
160                                 }
161                             } else {
162                                 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
163                             }
164                         }
165 
166                         @Override
167                         public void failed(final Exception ex) {
168                             asyncExecCallback.failed(ex);
169                         }
170 
171                         @Override
172                         public void cancelled() {
173                             asyncExecCallback.failed(new InterruptedIOException());
174                         }
175 
176                     }));
177         } else {
178             if (execRuntime.isEndpointConnected()) {
179                 proceedConnected(request, entityProducer, scope, chain, asyncExecCallback);
180             } else {
181                 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
182             }
183         }
184 
185     }
186 
187     private void proceedToNextHop(
188             final State state,
189             final HttpRequest request,
190             final AsyncEntityProducer entityProducer,
191             final AsyncExecChain.Scope scope,
192             final AsyncExecChain chain,
193             final AsyncExecCallback asyncExecCallback) {
194         final RouteTracker tracker = state.tracker;
195         final String exchangeId = scope.exchangeId;
196         final HttpRoute route = scope.route;
197         final AsyncExecRuntime execRuntime = scope.execRuntime;
198         final CancellableDependency operation = scope.cancellableDependency;
199         final HttpClientContext clientContext = scope.clientContext;
200 
201         final HttpRoute fact = tracker.toRoute();
202         final int step = routeDirector.nextStep(route, fact);
203 
204         switch (step) {
205             case HttpRouteDirector.CONNECT_TARGET:
206                 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
207 
208                     @Override
209                     public void completed(final AsyncExecRuntime execRuntime) {
210                         tracker.connectTarget(route.isSecure());
211                         if (LOG.isDebugEnabled()) {
212                             LOG.debug("{} connected to target", exchangeId);
213                         }
214                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
215                     }
216 
217                     @Override
218                     public void failed(final Exception ex) {
219                         asyncExecCallback.failed(ex);
220                     }
221 
222                     @Override
223                     public void cancelled() {
224                         asyncExecCallback.failed(new InterruptedIOException());
225                     }
226 
227                 }));
228                 break;
229 
230             case HttpRouteDirector.CONNECT_PROXY:
231                 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
232 
233                     @Override
234                     public void completed(final AsyncExecRuntime execRuntime) {
235                         final HttpHost proxy  = route.getProxyHost();
236                         tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
237                         if (LOG.isDebugEnabled()) {
238                             LOG.debug("{} connected to proxy", exchangeId);
239                         }
240                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
241                     }
242 
243                     @Override
244                     public void failed(final Exception ex) {
245                         asyncExecCallback.failed(ex);
246                     }
247 
248                     @Override
249                     public void cancelled() {
250                         asyncExecCallback.failed(new InterruptedIOException());
251                     }
252 
253                 }));
254                 break;
255 
256                 case HttpRouteDirector.TUNNEL_TARGET:
257                     try {
258                         final HttpHost proxy = route.getProxyHost();
259                         final HttpHost target = route.getTargetHost();
260                         if (LOG.isDebugEnabled()) {
261                             LOG.debug("{} create tunnel", exchangeId);
262                         }
263                         createTunnel(state, proxy, target, scope, new AsyncExecCallback() {
264 
265                             @Override
266                             public AsyncDataConsumer handleResponse(
267                                     final HttpResponse response,
268                                     final EntityDetails entityDetails) throws HttpException, IOException {
269                                 return asyncExecCallback.handleResponse(response, entityDetails);
270                             }
271 
272                             @Override
273                             public void handleInformationResponse(
274                                     final HttpResponse response) throws HttpException, IOException {
275                                 asyncExecCallback.handleInformationResponse(response);
276                             }
277 
278                             @Override
279                             public void completed() {
280                                 if (!execRuntime.isEndpointConnected()) {
281                                     // Remote endpoint disconnected. Need to start over
282                                     if (LOG.isDebugEnabled()) {
283                                         LOG.debug("{} proxy disconnected", exchangeId);
284                                     }
285                                     state.tracker.reset();
286                                 }
287                                 if (state.challenged) {
288                                     if (LOG.isDebugEnabled()) {
289                                         LOG.debug("{} proxy authentication required", exchangeId);
290                                     }
291                                     proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
292                                 } else {
293                                     if (state.tunnelRefused) {
294                                         if (LOG.isDebugEnabled()) {
295                                             LOG.debug("{} tunnel refused", exchangeId);
296                                         }
297                                         asyncExecCallback.completed();
298                                     } else {
299                                         if (LOG.isDebugEnabled()) {
300                                             LOG.debug("{} tunnel to target created", exchangeId);
301                                         }
302                                         tracker.tunnelTarget(false);
303                                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
304                                     }
305                                 }
306                             }
307 
308                         @Override
309                         public void failed(final Exception cause) {
310                             execRuntime.markConnectionNonReusable();
311                             asyncExecCallback.failed(cause);
312                         }
313 
314                     });
315                 } catch (final HttpException | IOException ex) {
316                     asyncExecCallback.failed(ex);
317                 }
318                 break;
319 
320             case HttpRouteDirector.TUNNEL_PROXY:
321                 // The most simple example for this case is a proxy chain
322                 // of two proxies, where P1 must be tunnelled to P2.
323                 // route: Source -> P1 -> P2 -> Target (3 hops)
324                 // fact:  Source -> P1 -> Target       (2 hops)
325                 asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
326                 break;
327 
328             case HttpRouteDirector.LAYER_PROTOCOL:
329                 execRuntime.upgradeTls(clientContext, new FutureCallback<AsyncExecRuntime>() {
330 
331                     @Override
332                     public void completed(final AsyncExecRuntime asyncExecRuntime) {
333                         if (LOG.isDebugEnabled()) {
334                             LOG.debug("{} upgraded to TLS", exchangeId);
335                         }
336                         tracker.layerProtocol(route.isSecure());
337                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
338                     }
339 
340                     @Override
341                     public void failed(final Exception ex) {
342                         asyncExecCallback.failed(ex);
343                     }
344 
345                     @Override
346                     public void cancelled() {
347                         asyncExecCallback.failed(new InterruptedIOException());
348                     }
349 
350                 });
351                 break;
352 
353             case HttpRouteDirector.UNREACHABLE:
354                 asyncExecCallback.failed(new HttpException("Unable to establish route: " +
355                         "planned = " + route + "; current = " + fact));
356                 break;
357 
358             case HttpRouteDirector.COMPLETE:
359                 if (LOG.isDebugEnabled()) {
360                     LOG.debug("{} route fully established", exchangeId);
361                 }
362                 proceedConnected(request, entityProducer, scope, chain, asyncExecCallback);
363                 break;
364 
365             default:
366                 throw new IllegalStateException("Unknown step indicator "  + step + " from RouteDirector.");
367         }
368     }
369 
370     private void createTunnel(
371             final State state,
372             final HttpHost proxy,
373             final HttpHost nextHop,
374             final AsyncExecChain.Scope scope,
375             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
376 
377         final CancellableDependency operation = scope.cancellableDependency;
378         final HttpClientContext clientContext = scope.clientContext;
379         final AsyncExecRuntime execRuntime = scope.execRuntime;
380         final String exchangeId = scope.exchangeId;
381 
382         final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
383 
384         if (authCacheKeeper != null) {
385             authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, clientContext);
386         }
387 
388         final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
389 
390             private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
391 
392             @Override
393             public void releaseResources() {
394                 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
395                 if (entityConsumer != null) {
396                     entityConsumer.releaseResources();
397                 }
398             }
399 
400             @Override
401             public void failed(final Exception cause) {
402                 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
403                 if (entityConsumer != null) {
404                     entityConsumer.releaseResources();
405                 }
406                 asyncExecCallback.failed(cause);
407             }
408 
409             @Override
410             public void cancel() {
411                 failed(new InterruptedIOException());
412             }
413 
414             @Override
415             public void produceRequest(final RequestChannel requestChannel,
416                                        final HttpContext httpContext) throws HttpException, IOException {
417                 final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
418                 connect.setVersion(HttpVersion.HTTP_1_1);
419 
420                 proxyHttpProcessor.process(connect, null, clientContext);
421                 authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
422 
423                 requestChannel.sendRequest(connect, null, clientContext);
424             }
425 
426             @Override
427             public void produce(final DataStreamChannel dataStreamChannel) throws IOException {
428             }
429 
430             @Override
431             public int available() {
432                 return 0;
433             }
434 
435             @Override
436             public void consumeInformation(final HttpResponse httpResponse,
437                                            final HttpContext httpContext) throws HttpException, IOException {
438             }
439 
440             @Override
441             public void consumeResponse(final HttpResponse response,
442                                         final EntityDetails entityDetails,
443                                         final HttpContext httpContext) throws HttpException, IOException {
444                 clientContext.setResponse(response);
445                 proxyHttpProcessor.process(response, entityDetails, clientContext);
446 
447                 final int status = response.getCode();
448                 if (status < HttpStatus.SC_SUCCESS) {
449                     throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
450                 }
451 
452                 if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
453                     state.challenged = true;
454                 } else {
455                     state.challenged = false;
456                     if (status >= HttpStatus.SC_REDIRECTION) {
457                         state.tunnelRefused = true;
458                         entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
459                     } else if (status == HttpStatus.SC_OK) {
460                         asyncExecCallback.completed();
461                     } else {
462                         throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
463                     }
464                 }
465             }
466 
467             @Override
468             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
469                 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
470                 if (entityConsumer != null) {
471                     entityConsumer.updateCapacity(capacityChannel);
472                 } else {
473                     capacityChannel.update(Integer.MAX_VALUE);
474                 }
475             }
476 
477             @Override
478             public void consume(final ByteBuffer src) throws IOException {
479                 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
480                 if (entityConsumer != null) {
481                     entityConsumer.consume(src);
482                 }
483             }
484 
485             @Override
486             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
487                 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
488                 if (entityConsumer != null) {
489                     entityConsumer.streamEnd(trailers);
490                 }
491                 asyncExecCallback.completed();
492             }
493 
494         };
495 
496         if (LOG.isDebugEnabled()) {
497             operation.setDependency(execRuntime.execute(
498                     exchangeId,
499                     new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
500                     clientContext));
501         } else {
502             operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
503         }
504 
505     }
506 
507     private boolean needAuthentication(
508             final AuthExchange proxyAuthExchange,
509             final HttpHost proxy,
510             final HttpResponse response,
511             final HttpClientContext context) {
512         final RequestConfig config = context.getRequestConfigOrDefault();
513         if (config.isAuthenticationEnabled()) {
514             final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context);
515 
516             if (authCacheKeeper != null) {
517                 if (proxyAuthRequested) {
518                     authCacheKeeper.updateOnChallenge(proxy, null, proxyAuthExchange, context);
519                 } else {
520                     authCacheKeeper.updateOnNoChallenge(proxy, null, proxyAuthExchange, context);
521                 }
522             }
523 
524             if (proxyAuthRequested) {
525                 final boolean updated = authenticator.updateAuthState(proxy, ChallengeType.PROXY, response,
526                         proxyAuthStrategy, proxyAuthExchange, context);
527 
528                 if (authCacheKeeper != null) {
529                     authCacheKeeper.updateOnResponse(proxy, null, proxyAuthExchange, context);
530                 }
531 
532                 return updated;
533             }
534         }
535         return false;
536     }
537 
538     private void proceedConnected(
539             final HttpRequest request,
540             final AsyncEntityProducer entityProducer,
541             final AsyncExecChain.Scope scope,
542             final AsyncExecChain chain,
543             final AsyncExecCallback asyncExecCallback) {
544         final AsyncExecRuntime execRuntime = scope.execRuntime;
545         final HttpClientContext clientContext = scope.clientContext;
546         final EndpointInfo endpointInfo = execRuntime.getEndpointInfo();
547         if (endpointInfo != null) {
548             clientContext.setProtocolVersion(endpointInfo.getProtocol());
549             clientContext.setSSLSession(endpointInfo.getSslSession());
550         }
551         try {
552             chain.proceed(request, entityProducer, scope, asyncExecCallback);
553         } catch (final HttpException | IOException ex) {
554             asyncExecCallback.failed(ex);
555         }
556     }
557 
558 }