1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.bootstrap;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.Set;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import org.apache.hc.core5.annotation.Internal;
38 import org.apache.hc.core5.concurrent.BasicFuture;
39 import org.apache.hc.core5.concurrent.ComplexFuture;
40 import org.apache.hc.core5.concurrent.FutureCallback;
41 import org.apache.hc.core5.concurrent.FutureContribution;
42 import org.apache.hc.core5.function.Callback;
43 import org.apache.hc.core5.function.Decorator;
44 import org.apache.hc.core5.http.ConnectionClosedException;
45 import org.apache.hc.core5.http.EntityDetails;
46 import org.apache.hc.core5.http.Header;
47 import org.apache.hc.core5.http.HttpConnection;
48 import org.apache.hc.core5.http.HttpException;
49 import org.apache.hc.core5.http.HttpHost;
50 import org.apache.hc.core5.http.HttpRequest;
51 import org.apache.hc.core5.http.HttpResponse;
52 import org.apache.hc.core5.http.ProtocolException;
53 import org.apache.hc.core5.http.impl.DefaultAddressResolver;
54 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
55 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
56 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
57 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
58 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
59 import org.apache.hc.core5.http.nio.CapacityChannel;
60 import org.apache.hc.core5.http.nio.DataStreamChannel;
61 import org.apache.hc.core5.http.nio.HandlerFactory;
62 import org.apache.hc.core5.http.nio.RequestChannel;
63 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
65 import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
66 import org.apache.hc.core5.http.protocol.HttpContext;
67 import org.apache.hc.core5.http.protocol.HttpCoreContext;
68 import org.apache.hc.core5.io.CloseMode;
69 import org.apache.hc.core5.net.URIAuthority;
70 import org.apache.hc.core5.pool.ConnPoolControl;
71 import org.apache.hc.core5.pool.ManagedConnPool;
72 import org.apache.hc.core5.pool.PoolEntry;
73 import org.apache.hc.core5.pool.PoolStats;
74 import org.apache.hc.core5.reactor.Command;
75 import org.apache.hc.core5.reactor.EndpointParameters;
76 import org.apache.hc.core5.reactor.IOEventHandler;
77 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
78 import org.apache.hc.core5.reactor.IOReactorConfig;
79 import org.apache.hc.core5.reactor.IOSession;
80 import org.apache.hc.core5.reactor.IOSessionListener;
81 import org.apache.hc.core5.util.Args;
82 import org.apache.hc.core5.util.TimeValue;
83 import org.apache.hc.core5.util.Timeout;
84
85
86
87
88
89
90 public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
91
92 private final ManagedConnPool<HttpHost, IOSession> connPool;
93
94
95
96
97 @Internal
98 public HttpAsyncRequester(
99 final IOReactorConfig ioReactorConfig,
100 final IOEventHandlerFactory eventHandlerFactory,
101 final Decorator<IOSession> ioSessionDecorator,
102 final Callback<Exception> exceptionCallback,
103 final IOSessionListener sessionListener,
104 final ManagedConnPool<HttpHost, IOSession> connPool) {
105 super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
106 ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
107 this.connPool = Args.notNull(connPool, "Connection pool");
108 }
109
110 @Override
111 public PoolStats getTotalStats() {
112 return connPool.getTotalStats();
113 }
114
115 @Override
116 public PoolStats getStats(final HttpHost route) {
117 return connPool.getStats(route);
118 }
119
120 @Override
121 public void setMaxTotal(final int max) {
122 connPool.setMaxTotal(max);
123 }
124
125 @Override
126 public int getMaxTotal() {
127 return connPool.getMaxTotal();
128 }
129
130 @Override
131 public void setDefaultMaxPerRoute(final int max) {
132 connPool.setDefaultMaxPerRoute(max);
133 }
134
135 @Override
136 public int getDefaultMaxPerRoute() {
137 return connPool.getDefaultMaxPerRoute();
138 }
139
140 @Override
141 public void setMaxPerRoute(final HttpHost route, final int max) {
142 connPool.setMaxPerRoute(route, max);
143 }
144
145 @Override
146 public int getMaxPerRoute(final HttpHost route) {
147 return connPool.getMaxPerRoute(route);
148 }
149
150 @Override
151 public void closeIdle(final TimeValue idleTime) {
152 connPool.closeIdle(idleTime);
153 }
154
155 @Override
156 public void closeExpired() {
157 connPool.closeExpired();
158 }
159
160 @Override
161 public Set<HttpHost> getRoutes() {
162 return connPool.getRoutes();
163 }
164
165 public Future<AsyncClientEndpoint> connect(
166 final HttpHost host,
167 final Timeout timeout,
168 final Object attachment,
169 final FutureCallback<AsyncClientEndpoint> callback) {
170 return doConnect(host, timeout, attachment, callback);
171 }
172
173 protected Future<AsyncClientEndpoint> doConnect(
174 final HttpHost host,
175 final Timeout timeout,
176 final Object attachment,
177 final FutureCallback<AsyncClientEndpoint> callback) {
178 Args.notNull(host, "Host");
179 Args.notNull(timeout, "Timeout");
180 final ComplexFuture<AsyncClientEndpoint> resultFuture = new ComplexFuture<>(callback);
181 final Future<PoolEntry<HttpHost, IOSession>> leaseFuture = connPool.lease(
182 host, null, timeout, new FutureCallback<PoolEntry<HttpHost, IOSession>>() {
183
184 @Override
185 public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
186 final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
187 final IOSession ioSession = poolEntry.getConnection();
188 if (ioSession != null && !ioSession.isOpen()) {
189 poolEntry.discardConnection(CloseMode.IMMEDIATE);
190 }
191 if (poolEntry.hasConnection()) {
192 resultFuture.completed(endpoint);
193 } else {
194 final Future<IOSession> future = requestSession(
195 host,
196 timeout,
197 new EndpointParameters(host, attachment),
198 new FutureCallback<IOSession>() {
199
200 @Override
201 public void completed(final IOSession session) {
202 session.setSocketTimeout(timeout);
203 poolEntry.assignConnection(session);
204 resultFuture.completed(endpoint);
205 }
206
207 @Override
208 public void failed(final Exception cause) {
209 try {
210 resultFuture.failed(cause);
211 } finally {
212 endpoint.releaseAndDiscard();
213 }
214 }
215
216 @Override
217 public void cancelled() {
218 try {
219 resultFuture.cancel();
220 } finally {
221 endpoint.releaseAndDiscard();
222 }
223 }
224
225 });
226 resultFuture.setDependency(future);
227 }
228 }
229
230 @Override
231 public void failed(final Exception ex) {
232 resultFuture.failed(ex);
233 }
234
235 @Override
236 public void cancelled() {
237 resultFuture.cancel();
238 }
239
240 });
241 resultFuture.setDependency(leaseFuture);
242 return resultFuture;
243 }
244
245 public Future<AsyncClientEndpoint> connect(final HttpHost host, final Timeout timeout) {
246 return connect(host, timeout, null, null);
247 }
248
249 public void execute(
250 final AsyncClientExchangeHandler exchangeHandler,
251 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
252 final Timeout timeout,
253 final HttpContext executeContext) {
254 Args.notNull(exchangeHandler, "Exchange handler");
255 Args.notNull(timeout, "Timeout");
256 Args.notNull(executeContext, "Context");
257 try {
258 exchangeHandler.produceRequest(new RequestChannel() {
259
260 @Override
261 public void sendRequest(
262 final HttpRequest request,
263 final EntityDetails entityDetails, final HttpContext requestContext) throws HttpException, IOException {
264 final String scheme = request.getScheme();
265 final URIAuthority authority = request.getAuthority();
266 if (authority == null) {
267 throw new ProtocolException("Request authority not specified");
268 }
269 final HttpHostttp/HttpHost.html#HttpHost">HttpHost target = new HttpHost(scheme, authority);
270 connect(target, timeout, null, new FutureCallback<AsyncClientEndpoint>() {
271
272 @Override
273 public void completed(final AsyncClientEndpoint endpoint) {
274 endpoint.execute(new AsyncClientExchangeHandler() {
275
276 @Override
277 public void releaseResources() {
278 endpoint.releaseAndDiscard();
279 exchangeHandler.releaseResources();
280 }
281
282 @Override
283 public void failed(final Exception cause) {
284 endpoint.releaseAndDiscard();
285 exchangeHandler.failed(cause);
286 }
287
288 @Override
289 public void cancel() {
290 endpoint.releaseAndDiscard();
291 exchangeHandler.cancel();
292 }
293
294 @Override
295 public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
296 channel.sendRequest(request, entityDetails, httpContext);
297 }
298
299 @Override
300 public int available() {
301 return exchangeHandler.available();
302 }
303
304 @Override
305 public void produce(final DataStreamChannel channel) throws IOException {
306 exchangeHandler.produce(channel);
307 }
308
309 @Override
310 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
311 exchangeHandler.consumeInformation(response, httpContext);
312 }
313
314 @Override
315 public void consumeResponse(
316 final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
317 if (entityDetails == null) {
318 endpoint.releaseAndReuse();
319 }
320 exchangeHandler.consumeResponse(response, entityDetails, httpContext);
321 }
322
323 @Override
324 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
325 exchangeHandler.updateCapacity(capacityChannel);
326 }
327
328 @Override
329 public void consume(final ByteBuffer src) throws IOException {
330 exchangeHandler.consume(src);
331 }
332
333 @Override
334 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
335 endpoint.releaseAndReuse();
336 exchangeHandler.streamEnd(trailers);
337 }
338
339 }, pushHandlerFactory, executeContext);
340
341 }
342
343 @Override
344 public void failed(final Exception ex) {
345 exchangeHandler.failed(ex);
346 }
347
348 @Override
349 public void cancelled() {
350 exchangeHandler.cancel();
351 }
352
353 });
354
355 }
356
357 }, executeContext);
358
359 } catch (final IOException | HttpException ex) {
360 exchangeHandler.failed(ex);
361 }
362 }
363
364 public void execute(
365 final AsyncClientExchangeHandler exchangeHandler,
366 final Timeout timeout,
367 final HttpContext executeContext) {
368 execute(exchangeHandler, null, timeout, executeContext);
369 }
370
371 public final <T> Future<T> execute(
372 final AsyncRequestProducer requestProducer,
373 final AsyncResponseConsumer<T> responseConsumer,
374 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
375 final Timeout timeout,
376 final HttpContext context,
377 final FutureCallback<T> callback) {
378 Args.notNull(requestProducer, "Request producer");
379 Args.notNull(responseConsumer, "Response consumer");
380 Args.notNull(timeout, "Timeout");
381 final BasicFuture<T> future = new BasicFuture<>(callback);
382 final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(
383 requestProducer,
384 responseConsumer,
385 new FutureContribution<T>(future) {
386
387 @Override
388 public void completed(final T result) {
389 future.completed(result);
390 }
391
392 });
393 execute(exchangeHandler, pushHandlerFactory, timeout, context != null ? context : HttpCoreContext.create());
394 return future;
395 }
396
397 public final <T> Future<T> execute(
398 final AsyncRequestProducer requestProducer,
399 final AsyncResponseConsumer<T> responseConsumer,
400 final Timeout timeout,
401 final HttpContext context,
402 final FutureCallback<T> callback) {
403 return execute(requestProducer, responseConsumer, null, timeout, context, callback);
404 }
405
406 public final <T> Future<T> execute(
407 final AsyncRequestProducer requestProducer,
408 final AsyncResponseConsumer<T> responseConsumer,
409 final Timeout timeout,
410 final FutureCallback<T> callback) {
411 return execute(requestProducer, responseConsumer, null, timeout, null, callback);
412 }
413
414 private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
415
416 final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
417
418 InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
419 this.poolEntryRef = new AtomicReference<>(poolEntry);
420 }
421
422 private IOSession getIOSession() {
423 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
424 if (poolEntry == null) {
425 throw new IllegalStateException("Endpoint has already been released");
426 }
427 final IOSession ioSession = poolEntry.getConnection();
428 if (ioSession == null) {
429 throw new IllegalStateException("I/O session is invalid");
430 }
431 return ioSession;
432 }
433
434 @Override
435 public void execute(
436 final AsyncClientExchangeHandler exchangeHandler,
437 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
438 final HttpContext context) {
439 final IOSession ioSession = getIOSession();
440 ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
441 if (!ioSession.isOpen()) {
442 try {
443 exchangeHandler.failed(new ConnectionClosedException());
444 } finally {
445 exchangeHandler.releaseResources();
446 }
447 }
448 }
449
450 @Override
451 public boolean isConnected() {
452 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
453 if (poolEntry != null) {
454 final IOSession ioSession = poolEntry.getConnection();
455 if (ioSession == null || !ioSession.isOpen()) {
456 return false;
457 }
458 final IOEventHandler handler = ioSession.getHandler();
459 return (handler instanceof HttpConnection../../../org/apache/hc/core5/http/HttpConnection.html#HttpConnection">HttpConnection) && ((HttpConnection) handler).isOpen();
460 }
461 return false;
462 }
463
464 @Override
465 public void releaseAndReuse() {
466 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
467 if (poolEntry != null) {
468 final IOSession ioSession = poolEntry.getConnection();
469 connPool.release(poolEntry, ioSession != null && ioSession.isOpen());
470 }
471 }
472
473 @Override
474 public void releaseAndDiscard() {
475 final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
476 if (poolEntry != null) {
477 poolEntry.discardConnection(CloseMode.GRACEFUL);
478 connPool.release(poolEntry, false);
479 }
480 }
481
482 }
483
484 }