1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32
33 import org.apache.hc.client5.http.AuthenticationStrategy;
34 import org.apache.hc.client5.http.HttpRoute;
35 import org.apache.hc.client5.http.RouteTracker;
36 import org.apache.hc.client5.http.SchemePortResolver;
37 import org.apache.hc.client5.http.async.AsyncExecCallback;
38 import org.apache.hc.client5.http.async.AsyncExecChain;
39 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
40 import org.apache.hc.client5.http.async.AsyncExecRuntime;
41 import org.apache.hc.client5.http.auth.AuthExchange;
42 import org.apache.hc.client5.http.auth.ChallengeType;
43 import org.apache.hc.client5.http.config.RequestConfig;
44 import org.apache.hc.client5.http.impl.TunnelRefusedException;
45 import org.apache.hc.client5.http.impl.auth.AuthCacheKeeper;
46 import org.apache.hc.client5.http.impl.auth.HttpAuthenticator;
47 import org.apache.hc.client5.http.impl.routing.BasicRouteDirector;
48 import org.apache.hc.client5.http.protocol.HttpClientContext;
49 import org.apache.hc.client5.http.routing.HttpRouteDirector;
50 import org.apache.hc.core5.annotation.Contract;
51 import org.apache.hc.core5.annotation.Internal;
52 import org.apache.hc.core5.annotation.ThreadingBehavior;
53 import org.apache.hc.core5.concurrent.CancellableDependency;
54 import org.apache.hc.core5.concurrent.FutureCallback;
55 import org.apache.hc.core5.http.EntityDetails;
56 import org.apache.hc.core5.http.HttpException;
57 import org.apache.hc.core5.http.HttpHost;
58 import org.apache.hc.core5.http.HttpRequest;
59 import org.apache.hc.core5.http.HttpResponse;
60 import org.apache.hc.core5.http.HttpStatus;
61 import org.apache.hc.core5.http.HttpVersion;
62 import org.apache.hc.core5.http.Method;
63 import org.apache.hc.core5.http.message.BasicHttpRequest;
64 import org.apache.hc.core5.http.message.StatusLine;
65 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
66 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
67 import org.apache.hc.core5.http.protocol.HttpCoreContext;
68 import org.apache.hc.core5.http.protocol.HttpProcessor;
69 import org.apache.hc.core5.util.Args;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73
74
75
76
77
78
79
80 @Contract(threading = ThreadingBehavior.STATELESS)
81 @Internal
82 public final class AsyncConnectExec implements AsyncExecChainHandler {
83
84 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectExec.class);
85
86 private final HttpProcessor proxyHttpProcessor;
87 private final AuthenticationStrategy proxyAuthStrategy;
88 private final HttpAuthenticator authenticator;
89 private final AuthCacheKeeper authCacheKeeper;
90 private final HttpRouteDirector routeDirector;
91
92 public AsyncConnectExec(
93 final HttpProcessor proxyHttpProcessor,
94 final AuthenticationStrategy proxyAuthStrategy,
95 final SchemePortResolver schemePortResolver,
96 final boolean authCachingDisabled) {
97 Args.notNull(proxyHttpProcessor, "Proxy HTTP processor");
98 Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
99 this.proxyHttpProcessor = proxyHttpProcessor;
100 this.proxyAuthStrategy = proxyAuthStrategy;
101 this.authenticator = new HttpAuthenticator();
102 this.authCacheKeeper = authCachingDisabled ? null : new AuthCacheKeeper(schemePortResolver);
103 this.routeDirector = BasicRouteDirector.INSTANCE;
104 }
105
106 static class State {
107
108 State(final HttpRoute route) {
109 tracker = new RouteTracker(route);
110 }
111
112 final RouteTracker tracker;
113
114 volatile boolean challenged;
115 volatile boolean tunnelRefused;
116
117 }
118
119 @Override
120 public void execute(
121 final HttpRequest request,
122 final AsyncEntityProducer entityProducer,
123 final AsyncExecChain.Scope scope,
124 final AsyncExecChain chain,
125 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
126 Args.notNull(request, "HTTP request");
127 Args.notNull(scope, "Scope");
128
129 final String exchangeId = scope.exchangeId;
130 final HttpRoute route = scope.route;
131 final CancellableDependency cancellableDependency = scope.cancellableDependency;
132 final HttpClientContext clientContext = scope.clientContext;
133 final AsyncExecRuntime execRuntime = scope.execRuntime;
134 final State state = new State(route);
135
136 if (!execRuntime.isEndpointAcquired()) {
137 final Object userToken = clientContext.getUserToken();
138 if (LOG.isDebugEnabled()) {
139 LOG.debug("{} acquiring connection with route {}", exchangeId, route);
140 }
141 cancellableDependency.setDependency(execRuntime.acquireEndpoint(
142 exchangeId, route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
143
144 @Override
145 public void completed(final AsyncExecRuntime execRuntime) {
146 if (execRuntime.isEndpointConnected()) {
147 try {
148 chain.proceed(request, entityProducer, scope, asyncExecCallback);
149 } catch (final HttpException | IOException ex) {
150 asyncExecCallback.failed(ex);
151 }
152 } else {
153 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
154 }
155 }
156
157 @Override
158 public void failed(final Exception ex) {
159 asyncExecCallback.failed(ex);
160 }
161
162 @Override
163 public void cancelled() {
164 asyncExecCallback.failed(new InterruptedIOException());
165 }
166
167 }));
168 } else {
169 if (execRuntime.isEndpointConnected()) {
170 try {
171 chain.proceed(request, entityProducer, scope, asyncExecCallback);
172 } catch (final HttpException | IOException ex) {
173 asyncExecCallback.failed(ex);
174 }
175 } else {
176 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
177 }
178 }
179
180 }
181
182 private void proceedToNextHop(
183 final State state,
184 final HttpRequest request,
185 final AsyncEntityProducer entityProducer,
186 final AsyncExecChain.Scope scope,
187 final AsyncExecChain chain,
188 final AsyncExecCallback asyncExecCallback) {
189 final RouteTracker tracker = state.tracker;
190 final String exchangeId = scope.exchangeId;
191 final HttpRoute route = scope.route;
192 final AsyncExecRuntime execRuntime = scope.execRuntime;
193 final CancellableDependency operation = scope.cancellableDependency;
194 final HttpClientContext clientContext = scope.clientContext;
195
196 final HttpRoute fact = tracker.toRoute();
197 final int step = routeDirector.nextStep(route, fact);
198
199 switch (step) {
200 case HttpRouteDirector.CONNECT_TARGET:
201 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
202
203 @Override
204 public void completed(final AsyncExecRuntime execRuntime) {
205 tracker.connectTarget(route.isSecure());
206 if (LOG.isDebugEnabled()) {
207 LOG.debug("{} connected to target", exchangeId);
208 }
209 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
210 }
211
212 @Override
213 public void failed(final Exception ex) {
214 asyncExecCallback.failed(ex);
215 }
216
217 @Override
218 public void cancelled() {
219 asyncExecCallback.failed(new InterruptedIOException());
220 }
221
222 }));
223 break;
224
225 case HttpRouteDirector.CONNECT_PROXY:
226 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
227
228 @Override
229 public void completed(final AsyncExecRuntime execRuntime) {
230 final HttpHost proxy = route.getProxyHost();
231 tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
232 if (LOG.isDebugEnabled()) {
233 LOG.debug("{} connected to proxy", exchangeId);
234 }
235 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
236 }
237
238 @Override
239 public void failed(final Exception ex) {
240 asyncExecCallback.failed(ex);
241 }
242
243 @Override
244 public void cancelled() {
245 asyncExecCallback.failed(new InterruptedIOException());
246 }
247
248 }));
249 break;
250
251 case HttpRouteDirector.TUNNEL_TARGET:
252 try {
253 final HttpHost proxy = route.getProxyHost();
254 final HttpHost target = route.getTargetHost();
255 if (LOG.isDebugEnabled()) {
256 LOG.debug("{} create tunnel", exchangeId);
257 }
258 createTunnel(state, proxy, target, scope, chain, new AsyncExecCallback() {
259
260 @Override
261 public AsyncDataConsumer handleResponse(
262 final HttpResponse response,
263 final EntityDetails entityDetails) throws HttpException, IOException {
264 return asyncExecCallback.handleResponse(response, entityDetails);
265 }
266
267 @Override
268 public void handleInformationResponse(
269 final HttpResponse response) throws HttpException, IOException {
270 asyncExecCallback.handleInformationResponse(response);
271 }
272
273 @Override
274 public void completed() {
275 if (!execRuntime.isEndpointConnected()) {
276
277 if (LOG.isDebugEnabled()) {
278 LOG.debug("{} proxy disconnected", exchangeId);
279 }
280 state.tracker.reset();
281 }
282 if (state.challenged) {
283 if (LOG.isDebugEnabled()) {
284 LOG.debug("{} proxy authentication required", exchangeId);
285 }
286 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
287 } else {
288 if (state.tunnelRefused) {
289 if (LOG.isDebugEnabled()) {
290 LOG.debug("{} tunnel refused", exchangeId);
291 }
292 asyncExecCallback.failed(new TunnelRefusedException("Tunnel refused", null));
293 } else {
294 if (LOG.isDebugEnabled()) {
295 LOG.debug("{} tunnel to target created", exchangeId);
296 }
297 tracker.tunnelTarget(false);
298 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
299 }
300 }
301 }
302
303 @Override
304 public void failed(final Exception cause) {
305 asyncExecCallback.failed(cause);
306 }
307
308 });
309 } catch (final HttpException | IOException ex) {
310 asyncExecCallback.failed(ex);
311 }
312 break;
313
314 case HttpRouteDirector.TUNNEL_PROXY:
315
316
317
318
319 asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
320 break;
321
322 case HttpRouteDirector.LAYER_PROTOCOL:
323 execRuntime.upgradeTls(clientContext, new FutureCallback<AsyncExecRuntime>() {
324
325 @Override
326 public void completed(final AsyncExecRuntime asyncExecRuntime) {
327 if (LOG.isDebugEnabled()) {
328 LOG.debug("{} upgraded to TLS", exchangeId);
329 }
330 tracker.layerProtocol(route.isSecure());
331 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
332 }
333
334 @Override
335 public void failed(final Exception ex) {
336 asyncExecCallback.failed(ex);
337 }
338
339 @Override
340 public void cancelled() {
341 asyncExecCallback.failed(new InterruptedIOException());
342 }
343
344 });
345 break;
346
347 case HttpRouteDirector.UNREACHABLE:
348 asyncExecCallback.failed(new HttpException("Unable to establish route: " +
349 "planned = " + route + "; current = " + fact));
350 break;
351
352 case HttpRouteDirector.COMPLETE:
353 if (LOG.isDebugEnabled()) {
354 LOG.debug("{} route fully established", exchangeId);
355 }
356 try {
357 chain.proceed(request, entityProducer, scope, asyncExecCallback);
358 } catch (final HttpException | IOException ex) {
359 asyncExecCallback.failed(ex);
360 }
361 break;
362
363 default:
364 throw new IllegalStateException("Unknown step indicator " + step + " from RouteDirector.");
365 }
366 }
367
368 private void createTunnel(
369 final State state,
370 final HttpHost proxy,
371 final HttpHost nextHop,
372 final AsyncExecChain.Scope scope,
373 final AsyncExecChain chain,
374 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
375
376 final HttpClientContext clientContext = scope.clientContext;
377
378 final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
379
380 if (authCacheKeeper != null) {
381 authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, clientContext);
382 }
383
384 final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
385 connect.setVersion(HttpVersion.HTTP_1_1);
386
387 proxyHttpProcessor.process(connect, null, clientContext);
388 authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
389
390 chain.proceed(connect, null, scope, new AsyncExecCallback() {
391
392 @Override
393 public AsyncDataConsumer handleResponse(
394 final HttpResponse response,
395 final EntityDetails entityDetails) throws HttpException, IOException {
396
397 clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
398 proxyHttpProcessor.process(response, entityDetails, clientContext);
399
400 final int status = response.getCode();
401 if (status < HttpStatus.SC_SUCCESS) {
402 throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
403 }
404
405 if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
406 state.challenged = true;
407 return null;
408 }
409 state.challenged = false;
410 if (status >= HttpStatus.SC_REDIRECTION) {
411 state.tunnelRefused = true;
412 return asyncExecCallback.handleResponse(response, entityDetails);
413 }
414 return null;
415 }
416
417 @Override
418 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
419 }
420
421 @Override
422 public void completed() {
423 asyncExecCallback.completed();
424 }
425
426 @Override
427 public void failed(final Exception cause) {
428 asyncExecCallback.failed(cause);
429 }
430
431 });
432
433 }
434
435 private boolean needAuthentication(
436 final AuthExchange proxyAuthExchange,
437 final HttpHost proxy,
438 final HttpResponse response,
439 final HttpClientContext context) {
440 final RequestConfig config = context.getRequestConfig();
441 if (config.isAuthenticationEnabled()) {
442 final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context);
443
444 if (authCacheKeeper != null) {
445 if (proxyAuthRequested) {
446 authCacheKeeper.updateOnChallenge(proxy, null, proxyAuthExchange, context);
447 } else {
448 authCacheKeeper.updateOnNoChallenge(proxy, null, proxyAuthExchange, context);
449 }
450 }
451
452 if (proxyAuthRequested) {
453 final boolean updated = authenticator.updateAuthState(proxy, ChallengeType.PROXY, response,
454 proxyAuthStrategy, proxyAuthExchange, context);
455
456 if (authCacheKeeper != null) {
457 authCacheKeeper.updateOnResponse(proxy, null, proxyAuthExchange, context);
458 }
459
460 return updated;
461 }
462 }
463 return false;
464 }
465
466 }