1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.protocol;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.concurrent.Future;
33
34 import org.apache.http.ConnectionClosedException;
35 import org.apache.http.ConnectionReuseStrategy;
36 import org.apache.http.ExceptionLogger;
37 import org.apache.http.HttpHost;
38 import org.apache.http.annotation.ThreadingBehavior;
39 import org.apache.http.annotation.Contract;
40 import org.apache.http.concurrent.BasicFuture;
41 import org.apache.http.concurrent.FutureCallback;
42 import org.apache.http.impl.DefaultConnectionReuseStrategy;
43 import org.apache.http.nio.NHttpClientConnection;
44 import org.apache.http.params.HttpParams;
45 import org.apache.http.pool.ConnPool;
46 import org.apache.http.pool.PoolEntry;
47 import org.apache.http.protocol.BasicHttpContext;
48 import org.apache.http.protocol.HttpContext;
49 import org.apache.http.protocol.HttpProcessor;
50 import org.apache.http.util.Args;
51
52
53
54
55
56
57
58
59
60
61 @SuppressWarnings("deprecation")
62 @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
63 public class HttpAsyncRequester {
64
65 private final HttpProcessor httpprocessor;
66 private final ConnectionReuseStrategy connReuseStrategy;
67 private final ExceptionLogger exceptionLogger;
68
69
70
71
72
73 @Deprecated
74 public HttpAsyncRequester(
75 final HttpProcessor httpprocessor,
76 final ConnectionReuseStrategy reuseStrategy,
77 final HttpParams params) {
78 this(httpprocessor, reuseStrategy);
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92
93 public HttpAsyncRequester(
94 final HttpProcessor httpprocessor,
95 final ConnectionReuseStrategy connReuseStrategy,
96 final ExceptionLogger exceptionLogger) {
97 super();
98 this.httpprocessor = Args.notNull(httpprocessor, "HTTP processor");
99 this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
100 DefaultConnectionReuseStrategy.INSTANCE;
101 this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
102 }
103
104
105
106
107
108
109 public HttpAsyncRequester(
110 final HttpProcessor httpprocessor,
111 final ConnectionReuseStrategy connReuseStrategy) {
112 this(httpprocessor, connReuseStrategy, (ExceptionLogger) null);
113 }
114
115
116
117
118
119
120 public HttpAsyncRequester(final HttpProcessor httpprocessor) {
121 this(httpprocessor, null);
122 }
123
124
125
126
127
128
129
130
131
132
133
134
135 public <T> Future<T> execute(
136 final HttpAsyncRequestProducer requestProducer,
137 final HttpAsyncResponseConsumer<T> responseConsumer,
138 final NHttpClientConnection conn,
139 final HttpContext context,
140 final FutureCallback<T> callback) {
141 Args.notNull(requestProducer, "HTTP request producer");
142 Args.notNull(responseConsumer, "HTTP response consumer");
143 Args.notNull(conn, "HTTP connection");
144 Args.notNull(context, "HTTP context");
145 final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
146 requestProducer, responseConsumer, callback, context, conn,
147 this.httpprocessor, this.connReuseStrategy);
148 initExecution(handler, conn);
149 return handler.getFuture();
150 }
151
152 private void initExecution(
153 final HttpAsyncClientExchangeHandler handler, final NHttpClientConnection conn) {
154
155 final HttpContext context = conn.getContext();
156 synchronized (context) {
157 context.setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
158 if (!conn.isOpen()) {
159 handler.failed(new ConnectionClosedException());
160 } else {
161 conn.requestOutput();
162 }
163 }
164 if (handler.isDone()) {
165 try {
166 handler.close();
167 } catch (final IOException ex) {
168 log(ex);
169 }
170 }
171 }
172
173
174
175
176
177
178
179
180
181
182
183 public <T> Future<T> execute(
184 final HttpAsyncRequestProducer requestProducer,
185 final HttpAsyncResponseConsumer<T> responseConsumer,
186 final NHttpClientConnection conn,
187 final HttpContext context) {
188 return execute(requestProducer, responseConsumer, conn, context, null);
189 }
190
191
192
193
194
195
196
197
198
199
200 public <T> Future<T> execute(
201 final HttpAsyncRequestProducer requestProducer,
202 final HttpAsyncResponseConsumer<T> responseConsumer,
203 final NHttpClientConnection conn) {
204 return execute(requestProducer, responseConsumer, conn, new BasicHttpContext());
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218
219 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
220 final HttpAsyncRequestProducer requestProducer,
221 final HttpAsyncResponseConsumer<T> responseConsumer,
222 final ConnPool<HttpHost, E> connPool,
223 final HttpContext context,
224 final FutureCallback<T> callback) {
225 Args.notNull(requestProducer, "HTTP request producer");
226 Args.notNull(responseConsumer, "HTTP response consumer");
227 Args.notNull(connPool, "HTTP connection pool");
228 Args.notNull(context, "HTTP context");
229 final BasicFuture<T> future = new BasicFuture<T>(callback);
230 final HttpHost target = requestProducer.getTarget();
231 connPool.lease(target, null, new ConnRequestCallback<T, E>(
232 future, requestProducer, responseConsumer, connPool, context));
233 return future;
234 }
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<List<T>> executePipelined(
252 final HttpHost target,
253 final List<? extends HttpAsyncRequestProducer> requestProducers,
254 final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
255 final ConnPool<HttpHost, E> connPool,
256 final HttpContext context,
257 final FutureCallback<List<T>> callback) {
258 Args.notNull(target, "HTTP target");
259 Args.notEmpty(requestProducers, "Request producer list");
260 Args.notEmpty(responseConsumers, "Response consumer list");
261 Args.notNull(connPool, "HTTP connection pool");
262 Args.notNull(context, "HTTP context");
263 final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback);
264 connPool.lease(target, null, new ConnPipelinedRequestCallback<T, E>(
265 future, requestProducers, responseConsumers, connPool, context));
266 return future;
267 }
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
287 final HttpAsyncRequestProducer requestProducer,
288 final HttpAsyncResponseConsumer<T> responseConsumer,
289 final E poolEntry,
290 final ConnPool<HttpHost, E> connPool,
291 final HttpContext context,
292 final FutureCallback<T> callback) {
293 Args.notNull(requestProducer, "HTTP request producer");
294 Args.notNull(responseConsumer, "HTTP response consumer");
295 Args.notNull(connPool, "HTTP connection pool");
296 Args.notNull(poolEntry, "Pool entry");
297 Args.notNull(context, "HTTP context");
298 final BasicFuture<T> future = new BasicFuture<T>(callback);
299 final NHttpClientConnection conn = poolEntry.getConnection();
300 final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
301 requestProducer, responseConsumer,
302 new RequestExecutionCallback<T, E>(future, poolEntry, connPool),
303 context, conn,
304 this.httpprocessor, this.connReuseStrategy);
305 initExecution(handler, conn);
306 return future;
307 }
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<List<T>> executePipelined(
327 final List<HttpAsyncRequestProducer> requestProducers,
328 final List<HttpAsyncResponseConsumer<T>> responseConsumers,
329 final E poolEntry,
330 final ConnPool<HttpHost, E> connPool,
331 final HttpContext context,
332 final FutureCallback<List<T>> callback) {
333 Args.notEmpty(requestProducers, "Request producer list");
334 Args.notEmpty(responseConsumers, "Response consumer list");
335 Args.notNull(connPool, "HTTP connection pool");
336 Args.notNull(poolEntry, "Pool entry");
337 Args.notNull(context, "HTTP context");
338 final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback);
339 final NHttpClientConnection conn = poolEntry.getConnection();
340 final PipeliningClientExchangeHandler<T> handler = new PipeliningClientExchangeHandler<T>(
341 requestProducers, responseConsumers,
342 new RequestExecutionCallback<List<T>, E>(future, poolEntry, connPool),
343 context, conn,
344 this.httpprocessor, this.connReuseStrategy);
345 initExecution(handler, conn);
346 return future;
347 }
348
349
350
351
352
353
354
355
356
357
358
359
360 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
361 final HttpAsyncRequestProducer requestProducer,
362 final HttpAsyncResponseConsumer<T> responseConsumer,
363 final ConnPool<HttpHost, E> connPool,
364 final HttpContext context) {
365 return execute(requestProducer, responseConsumer, connPool, context, null);
366 }
367
368
369
370
371
372
373
374
375
376
377
378 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
379 final HttpAsyncRequestProducer requestProducer,
380 final HttpAsyncResponseConsumer<T> responseConsumer,
381 final ConnPool<HttpHost, E> connPool) {
382 return execute(requestProducer, responseConsumer, connPool, new BasicHttpContext());
383 }
384
385 class ConnRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
386
387 private final BasicFuture<T> requestFuture;
388 private final HttpAsyncRequestProducer requestProducer;
389 private final HttpAsyncResponseConsumer<T> responseConsumer;
390 private final ConnPool<HttpHost, E> connPool;
391 private final HttpContext context;
392
393 ConnRequestCallback(
394 final BasicFuture<T> requestFuture,
395 final HttpAsyncRequestProducer requestProducer,
396 final HttpAsyncResponseConsumer<T> responseConsumer,
397 final ConnPool<HttpHost, E> connPool,
398 final HttpContext context) {
399 super();
400 this.requestFuture = requestFuture;
401 this.requestProducer = requestProducer;
402 this.responseConsumer = responseConsumer;
403 this.connPool = connPool;
404 this.context = context;
405 }
406
407 @Override
408 public void completed(final E result) {
409 if (this.requestFuture.isDone()) {
410 this.connPool.release(result, true);
411 return;
412 }
413 final NHttpClientConnection conn = result.getConnection();
414 final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
415 this.requestProducer, this.responseConsumer,
416 new RequestExecutionCallback<T, E>(this.requestFuture, result, this.connPool),
417 this.context, conn, httpprocessor, connReuseStrategy);
418 initExecution(handler, conn);
419 }
420
421 @Override
422 public void failed(final Exception ex) {
423 try {
424 try {
425 this.responseConsumer.failed(ex);
426 } finally {
427 releaseResources();
428 }
429 } finally {
430 this.requestFuture.failed(ex);
431 }
432 }
433
434 @Override
435 public void cancelled() {
436 try {
437 try {
438 this.responseConsumer.cancel();
439 } finally {
440 releaseResources();
441 }
442 } finally {
443 this.requestFuture.cancel(true);
444 }
445 }
446
447 public void releaseResources() {
448 close(requestProducer);
449 close(responseConsumer);
450 }
451
452 }
453
454 class ConnPipelinedRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
455
456 private final BasicFuture<List<T>> requestFuture;
457 private final List<? extends HttpAsyncRequestProducer> requestProducers;
458 private final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers;
459 private final ConnPool<HttpHost, E> connPool;
460 private final HttpContext context;
461
462 ConnPipelinedRequestCallback(
463 final BasicFuture<List<T>> requestFuture,
464 final List<? extends HttpAsyncRequestProducer> requestProducers,
465 final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
466 final ConnPool<HttpHost, E> connPool,
467 final HttpContext context) {
468 super();
469 this.requestFuture = requestFuture;
470 this.requestProducers = requestProducers;
471 this.responseConsumers = responseConsumers;
472 this.connPool = connPool;
473 this.context = context;
474 }
475
476 @Override
477 public void completed(final E result) {
478 if (this.requestFuture.isDone()) {
479 this.connPool.release(result, true);
480 return;
481 }
482 final NHttpClientConnection conn = result.getConnection();
483 final PipeliningClientExchangeHandler<T> handler = new PipeliningClientExchangeHandler<T>(
484 this.requestProducers, this.responseConsumers,
485 new RequestExecutionCallback<List<T>, E>(this.requestFuture, result, this.connPool),
486 this.context, conn, httpprocessor, connReuseStrategy);
487 initExecution(handler, conn);
488 }
489
490 @Override
491 public void failed(final Exception ex) {
492 try {
493 try {
494 for (final HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
495 responseConsumer.failed(ex);
496 }
497 } finally {
498 releaseResources();
499 }
500 } finally {
501 this.requestFuture.failed(ex);
502 }
503 }
504
505 @Override
506 public void cancelled() {
507 try {
508 try {
509 for (final HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
510 responseConsumer.cancel();
511 }
512 } finally {
513 releaseResources();
514 }
515 } finally {
516 this.requestFuture.cancel(true);
517 }
518 }
519
520 public void releaseResources() {
521 for (final HttpAsyncRequestProducer requestProducer: this.requestProducers) {
522 close(requestProducer);
523 }
524 for (final HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
525 close(responseConsumer);
526 }
527 }
528
529 }
530
531 class RequestExecutionCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>>
532 implements FutureCallback<T> {
533
534 private final BasicFuture<T> future;
535 private final E poolEntry;
536 private final ConnPool<HttpHost, E> connPool;
537
538 RequestExecutionCallback(
539 final BasicFuture<T> future,
540 final E poolEntry,
541 final ConnPool<HttpHost, E> connPool) {
542 super();
543 this.future = future;
544 this.poolEntry = poolEntry;
545 this.connPool = connPool;
546 }
547
548 @Override
549 public void completed(final T result) {
550 try {
551 this.connPool.release(this.poolEntry, true);
552 } finally {
553 this.future.completed(result);
554 }
555 }
556
557 @Override
558 public void failed(final Exception ex) {
559 try {
560 this.connPool.release(this.poolEntry, false);
561 } finally {
562 this.future.failed(ex);
563 }
564 }
565
566 @Override
567 public void cancelled() {
568 try {
569 this.connPool.release(this.poolEntry, false);
570 } finally {
571 this.future.cancel(true);
572 }
573 }
574
575 }
576
577
578
579
580
581
582
583
584 protected void log(final Exception ex) {
585 this.exceptionLogger.log(ex);
586 }
587
588 private void close(final Closeable closeable) {
589 try {
590 closeable.close();
591 } catch (final IOException ex) {
592 log(ex);
593 }
594 }
595
596 }