1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.apache.hc.client5.http.HttpRoute;
39 import org.apache.hc.client5.http.SchemePortResolver;
40 import org.apache.hc.client5.http.config.Configurable;
41 import org.apache.hc.client5.http.config.RequestConfig;
42 import org.apache.hc.client5.http.config.TlsConfig;
43 import org.apache.hc.client5.http.impl.ConnPoolSupport;
44 import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
45 import org.apache.hc.client5.http.impl.ExecSupport;
46 import org.apache.hc.client5.http.impl.classic.RequestFailedException;
47 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
48 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
49 import org.apache.hc.client5.http.protocol.HttpClientContext;
50 import org.apache.hc.client5.http.routing.RoutingSupport;
51 import org.apache.hc.core5.annotation.Contract;
52 import org.apache.hc.core5.annotation.ThreadingBehavior;
53 import org.apache.hc.core5.concurrent.BasicFuture;
54 import org.apache.hc.core5.concurrent.Cancellable;
55 import org.apache.hc.core5.concurrent.ComplexCancellable;
56 import org.apache.hc.core5.concurrent.ComplexFuture;
57 import org.apache.hc.core5.concurrent.FutureCallback;
58 import org.apache.hc.core5.http.EntityDetails;
59 import org.apache.hc.core5.http.Header;
60 import org.apache.hc.core5.http.HttpException;
61 import org.apache.hc.core5.http.HttpHost;
62 import org.apache.hc.core5.http.HttpResponse;
63 import org.apache.hc.core5.http.HttpStatus;
64 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
65 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
66 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
67 import org.apache.hc.core5.http.nio.CapacityChannel;
68 import org.apache.hc.core5.http.nio.DataStreamChannel;
69 import org.apache.hc.core5.http.nio.HandlerFactory;
70 import org.apache.hc.core5.http.nio.RequestChannel;
71 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
72 import org.apache.hc.core5.http.protocol.HttpContext;
73 import org.apache.hc.core5.io.CloseMode;
74 import org.apache.hc.core5.io.Closer;
75 import org.apache.hc.core5.reactor.Command;
76 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
77 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
78 import org.apache.hc.core5.reactor.IOReactorConfig;
79 import org.apache.hc.core5.util.Args;
80 import org.apache.hc.core5.util.Asserts;
81 import org.apache.hc.core5.util.TimeValue;
82 import org.apache.hc.core5.util.Timeout;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85
86
87
88
89
90
91
92
93
94
95
96
97
98 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
99 public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClientBase {
100
101 private static final Logger LOG = LoggerFactory.getLogger(MinimalHttpAsyncClient.class);
102 private final AsyncClientConnectionManager manager;
103 private final SchemePortResolver schemePortResolver;
104 private final TlsConfig tlsConfig;
105
106 MinimalHttpAsyncClient(
107 final IOEventHandlerFactory eventHandlerFactory,
108 final AsyncPushConsumerRegistry pushConsumerRegistry,
109 final IOReactorConfig reactorConfig,
110 final ThreadFactory threadFactory,
111 final ThreadFactory workerThreadFactory,
112 final AsyncClientConnectionManager manager,
113 final SchemePortResolver schemePortResolver,
114 final TlsConfig tlsConfig) {
115 super(new DefaultConnectingIOReactor(
116 eventHandlerFactory,
117 reactorConfig,
118 workerThreadFactory,
119 LoggingIOSessionDecorator.INSTANCE,
120 LoggingExceptionCallback.INSTANCE,
121 null,
122 ioSession -> ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL)),
123 pushConsumerRegistry,
124 threadFactory);
125 this.manager = manager;
126 this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
127 this.tlsConfig = tlsConfig;
128 }
129
130 private Future<AsyncConnectionEndpoint> leaseEndpoint(
131 final HttpHost host,
132 final Timeout connectionRequestTimeout,
133 final Timeout connectTimeout,
134 final HttpClientContext clientContext,
135 final FutureCallback<AsyncConnectionEndpoint> callback) {
136 final HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver));
137 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
138 final String exchangeId = ExecSupport.getNextExchangeId();
139 clientContext.setExchangeId(exchangeId);
140 final Future<AsyncConnectionEndpoint> leaseFuture = manager.lease(
141 exchangeId,
142 route,
143 null,
144 connectionRequestTimeout,
145 new FutureCallback<AsyncConnectionEndpoint>() {
146
147 @Override
148 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
149 if (connectionEndpoint.isConnected()) {
150 resultFuture.completed(connectionEndpoint);
151 } else {
152 final Future<AsyncConnectionEndpoint> connectFuture = manager.connect(
153 connectionEndpoint,
154 getConnectionInitiator(),
155 connectTimeout,
156 tlsConfig,
157 clientContext,
158 new FutureCallback<AsyncConnectionEndpoint>() {
159
160 @Override
161 public void completed(final AsyncConnectionEndpoint result) {
162 resultFuture.completed(result);
163 }
164
165 @Override
166 public void failed(final Exception ex) {
167 try {
168 Closer.closeQuietly(connectionEndpoint);
169 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
170 } finally {
171 resultFuture.failed(ex);
172 }
173 }
174
175 @Override
176 public void cancelled() {
177 try {
178 Closer.closeQuietly(connectionEndpoint);
179 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
180 } finally {
181 resultFuture.cancel(true);
182 }
183 }
184
185 });
186 resultFuture.setDependency(connectFuture);
187 }
188 }
189
190 @Override
191 public void failed(final Exception ex) {
192 callback.failed(ex);
193 }
194
195 @Override
196 public void cancelled() {
197 callback.cancelled();
198 }
199
200 });
201 resultFuture.setDependency(leaseFuture);
202 return resultFuture;
203 }
204
205 public Future<AsyncClientEndpoint> lease(
206 final HttpHost host,
207 final FutureCallback<AsyncClientEndpoint> callback) {
208 return lease(host, null, callback);
209 }
210
211 public Future<AsyncClientEndpoint> lease(
212 final HttpHost host,
213 final HttpContext context,
214 final FutureCallback<AsyncClientEndpoint> callback) {
215 Args.notNull(host, "Host");
216 final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
217 if (!isRunning()) {
218 future.failed(new CancellationException("Connection lease cancelled"));
219 return future;
220 }
221 final HttpClientContext clientContext = HttpClientContext.castOrCreate(context);
222 final RequestConfig requestConfig = clientContext.getRequestConfigOrDefault();
223 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
224 @SuppressWarnings("deprecation")
225 final Timeout connectTimeout = requestConfig.getConnectTimeout();
226 leaseEndpoint(
227 host,
228 connectionRequestTimeout,
229 connectTimeout,
230 clientContext,
231 new FutureCallback<AsyncConnectionEndpoint>() {
232
233 @Override
234 public void completed(final AsyncConnectionEndpoint result) {
235 future.completed(new InternalAsyncClientEndpoint(result));
236 }
237
238 @Override
239 public void failed(final Exception ex) {
240 future.failed(ex);
241 }
242
243 @Override
244 public void cancelled() {
245 future.cancel(true);
246 }
247
248 });
249 return future;
250 }
251
252 @Override
253 public Cancellable execute(
254 final AsyncClientExchangeHandler exchangeHandler,
255 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
256 final HttpContext context) {
257 final ComplexCancellable cancellable = new ComplexCancellable();
258 try {
259 if (!isRunning()) {
260 throw new CancellationException("Request execution cancelled");
261 }
262 final HttpClientContext clientContext = HttpClientContext.castOrCreate(context);
263 exchangeHandler.produceRequest((request, entityDetails, context1) -> {
264 RequestConfig requestConfig = null;
265 if (request instanceof Configurable) {
266 requestConfig = ((Configurable) request).getConfig();
267 }
268 if (requestConfig != null) {
269 clientContext.setRequestConfig(requestConfig);
270 } else {
271 requestConfig = clientContext.getRequestConfigOrDefault();
272 }
273 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
274 @SuppressWarnings("deprecation")
275 final Timeout connectTimeout = requestConfig.getConnectTimeout();
276 final Timeout responseTimeout = requestConfig.getResponseTimeout();
277 final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
278
279 final Future<AsyncConnectionEndpoint> leaseFuture = leaseEndpoint(
280 target,
281 connectionRequestTimeout,
282 connectTimeout,
283 clientContext,
284 new FutureCallback<AsyncConnectionEndpoint>() {
285
286 @Override
287 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
288 final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(connectionEndpoint);
289 final AtomicInteger messageCountDown = new AtomicInteger(2);
290 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
291
292 @Override
293 public void releaseResources() {
294 try {
295 exchangeHandler.releaseResources();
296 } finally {
297 endpoint.releaseAndDiscard();
298 }
299 }
300
301 @Override
302 public void failed(final Exception cause) {
303 try {
304 exchangeHandler.failed(cause);
305 } finally {
306 endpoint.releaseAndDiscard();
307 }
308 }
309
310 @Override
311 public void cancel() {
312 failed(new RequestFailedException("Request aborted"));
313 }
314
315 @Override
316 public void produceRequest(
317 final RequestChannel channel,
318 final HttpContext context1) throws HttpException, IOException {
319 channel.sendRequest(request, entityDetails, context1);
320 if (entityDetails == null) {
321 messageCountDown.decrementAndGet();
322 }
323 }
324
325 @Override
326 public int available() {
327 return exchangeHandler.available();
328 }
329
330 @Override
331 public void produce(final DataStreamChannel channel) throws IOException {
332 exchangeHandler.produce(new DataStreamChannel() {
333
334 @Override
335 public void requestOutput() {
336 channel.requestOutput();
337 }
338
339 @Override
340 public int write(final ByteBuffer src) throws IOException {
341 return channel.write(src);
342 }
343
344 @Override
345 public void endStream(final List<? extends Header> trailers) throws IOException {
346 channel.endStream(trailers);
347 if (messageCountDown.decrementAndGet() <= 0) {
348 endpoint.releaseAndReuse();
349 }
350 }
351
352 @Override
353 public void endStream() throws IOException {
354 channel.endStream();
355 if (messageCountDown.decrementAndGet() <= 0) {
356 endpoint.releaseAndReuse();
357 }
358 }
359
360 });
361 }
362
363 @Override
364 public void consumeInformation(
365 final HttpResponse response,
366 final HttpContext context1) throws HttpException, IOException {
367 exchangeHandler.consumeInformation(response, context1);
368 }
369
370 @Override
371 public void consumeResponse(
372 final HttpResponse response,
373 final EntityDetails entityDetails,
374 final HttpContext context1) throws HttpException, IOException {
375 exchangeHandler.consumeResponse(response, entityDetails, context1);
376 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
377 messageCountDown.decrementAndGet();
378 }
379 if (entityDetails == null) {
380 if (messageCountDown.decrementAndGet() <= 0) {
381 endpoint.releaseAndReuse();
382 }
383 }
384 }
385
386 @Override
387 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
388 exchangeHandler.updateCapacity(capacityChannel);
389 }
390
391 @Override
392 public void consume(final ByteBuffer src) throws IOException {
393 exchangeHandler.consume(src);
394 }
395
396 @Override
397 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
398 if (messageCountDown.decrementAndGet() <= 0) {
399 endpoint.releaseAndReuse();
400 }
401 exchangeHandler.streamEnd(trailers);
402 }
403
404 };
405 if (responseTimeout != null) {
406 endpoint.setSocketTimeout(responseTimeout);
407 }
408 endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
409 }
410
411 @Override
412 public void failed(final Exception ex) {
413 exchangeHandler.failed(ex);
414 }
415
416 @Override
417 public void cancelled() {
418 exchangeHandler.cancel();
419 }
420
421 });
422
423 cancellable.setDependency(() -> leaseFuture.cancel(true));
424 }, context);
425
426 } catch (final HttpException | IOException | IllegalStateException ex) {
427 exchangeHandler.failed(ex);
428 }
429 return cancellable;
430 }
431
432 private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
433
434 private final AsyncConnectionEndpoint connectionEndpoint;
435 private final AtomicBoolean released;
436
437 InternalAsyncClientEndpoint(final AsyncConnectionEndpoint connectionEndpoint) {
438 this.connectionEndpoint = connectionEndpoint;
439 this.released = new AtomicBoolean(false);
440 }
441
442 boolean isReleased() {
443 return released.get();
444 }
445
446 @Override
447 public boolean isConnected() {
448 return !isReleased() && connectionEndpoint.isConnected();
449 }
450
451 @Override
452 public void execute(
453 final AsyncClientExchangeHandler exchangeHandler,
454 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
455 final HttpContext context) {
456 Asserts.check(!released.get(), "Endpoint has already been released");
457
458 final HttpClientContext clientContext = HttpClientContext.castOrCreate(context);
459 final String exchangeId = ExecSupport.getNextExchangeId();
460 clientContext.setExchangeId(exchangeId);
461 if (LOG.isDebugEnabled()) {
462 LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint));
463 connectionEndpoint.execute(
464 exchangeId,
465 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
466 pushHandlerFactory,
467 clientContext);
468 } else {
469 connectionEndpoint.execute(exchangeId, exchangeHandler, clientContext);
470 }
471 }
472
473 public void setSocketTimeout(final Timeout timeout) {
474 connectionEndpoint.setSocketTimeout(timeout);
475 }
476
477 @Override
478 public void releaseAndReuse() {
479 if (released.compareAndSet(false, true)) {
480 manager.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECOND);
481 }
482 }
483
484 @Override
485 public void releaseAndDiscard() {
486 if (released.compareAndSet(false, true)) {
487 Closer.closeQuietly(connectionEndpoint);
488 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
489 }
490 }
491
492 }
493
494 }