1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.bootstrap;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.Set;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import org.apache.hc.core5.annotation.Internal;
38 import org.apache.hc.core5.concurrent.BasicFuture;
39 import org.apache.hc.core5.concurrent.CallbackContribution;
40 import org.apache.hc.core5.concurrent.ComplexFuture;
41 import org.apache.hc.core5.concurrent.FutureCallback;
42 import org.apache.hc.core5.concurrent.FutureContribution;
43 import org.apache.hc.core5.function.Callback;
44 import org.apache.hc.core5.function.Decorator;
45 import org.apache.hc.core5.http.ConnectionClosedException;
46 import org.apache.hc.core5.http.EntityDetails;
47 import org.apache.hc.core5.http.Header;
48 import org.apache.hc.core5.http.HttpConnection;
49 import org.apache.hc.core5.http.HttpException;
50 import org.apache.hc.core5.http.HttpHost;
51 import org.apache.hc.core5.http.HttpResponse;
52 import org.apache.hc.core5.http.ProtocolException;
53 import org.apache.hc.core5.http.impl.DefaultAddressResolver;
54 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
55 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
56 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
57 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
58 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
59 import org.apache.hc.core5.http.nio.CapacityChannel;
60 import org.apache.hc.core5.http.nio.DataStreamChannel;
61 import org.apache.hc.core5.http.nio.HandlerFactory;
62 import org.apache.hc.core5.http.nio.RequestChannel;
63 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
65 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
66 import org.apache.hc.core5.http.nio.ssl.TlsUpgradeCapable;
67 import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
68 import org.apache.hc.core5.http.protocol.HttpContext;
69 import org.apache.hc.core5.http.protocol.HttpCoreContext;
70 import org.apache.hc.core5.io.CloseMode;
71 import org.apache.hc.core5.net.NamedEndpoint;
72 import org.apache.hc.core5.net.URIAuthority;
73 import org.apache.hc.core5.pool.ConnPoolControl;
74 import org.apache.hc.core5.pool.ManagedConnPool;
75 import org.apache.hc.core5.pool.PoolEntry;
76 import org.apache.hc.core5.pool.PoolStats;
77 import org.apache.hc.core5.reactor.Command;
78 import org.apache.hc.core5.reactor.EndpointParameters;
79 import org.apache.hc.core5.reactor.IOEventHandler;
80 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
81 import org.apache.hc.core5.reactor.IOReactorConfig;
82 import org.apache.hc.core5.reactor.IOSession;
83 import org.apache.hc.core5.reactor.IOSessionListener;
84 import org.apache.hc.core5.reactor.ProtocolIOSession;
85 import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
86 import org.apache.hc.core5.util.Args;
87 import org.apache.hc.core5.util.TimeValue;
88 import org.apache.hc.core5.util.Timeout;
89
90
91
92
93
94
95 public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
96
97 private final ManagedConnPool<HttpHost, IOSession> connPool;
98 private final TlsStrategy tlsStrategy;
99 private final Timeout handshakeTimeout;
100
101
102
103
104
105
106 @Internal
107 public HttpAsyncRequester(
108 final IOReactorConfig ioReactorConfig,
109 final IOEventHandlerFactory eventHandlerFactory,
110 final Decorator<IOSession> ioSessionDecorator,
111 final Callback<Exception> exceptionCallback,
112 final IOSessionListener sessionListener,
113 final ManagedConnPool<HttpHost, IOSession> connPool,
114 final TlsStrategy tlsStrategy,
115 final Timeout handshakeTimeout) {
116 super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
117 ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
118 this.connPool = Args.notNull(connPool, "Connection pool");
119 this.tlsStrategy = tlsStrategy;
120 this.handshakeTimeout = handshakeTimeout;
121 }
122
123
124
125
126 @Internal
127 public HttpAsyncRequester(
128 final IOReactorConfig ioReactorConfig,
129 final IOEventHandlerFactory eventHandlerFactory,
130 final Decorator<IOSession> ioSessionDecorator,
131 final Callback<Exception> exceptionCallback,
132 final IOSessionListener sessionListener,
133 final ManagedConnPool<HttpHost, IOSession> connPool) {
134 this(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
135 null, null);
136 }
137
138 @Override
139 public PoolStats getTotalStats() {
140 return connPool.getTotalStats();
141 }
142
143 @Override
144 public PoolStats getStats(final HttpHost route) {
145 return connPool.getStats(route);
146 }
147
148 @Override
149 public void setMaxTotal(final int max) {
150 connPool.setMaxTotal(max);
151 }
152
153 @Override
154 public int getMaxTotal() {
155 return connPool.getMaxTotal();
156 }
157
158 @Override
159 public void setDefaultMaxPerRoute(final int max) {
160 connPool.setDefaultMaxPerRoute(max);
161 }
162
163 @Override
164 public int getDefaultMaxPerRoute() {
165 return connPool.getDefaultMaxPerRoute();
166 }
167
168 @Override
169 public void setMaxPerRoute(final HttpHost route, final int max) {
170 connPool.setMaxPerRoute(route, max);
171 }
172
173 @Override
174 public int getMaxPerRoute(final HttpHost route) {
175 return connPool.getMaxPerRoute(route);
176 }
177
178 @Override
179 public void closeIdle(final TimeValue idleTime) {
180 connPool.closeIdle(idleTime);
181 }
182
183 @Override
184 public void closeExpired() {
185 connPool.closeExpired();
186 }
187
188 @Override
189 public Set<HttpHost> getRoutes() {
190 return connPool.getRoutes();
191 }
192
193 public Future<AsyncClientEndpoint> connect(
194 final HttpHost host,
195 final Timeout timeout,
196 final Object attachment,
197 final FutureCallback<AsyncClientEndpoint> callback) {
198 return doConnect(host, timeout, attachment, callback);
199 }
200
201 protected Future<AsyncClientEndpoint> doConnect(
202 final HttpHost host,
203 final Timeout timeout,
204 final Object attachment,
205 final FutureCallback<AsyncClientEndpoint> callback) {
206 Args.notNull(host, "Host");
207 Args.notNull(timeout, "Timeout");
208 final ComplexFuture<AsyncClientEndpoint> resultFuture = new ComplexFuture<>(callback);
209 final Future<PoolEntry<HttpHost, IOSession>> leaseFuture = connPool.lease(
210 host, null, timeout, new FutureCallback<PoolEntry<HttpHost, IOSession>>() {
211
212 @Override
213 public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
214 final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
215 final IOSession ioSession = poolEntry.getConnection();
216 if (ioSession != null && !ioSession.isOpen()) {
217 poolEntry.discardConnection(CloseMode.IMMEDIATE);
218 }
219 if (poolEntry.hasConnection()) {
220 resultFuture.completed(endpoint);
221 } else {
222 final Future<IOSession> future = requestSession(
223 host,
224 timeout,
225 new EndpointParameters(host, attachment),
226 new FutureCallback<IOSession>() {
227
228 @Override
229 public void completed(final IOSession session) {
230 session.setSocketTimeout(timeout);
231 poolEntry.assignConnection(session);
232 resultFuture.completed(endpoint);
233 }
234
235 @Override
236 public void failed(final Exception cause) {
237 try {
238 resultFuture.failed(cause);
239 } finally {
240 endpoint.releaseAndDiscard();
241 }
242 }
243
244 @Override
245 public void cancelled() {
246 try {
247 resultFuture.cancel();
248 } finally {
249 endpoint.releaseAndDiscard();
250 }
251 }
252
253 });
254 resultFuture.setDependency(future);
255 }
256 }
257
258 @Override
259 public void failed(final Exception ex) {
260 resultFuture.failed(ex);
261 }
262
263 @Override
264 public void cancelled() {
265 resultFuture.cancel();
266 }
267
268 });
269 resultFuture.setDependency(leaseFuture);
270 return resultFuture;
271 }
272
273 public Future<AsyncClientEndpoint> connect(final HttpHost host, final Timeout timeout) {
274 return connect(host, timeout, null, null);
275 }
276
277 public void execute(
278 final AsyncClientExchangeHandler exchangeHandler,
279 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
280 final Timeout timeout,
281 final HttpContext executeContext) {
282 Args.notNull(exchangeHandler, "Exchange handler");
283 Args.notNull(timeout, "Timeout");
284 Args.notNull(executeContext, "Context");
285 try {
286 exchangeHandler.produceRequest((request, entityDetails, requestContext) -> {
287 final String scheme = request.getScheme();
288 final URIAuthority authority = request.getAuthority();
289 if (authority == null) {
290 throw new ProtocolException("Request authority not specified");
291 }
292 final HttpHost target = new HttpHost(scheme, authority);
293 connect(target, timeout, null, new FutureCallback<AsyncClientEndpoint>() {
294
295 @Override
296 public void completed(final AsyncClientEndpoint endpoint) {
297 endpoint.execute(new AsyncClientExchangeHandler() {
298
299 @Override
300 public void releaseResources() {
301 endpoint.releaseAndDiscard();
302 exchangeHandler.releaseResources();
303 }
304
305 @Override
306 public void failed(final Exception cause) {
307 endpoint.releaseAndDiscard();
308 exchangeHandler.failed(cause);
309 }
310
311 @Override
312 public void cancel() {
313 endpoint.releaseAndDiscard();
314 exchangeHandler.cancel();
315 }
316
317 @Override
318 public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
319 channel.sendRequest(request, entityDetails, httpContext);
320 }
321
322 @Override
323 public int available() {
324 return exchangeHandler.available();
325 }
326
327 @Override
328 public void produce(final DataStreamChannel channel) throws IOException {
329 exchangeHandler.produce(channel);
330 }
331
332 @Override
333 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
334 exchangeHandler.consumeInformation(response, httpContext);
335 }
336
337 @Override
338 public void consumeResponse(
339 final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
340 if (entityDetails == null) {
341 endpoint.releaseAndReuse();
342 }
343 exchangeHandler.consumeResponse(response, entityDetails, httpContext);
344 }
345
346 @Override
347 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
348 exchangeHandler.updateCapacity(capacityChannel);
349 }
350
351 @Override
352 public void consume(final ByteBuffer src) throws IOException {
353 exchangeHandler.consume(src);
354 }
355
356 @Override
357 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
358 endpoint.releaseAndReuse();
359 exchangeHandler.streamEnd(trailers);
360 }
361
362 }, pushHandlerFactory, executeContext);
363
364 }
365
366 @Override
367 public void failed(final Exception ex) {
368 exchangeHandler.failed(ex);
369 }
370
371 @Override
372 public void cancelled() {
373 exchangeHandler.cancel();
374 }
375
376 });
377
378 }, executeContext);
379
380 } catch (final IOException | HttpException ex) {
381 exchangeHandler.failed(ex);
382 }
383 }
384
385 public void execute(
386 final AsyncClientExchangeHandler exchangeHandler,
387 final Timeout timeout,
388 final HttpContext executeContext) {
389 execute(exchangeHandler, null, timeout, executeContext);
390 }
391
392 public final <T> Future<T> execute(
393 final AsyncRequestProducer requestProducer,
394 final AsyncResponseConsumer<T> responseConsumer,
395 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
396 final Timeout timeout,
397 final HttpContext context,
398 final FutureCallback<T> callback) {
399 Args.notNull(requestProducer, "Request producer");
400 Args.notNull(responseConsumer, "Response consumer");
401 Args.notNull(timeout, "Timeout");
402 final BasicFuture<T> future = new BasicFuture<>(callback);
403 final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(
404 requestProducer,
405 responseConsumer,
406 new FutureContribution<T>(future) {
407
408 @Override
409 public void completed(final T result) {
410 future.completed(result);
411 }
412
413 });
414 execute(exchangeHandler, pushHandlerFactory, timeout, context != null ? context : HttpCoreContext.create());
415 return future;
416 }
417
418 public final <T> Future<T> execute(
419 final AsyncRequestProducer requestProducer,
420 final AsyncResponseConsumer<T> responseConsumer,
421 final Timeout timeout,
422 final HttpContext context,
423 final FutureCallback<T> callback) {
424 return execute(requestProducer, responseConsumer, null, timeout, context, callback);
425 }
426
427 public final <T> Future<T> execute(
428 final AsyncRequestProducer requestProducer,
429 final AsyncResponseConsumer<T> responseConsumer,
430 final Timeout timeout,
431 final FutureCallback<T> callback) {
432 return execute(requestProducer, responseConsumer, null, timeout, null, callback);
433 }
434
435 protected void doTlsUpgrade(
436 final ProtocolIOSession ioSession,
437 final NamedEndpoint endpoint,
438 final FutureCallback<ProtocolIOSession> callback) {
439 if (tlsStrategy != null) {
440 tlsStrategy.upgrade(ioSession,
441 endpoint,
442 null,
443 handshakeTimeout,
444 new CallbackContribution<TransportSecurityLayer>(callback) {
445
446 @Override
447 public void completed(final TransportSecurityLayer transportSecurityLayer) {
448 if (callback != null) {
449 callback.completed(ioSession);
450 }
451 }
452
453 });
454 } else {
455 throw new IllegalStateException("TLS upgrade not supported");
456 }
457 }
458
459 private class InternalAsyncClientEndpoint extends AsyncClientEndpoint implements TlsUpgradeCapable {
460
461 final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
462
463 InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
464 this.poolEntryRef = new AtomicReference<>(poolEntry);
465 }
466
467 private IOSession getIOSession() {
468 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
469 if (poolEntry == null) {
470 throw new IllegalStateException("Endpoint has already been released");
471 }
472 final IOSession ioSession = poolEntry.getConnection();
473 if (ioSession == null) {
474 throw new IllegalStateException("I/O session is invalid");
475 }
476 return ioSession;
477 }
478
479 @Override
480 public void execute(
481 final AsyncClientExchangeHandler exchangeHandler,
482 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
483 final HttpContext context) {
484 final IOSession ioSession = getIOSession();
485 ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
486 if (!ioSession.isOpen()) {
487 try {
488 exchangeHandler.failed(new ConnectionClosedException());
489 } finally {
490 exchangeHandler.releaseResources();
491 }
492 }
493 }
494
495 @Override
496 public boolean isConnected() {
497 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
498 if (poolEntry != null) {
499 final IOSession ioSession = poolEntry.getConnection();
500 if (ioSession == null || !ioSession.isOpen()) {
501 return false;
502 }
503 final IOEventHandler handler = ioSession.getHandler();
504 return (handler instanceof HttpConnection) && ((HttpConnection) handler).isOpen();
505 }
506 return false;
507 }
508
509 @Override
510 public void releaseAndReuse() {
511 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
512 if (poolEntry != null) {
513 final IOSession ioSession = poolEntry.getConnection();
514 connPool.release(poolEntry, ioSession != null && ioSession.isOpen());
515 }
516 }
517
518 @Override
519 public void releaseAndDiscard() {
520 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
521 if (poolEntry != null) {
522 poolEntry.discardConnection(CloseMode.GRACEFUL);
523 connPool.release(poolEntry, false);
524 }
525 }
526
527 @Override
528 public void tlsUpgrade(final NamedEndpoint endpoint, final FutureCallback<ProtocolIOSession> callback) {
529 final IOSession ioSession = getIOSession();
530 if (ioSession instanceof ProtocolIOSession) {
531 doTlsUpgrade((ProtocolIOSession) ioSession, endpoint, callback);
532 } else {
533 throw new IllegalStateException("TLS upgrade not supported");
534 }
535 }
536 }
537
538 }