1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32 import java.nio.ByteBuffer;
33 import java.util.List;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.hc.client5.http.AuthenticationStrategy;
37 import org.apache.hc.client5.http.EndpointInfo;
38 import org.apache.hc.client5.http.HttpRoute;
39 import org.apache.hc.client5.http.RouteTracker;
40 import org.apache.hc.client5.http.SchemePortResolver;
41 import org.apache.hc.client5.http.async.AsyncExecCallback;
42 import org.apache.hc.client5.http.async.AsyncExecChain;
43 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
44 import org.apache.hc.client5.http.async.AsyncExecRuntime;
45 import org.apache.hc.client5.http.auth.AuthExchange;
46 import org.apache.hc.client5.http.auth.ChallengeType;
47 import org.apache.hc.client5.http.config.RequestConfig;
48 import org.apache.hc.client5.http.impl.auth.AuthCacheKeeper;
49 import org.apache.hc.client5.http.impl.auth.HttpAuthenticator;
50 import org.apache.hc.client5.http.impl.routing.BasicRouteDirector;
51 import org.apache.hc.client5.http.protocol.HttpClientContext;
52 import org.apache.hc.client5.http.routing.HttpRouteDirector;
53 import org.apache.hc.core5.annotation.Contract;
54 import org.apache.hc.core5.annotation.Internal;
55 import org.apache.hc.core5.annotation.ThreadingBehavior;
56 import org.apache.hc.core5.concurrent.CancellableDependency;
57 import org.apache.hc.core5.concurrent.FutureCallback;
58 import org.apache.hc.core5.http.EntityDetails;
59 import org.apache.hc.core5.http.Header;
60 import org.apache.hc.core5.http.HttpException;
61 import org.apache.hc.core5.http.HttpHost;
62 import org.apache.hc.core5.http.HttpRequest;
63 import org.apache.hc.core5.http.HttpResponse;
64 import org.apache.hc.core5.http.HttpStatus;
65 import org.apache.hc.core5.http.HttpVersion;
66 import org.apache.hc.core5.http.Method;
67 import org.apache.hc.core5.http.message.BasicHttpRequest;
68 import org.apache.hc.core5.http.message.StatusLine;
69 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
70 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
71 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
72 import org.apache.hc.core5.http.nio.CapacityChannel;
73 import org.apache.hc.core5.http.nio.DataStreamChannel;
74 import org.apache.hc.core5.http.nio.RequestChannel;
75 import org.apache.hc.core5.http.protocol.HttpContext;
76 import org.apache.hc.core5.http.protocol.HttpProcessor;
77 import org.apache.hc.core5.util.Args;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80
81
82
83
84
85
86
87
88 @Contract(threading = ThreadingBehavior.STATELESS)
89 @Internal
90 public final class AsyncConnectExec implements AsyncExecChainHandler {
91
92 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectExec.class);
93
94 private final HttpProcessor proxyHttpProcessor;
95 private final AuthenticationStrategy proxyAuthStrategy;
96 private final HttpAuthenticator authenticator;
97 private final AuthCacheKeeper authCacheKeeper;
98 private final HttpRouteDirector routeDirector;
99
100 public AsyncConnectExec(
101 final HttpProcessor proxyHttpProcessor,
102 final AuthenticationStrategy proxyAuthStrategy,
103 final SchemePortResolver schemePortResolver,
104 final boolean authCachingDisabled) {
105 Args.notNull(proxyHttpProcessor, "Proxy HTTP processor");
106 Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
107 this.proxyHttpProcessor = proxyHttpProcessor;
108 this.proxyAuthStrategy = proxyAuthStrategy;
109 this.authenticator = new HttpAuthenticator();
110 this.authCacheKeeper = authCachingDisabled ? null : new AuthCacheKeeper(schemePortResolver);
111 this.routeDirector = BasicRouteDirector.INSTANCE;
112 }
113
114 static class State {
115
116 State(final HttpRoute route) {
117 tracker = new RouteTracker(route);
118 }
119
120 final RouteTracker tracker;
121
122 volatile boolean challenged;
123 volatile HttpResponse response;
124 volatile boolean tunnelRefused;
125
126 }
127
128 @Override
129 public void execute(
130 final HttpRequest request,
131 final AsyncEntityProducer entityProducer,
132 final AsyncExecChain.Scope scope,
133 final AsyncExecChain chain,
134 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
135 Args.notNull(request, "HTTP request");
136 Args.notNull(scope, "Scope");
137
138 final String exchangeId = scope.exchangeId;
139 final HttpRoute route = scope.route;
140 final CancellableDependency cancellableDependency = scope.cancellableDependency;
141 final HttpClientContext clientContext = scope.clientContext;
142 final AsyncExecRuntime execRuntime = scope.execRuntime;
143 final State state = new State(route);
144
145 if (!execRuntime.isEndpointAcquired()) {
146 final Object userToken = clientContext.getUserToken();
147 if (LOG.isDebugEnabled()) {
148 LOG.debug("{} acquiring connection with route {}", exchangeId, route);
149 }
150 cancellableDependency.setDependency(execRuntime.acquireEndpoint(
151 exchangeId, route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
152
153 @Override
154 public void completed(final AsyncExecRuntime execRuntime) {
155 if (execRuntime.isEndpointConnected()) {
156 try {
157 chain.proceed(request, entityProducer, scope, asyncExecCallback);
158 } catch (final HttpException | IOException ex) {
159 asyncExecCallback.failed(ex);
160 }
161 } else {
162 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
163 }
164 }
165
166 @Override
167 public void failed(final Exception ex) {
168 asyncExecCallback.failed(ex);
169 }
170
171 @Override
172 public void cancelled() {
173 asyncExecCallback.failed(new InterruptedIOException());
174 }
175
176 }));
177 } else {
178 if (execRuntime.isEndpointConnected()) {
179 proceedConnected(request, entityProducer, scope, chain, asyncExecCallback);
180 } else {
181 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
182 }
183 }
184
185 }
186
187 private void proceedToNextHop(
188 final State state,
189 final HttpRequest request,
190 final AsyncEntityProducer entityProducer,
191 final AsyncExecChain.Scope scope,
192 final AsyncExecChain chain,
193 final AsyncExecCallback asyncExecCallback) {
194 final RouteTracker tracker = state.tracker;
195 final String exchangeId = scope.exchangeId;
196 final HttpRoute route = scope.route;
197 final AsyncExecRuntime execRuntime = scope.execRuntime;
198 final CancellableDependency operation = scope.cancellableDependency;
199 final HttpClientContext clientContext = scope.clientContext;
200
201 final HttpRoute fact = tracker.toRoute();
202 final int step = routeDirector.nextStep(route, fact);
203
204 switch (step) {
205 case HttpRouteDirector.CONNECT_TARGET:
206 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
207
208 @Override
209 public void completed(final AsyncExecRuntime execRuntime) {
210 tracker.connectTarget(route.isSecure());
211 if (LOG.isDebugEnabled()) {
212 LOG.debug("{} connected to target", exchangeId);
213 }
214 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
215 }
216
217 @Override
218 public void failed(final Exception ex) {
219 asyncExecCallback.failed(ex);
220 }
221
222 @Override
223 public void cancelled() {
224 asyncExecCallback.failed(new InterruptedIOException());
225 }
226
227 }));
228 break;
229
230 case HttpRouteDirector.CONNECT_PROXY:
231 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
232
233 @Override
234 public void completed(final AsyncExecRuntime execRuntime) {
235 final HttpHost proxy = route.getProxyHost();
236 tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
237 if (LOG.isDebugEnabled()) {
238 LOG.debug("{} connected to proxy", exchangeId);
239 }
240 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
241 }
242
243 @Override
244 public void failed(final Exception ex) {
245 asyncExecCallback.failed(ex);
246 }
247
248 @Override
249 public void cancelled() {
250 asyncExecCallback.failed(new InterruptedIOException());
251 }
252
253 }));
254 break;
255
256 case HttpRouteDirector.TUNNEL_TARGET:
257 try {
258 final HttpHost proxy = route.getProxyHost();
259 final HttpHost target = route.getTargetHost();
260 if (LOG.isDebugEnabled()) {
261 LOG.debug("{} create tunnel", exchangeId);
262 }
263 createTunnel(state, proxy, target, scope, new AsyncExecCallback() {
264
265 @Override
266 public AsyncDataConsumer handleResponse(
267 final HttpResponse response,
268 final EntityDetails entityDetails) throws HttpException, IOException {
269 return asyncExecCallback.handleResponse(response, entityDetails);
270 }
271
272 @Override
273 public void handleInformationResponse(
274 final HttpResponse response) throws HttpException, IOException {
275 asyncExecCallback.handleInformationResponse(response);
276 }
277
278 @Override
279 public void completed() {
280 if (!execRuntime.isEndpointConnected()) {
281
282 if (LOG.isDebugEnabled()) {
283 LOG.debug("{} proxy disconnected", exchangeId);
284 }
285 state.tracker.reset();
286 }
287 if (state.challenged) {
288 if (LOG.isDebugEnabled()) {
289 LOG.debug("{} proxy authentication required", exchangeId);
290 }
291 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
292 } else {
293 if (state.tunnelRefused) {
294 if (LOG.isDebugEnabled()) {
295 LOG.debug("{} tunnel refused", exchangeId);
296 }
297 asyncExecCallback.completed();
298 } else {
299 if (LOG.isDebugEnabled()) {
300 LOG.debug("{} tunnel to target created", exchangeId);
301 }
302 tracker.tunnelTarget(false);
303 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
304 }
305 }
306 }
307
308 @Override
309 public void failed(final Exception cause) {
310 execRuntime.markConnectionNonReusable();
311 asyncExecCallback.failed(cause);
312 }
313
314 });
315 } catch (final HttpException | IOException ex) {
316 asyncExecCallback.failed(ex);
317 }
318 break;
319
320 case HttpRouteDirector.TUNNEL_PROXY:
321
322
323
324
325 asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
326 break;
327
328 case HttpRouteDirector.LAYER_PROTOCOL:
329 execRuntime.upgradeTls(clientContext, new FutureCallback<AsyncExecRuntime>() {
330
331 @Override
332 public void completed(final AsyncExecRuntime asyncExecRuntime) {
333 if (LOG.isDebugEnabled()) {
334 LOG.debug("{} upgraded to TLS", exchangeId);
335 }
336 tracker.layerProtocol(route.isSecure());
337 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
338 }
339
340 @Override
341 public void failed(final Exception ex) {
342 asyncExecCallback.failed(ex);
343 }
344
345 @Override
346 public void cancelled() {
347 asyncExecCallback.failed(new InterruptedIOException());
348 }
349
350 });
351 break;
352
353 case HttpRouteDirector.UNREACHABLE:
354 asyncExecCallback.failed(new HttpException("Unable to establish route: " +
355 "planned = " + route + "; current = " + fact));
356 break;
357
358 case HttpRouteDirector.COMPLETE:
359 if (LOG.isDebugEnabled()) {
360 LOG.debug("{} route fully established", exchangeId);
361 }
362 proceedConnected(request, entityProducer, scope, chain, asyncExecCallback);
363 break;
364
365 default:
366 throw new IllegalStateException("Unknown step indicator " + step + " from RouteDirector.");
367 }
368 }
369
370 private void createTunnel(
371 final State state,
372 final HttpHost proxy,
373 final HttpHost nextHop,
374 final AsyncExecChain.Scope scope,
375 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
376
377 final CancellableDependency operation = scope.cancellableDependency;
378 final HttpClientContext clientContext = scope.clientContext;
379 final AsyncExecRuntime execRuntime = scope.execRuntime;
380 final String exchangeId = scope.exchangeId;
381
382 final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
383
384 if (authCacheKeeper != null) {
385 authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, clientContext);
386 }
387
388 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
389
390 private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
391
392 @Override
393 public void releaseResources() {
394 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
395 if (entityConsumer != null) {
396 entityConsumer.releaseResources();
397 }
398 }
399
400 @Override
401 public void failed(final Exception cause) {
402 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
403 if (entityConsumer != null) {
404 entityConsumer.releaseResources();
405 }
406 asyncExecCallback.failed(cause);
407 }
408
409 @Override
410 public void cancel() {
411 failed(new InterruptedIOException());
412 }
413
414 @Override
415 public void produceRequest(final RequestChannel requestChannel,
416 final HttpContext httpContext) throws HttpException, IOException {
417 final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
418 connect.setVersion(HttpVersion.HTTP_1_1);
419
420 proxyHttpProcessor.process(connect, null, clientContext);
421 authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
422
423 requestChannel.sendRequest(connect, null, clientContext);
424 }
425
426 @Override
427 public void produce(final DataStreamChannel dataStreamChannel) throws IOException {
428 }
429
430 @Override
431 public int available() {
432 return 0;
433 }
434
435 @Override
436 public void consumeInformation(final HttpResponse httpResponse,
437 final HttpContext httpContext) throws HttpException, IOException {
438 }
439
440 @Override
441 public void consumeResponse(final HttpResponse response,
442 final EntityDetails entityDetails,
443 final HttpContext httpContext) throws HttpException, IOException {
444 clientContext.setResponse(response);
445 proxyHttpProcessor.process(response, entityDetails, clientContext);
446
447 final int status = response.getCode();
448 if (status < HttpStatus.SC_SUCCESS) {
449 throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
450 }
451
452 if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
453 state.challenged = true;
454 } else {
455 state.challenged = false;
456 if (status >= HttpStatus.SC_REDIRECTION) {
457 state.tunnelRefused = true;
458 entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
459 } else if (status == HttpStatus.SC_OK) {
460 asyncExecCallback.completed();
461 } else {
462 throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
463 }
464 }
465 }
466
467 @Override
468 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
469 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
470 if (entityConsumer != null) {
471 entityConsumer.updateCapacity(capacityChannel);
472 } else {
473 capacityChannel.update(Integer.MAX_VALUE);
474 }
475 }
476
477 @Override
478 public void consume(final ByteBuffer src) throws IOException {
479 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
480 if (entityConsumer != null) {
481 entityConsumer.consume(src);
482 }
483 }
484
485 @Override
486 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
487 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
488 if (entityConsumer != null) {
489 entityConsumer.streamEnd(trailers);
490 }
491 asyncExecCallback.completed();
492 }
493
494 };
495
496 if (LOG.isDebugEnabled()) {
497 operation.setDependency(execRuntime.execute(
498 exchangeId,
499 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
500 clientContext));
501 } else {
502 operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
503 }
504
505 }
506
507 private boolean needAuthentication(
508 final AuthExchange proxyAuthExchange,
509 final HttpHost proxy,
510 final HttpResponse response,
511 final HttpClientContext context) {
512 final RequestConfig config = context.getRequestConfigOrDefault();
513 if (config.isAuthenticationEnabled()) {
514 final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context);
515
516 if (authCacheKeeper != null) {
517 if (proxyAuthRequested) {
518 authCacheKeeper.updateOnChallenge(proxy, null, proxyAuthExchange, context);
519 } else {
520 authCacheKeeper.updateOnNoChallenge(proxy, null, proxyAuthExchange, context);
521 }
522 }
523
524 if (proxyAuthRequested) {
525 final boolean updated = authenticator.updateAuthState(proxy, ChallengeType.PROXY, response,
526 proxyAuthStrategy, proxyAuthExchange, context);
527
528 if (authCacheKeeper != null) {
529 authCacheKeeper.updateOnResponse(proxy, null, proxyAuthExchange, context);
530 }
531
532 return updated;
533 }
534 }
535 return false;
536 }
537
538 private void proceedConnected(
539 final HttpRequest request,
540 final AsyncEntityProducer entityProducer,
541 final AsyncExecChain.Scope scope,
542 final AsyncExecChain chain,
543 final AsyncExecCallback asyncExecCallback) {
544 final AsyncExecRuntime execRuntime = scope.execRuntime;
545 final HttpClientContext clientContext = scope.clientContext;
546 final EndpointInfo endpointInfo = execRuntime.getEndpointInfo();
547 if (endpointInfo != null) {
548 clientContext.setProtocolVersion(endpointInfo.getProtocol());
549 clientContext.setSSLSession(endpointInfo.getSslSession());
550 }
551 try {
552 chain.proceed(request, entityProducer, scope, asyncExecCallback);
553 } catch (final HttpException | IOException ex) {
554 asyncExecCallback.failed(ex);
555 }
556 }
557
558 }