View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.protocol;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.List;
32  import java.util.concurrent.Future;
33  
34  import org.apache.http.ConnectionClosedException;
35  import org.apache.http.ConnectionReuseStrategy;
36  import org.apache.http.ExceptionLogger;
37  import org.apache.http.HttpHost;
38  import org.apache.http.annotation.ThreadingBehavior;
39  import org.apache.http.annotation.Contract;
40  import org.apache.http.concurrent.BasicFuture;
41  import org.apache.http.concurrent.FutureCallback;
42  import org.apache.http.impl.DefaultConnectionReuseStrategy;
43  import org.apache.http.nio.NHttpClientConnection;
44  import org.apache.http.params.HttpParams;
45  import org.apache.http.pool.ConnPool;
46  import org.apache.http.pool.PoolEntry;
47  import org.apache.http.protocol.BasicHttpContext;
48  import org.apache.http.protocol.HttpContext;
49  import org.apache.http.protocol.HttpProcessor;
50  import org.apache.http.util.Args;
51  
52  /**
53   * {@code HttpAsyncRequester} is a utility class that can be used
54   * in conjunction with {@link HttpAsyncRequestExecutor} to initiate execution
55   * of asynchronous HTTP requests.
56   *
57   * @see HttpAsyncRequestExecutor
58   *
59   * @since 4.2
60   */
61  @SuppressWarnings("deprecation")
62  @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
63  public class HttpAsyncRequester {
64  
65      private final HttpProcessor httpprocessor;
66      private final ConnectionReuseStrategy connReuseStrategy;
67      private final ExceptionLogger exceptionLogger;
68  
69      /**
70       * @deprecated (4.3) use {@link HttpAsyncRequester#HttpAsyncRequester(HttpProcessor,
71       *   ConnectionReuseStrategy)}
72       */
73      @Deprecated
74      public HttpAsyncRequester(
75              final HttpProcessor httpprocessor,
76              final ConnectionReuseStrategy reuseStrategy,
77              final HttpParams params) {
78          this(httpprocessor, reuseStrategy);
79      }
80  
81      /**
82       * Creates new instance of {@code HttpAsyncRequester}.
83       * @param httpprocessor HTTP protocol processor.
84       * @param connReuseStrategy Connection re-use strategy. If {@code null}
85       *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
86       * @param exceptionLogger Exception logger. If {@code null}
87       *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
88       *   logger will be only used to log I/O exception thrown while closing
89       *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
90       *
91       * @since 4.4
92       */
93      public HttpAsyncRequester(
94              final HttpProcessor httpprocessor,
95              final ConnectionReuseStrategy connReuseStrategy,
96              final ExceptionLogger exceptionLogger) {
97          super();
98          this.httpprocessor = Args.notNull(httpprocessor, "HTTP processor");
99          this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
100                 DefaultConnectionReuseStrategy.INSTANCE;
101         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
102     }
103 
104     /**
105      * Creates new instance of HttpAsyncRequester.
106      *
107      * @since 4.3
108      */
109     public HttpAsyncRequester(
110             final HttpProcessor httpprocessor,
111             final ConnectionReuseStrategy connReuseStrategy) {
112         this(httpprocessor, connReuseStrategy, (ExceptionLogger) null);
113     }
114 
115     /**
116      * Creates new instance of HttpAsyncRequester.
117      *
118      * @since 4.3
119      */
120     public HttpAsyncRequester(final HttpProcessor httpprocessor) {
121         this(httpprocessor, null);
122     }
123 
124     /**
125      * Initiates asynchronous HTTP request execution.
126      *
127      * @param <T> the result type of request execution.
128      * @param requestProducer request producer.
129      * @param responseConsumer response consumer.
130      * @param conn underlying HTTP connection.
131      * @param context HTTP context
132      * @param callback future callback.
133      * @return future representing pending completion of the operation.
134      */
135     public <T> Future<T> execute(
136             final HttpAsyncRequestProducer requestProducer,
137             final HttpAsyncResponseConsumer<T> responseConsumer,
138             final NHttpClientConnection conn,
139             final HttpContext context,
140             final FutureCallback<T> callback) {
141         Args.notNull(requestProducer, "HTTP request producer");
142         Args.notNull(responseConsumer, "HTTP response consumer");
143         Args.notNull(conn, "HTTP connection");
144         Args.notNull(context, "HTTP context");
145         final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
146                 requestProducer, responseConsumer, callback, context, conn,
147                 this.httpprocessor, this.connReuseStrategy);
148         initExecution(handler, conn);
149         return handler.getFuture();
150     }
151 
152     private void initExecution(
153             final HttpAsyncClientExchangeHandler handler, final NHttpClientConnection conn) {
154 
155         final HttpContext context = conn.getContext();
156         synchronized (context) {
157             context.setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
158             if (!conn.isOpen()) {
159                 handler.failed(new ConnectionClosedException());
160             } else {
161                 conn.requestOutput();
162             }
163         }
164         if (handler.isDone()) {
165             try {
166                 handler.close();
167             } catch (final IOException ex) {
168                 log(ex);
169             }
170         }
171     }
172 
173     /**
174      * Initiates asynchronous HTTP request execution.
175      *
176      * @param <T> the result type of request execution.
177      * @param requestProducer request producer.
178      * @param responseConsumer response consumer.
179      * @param conn underlying HTTP connection.
180      * @param context HTTP context
181      * @return future representing pending completion of the operation.
182      */
183     public <T> Future<T> execute(
184             final HttpAsyncRequestProducer requestProducer,
185             final HttpAsyncResponseConsumer<T> responseConsumer,
186             final NHttpClientConnection conn,
187             final HttpContext context) {
188         return execute(requestProducer, responseConsumer, conn, context, null);
189     }
190 
191     /**
192      * Initiates asynchronous HTTP request execution.
193      *
194      * @param <T> the result type of request execution.
195      * @param requestProducer request producer.
196      * @param responseConsumer response consumer.
197      * @param conn underlying HTTP connection.
198      * @return future representing pending completion of the operation.
199      */
200     public <T> Future<T> execute(
201             final HttpAsyncRequestProducer requestProducer,
202             final HttpAsyncResponseConsumer<T> responseConsumer,
203             final NHttpClientConnection conn) {
204         return execute(requestProducer, responseConsumer, conn, new BasicHttpContext());
205     }
206 
207     /**
208      * Initiates asynchronous HTTP request execution.
209      *
210      * @param <T> the result type of request execution.
211      * @param <E> the connection pool entry type.
212      * @param requestProducer request producer.
213      * @param responseConsumer response consumer.
214      * @param connPool pool of persistent reusable connections.
215      * @param context HTTP context
216      * @param callback future callback.
217      * @return future representing pending completion of the operation.
218      */
219     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
220             final HttpAsyncRequestProducer requestProducer,
221             final HttpAsyncResponseConsumer<T> responseConsumer,
222             final ConnPool<HttpHost, E> connPool,
223             final HttpContext context,
224             final FutureCallback<T> callback) {
225         Args.notNull(requestProducer, "HTTP request producer");
226         Args.notNull(responseConsumer, "HTTP response consumer");
227         Args.notNull(connPool, "HTTP connection pool");
228         Args.notNull(context, "HTTP context");
229         final BasicFuture<T> future = new BasicFuture<T>(callback);
230         final HttpHost target = requestProducer.getTarget();
231         connPool.lease(target, null, new ConnRequestCallback<T, E>(
232                 future, requestProducer, responseConsumer, connPool, context));
233         return future;
234     }
235 
236     /**
237      * Initiates asynchronous pipelined HTTP request execution.
238      *
239      * @param <T> the result type of request execution.
240      * @param <E> the connection pool entry type.
241      * @param target target host.
242      * @param requestProducers list of request producers.
243      * @param responseConsumers list of response consumers.
244      * @param connPool pool of persistent reusable connections.
245      * @param context HTTP context
246      * @param callback future callback.
247      * @return future representing pending completion of the operation.
248      *
249      * @since 4.4
250      */
251     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<List<T>> executePipelined(
252             final HttpHost target,
253             final List<? extends HttpAsyncRequestProducer> requestProducers,
254             final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
255             final ConnPool<HttpHost, E> connPool,
256             final HttpContext context,
257             final FutureCallback<List<T>> callback) {
258         Args.notNull(target, "HTTP target");
259         Args.notEmpty(requestProducers, "Request producer list");
260         Args.notEmpty(responseConsumers, "Response consumer list");
261         Args.notNull(connPool, "HTTP connection pool");
262         Args.notNull(context, "HTTP context");
263         final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback);
264         connPool.lease(target, null, new ConnPipelinedRequestCallback<T, E>(
265                 future, requestProducers, responseConsumers, connPool, context));
266         return future;
267     }
268 
269     /**
270      * Initiates asynchronous HTTP request execution. This method automatically releases
271      * the given pool entry once request execution is completed (successfully or unsuccessfully).
272      *
273      * @param <T> the result type of request execution.
274      * @param <E> the connection pool entry type.
275      * @param requestProducer request producer.
276      * @param responseConsumer response consumer.
277      * @param poolEntry leased pool entry. It will be automatically released
278      *   back to the pool when execution is completed.
279      * @param connPool pool of persistent reusable connections.
280      * @param context HTTP context
281      * @param callback future callback.
282      * @return future representing pending completion of the operation.
283      *
284      * @since 4.3
285      */
286     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
287             final HttpAsyncRequestProducer requestProducer,
288             final HttpAsyncResponseConsumer<T> responseConsumer,
289             final E poolEntry,
290             final ConnPool<HttpHost, E> connPool,
291             final HttpContext context,
292             final FutureCallback<T> callback) {
293         Args.notNull(requestProducer, "HTTP request producer");
294         Args.notNull(responseConsumer, "HTTP response consumer");
295         Args.notNull(connPool, "HTTP connection pool");
296         Args.notNull(poolEntry, "Pool entry");
297         Args.notNull(context, "HTTP context");
298         final BasicFuture<T> future = new BasicFuture<T>(callback);
299         final NHttpClientConnection conn = poolEntry.getConnection();
300         final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
301                 requestProducer, responseConsumer,
302                 new RequestExecutionCallback<T, E>(future, poolEntry, connPool),
303                 context, conn,
304                 this.httpprocessor, this.connReuseStrategy);
305         initExecution(handler, conn);
306         return future;
307     }
308 
309     /**
310      * Initiates asynchronous pipelined HTTP request execution. This method automatically releases
311      * the given pool entry once request execution is completed (successfully or unsuccessfully).
312      *
313      * @param <T> the result type of request execution.
314      * @param <E> the connection pool entry type.
315      * @param requestProducers list of request producers.
316      * @param responseConsumers list of response consumers.
317      * @param poolEntry leased pool entry. It will be automatically released
318      *   back to the pool when execution is completed.
319      * @param connPool pool of persistent reusable connections.
320      * @param context HTTP context
321      * @param callback future callback.
322      * @return future representing pending completion of the operation.
323      *
324      * @since 4.4
325      */
326     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<List<T>> executePipelined(
327             final List<HttpAsyncRequestProducer> requestProducers,
328             final List<HttpAsyncResponseConsumer<T>> responseConsumers,
329             final E poolEntry,
330             final ConnPool<HttpHost, E> connPool,
331             final HttpContext context,
332             final FutureCallback<List<T>> callback) {
333         Args.notEmpty(requestProducers, "Request producer list");
334         Args.notEmpty(responseConsumers, "Response consumer list");
335         Args.notNull(connPool, "HTTP connection pool");
336         Args.notNull(poolEntry, "Pool entry");
337         Args.notNull(context, "HTTP context");
338         final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback);
339         final NHttpClientConnection conn = poolEntry.getConnection();
340         final PipeliningClientExchangeHandler<T> handler = new PipeliningClientExchangeHandler<T>(
341                 requestProducers, responseConsumers,
342                 new RequestExecutionCallback<List<T>, E>(future, poolEntry, connPool),
343                 context, conn,
344                 this.httpprocessor, this.connReuseStrategy);
345         initExecution(handler, conn);
346         return future;
347     }
348 
349     /**
350      * Initiates asynchronous HTTP request execution.
351      *
352      * @param <T> the result type of request execution.
353      * @param <E> the connection pool entry type.
354      * @param requestProducer request producer.
355      * @param responseConsumer response consumer.
356      * @param connPool pool of persistent reusable connections.
357      * @param context HTTP context
358      * @return future representing pending completion of the operation.
359      */
360     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
361             final HttpAsyncRequestProducer requestProducer,
362             final HttpAsyncResponseConsumer<T> responseConsumer,
363             final ConnPool<HttpHost, E> connPool,
364             final HttpContext context) {
365         return execute(requestProducer, responseConsumer, connPool, context, null);
366     }
367 
368     /**
369      * Initiates asynchronous HTTP request execution.
370      *
371      * @param <T> the result type of request execution.
372      * @param <E> the connection pool entry type.
373      * @param requestProducer request producer.
374      * @param responseConsumer response consumer.
375      * @param connPool pool of persistent reusable connections.
376      * @return future representing pending completion of the operation.
377      */
378     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
379             final HttpAsyncRequestProducer requestProducer,
380             final HttpAsyncResponseConsumer<T> responseConsumer,
381             final ConnPool<HttpHost, E> connPool) {
382         return execute(requestProducer, responseConsumer, connPool, new BasicHttpContext());
383     }
384 
385     class ConnRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
386 
387         private final BasicFuture<T> requestFuture;
388         private final HttpAsyncRequestProducer requestProducer;
389         private final HttpAsyncResponseConsumer<T> responseConsumer;
390         private final ConnPool<HttpHost, E> connPool;
391         private final HttpContext context;
392 
393         ConnRequestCallback(
394                 final BasicFuture<T> requestFuture,
395                 final HttpAsyncRequestProducer requestProducer,
396                 final HttpAsyncResponseConsumer<T> responseConsumer,
397                 final ConnPool<HttpHost, E> connPool,
398                 final HttpContext context) {
399             super();
400             this.requestFuture = requestFuture;
401             this.requestProducer = requestProducer;
402             this.responseConsumer = responseConsumer;
403             this.connPool = connPool;
404             this.context = context;
405         }
406 
407         @Override
408         public void completed(final E result) {
409             if (this.requestFuture.isDone()) {
410                 this.connPool.release(result, true);
411                 return;
412             }
413             final NHttpClientConnection conn = result.getConnection();
414             final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
415                     this.requestProducer, this.responseConsumer,
416                     new RequestExecutionCallback<T, E>(this.requestFuture, result, this.connPool),
417                     this.context, conn, httpprocessor, connReuseStrategy);
418             initExecution(handler, conn);
419         }
420 
421         @Override
422         public void failed(final Exception ex) {
423             try {
424                 try {
425                     this.responseConsumer.failed(ex);
426                 } finally {
427                     releaseResources();
428                 }
429             } finally {
430                 this.requestFuture.failed(ex);
431             }
432         }
433 
434         @Override
435         public void cancelled() {
436             try {
437                 try {
438                     this.responseConsumer.cancel();
439                 } finally {
440                     releaseResources();
441                 }
442             } finally {
443                 this.requestFuture.cancel(true);
444             }
445         }
446 
447         public void releaseResources() {
448             close(requestProducer);
449             close(responseConsumer);
450         }
451 
452     }
453 
454     class ConnPipelinedRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
455 
456         private final BasicFuture<List<T>> requestFuture;
457         private final List<? extends HttpAsyncRequestProducer> requestProducers;
458         private final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers;
459         private final ConnPool<HttpHost, E> connPool;
460         private final HttpContext context;
461 
462         ConnPipelinedRequestCallback(
463                 final BasicFuture<List<T>> requestFuture,
464                 final List<? extends HttpAsyncRequestProducer> requestProducers,
465                 final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
466                 final ConnPool<HttpHost, E> connPool,
467                 final HttpContext context) {
468             super();
469             this.requestFuture = requestFuture;
470             this.requestProducers = requestProducers;
471             this.responseConsumers = responseConsumers;
472             this.connPool = connPool;
473             this.context = context;
474         }
475 
476         @Override
477         public void completed(final E result) {
478             if (this.requestFuture.isDone()) {
479                 this.connPool.release(result, true);
480                 return;
481             }
482             final NHttpClientConnection conn = result.getConnection();
483             final PipeliningClientExchangeHandler<T> handler = new PipeliningClientExchangeHandler<T>(
484                     this.requestProducers, this.responseConsumers,
485                     new RequestExecutionCallback<List<T>, E>(this.requestFuture, result, this.connPool),
486                     this.context, conn, httpprocessor, connReuseStrategy);
487             initExecution(handler, conn);
488         }
489 
490         @Override
491         public void failed(final Exception ex) {
492             try {
493                 try {
494                     for (final HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
495                         responseConsumer.failed(ex);
496                     }
497                 } finally {
498                     releaseResources();
499                 }
500             } finally {
501                 this.requestFuture.failed(ex);
502             }
503         }
504 
505         @Override
506         public void cancelled() {
507             try {
508                 try {
509                     for (final HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
510                         responseConsumer.cancel();
511                     }
512                 } finally {
513                     releaseResources();
514                 }
515             } finally {
516                 this.requestFuture.cancel(true);
517             }
518         }
519 
520         public void releaseResources() {
521             for (final HttpAsyncRequestProducer requestProducer: this.requestProducers) {
522                 close(requestProducer);
523             }
524             for (final HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
525                 close(responseConsumer);
526             }
527         }
528 
529     }
530 
531     class RequestExecutionCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>>
532                                                implements FutureCallback<T> {
533 
534         private final BasicFuture<T> future;
535         private final E poolEntry;
536         private final ConnPool<HttpHost, E> connPool;
537 
538         RequestExecutionCallback(
539                 final BasicFuture<T> future,
540                 final E poolEntry,
541                 final ConnPool<HttpHost, E> connPool) {
542             super();
543             this.future = future;
544             this.poolEntry = poolEntry;
545             this.connPool = connPool;
546         }
547 
548         @Override
549         public void completed(final T result) {
550             try {
551                 this.connPool.release(this.poolEntry, true);
552             } finally {
553                 this.future.completed(result);
554             }
555         }
556 
557         @Override
558         public void failed(final Exception ex) {
559             try {
560                 this.connPool.release(this.poolEntry, false);
561             } finally {
562                 this.future.failed(ex);
563             }
564         }
565 
566         @Override
567         public void cancelled() {
568             try {
569                 this.connPool.release(this.poolEntry, false);
570             } finally {
571                 this.future.cancel(true);
572             }
573         }
574 
575     }
576 
577     /**
578      * This method can be used to log I/O exception thrown while closing
579      * {@link java.io.Closeable} objects (such as
580      * {@link org.apache.http.HttpConnection}}).
581      *
582      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
583      */
584     protected void log(final Exception ex) {
585         this.exceptionLogger.log(ex);
586     }
587 
588     private void close(final Closeable closeable) {
589         try {
590             closeable.close();
591         } catch (final IOException ex) {
592             log(ex);
593         }
594     }
595 
596 }