View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  
33  import org.apache.hc.client5.http.AuthenticationStrategy;
34  import org.apache.hc.client5.http.HttpRoute;
35  import org.apache.hc.client5.http.RouteTracker;
36  import org.apache.hc.client5.http.SchemePortResolver;
37  import org.apache.hc.client5.http.async.AsyncExecCallback;
38  import org.apache.hc.client5.http.async.AsyncExecChain;
39  import org.apache.hc.client5.http.async.AsyncExecChainHandler;
40  import org.apache.hc.client5.http.async.AsyncExecRuntime;
41  import org.apache.hc.client5.http.auth.AuthExchange;
42  import org.apache.hc.client5.http.auth.ChallengeType;
43  import org.apache.hc.client5.http.config.RequestConfig;
44  import org.apache.hc.client5.http.impl.TunnelRefusedException;
45  import org.apache.hc.client5.http.impl.auth.AuthCacheKeeper;
46  import org.apache.hc.client5.http.impl.auth.HttpAuthenticator;
47  import org.apache.hc.client5.http.impl.routing.BasicRouteDirector;
48  import org.apache.hc.client5.http.protocol.HttpClientContext;
49  import org.apache.hc.client5.http.routing.HttpRouteDirector;
50  import org.apache.hc.core5.annotation.Contract;
51  import org.apache.hc.core5.annotation.Internal;
52  import org.apache.hc.core5.annotation.ThreadingBehavior;
53  import org.apache.hc.core5.concurrent.CancellableDependency;
54  import org.apache.hc.core5.concurrent.FutureCallback;
55  import org.apache.hc.core5.http.EntityDetails;
56  import org.apache.hc.core5.http.HttpException;
57  import org.apache.hc.core5.http.HttpHost;
58  import org.apache.hc.core5.http.HttpRequest;
59  import org.apache.hc.core5.http.HttpResponse;
60  import org.apache.hc.core5.http.HttpStatus;
61  import org.apache.hc.core5.http.HttpVersion;
62  import org.apache.hc.core5.http.Method;
63  import org.apache.hc.core5.http.message.BasicHttpRequest;
64  import org.apache.hc.core5.http.message.StatusLine;
65  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
66  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
67  import org.apache.hc.core5.http.protocol.HttpCoreContext;
68  import org.apache.hc.core5.http.protocol.HttpProcessor;
69  import org.apache.hc.core5.util.Args;
70  import org.slf4j.Logger;
71  import org.slf4j.LoggerFactory;
72  
73  /**
74   * Request execution handler in the asynchronous request execution chain
75   * that is responsible for establishing connection to the target
76   * origin server as specified by the current connection route.
77   *
78   * @since 5.0
79   */
80  @Contract(threading = ThreadingBehavior.STATELESS)
81  @Internal
82  public final class AsyncConnectExec implements AsyncExecChainHandler {
83  
84      private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectExec.class);
85  
86      private final HttpProcessor proxyHttpProcessor;
87      private final AuthenticationStrategy proxyAuthStrategy;
88      private final HttpAuthenticator authenticator;
89      private final AuthCacheKeeper authCacheKeeper;
90      private final HttpRouteDirector routeDirector;
91  
92      public AsyncConnectExec(
93              final HttpProcessor proxyHttpProcessor,
94              final AuthenticationStrategy proxyAuthStrategy,
95              final SchemePortResolver schemePortResolver,
96              final boolean authCachingDisabled) {
97          Args.notNull(proxyHttpProcessor, "Proxy HTTP processor");
98          Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
99          this.proxyHttpProcessor = proxyHttpProcessor;
100         this.proxyAuthStrategy  = proxyAuthStrategy;
101         this.authenticator = new HttpAuthenticator();
102         this.authCacheKeeper = authCachingDisabled ? null : new AuthCacheKeeper(schemePortResolver);
103         this.routeDirector = BasicRouteDirector.INSTANCE;
104     }
105 
106     static class State {
107 
108         State(final HttpRoute route) {
109             tracker = new RouteTracker(route);
110         }
111 
112         final RouteTracker tracker;
113 
114         volatile boolean challenged;
115         volatile boolean tunnelRefused;
116 
117     }
118 
119     @Override
120     public void execute(
121             final HttpRequest request,
122             final AsyncEntityProducer entityProducer,
123             final AsyncExecChain.Scope scope,
124             final AsyncExecChain chain,
125             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
126         Args.notNull(request, "HTTP request");
127         Args.notNull(scope, "Scope");
128 
129         final String exchangeId = scope.exchangeId;
130         final HttpRoute route = scope.route;
131         final CancellableDependency cancellableDependency = scope.cancellableDependency;
132         final HttpClientContext clientContext = scope.clientContext;
133         final AsyncExecRuntime execRuntime = scope.execRuntime;
134         final State state = new State(route);
135 
136         if (!execRuntime.isEndpointAcquired()) {
137             final Object userToken = clientContext.getUserToken();
138             if (LOG.isDebugEnabled()) {
139                 LOG.debug("{} acquiring connection with route {}", exchangeId, route);
140             }
141             cancellableDependency.setDependency(execRuntime.acquireEndpoint(
142                     exchangeId, route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
143 
144                         @Override
145                         public void completed(final AsyncExecRuntime execRuntime) {
146                             if (execRuntime.isEndpointConnected()) {
147                                 try {
148                                     chain.proceed(request, entityProducer, scope, asyncExecCallback);
149                                 } catch (final HttpException | IOException ex) {
150                                     asyncExecCallback.failed(ex);
151                                 }
152                             } else {
153                                 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
154                             }
155                         }
156 
157                         @Override
158                         public void failed(final Exception ex) {
159                             asyncExecCallback.failed(ex);
160                         }
161 
162                         @Override
163                         public void cancelled() {
164                             asyncExecCallback.failed(new InterruptedIOException());
165                         }
166 
167                     }));
168         } else {
169             if (execRuntime.isEndpointConnected()) {
170                 try {
171                     chain.proceed(request, entityProducer, scope, asyncExecCallback);
172                 } catch (final HttpException | IOException ex) {
173                     asyncExecCallback.failed(ex);
174                 }
175             } else {
176                 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
177             }
178         }
179 
180     }
181 
182     private void proceedToNextHop(
183             final State state,
184             final HttpRequest request,
185             final AsyncEntityProducer entityProducer,
186             final AsyncExecChain.Scope scope,
187             final AsyncExecChain chain,
188             final AsyncExecCallback asyncExecCallback) {
189         final RouteTracker tracker = state.tracker;
190         final String exchangeId = scope.exchangeId;
191         final HttpRoute route = scope.route;
192         final AsyncExecRuntime execRuntime = scope.execRuntime;
193         final CancellableDependency operation = scope.cancellableDependency;
194         final HttpClientContext clientContext = scope.clientContext;
195 
196         final HttpRoute fact = tracker.toRoute();
197         final int step = routeDirector.nextStep(route, fact);
198 
199         switch (step) {
200             case HttpRouteDirector.CONNECT_TARGET:
201                 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
202 
203                     @Override
204                     public void completed(final AsyncExecRuntime execRuntime) {
205                         tracker.connectTarget(route.isSecure());
206                         if (LOG.isDebugEnabled()) {
207                             LOG.debug("{} connected to target", exchangeId);
208                         }
209                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
210                     }
211 
212                     @Override
213                     public void failed(final Exception ex) {
214                         asyncExecCallback.failed(ex);
215                     }
216 
217                     @Override
218                     public void cancelled() {
219                         asyncExecCallback.failed(new InterruptedIOException());
220                     }
221 
222                 }));
223                 break;
224 
225             case HttpRouteDirector.CONNECT_PROXY:
226                 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
227 
228                     @Override
229                     public void completed(final AsyncExecRuntime execRuntime) {
230                         final HttpHost proxy  = route.getProxyHost();
231                         tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
232                         if (LOG.isDebugEnabled()) {
233                             LOG.debug("{} connected to proxy", exchangeId);
234                         }
235                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
236                     }
237 
238                     @Override
239                     public void failed(final Exception ex) {
240                         asyncExecCallback.failed(ex);
241                     }
242 
243                     @Override
244                     public void cancelled() {
245                         asyncExecCallback.failed(new InterruptedIOException());
246                     }
247 
248                 }));
249                 break;
250 
251                 case HttpRouteDirector.TUNNEL_TARGET:
252                     try {
253                         final HttpHost proxy = route.getProxyHost();
254                         final HttpHost target = route.getTargetHost();
255                         if (LOG.isDebugEnabled()) {
256                             LOG.debug("{} create tunnel", exchangeId);
257                         }
258                         createTunnel(state, proxy, target, scope, chain, new AsyncExecCallback() {
259 
260                         @Override
261                         public AsyncDataConsumer handleResponse(
262                                 final HttpResponse response,
263                                 final EntityDetails entityDetails) throws HttpException, IOException {
264                             return asyncExecCallback.handleResponse(response, entityDetails);
265                         }
266 
267                         @Override
268                         public void handleInformationResponse(
269                                 final HttpResponse response) throws HttpException, IOException {
270                             asyncExecCallback.handleInformationResponse(response);
271                         }
272 
273                             @Override
274                             public void completed() {
275                                 if (!execRuntime.isEndpointConnected()) {
276                                     // Remote endpoint disconnected. Need to start over
277                                     if (LOG.isDebugEnabled()) {
278                                         LOG.debug("{} proxy disconnected", exchangeId);
279                                     }
280                                     state.tracker.reset();
281                                 }
282                                 if (state.challenged) {
283                                     if (LOG.isDebugEnabled()) {
284                                         LOG.debug("{} proxy authentication required", exchangeId);
285                                     }
286                                     proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
287                                 } else {
288                                     if (state.tunnelRefused) {
289                                         if (LOG.isDebugEnabled()) {
290                                             LOG.debug("{} tunnel refused", exchangeId);
291                                         }
292                                         asyncExecCallback.failed(new TunnelRefusedException("Tunnel refused", null));
293                                     } else {
294                                         if (LOG.isDebugEnabled()) {
295                                             LOG.debug("{} tunnel to target created", exchangeId);
296                                         }
297                                         tracker.tunnelTarget(false);
298                                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
299                                     }
300                                 }
301                             }
302 
303                         @Override
304                         public void failed(final Exception cause) {
305                             asyncExecCallback.failed(cause);
306                         }
307 
308                     });
309                 } catch (final HttpException | IOException ex) {
310                     asyncExecCallback.failed(ex);
311                 }
312                 break;
313 
314             case HttpRouteDirector.TUNNEL_PROXY:
315                 // The most simple example for this case is a proxy chain
316                 // of two proxies, where P1 must be tunnelled to P2.
317                 // route: Source -> P1 -> P2 -> Target (3 hops)
318                 // fact:  Source -> P1 -> Target       (2 hops)
319                 asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
320                 break;
321 
322             case HttpRouteDirector.LAYER_PROTOCOL:
323                 execRuntime.upgradeTls(clientContext, new FutureCallback<AsyncExecRuntime>() {
324 
325                     @Override
326                     public void completed(final AsyncExecRuntime asyncExecRuntime) {
327                         if (LOG.isDebugEnabled()) {
328                             LOG.debug("{} upgraded to TLS", exchangeId);
329                         }
330                         tracker.layerProtocol(route.isSecure());
331                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
332                     }
333 
334                     @Override
335                     public void failed(final Exception ex) {
336                         asyncExecCallback.failed(ex);
337                     }
338 
339                     @Override
340                     public void cancelled() {
341                         asyncExecCallback.failed(new InterruptedIOException());
342                     }
343 
344                 });
345                 break;
346 
347             case HttpRouteDirector.UNREACHABLE:
348                 asyncExecCallback.failed(new HttpException("Unable to establish route: " +
349                         "planned = " + route + "; current = " + fact));
350                 break;
351 
352             case HttpRouteDirector.COMPLETE:
353                 if (LOG.isDebugEnabled()) {
354                     LOG.debug("{} route fully established", exchangeId);
355                 }
356                 try {
357                     chain.proceed(request, entityProducer, scope, asyncExecCallback);
358                 } catch (final HttpException | IOException ex) {
359                     asyncExecCallback.failed(ex);
360                 }
361                 break;
362 
363             default:
364                 throw new IllegalStateException("Unknown step indicator "  + step + " from RouteDirector.");
365         }
366     }
367 
368     private void createTunnel(
369             final State state,
370             final HttpHost proxy,
371             final HttpHost nextHop,
372             final AsyncExecChain.Scope scope,
373             final AsyncExecChain chain,
374             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
375 
376         final HttpClientContext clientContext = scope.clientContext;
377 
378         final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
379 
380         if (authCacheKeeper != null) {
381             authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, clientContext);
382         }
383 
384         final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
385         connect.setVersion(HttpVersion.HTTP_1_1);
386 
387         proxyHttpProcessor.process(connect, null, clientContext);
388         authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
389 
390         chain.proceed(connect, null, scope, new AsyncExecCallback() {
391 
392             @Override
393             public AsyncDataConsumer handleResponse(
394                     final HttpResponse response,
395                     final EntityDetails entityDetails) throws HttpException, IOException {
396 
397                 clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
398                 proxyHttpProcessor.process(response, entityDetails, clientContext);
399 
400                 final int status = response.getCode();
401                 if (status < HttpStatus.SC_SUCCESS) {
402                     throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
403                 }
404 
405                 if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
406                     state.challenged = true;
407                     return null;
408                 }
409                 state.challenged = false;
410                 if (status >= HttpStatus.SC_REDIRECTION) {
411                     state.tunnelRefused = true;
412                     return asyncExecCallback.handleResponse(response, entityDetails);
413                 }
414                 return null;
415             }
416 
417             @Override
418             public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
419             }
420 
421             @Override
422             public void completed() {
423                 asyncExecCallback.completed();
424             }
425 
426             @Override
427             public void failed(final Exception cause) {
428                 asyncExecCallback.failed(cause);
429             }
430 
431         });
432 
433     }
434 
435     private boolean needAuthentication(
436             final AuthExchange proxyAuthExchange,
437             final HttpHost proxy,
438             final HttpResponse response,
439             final HttpClientContext context) {
440         final RequestConfig config = context.getRequestConfig();
441         if (config.isAuthenticationEnabled()) {
442             final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context);
443 
444             if (authCacheKeeper != null) {
445                 if (proxyAuthRequested) {
446                     authCacheKeeper.updateOnChallenge(proxy, null, proxyAuthExchange, context);
447                 } else {
448                     authCacheKeeper.updateOnNoChallenge(proxy, null, proxyAuthExchange, context);
449                 }
450             }
451 
452             if (proxyAuthRequested) {
453                 final boolean updated = authenticator.updateAuthState(proxy, ChallengeType.PROXY, response,
454                         proxyAuthStrategy, proxyAuthExchange, context);
455 
456                 if (authCacheKeeper != null) {
457                     authCacheKeeper.updateOnResponse(proxy, null, proxyAuthExchange, context);
458                 }
459 
460                 return updated;
461             }
462         }
463         return false;
464     }
465 
466 }