1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.apache.hc.client5.http.HttpRoute;
39 import org.apache.hc.client5.http.SchemePortResolver;
40 import org.apache.hc.client5.http.config.Configurable;
41 import org.apache.hc.client5.http.config.RequestConfig;
42 import org.apache.hc.client5.http.config.TlsConfig;
43 import org.apache.hc.client5.http.impl.ConnPoolSupport;
44 import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
45 import org.apache.hc.client5.http.impl.ExecSupport;
46 import org.apache.hc.client5.http.impl.classic.RequestFailedException;
47 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
48 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
49 import org.apache.hc.client5.http.protocol.HttpClientContext;
50 import org.apache.hc.client5.http.routing.RoutingSupport;
51 import org.apache.hc.core5.annotation.Contract;
52 import org.apache.hc.core5.annotation.ThreadingBehavior;
53 import org.apache.hc.core5.concurrent.BasicFuture;
54 import org.apache.hc.core5.concurrent.Cancellable;
55 import org.apache.hc.core5.concurrent.ComplexCancellable;
56 import org.apache.hc.core5.concurrent.ComplexFuture;
57 import org.apache.hc.core5.concurrent.FutureCallback;
58 import org.apache.hc.core5.http.EntityDetails;
59 import org.apache.hc.core5.http.Header;
60 import org.apache.hc.core5.http.HttpException;
61 import org.apache.hc.core5.http.HttpHost;
62 import org.apache.hc.core5.http.HttpResponse;
63 import org.apache.hc.core5.http.HttpStatus;
64 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
65 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
66 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
67 import org.apache.hc.core5.http.nio.CapacityChannel;
68 import org.apache.hc.core5.http.nio.DataStreamChannel;
69 import org.apache.hc.core5.http.nio.HandlerFactory;
70 import org.apache.hc.core5.http.nio.RequestChannel;
71 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
72 import org.apache.hc.core5.http.protocol.HttpContext;
73 import org.apache.hc.core5.io.CloseMode;
74 import org.apache.hc.core5.io.Closer;
75 import org.apache.hc.core5.reactor.Command;
76 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
77 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
78 import org.apache.hc.core5.reactor.IOReactorConfig;
79 import org.apache.hc.core5.util.Args;
80 import org.apache.hc.core5.util.Asserts;
81 import org.apache.hc.core5.util.TimeValue;
82 import org.apache.hc.core5.util.Timeout;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85
86
87
88
89
90
91
92
93
94
95
96
97
98 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
99 public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClientBase {
100
101 private static final Logger LOG = LoggerFactory.getLogger(MinimalHttpAsyncClient.class);
102 private final AsyncClientConnectionManager manager;
103 private final SchemePortResolver schemePortResolver;
104 private final TlsConfig tlsConfig;
105
106 MinimalHttpAsyncClient(
107 final IOEventHandlerFactory eventHandlerFactory,
108 final AsyncPushConsumerRegistry pushConsumerRegistry,
109 final IOReactorConfig reactorConfig,
110 final ThreadFactory threadFactory,
111 final ThreadFactory workerThreadFactory,
112 final AsyncClientConnectionManager manager,
113 final SchemePortResolver schemePortResolver,
114 final TlsConfig tlsConfig) {
115 super(new DefaultConnectingIOReactor(
116 eventHandlerFactory,
117 reactorConfig,
118 workerThreadFactory,
119 LoggingIOSessionDecorator.INSTANCE,
120 LoggingExceptionCallback.INSTANCE,
121 null,
122 ioSession -> ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL)),
123 pushConsumerRegistry,
124 threadFactory);
125 this.manager = manager;
126 this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
127 this.tlsConfig = tlsConfig;
128 }
129
130 private Future<AsyncConnectionEndpoint> leaseEndpoint(
131 final HttpHost host,
132 final Timeout connectionRequestTimeout,
133 final Timeout connectTimeout,
134 final HttpClientContext clientContext,
135 final FutureCallback<AsyncConnectionEndpoint> callback) {
136 final HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver));
137 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
138 final String exchangeId = ExecSupport.getNextExchangeId();
139 clientContext.setExchangeId(exchangeId);
140 final Future<AsyncConnectionEndpoint> leaseFuture = manager.lease(
141 exchangeId,
142 route,
143 null,
144 connectionRequestTimeout,
145 new FutureCallback<AsyncConnectionEndpoint>() {
146
147 @Override
148 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
149 if (connectionEndpoint.isConnected()) {
150 resultFuture.completed(connectionEndpoint);
151 } else {
152 final Future<AsyncConnectionEndpoint> connectFuture = manager.connect(
153 connectionEndpoint,
154 getConnectionInitiator(),
155 connectTimeout,
156 tlsConfig,
157 clientContext,
158 new FutureCallback<AsyncConnectionEndpoint>() {
159
160 @Override
161 public void completed(final AsyncConnectionEndpoint result) {
162 resultFuture.completed(result);
163 }
164
165 @Override
166 public void failed(final Exception ex) {
167 try {
168 Closer.closeQuietly(connectionEndpoint);
169 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
170 } finally {
171 resultFuture.failed(ex);
172 }
173 }
174
175 @Override
176 public void cancelled() {
177 try {
178 Closer.closeQuietly(connectionEndpoint);
179 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
180 } finally {
181 resultFuture.cancel(true);
182 }
183 }
184
185 });
186 resultFuture.setDependency(connectFuture);
187 }
188 }
189
190 @Override
191 public void failed(final Exception ex) {
192 callback.failed(ex);
193 }
194
195 @Override
196 public void cancelled() {
197 callback.cancelled();
198 }
199
200 });
201 resultFuture.setDependency(leaseFuture);
202 return resultFuture;
203 }
204
205 public Future<AsyncClientEndpoint> lease(
206 final HttpHost host,
207 final FutureCallback<AsyncClientEndpoint> callback) {
208 return lease(host, HttpClientContext.create(), callback);
209 }
210
211 public Future<AsyncClientEndpoint> lease(
212 final HttpHost host,
213 final HttpContext context,
214 final FutureCallback<AsyncClientEndpoint> callback) {
215 Args.notNull(host, "Host");
216 Args.notNull(context, "HTTP context");
217 final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
218 if (!isRunning()) {
219 future.failed(new CancellationException("Connection lease cancelled"));
220 return future;
221 }
222 final HttpClientContext clientContext = HttpClientContext.adapt(context);
223 final RequestConfig requestConfig = clientContext.getRequestConfig();
224 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
225 @SuppressWarnings("deprecation")
226 final Timeout connectTimeout = requestConfig.getConnectTimeout();
227 leaseEndpoint(
228 host,
229 connectionRequestTimeout,
230 connectTimeout,
231 clientContext,
232 new FutureCallback<AsyncConnectionEndpoint>() {
233
234 @Override
235 public void completed(final AsyncConnectionEndpoint result) {
236 future.completed(new InternalAsyncClientEndpoint(result));
237 }
238
239 @Override
240 public void failed(final Exception ex) {
241 future.failed(ex);
242 }
243
244 @Override
245 public void cancelled() {
246 future.cancel(true);
247 }
248
249 });
250 return future;
251 }
252
253 @Override
254 public Cancellable execute(
255 final AsyncClientExchangeHandler exchangeHandler,
256 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
257 final HttpContext context) {
258 final ComplexCancellable cancellable = new ComplexCancellable();
259 try {
260 if (!isRunning()) {
261 throw new CancellationException("Request execution cancelled");
262 }
263 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
264 exchangeHandler.produceRequest((request, entityDetails, context1) -> {
265 RequestConfig requestConfig = null;
266 if (request instanceof Configurable) {
267 requestConfig = ((Configurable) request).getConfig();
268 }
269 if (requestConfig != null) {
270 clientContext.setRequestConfig(requestConfig);
271 } else {
272 requestConfig = clientContext.getRequestConfig();
273 }
274 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
275 @SuppressWarnings("deprecation")
276 final Timeout connectTimeout = requestConfig.getConnectTimeout();
277 final Timeout responseTimeout = requestConfig.getResponseTimeout();
278 final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
279
280 final Future<AsyncConnectionEndpoint> leaseFuture = leaseEndpoint(
281 target,
282 connectionRequestTimeout,
283 connectTimeout,
284 clientContext,
285 new FutureCallback<AsyncConnectionEndpoint>() {
286
287 @Override
288 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
289 final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(connectionEndpoint);
290 final AtomicInteger messageCountDown = new AtomicInteger(2);
291 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
292
293 @Override
294 public void releaseResources() {
295 try {
296 exchangeHandler.releaseResources();
297 } finally {
298 endpoint.releaseAndDiscard();
299 }
300 }
301
302 @Override
303 public void failed(final Exception cause) {
304 try {
305 exchangeHandler.failed(cause);
306 } finally {
307 endpoint.releaseAndDiscard();
308 }
309 }
310
311 @Override
312 public void cancel() {
313 failed(new RequestFailedException("Request aborted"));
314 }
315
316 @Override
317 public void produceRequest(
318 final RequestChannel channel,
319 final HttpContext context1) throws HttpException, IOException {
320 channel.sendRequest(request, entityDetails, context1);
321 if (entityDetails == null) {
322 messageCountDown.decrementAndGet();
323 }
324 }
325
326 @Override
327 public int available() {
328 return exchangeHandler.available();
329 }
330
331 @Override
332 public void produce(final DataStreamChannel channel) throws IOException {
333 exchangeHandler.produce(new DataStreamChannel() {
334
335 @Override
336 public void requestOutput() {
337 channel.requestOutput();
338 }
339
340 @Override
341 public int write(final ByteBuffer src) throws IOException {
342 return channel.write(src);
343 }
344
345 @Override
346 public void endStream(final List<? extends Header> trailers) throws IOException {
347 channel.endStream(trailers);
348 if (messageCountDown.decrementAndGet() <= 0) {
349 endpoint.releaseAndReuse();
350 }
351 }
352
353 @Override
354 public void endStream() throws IOException {
355 channel.endStream();
356 if (messageCountDown.decrementAndGet() <= 0) {
357 endpoint.releaseAndReuse();
358 }
359 }
360
361 });
362 }
363
364 @Override
365 public void consumeInformation(
366 final HttpResponse response,
367 final HttpContext context1) throws HttpException, IOException {
368 exchangeHandler.consumeInformation(response, context1);
369 }
370
371 @Override
372 public void consumeResponse(
373 final HttpResponse response,
374 final EntityDetails entityDetails,
375 final HttpContext context1) throws HttpException, IOException {
376 exchangeHandler.consumeResponse(response, entityDetails, context1);
377 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
378 messageCountDown.decrementAndGet();
379 }
380 if (entityDetails == null) {
381 if (messageCountDown.decrementAndGet() <= 0) {
382 endpoint.releaseAndReuse();
383 }
384 }
385 }
386
387 @Override
388 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
389 exchangeHandler.updateCapacity(capacityChannel);
390 }
391
392 @Override
393 public void consume(final ByteBuffer src) throws IOException {
394 exchangeHandler.consume(src);
395 }
396
397 @Override
398 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
399 if (messageCountDown.decrementAndGet() <= 0) {
400 endpoint.releaseAndReuse();
401 }
402 exchangeHandler.streamEnd(trailers);
403 }
404
405 };
406 if (responseTimeout != null) {
407 endpoint.setSocketTimeout(responseTimeout);
408 }
409 endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
410 }
411
412 @Override
413 public void failed(final Exception ex) {
414 exchangeHandler.failed(ex);
415 }
416
417 @Override
418 public void cancelled() {
419 exchangeHandler.cancel();
420 }
421
422 });
423
424 cancellable.setDependency(() -> leaseFuture.cancel(true));
425 }, context);
426
427 } catch (final HttpException | IOException | IllegalStateException ex) {
428 exchangeHandler.failed(ex);
429 }
430 return cancellable;
431 }
432
433 private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
434
435 private final AsyncConnectionEndpoint connectionEndpoint;
436 private final AtomicBoolean released;
437
438 InternalAsyncClientEndpoint(final AsyncConnectionEndpoint connectionEndpoint) {
439 this.connectionEndpoint = connectionEndpoint;
440 this.released = new AtomicBoolean(false);
441 }
442
443 boolean isReleased() {
444 return released.get();
445 }
446
447 @Override
448 public boolean isConnected() {
449 return !isReleased() && connectionEndpoint.isConnected();
450 }
451
452 @Override
453 public void execute(
454 final AsyncClientExchangeHandler exchangeHandler,
455 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
456 final HttpContext context) {
457 Asserts.check(!released.get(), "Endpoint has already been released");
458
459 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
460 final String exchangeId = ExecSupport.getNextExchangeId();
461 clientContext.setExchangeId(exchangeId);
462 if (LOG.isDebugEnabled()) {
463 LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint));
464 connectionEndpoint.execute(
465 exchangeId,
466 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
467 pushHandlerFactory,
468 clientContext);
469 } else {
470 connectionEndpoint.execute(exchangeId, exchangeHandler, clientContext);
471 }
472 }
473
474 public void setSocketTimeout(final Timeout timeout) {
475 connectionEndpoint.setSocketTimeout(timeout);
476 }
477
478 @Override
479 public void releaseAndReuse() {
480 if (released.compareAndSet(false, true)) {
481 manager.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECOND);
482 }
483 }
484
485 @Override
486 public void releaseAndDiscard() {
487 if (released.compareAndSet(false, true)) {
488 Closer.closeQuietly(connectionEndpoint);
489 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
490 }
491 }
492
493 }
494
495 }