1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.apache.hc.client5.http.HttpRoute;
39 import org.apache.hc.client5.http.SchemePortResolver;
40 import org.apache.hc.client5.http.config.Configurable;
41 import org.apache.hc.client5.http.config.RequestConfig;
42 import org.apache.hc.client5.http.impl.ConnPoolSupport;
43 import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
44 import org.apache.hc.client5.http.impl.ExecSupport;
45 import org.apache.hc.client5.http.impl.classic.RequestFailedException;
46 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
47 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
48 import org.apache.hc.client5.http.protocol.HttpClientContext;
49 import org.apache.hc.client5.http.routing.RoutingSupport;
50 import org.apache.hc.core5.annotation.Contract;
51 import org.apache.hc.core5.annotation.ThreadingBehavior;
52 import org.apache.hc.core5.concurrent.BasicFuture;
53 import org.apache.hc.core5.concurrent.Cancellable;
54 import org.apache.hc.core5.concurrent.ComplexCancellable;
55 import org.apache.hc.core5.concurrent.ComplexFuture;
56 import org.apache.hc.core5.concurrent.FutureCallback;
57 import org.apache.hc.core5.function.Callback;
58 import org.apache.hc.core5.http.EntityDetails;
59 import org.apache.hc.core5.http.Header;
60 import org.apache.hc.core5.http.HttpException;
61 import org.apache.hc.core5.http.HttpHost;
62 import org.apache.hc.core5.http.HttpRequest;
63 import org.apache.hc.core5.http.HttpResponse;
64 import org.apache.hc.core5.http.HttpStatus;
65 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
66 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
67 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
68 import org.apache.hc.core5.http.nio.CapacityChannel;
69 import org.apache.hc.core5.http.nio.DataStreamChannel;
70 import org.apache.hc.core5.http.nio.HandlerFactory;
71 import org.apache.hc.core5.http.nio.RequestChannel;
72 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
73 import org.apache.hc.core5.http.protocol.HttpContext;
74 import org.apache.hc.core5.http2.HttpVersionPolicy;
75 import org.apache.hc.core5.io.CloseMode;
76 import org.apache.hc.core5.io.Closer;
77 import org.apache.hc.core5.reactor.Command;
78 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
79 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
80 import org.apache.hc.core5.reactor.IOReactorConfig;
81 import org.apache.hc.core5.reactor.IOSession;
82 import org.apache.hc.core5.util.Args;
83 import org.apache.hc.core5.util.Asserts;
84 import org.apache.hc.core5.util.TimeValue;
85 import org.apache.hc.core5.util.Timeout;
86 import org.slf4j.Logger;
87 import org.slf4j.LoggerFactory;
88
89
90
91
92
93
94
95
96
97
98
99
100
101 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
102 public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClientBase {
103
104 private static final Logger LOG = LoggerFactory.getLogger(MinimalHttpAsyncClient.class);
105 private final AsyncClientConnectionManager manager;
106 private final SchemePortResolver schemePortResolver;
107 private final HttpVersionPolicy versionPolicy;
108
109 MinimalHttpAsyncClient(
110 final IOEventHandlerFactory eventHandlerFactory,
111 final AsyncPushConsumerRegistry pushConsumerRegistry,
112 final HttpVersionPolicy versionPolicy,
113 final IOReactorConfig reactorConfig,
114 final ThreadFactory threadFactory,
115 final ThreadFactory workerThreadFactory,
116 final AsyncClientConnectionManager manager,
117 final SchemePortResolver schemePortResolver) {
118 super(new DefaultConnectingIOReactor(
119 eventHandlerFactory,
120 reactorConfig,
121 workerThreadFactory,
122 LoggingIOSessionDecorator.INSTANCE,
123 LoggingExceptionCallback.INSTANCE,
124 null,
125 new Callback<IOSession>() {
126
127 @Override
128 public void execute(final IOSession ioSession) {
129 ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL);
130 }
131
132 }),
133 pushConsumerRegistry,
134 threadFactory);
135 this.manager = manager;
136 this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
137 this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
138 }
139
140 private Future<AsyncConnectionEndpoint> leaseEndpoint(
141 final HttpHost host,
142 final Timeout connectionRequestTimeout,
143 final Timeout connectTimeout,
144 final HttpClientContext clientContext,
145 final FutureCallback<AsyncConnectionEndpoint> callback) {
146 final HttpRoute/http/HttpRoute.html#HttpRoute">HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver));
147 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
148 final String exchangeId = ExecSupport.getNextExchangeId();
149 clientContext.setExchangeId(exchangeId);
150 final Future<AsyncConnectionEndpoint> leaseFuture = manager.lease(
151 exchangeId,
152 route,
153 null,
154 connectionRequestTimeout,
155 new FutureCallback<AsyncConnectionEndpoint>() {
156
157 @Override
158 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
159 if (connectionEndpoint.isConnected()) {
160 resultFuture.completed(connectionEndpoint);
161 } else {
162 final Future<AsyncConnectionEndpoint> connectFuture = manager.connect(
163 connectionEndpoint,
164 getConnectionInitiator(),
165 connectTimeout,
166 versionPolicy,
167 clientContext,
168 new FutureCallback<AsyncConnectionEndpoint>() {
169
170 @Override
171 public void completed(final AsyncConnectionEndpoint result) {
172 resultFuture.completed(result);
173 }
174
175 @Override
176 public void failed(final Exception ex) {
177 resultFuture.failed(ex);
178 }
179
180 @Override
181 public void cancelled() {
182 resultFuture.cancel(true);
183 }
184
185 });
186 resultFuture.setDependency(connectFuture);
187 }
188 }
189
190 @Override
191 public void failed(final Exception ex) {
192 callback.failed(ex);
193 }
194
195 @Override
196 public void cancelled() {
197 callback.cancelled();
198 }
199
200 });
201 resultFuture.setDependency(leaseFuture);
202 return resultFuture;
203 }
204
205 public Future<AsyncClientEndpoint> lease(
206 final HttpHost host,
207 final FutureCallback<AsyncClientEndpoint> callback) {
208 return lease(host, HttpClientContext.create(), callback);
209 }
210
211 public Future<AsyncClientEndpoint> lease(
212 final HttpHost host,
213 final HttpContext context,
214 final FutureCallback<AsyncClientEndpoint> callback) {
215 Args.notNull(host, "Host");
216 Args.notNull(context, "HTTP context");
217 final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
218 if (!isRunning()) {
219 future.failed(new CancellationException("Connection lease cancelled"));
220 return future;
221 }
222 final HttpClientContext clientContext = HttpClientContext.adapt(context);
223 final RequestConfig requestConfig = clientContext.getRequestConfig();
224 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
225 final Timeout connectTimeout = requestConfig.getConnectTimeout();
226 leaseEndpoint(
227 host,
228 connectionRequestTimeout,
229 connectTimeout,
230 clientContext,
231 new FutureCallback<AsyncConnectionEndpoint>() {
232
233 @Override
234 public void completed(final AsyncConnectionEndpoint result) {
235 future.completed(new InternalAsyncClientEndpoint(result));
236 }
237
238 @Override
239 public void failed(final Exception ex) {
240 future.failed(ex);
241 }
242
243 @Override
244 public void cancelled() {
245 future.cancel(true);
246 }
247
248 });
249 return future;
250 }
251
252 @Override
253 public Cancellable execute(
254 final AsyncClientExchangeHandler exchangeHandler,
255 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
256 final HttpContext context) {
257 final ComplexCancellable cancellable = new ComplexCancellable();
258 try {
259 if (!isRunning()) {
260 throw new CancellationException("Request execution cancelled");
261 }
262 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
263 exchangeHandler.produceRequest(new RequestChannel() {
264
265 @Override
266 public void sendRequest(
267 final HttpRequest request,
268 final EntityDetails entityDetails,
269 final HttpContext context) throws HttpException, IOException {
270 RequestConfig requestConfig = null;
271 if (request instanceof Configurable) {
272 requestConfig = ((Configurable) request).getConfig();
273 }
274 if (requestConfig != null) {
275 clientContext.setRequestConfig(requestConfig);
276 } else {
277 requestConfig = clientContext.getRequestConfig();
278 }
279 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
280 final Timeout connectTimeout = requestConfig.getConnectTimeout();
281 final Timeout responseTimeout = requestConfig.getResponseTimeout();
282 final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
283
284 final Future<AsyncConnectionEndpoint> leaseFuture = leaseEndpoint(
285 target,
286 connectionRequestTimeout,
287 connectTimeout,
288 clientContext,
289 new FutureCallback<AsyncConnectionEndpoint>() {
290
291 @Override
292 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
293 final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(connectionEndpoint);
294 final AtomicInteger messageCountDown = new AtomicInteger(2);
295 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
296
297 @Override
298 public void releaseResources() {
299 try {
300 exchangeHandler.releaseResources();
301 } finally {
302 endpoint.releaseAndDiscard();
303 }
304 }
305
306 @Override
307 public void failed(final Exception cause) {
308 try {
309 exchangeHandler.failed(cause);
310 } finally {
311 endpoint.releaseAndDiscard();
312 }
313 }
314
315 @Override
316 public void cancel() {
317 failed(new RequestFailedException("Request aborted"));
318 }
319
320 @Override
321 public void produceRequest(
322 final RequestChannel channel,
323 final HttpContext context) throws HttpException, IOException {
324 channel.sendRequest(request, entityDetails, context);
325 if (entityDetails == null) {
326 messageCountDown.decrementAndGet();
327 }
328 }
329
330 @Override
331 public int available() {
332 return exchangeHandler.available();
333 }
334
335 @Override
336 public void produce(final DataStreamChannel channel) throws IOException {
337 exchangeHandler.produce(new DataStreamChannel() {
338
339 @Override
340 public void requestOutput() {
341 channel.requestOutput();
342 }
343
344 @Override
345 public int write(final ByteBuffer src) throws IOException {
346 return channel.write(src);
347 }
348
349 @Override
350 public void endStream(final List<? extends Header> trailers) throws IOException {
351 channel.endStream(trailers);
352 if (messageCountDown.decrementAndGet() <= 0) {
353 endpoint.releaseAndReuse();
354 }
355 }
356
357 @Override
358 public void endStream() throws IOException {
359 channel.endStream();
360 if (messageCountDown.decrementAndGet() <= 0) {
361 endpoint.releaseAndReuse();
362 }
363 }
364
365 });
366 }
367
368 @Override
369 public void consumeInformation(
370 final HttpResponse response,
371 final HttpContext context) throws HttpException, IOException {
372 exchangeHandler.consumeInformation(response, context);
373 }
374
375 @Override
376 public void consumeResponse(
377 final HttpResponse response,
378 final EntityDetails entityDetails,
379 final HttpContext context) throws HttpException, IOException {
380 exchangeHandler.consumeResponse(response, entityDetails, context);
381 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
382 messageCountDown.decrementAndGet();
383 }
384 if (entityDetails == null) {
385 if (messageCountDown.decrementAndGet() <= 0) {
386 endpoint.releaseAndReuse();
387 }
388 }
389 }
390
391 @Override
392 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
393 exchangeHandler.updateCapacity(capacityChannel);
394 }
395
396 @Override
397 public void consume(final ByteBuffer src) throws IOException {
398 exchangeHandler.consume(src);
399 }
400
401 @Override
402 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
403 if (messageCountDown.decrementAndGet() <= 0) {
404 endpoint.releaseAndReuse();
405 }
406 exchangeHandler.streamEnd(trailers);
407 }
408
409 };
410 if (responseTimeout != null) {
411 endpoint.setSocketTimeout(responseTimeout);
412 }
413 endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
414 }
415
416 @Override
417 public void failed(final Exception ex) {
418 exchangeHandler.failed(ex);
419 }
420
421 @Override
422 public void cancelled() {
423 exchangeHandler.cancel();
424 }
425
426 });
427
428 cancellable.setDependency(new Cancellable() {
429
430 @Override
431 public boolean cancel() {
432 return leaseFuture.cancel(true);
433 }
434
435 });
436 }
437 }, context);
438
439 } catch (final HttpException | IOException | IllegalStateException ex) {
440 exchangeHandler.failed(ex);
441 }
442 return cancellable;
443 }
444
445 private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
446
447 private final AsyncConnectionEndpoint connectionEndpoint;
448 private final AtomicBoolean released;
449
450 InternalAsyncClientEndpoint(final AsyncConnectionEndpoint connectionEndpoint) {
451 this.connectionEndpoint = connectionEndpoint;
452 this.released = new AtomicBoolean(false);
453 }
454
455 boolean isReleased() {
456 return released.get();
457 }
458
459 @Override
460 public boolean isConnected() {
461 return !isReleased() && connectionEndpoint.isConnected();
462 }
463
464 @Override
465 public void execute(
466 final AsyncClientExchangeHandler exchangeHandler,
467 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
468 final HttpContext context) {
469 Asserts.check(!released.get(), "Endpoint has already been released");
470
471 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
472 final String exchangeId = ExecSupport.getNextExchangeId();
473 clientContext.setExchangeId(exchangeId);
474 if (LOG.isDebugEnabled()) {
475 LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint));
476 connectionEndpoint.execute(
477 exchangeId,
478 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
479 pushHandlerFactory,
480 clientContext);
481 } else {
482 connectionEndpoint.execute(exchangeId, exchangeHandler, clientContext);
483 }
484 }
485
486 public void setSocketTimeout(final Timeout timeout) {
487 connectionEndpoint.setSocketTimeout(timeout);
488 }
489
490 @Override
491 public void releaseAndReuse() {
492 if (released.compareAndSet(false, true)) {
493 manager.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECOND);
494 }
495 }
496
497 @Override
498 public void releaseAndDiscard() {
499 if (released.compareAndSet(false, true)) {
500 Closer.closeQuietly(connectionEndpoint);
501 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
502 }
503 }
504
505 }
506
507 }