1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32 import java.nio.ByteBuffer;
33 import java.util.List;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.hc.client5.http.AuthenticationStrategy;
37 import org.apache.hc.client5.http.HttpRoute;
38 import org.apache.hc.client5.http.RouteTracker;
39 import org.apache.hc.client5.http.SchemePortResolver;
40 import org.apache.hc.client5.http.async.AsyncExecCallback;
41 import org.apache.hc.client5.http.async.AsyncExecChain;
42 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
43 import org.apache.hc.client5.http.async.AsyncExecRuntime;
44 import org.apache.hc.client5.http.auth.AuthExchange;
45 import org.apache.hc.client5.http.auth.ChallengeType;
46 import org.apache.hc.client5.http.config.RequestConfig;
47 import org.apache.hc.client5.http.impl.TunnelRefusedException;
48 import org.apache.hc.client5.http.impl.auth.AuthCacheKeeper;
49 import org.apache.hc.client5.http.impl.auth.HttpAuthenticator;
50 import org.apache.hc.client5.http.impl.routing.BasicRouteDirector;
51 import org.apache.hc.client5.http.protocol.HttpClientContext;
52 import org.apache.hc.client5.http.routing.HttpRouteDirector;
53 import org.apache.hc.core5.annotation.Contract;
54 import org.apache.hc.core5.annotation.Internal;
55 import org.apache.hc.core5.annotation.ThreadingBehavior;
56 import org.apache.hc.core5.concurrent.CancellableDependency;
57 import org.apache.hc.core5.concurrent.FutureCallback;
58 import org.apache.hc.core5.http.EntityDetails;
59 import org.apache.hc.core5.http.Header;
60 import org.apache.hc.core5.http.HttpException;
61 import org.apache.hc.core5.http.HttpHost;
62 import org.apache.hc.core5.http.HttpRequest;
63 import org.apache.hc.core5.http.HttpResponse;
64 import org.apache.hc.core5.http.HttpStatus;
65 import org.apache.hc.core5.http.HttpVersion;
66 import org.apache.hc.core5.http.Method;
67 import org.apache.hc.core5.http.message.BasicHttpRequest;
68 import org.apache.hc.core5.http.message.StatusLine;
69 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
70 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
71 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
72 import org.apache.hc.core5.http.nio.CapacityChannel;
73 import org.apache.hc.core5.http.nio.DataStreamChannel;
74 import org.apache.hc.core5.http.nio.RequestChannel;
75 import org.apache.hc.core5.http.protocol.HttpContext;
76 import org.apache.hc.core5.http.protocol.HttpCoreContext;
77 import org.apache.hc.core5.http.protocol.HttpProcessor;
78 import org.apache.hc.core5.util.Args;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82
83
84
85
86
87
88
89 @Contract(threading = ThreadingBehavior.STATELESS)
90 @Internal
91 public final class AsyncConnectExec implements AsyncExecChainHandler {
92
93 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectExec.class);
94
95 private final HttpProcessor proxyHttpProcessor;
96 private final AuthenticationStrategy proxyAuthStrategy;
97 private final HttpAuthenticator authenticator;
98 private final AuthCacheKeeper authCacheKeeper;
99 private final HttpRouteDirector routeDirector;
100
101 public AsyncConnectExec(
102 final HttpProcessor proxyHttpProcessor,
103 final AuthenticationStrategy proxyAuthStrategy,
104 final SchemePortResolver schemePortResolver,
105 final boolean authCachingDisabled) {
106 Args.notNull(proxyHttpProcessor, "Proxy HTTP processor");
107 Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
108 this.proxyHttpProcessor = proxyHttpProcessor;
109 this.proxyAuthStrategy = proxyAuthStrategy;
110 this.authenticator = new HttpAuthenticator();
111 this.authCacheKeeper = authCachingDisabled ? null : new AuthCacheKeeper(schemePortResolver);
112 this.routeDirector = BasicRouteDirector.INSTANCE;
113 }
114
115 static class State {
116
117 State(final HttpRoute route) {
118 tracker = new RouteTracker(route);
119 }
120
121 final RouteTracker tracker;
122
123 volatile boolean challenged;
124 volatile boolean tunnelRefused;
125
126 }
127
128 @Override
129 public void execute(
130 final HttpRequest request,
131 final AsyncEntityProducer entityProducer,
132 final AsyncExecChain.Scope scope,
133 final AsyncExecChain chain,
134 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
135 Args.notNull(request, "HTTP request");
136 Args.notNull(scope, "Scope");
137
138 final String exchangeId = scope.exchangeId;
139 final HttpRoute route = scope.route;
140 final CancellableDependency cancellableDependency = scope.cancellableDependency;
141 final HttpClientContext clientContext = scope.clientContext;
142 final AsyncExecRuntime execRuntime = scope.execRuntime;
143 final State state = new State(route);
144
145 if (!execRuntime.isEndpointAcquired()) {
146 final Object userToken = clientContext.getUserToken();
147 if (LOG.isDebugEnabled()) {
148 LOG.debug("{} acquiring connection with route {}", exchangeId, route);
149 }
150 cancellableDependency.setDependency(execRuntime.acquireEndpoint(
151 exchangeId, route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
152
153 @Override
154 public void completed(final AsyncExecRuntime execRuntime) {
155 if (execRuntime.isEndpointConnected()) {
156 try {
157 chain.proceed(request, entityProducer, scope, asyncExecCallback);
158 } catch (final HttpException | IOException ex) {
159 asyncExecCallback.failed(ex);
160 }
161 } else {
162 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
163 }
164 }
165
166 @Override
167 public void failed(final Exception ex) {
168 asyncExecCallback.failed(ex);
169 }
170
171 @Override
172 public void cancelled() {
173 asyncExecCallback.failed(new InterruptedIOException());
174 }
175
176 }));
177 } else {
178 if (execRuntime.isEndpointConnected()) {
179 try {
180 chain.proceed(request, entityProducer, scope, asyncExecCallback);
181 } catch (final HttpException | IOException ex) {
182 asyncExecCallback.failed(ex);
183 }
184 } else {
185 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
186 }
187 }
188
189 }
190
191 private void proceedToNextHop(
192 final State state,
193 final HttpRequest request,
194 final AsyncEntityProducer entityProducer,
195 final AsyncExecChain.Scope scope,
196 final AsyncExecChain chain,
197 final AsyncExecCallback asyncExecCallback) {
198 final RouteTracker tracker = state.tracker;
199 final String exchangeId = scope.exchangeId;
200 final HttpRoute route = scope.route;
201 final AsyncExecRuntime execRuntime = scope.execRuntime;
202 final CancellableDependency operation = scope.cancellableDependency;
203 final HttpClientContext clientContext = scope.clientContext;
204
205 final HttpRoute fact = tracker.toRoute();
206 final int step = routeDirector.nextStep(route, fact);
207
208 switch (step) {
209 case HttpRouteDirector.CONNECT_TARGET:
210 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
211
212 @Override
213 public void completed(final AsyncExecRuntime execRuntime) {
214 tracker.connectTarget(route.isSecure());
215 if (LOG.isDebugEnabled()) {
216 LOG.debug("{} connected to target", exchangeId);
217 }
218 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
219 }
220
221 @Override
222 public void failed(final Exception ex) {
223 asyncExecCallback.failed(ex);
224 }
225
226 @Override
227 public void cancelled() {
228 asyncExecCallback.failed(new InterruptedIOException());
229 }
230
231 }));
232 break;
233
234 case HttpRouteDirector.CONNECT_PROXY:
235 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
236
237 @Override
238 public void completed(final AsyncExecRuntime execRuntime) {
239 final HttpHost proxy = route.getProxyHost();
240 tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
241 if (LOG.isDebugEnabled()) {
242 LOG.debug("{} connected to proxy", exchangeId);
243 }
244 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
245 }
246
247 @Override
248 public void failed(final Exception ex) {
249 asyncExecCallback.failed(ex);
250 }
251
252 @Override
253 public void cancelled() {
254 asyncExecCallback.failed(new InterruptedIOException());
255 }
256
257 }));
258 break;
259
260 case HttpRouteDirector.TUNNEL_TARGET:
261 try {
262 final HttpHost proxy = route.getProxyHost();
263 final HttpHost target = route.getTargetHost();
264 if (LOG.isDebugEnabled()) {
265 LOG.debug("{} create tunnel", exchangeId);
266 }
267 createTunnel(state, proxy, target, scope, new AsyncExecCallback() {
268
269 @Override
270 public AsyncDataConsumer handleResponse(
271 final HttpResponse response,
272 final EntityDetails entityDetails) throws HttpException, IOException {
273 return asyncExecCallback.handleResponse(response, entityDetails);
274 }
275
276 @Override
277 public void handleInformationResponse(
278 final HttpResponse response) throws HttpException, IOException {
279 asyncExecCallback.handleInformationResponse(response);
280 }
281
282 @Override
283 public void completed() {
284 if (!execRuntime.isEndpointConnected()) {
285
286 if (LOG.isDebugEnabled()) {
287 LOG.debug("{} proxy disconnected", exchangeId);
288 }
289 state.tracker.reset();
290 }
291 if (state.challenged) {
292 if (LOG.isDebugEnabled()) {
293 LOG.debug("{} proxy authentication required", exchangeId);
294 }
295 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
296 } else {
297 if (state.tunnelRefused) {
298 if (LOG.isDebugEnabled()) {
299 LOG.debug("{} tunnel refused", exchangeId);
300 }
301 asyncExecCallback.failed(new TunnelRefusedException("Tunnel refused", null));
302 } else {
303 if (LOG.isDebugEnabled()) {
304 LOG.debug("{} tunnel to target created", exchangeId);
305 }
306 tracker.tunnelTarget(false);
307 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
308 }
309 }
310 }
311
312 @Override
313 public void failed(final Exception cause) {
314 execRuntime.markConnectionNonReusable();
315 asyncExecCallback.failed(cause);
316 }
317
318 });
319 } catch (final HttpException | IOException ex) {
320 asyncExecCallback.failed(ex);
321 }
322 break;
323
324 case HttpRouteDirector.TUNNEL_PROXY:
325
326
327
328
329 asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
330 break;
331
332 case HttpRouteDirector.LAYER_PROTOCOL:
333 execRuntime.upgradeTls(clientContext, new FutureCallback<AsyncExecRuntime>() {
334
335 @Override
336 public void completed(final AsyncExecRuntime asyncExecRuntime) {
337 if (LOG.isDebugEnabled()) {
338 LOG.debug("{} upgraded to TLS", exchangeId);
339 }
340 tracker.layerProtocol(route.isSecure());
341 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
342 }
343
344 @Override
345 public void failed(final Exception ex) {
346 asyncExecCallback.failed(ex);
347 }
348
349 @Override
350 public void cancelled() {
351 asyncExecCallback.failed(new InterruptedIOException());
352 }
353
354 });
355 break;
356
357 case HttpRouteDirector.UNREACHABLE:
358 asyncExecCallback.failed(new HttpException("Unable to establish route: " +
359 "planned = " + route + "; current = " + fact));
360 break;
361
362 case HttpRouteDirector.COMPLETE:
363 if (LOG.isDebugEnabled()) {
364 LOG.debug("{} route fully established", exchangeId);
365 }
366 try {
367 chain.proceed(request, entityProducer, scope, asyncExecCallback);
368 } catch (final HttpException | IOException ex) {
369 asyncExecCallback.failed(ex);
370 }
371 break;
372
373 default:
374 throw new IllegalStateException("Unknown step indicator " + step + " from RouteDirector.");
375 }
376 }
377
378 private void createTunnel(
379 final State state,
380 final HttpHost proxy,
381 final HttpHost nextHop,
382 final AsyncExecChain.Scope scope,
383 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
384
385 final CancellableDependency operation = scope.cancellableDependency;
386 final HttpClientContext clientContext = scope.clientContext;
387 final AsyncExecRuntime execRuntime = scope.execRuntime;
388 final String exchangeId = scope.exchangeId;
389
390 final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
391
392 if (authCacheKeeper != null) {
393 authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, clientContext);
394 }
395
396 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
397
398 private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
399
400 @Override
401 public void releaseResources() {
402 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
403 if (entityConsumer != null) {
404 entityConsumer.releaseResources();
405 }
406 }
407
408 @Override
409 public void failed(final Exception cause) {
410 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
411 if (entityConsumer != null) {
412 entityConsumer.releaseResources();
413 }
414 asyncExecCallback.failed(cause);
415 }
416
417 @Override
418 public void cancel() {
419 failed(new InterruptedIOException());
420 }
421
422 @Override
423 public void produceRequest(final RequestChannel requestChannel,
424 final HttpContext httpContext) throws HttpException, IOException {
425 final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
426 connect.setVersion(HttpVersion.HTTP_1_1);
427
428 proxyHttpProcessor.process(connect, null, clientContext);
429 authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
430
431 requestChannel.sendRequest(connect, null, clientContext);
432 }
433
434 @Override
435 public void produce(final DataStreamChannel dataStreamChannel) throws IOException {
436 }
437
438 @Override
439 public int available() {
440 return 0;
441 }
442
443 @Override
444 public void consumeInformation(final HttpResponse httpResponse,
445 final HttpContext httpContext) throws HttpException, IOException {
446 }
447
448 @Override
449 public void consumeResponse(final HttpResponse response,
450 final EntityDetails entityDetails,
451 final HttpContext httpContext) throws HttpException, IOException {
452 clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
453 proxyHttpProcessor.process(response, entityDetails, clientContext);
454
455 final int status = response.getCode();
456 if (status < HttpStatus.SC_SUCCESS) {
457 throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
458 }
459
460 if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
461 state.challenged = true;
462 } else {
463 state.challenged = false;
464 if (status >= HttpStatus.SC_REDIRECTION) {
465 state.tunnelRefused = true;
466 entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
467 } else if (status == HttpStatus.SC_OK) {
468 asyncExecCallback.completed();
469 } else {
470 throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
471 }
472 }
473 }
474
475 @Override
476 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
477 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
478 if (entityConsumer != null) {
479 entityConsumer.updateCapacity(capacityChannel);
480 } else {
481 capacityChannel.update(Integer.MAX_VALUE);
482 }
483 }
484
485 @Override
486 public void consume(final ByteBuffer src) throws IOException {
487 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
488 if (entityConsumer != null) {
489 entityConsumer.consume(src);
490 }
491 }
492
493 @Override
494 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
495 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
496 if (entityConsumer != null) {
497 entityConsumer.streamEnd(trailers);
498 }
499 asyncExecCallback.completed();
500 }
501
502 };
503
504 if (LOG.isDebugEnabled()) {
505 operation.setDependency(execRuntime.execute(
506 exchangeId,
507 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
508 clientContext));
509 } else {
510 operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
511 }
512
513 }
514
515 private boolean needAuthentication(
516 final AuthExchange proxyAuthExchange,
517 final HttpHost proxy,
518 final HttpResponse response,
519 final HttpClientContext context) {
520 final RequestConfig config = context.getRequestConfig();
521 if (config.isAuthenticationEnabled()) {
522 final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context);
523
524 if (authCacheKeeper != null) {
525 if (proxyAuthRequested) {
526 authCacheKeeper.updateOnChallenge(proxy, null, proxyAuthExchange, context);
527 } else {
528 authCacheKeeper.updateOnNoChallenge(proxy, null, proxyAuthExchange, context);
529 }
530 }
531
532 if (proxyAuthRequested) {
533 final boolean updated = authenticator.updateAuthState(proxy, ChallengeType.PROXY, response,
534 proxyAuthStrategy, proxyAuthExchange, context);
535
536 if (authCacheKeeper != null) {
537 authCacheKeeper.updateOnResponse(proxy, null, proxyAuthExchange, context);
538 }
539
540 return updated;
541 }
542 }
543 return false;
544 }
545
546 }