1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.apache.hc.client5.http.HttpRoute;
39 import org.apache.hc.client5.http.SchemePortResolver;
40 import org.apache.hc.client5.http.config.Configurable;
41 import org.apache.hc.client5.http.config.RequestConfig;
42 import org.apache.hc.client5.http.impl.ConnPoolSupport;
43 import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
44 import org.apache.hc.client5.http.impl.ExecSupport;
45 import org.apache.hc.client5.http.impl.classic.RequestFailedException;
46 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
47 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
48 import org.apache.hc.client5.http.protocol.HttpClientContext;
49 import org.apache.hc.client5.http.routing.RoutingSupport;
50 import org.apache.hc.core5.annotation.Contract;
51 import org.apache.hc.core5.annotation.ThreadingBehavior;
52 import org.apache.hc.core5.concurrent.BasicFuture;
53 import org.apache.hc.core5.concurrent.Cancellable;
54 import org.apache.hc.core5.concurrent.ComplexCancellable;
55 import org.apache.hc.core5.concurrent.ComplexFuture;
56 import org.apache.hc.core5.concurrent.FutureCallback;
57 import org.apache.hc.core5.function.Callback;
58 import org.apache.hc.core5.http.EntityDetails;
59 import org.apache.hc.core5.http.Header;
60 import org.apache.hc.core5.http.HttpException;
61 import org.apache.hc.core5.http.HttpHost;
62 import org.apache.hc.core5.http.HttpRequest;
63 import org.apache.hc.core5.http.HttpResponse;
64 import org.apache.hc.core5.http.HttpStatus;
65 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
66 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
67 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
68 import org.apache.hc.core5.http.nio.CapacityChannel;
69 import org.apache.hc.core5.http.nio.DataStreamChannel;
70 import org.apache.hc.core5.http.nio.HandlerFactory;
71 import org.apache.hc.core5.http.nio.RequestChannel;
72 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
73 import org.apache.hc.core5.http.protocol.HttpContext;
74 import org.apache.hc.core5.http2.HttpVersionPolicy;
75 import org.apache.hc.core5.io.CloseMode;
76 import org.apache.hc.core5.io.Closer;
77 import org.apache.hc.core5.reactor.Command;
78 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
79 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
80 import org.apache.hc.core5.reactor.IOReactorConfig;
81 import org.apache.hc.core5.reactor.IOSession;
82 import org.apache.hc.core5.util.Args;
83 import org.apache.hc.core5.util.Asserts;
84 import org.apache.hc.core5.util.TimeValue;
85 import org.apache.hc.core5.util.Timeout;
86 import org.slf4j.Logger;
87 import org.slf4j.LoggerFactory;
88
89
90
91
92
93
94
95
96
97
98
99
100
101 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
102 public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClientBase {
103
104 private static final Logger LOG = LoggerFactory.getLogger(MinimalHttpAsyncClient.class);
105 private final AsyncClientConnectionManager manager;
106 private final SchemePortResolver schemePortResolver;
107 private final HttpVersionPolicy versionPolicy;
108
109 MinimalHttpAsyncClient(
110 final IOEventHandlerFactory eventHandlerFactory,
111 final AsyncPushConsumerRegistry pushConsumerRegistry,
112 final HttpVersionPolicy versionPolicy,
113 final IOReactorConfig reactorConfig,
114 final ThreadFactory threadFactory,
115 final ThreadFactory workerThreadFactory,
116 final AsyncClientConnectionManager manager,
117 final SchemePortResolver schemePortResolver) {
118 super(new DefaultConnectingIOReactor(
119 eventHandlerFactory,
120 reactorConfig,
121 workerThreadFactory,
122 LoggingIOSessionDecorator.INSTANCE,
123 LoggingExceptionCallback.INSTANCE,
124 null,
125 new Callback<IOSession>() {
126
127 @Override
128 public void execute(final IOSession ioSession) {
129 ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL);
130 }
131
132 }),
133 pushConsumerRegistry,
134 threadFactory);
135 this.manager = manager;
136 this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
137 this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
138 }
139
140 private Future<AsyncConnectionEndpoint> leaseEndpoint(
141 final HttpHost host,
142 final Timeout connectionRequestTimeout,
143 final Timeout connectTimeout,
144 final HttpClientContext clientContext,
145 final FutureCallback<AsyncConnectionEndpoint> callback) {
146 final HttpRoute/http/HttpRoute.html#HttpRoute">HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver));
147 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
148 final String exchangeId = ExecSupport.getNextExchangeId();
149 final Future<AsyncConnectionEndpoint> leaseFuture = manager.lease(
150 exchangeId,
151 route,
152 null,
153 connectionRequestTimeout,
154 new FutureCallback<AsyncConnectionEndpoint>() {
155
156 @Override
157 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
158 if (connectionEndpoint.isConnected()) {
159 resultFuture.completed(connectionEndpoint);
160 } else {
161 final Future<AsyncConnectionEndpoint> connectFuture = manager.connect(
162 connectionEndpoint,
163 getConnectionInitiator(),
164 connectTimeout,
165 versionPolicy,
166 clientContext,
167 new FutureCallback<AsyncConnectionEndpoint>() {
168
169 @Override
170 public void completed(final AsyncConnectionEndpoint result) {
171 resultFuture.completed(result);
172 }
173
174 @Override
175 public void failed(final Exception ex) {
176 resultFuture.failed(ex);
177 }
178
179 @Override
180 public void cancelled() {
181 resultFuture.cancel(true);
182 }
183
184 });
185 resultFuture.setDependency(connectFuture);
186 }
187 }
188
189 @Override
190 public void failed(final Exception ex) {
191 callback.failed(ex);
192 }
193
194 @Override
195 public void cancelled() {
196 callback.cancelled();
197 }
198
199 });
200 resultFuture.setDependency(leaseFuture);
201 return resultFuture;
202 }
203
204 public final Future<AsyncClientEndpoint> lease(
205 final HttpHost host,
206 final FutureCallback<AsyncClientEndpoint> callback) {
207 return lease(host, HttpClientContext.create(), callback);
208 }
209
210 public Future<AsyncClientEndpoint> lease(
211 final HttpHost host,
212 final HttpContext context,
213 final FutureCallback<AsyncClientEndpoint> callback) {
214 Args.notNull(host, "Host");
215 Args.notNull(context, "HTTP context");
216 final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
217 if (!isRunning()) {
218 future.failed(new CancellationException("Connection lease cancelled"));
219 return future;
220 }
221 final HttpClientContext clientContext = HttpClientContext.adapt(context);
222 final RequestConfig requestConfig = clientContext.getRequestConfig();
223 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
224 final Timeout connectTimeout = requestConfig.getConnectTimeout();
225 leaseEndpoint(
226 host,
227 connectionRequestTimeout,
228 connectTimeout,
229 clientContext,
230 new FutureCallback<AsyncConnectionEndpoint>() {
231
232 @Override
233 public void completed(final AsyncConnectionEndpoint result) {
234 future.completed(new InternalAsyncClientEndpoint(result));
235 }
236
237 @Override
238 public void failed(final Exception ex) {
239 future.failed(ex);
240 }
241
242 @Override
243 public void cancelled() {
244 future.cancel(true);
245 }
246
247 });
248 return future;
249 }
250
251 @Override
252 public Cancellable execute(
253 final AsyncClientExchangeHandler exchangeHandler,
254 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
255 final HttpContext context) {
256 final ComplexCancellable cancellable = new ComplexCancellable();
257 try {
258 if (!isRunning()) {
259 throw new CancellationException("Request execution cancelled");
260 }
261 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
262 exchangeHandler.produceRequest(new RequestChannel() {
263
264 @Override
265 public void sendRequest(
266 final HttpRequest request,
267 final EntityDetails entityDetails,
268 final HttpContext context) throws HttpException, IOException {
269 RequestConfig requestConfig = null;
270 if (request instanceof Configurable) {
271 requestConfig = ((Configurable) request).getConfig();
272 }
273 if (requestConfig != null) {
274 clientContext.setRequestConfig(requestConfig);
275 } else {
276 requestConfig = clientContext.getRequestConfig();
277 }
278 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
279 final Timeout connectTimeout = requestConfig.getConnectTimeout();
280 final Timeout responseTimeout = requestConfig.getResponseTimeout();
281 final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
282
283 final Future<AsyncConnectionEndpoint> leaseFuture = leaseEndpoint(
284 target,
285 connectionRequestTimeout,
286 connectTimeout,
287 clientContext,
288 new FutureCallback<AsyncConnectionEndpoint>() {
289
290 @Override
291 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
292 final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(connectionEndpoint);
293 final AtomicInteger messageCountDown = new AtomicInteger(2);
294 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
295
296 @Override
297 public void releaseResources() {
298 try {
299 exchangeHandler.releaseResources();
300 } finally {
301 endpoint.releaseAndDiscard();
302 }
303 }
304
305 @Override
306 public void failed(final Exception cause) {
307 try {
308 exchangeHandler.failed(cause);
309 } finally {
310 endpoint.releaseAndDiscard();
311 }
312 }
313
314 @Override
315 public void cancel() {
316 failed(new RequestFailedException("Request aborted"));
317 }
318
319 @Override
320 public void produceRequest(
321 final RequestChannel channel,
322 final HttpContext context) throws HttpException, IOException {
323 channel.sendRequest(request, entityDetails, context);
324 if (entityDetails == null) {
325 messageCountDown.decrementAndGet();
326 }
327 }
328
329 @Override
330 public int available() {
331 return exchangeHandler.available();
332 }
333
334 @Override
335 public void produce(final DataStreamChannel channel) throws IOException {
336 exchangeHandler.produce(new DataStreamChannel() {
337
338 @Override
339 public void requestOutput() {
340 channel.requestOutput();
341 }
342
343 @Override
344 public int write(final ByteBuffer src) throws IOException {
345 return channel.write(src);
346 }
347
348 @Override
349 public void endStream(final List<? extends Header> trailers) throws IOException {
350 channel.endStream(trailers);
351 if (messageCountDown.decrementAndGet() <= 0) {
352 endpoint.releaseAndReuse();
353 }
354 }
355
356 @Override
357 public void endStream() throws IOException {
358 channel.endStream();
359 if (messageCountDown.decrementAndGet() <= 0) {
360 endpoint.releaseAndReuse();
361 }
362 }
363
364 });
365 }
366
367 @Override
368 public void consumeInformation(
369 final HttpResponse response,
370 final HttpContext context) throws HttpException, IOException {
371 exchangeHandler.consumeInformation(response, context);
372 }
373
374 @Override
375 public void consumeResponse(
376 final HttpResponse response,
377 final EntityDetails entityDetails,
378 final HttpContext context) throws HttpException, IOException {
379 exchangeHandler.consumeResponse(response, entityDetails, context);
380 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
381 messageCountDown.decrementAndGet();
382 }
383 if (entityDetails == null) {
384 if (messageCountDown.decrementAndGet() <= 0) {
385 endpoint.releaseAndReuse();
386 }
387 }
388 }
389
390 @Override
391 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
392 exchangeHandler.updateCapacity(capacityChannel);
393 }
394
395 @Override
396 public void consume(final ByteBuffer src) throws IOException {
397 exchangeHandler.consume(src);
398 }
399
400 @Override
401 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
402 if (messageCountDown.decrementAndGet() <= 0) {
403 endpoint.releaseAndReuse();
404 }
405 exchangeHandler.streamEnd(trailers);
406 }
407
408 };
409 if (responseTimeout != null) {
410 endpoint.setSocketTimeout(responseTimeout);
411 }
412 endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
413 }
414
415 @Override
416 public void failed(final Exception ex) {
417 exchangeHandler.failed(ex);
418 }
419
420 @Override
421 public void cancelled() {
422 exchangeHandler.cancel();
423 }
424
425 });
426
427 cancellable.setDependency(new Cancellable() {
428
429 @Override
430 public boolean cancel() {
431 return leaseFuture.cancel(true);
432 }
433
434 });
435 }
436 }, context);
437
438 } catch (final HttpException | IOException | IllegalStateException ex) {
439 exchangeHandler.failed(ex);
440 }
441 return cancellable;
442 }
443
444 private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
445
446 private final AsyncConnectionEndpoint connectionEndpoint;
447 private final AtomicBoolean released;
448
449 InternalAsyncClientEndpoint(final AsyncConnectionEndpoint connectionEndpoint) {
450 this.connectionEndpoint = connectionEndpoint;
451 this.released = new AtomicBoolean(false);
452 }
453
454 boolean isReleased() {
455 return released.get();
456 }
457
458 @Override
459 public boolean isConnected() {
460 return !isReleased() && connectionEndpoint.isConnected();
461 }
462
463 @Override
464 public void execute(
465 final AsyncClientExchangeHandler exchangeHandler,
466 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
467 final HttpContext context) {
468 Asserts.check(!released.get(), "Endpoint has already been released");
469
470 final String exchangeId = ExecSupport.getNextExchangeId();
471 if (LOG.isDebugEnabled()) {
472 LOG.debug("{}: executing message exchange {}", ConnPoolSupport.getId(connectionEndpoint), exchangeId);
473 connectionEndpoint.execute(
474 exchangeId,
475 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
476 pushHandlerFactory,
477 context);
478 } else {
479 connectionEndpoint.execute(exchangeId, exchangeHandler, context);
480 }
481 }
482
483 public void setSocketTimeout(final Timeout timeout) {
484 connectionEndpoint.setSocketTimeout(timeout);
485 }
486
487 @Override
488 public void releaseAndReuse() {
489 if (released.compareAndSet(false, true)) {
490 manager.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECOND);
491 }
492 }
493
494 @Override
495 public void releaseAndDiscard() {
496 if (released.compareAndSet(false, true)) {
497 Closer.closeQuietly(connectionEndpoint);
498 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
499 }
500 }
501
502 }
503
504 }