View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.Closeable;
31  import java.io.IOException;
32  import java.util.ArrayList;
33  import java.util.List;
34  import java.util.Queue;
35  import java.util.concurrent.ConcurrentLinkedQueue;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import org.apache.http.ConnectionClosedException;
41  import org.apache.http.ConnectionReuseStrategy;
42  import org.apache.http.HttpException;
43  import org.apache.http.HttpRequest;
44  import org.apache.http.HttpResponse;
45  import org.apache.http.concurrent.BasicFuture;
46  import org.apache.http.concurrent.FutureCallback;
47  import org.apache.http.impl.DefaultConnectionReuseStrategy;
48  import org.apache.http.nio.ContentDecoder;
49  import org.apache.http.nio.ContentEncoder;
50  import org.apache.http.nio.IOControl;
51  import org.apache.http.nio.NHttpClientConnection;
52  import org.apache.http.protocol.HttpContext;
53  import org.apache.http.protocol.HttpCoreContext;
54  import org.apache.http.protocol.HttpProcessor;
55  import org.apache.http.util.Args;
56  import org.apache.http.util.Asserts;
57  
58  /**
59   * Pipelining implementation of {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler}
60   * that executes a series of pipelined HTTP requests.
61   *
62   * @param <T> the result type of request execution.
63   * @since 4.4
64   */
65  @Pipelined()
66  public class PipeliningClientExchangeHandler<T> implements HttpAsyncClientExchangeHandler {
67  
68      private final Queue<HttpAsyncRequestProducer> requestProducerQueue;
69      private final Queue<HttpAsyncResponseConsumer<T>> responseConsumerQueue;
70      private final Queue<HttpRequest> requestQueue;
71      private final Queue<T> resultQueue;
72      private final BasicFuture<List<T>> future;
73      private final HttpContext localContext;
74      private final NHttpClientConnection conn;
75      private final HttpProcessor httpPocessor;
76      private final ConnectionReuseStrategy connReuseStrategy;
77  
78      private final AtomicReference<HttpAsyncRequestProducer> requestProducerRef;
79      private final AtomicReference<HttpAsyncResponseConsumer<T>> responseConsumerRef;
80      private final AtomicBoolean keepAlive;
81      private final AtomicBoolean closed;
82  
83      /**
84       * Creates new instance of {@code PipeliningClientExchangeHandler}.
85       *
86       * @param requestProducers the request producers.
87       * @param responseConsumers the response consumers.
88       * @param callback the future callback invoked when the operation is completed.
89       * @param localContext the local execution context.
90       * @param conn the actual connection.
91       * @param httpPocessor the HTTP protocol processor.
92       * @param connReuseStrategy the connection re-use strategy.
93       */
94      public PipeliningClientExchangeHandler(
95              final List<? extends HttpAsyncRequestProducer> requestProducers,
96              final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
97              final FutureCallback<List<T>> callback,
98              final HttpContext localContext,
99              final NHttpClientConnection conn,
100             final HttpProcessor httpPocessor,
101             final ConnectionReuseStrategy connReuseStrategy) {
102         super();
103         Args.notEmpty(requestProducers, "Request producer list");
104         Args.notEmpty(responseConsumers, "Response consumer list");
105         Args.check(requestProducers.size() == responseConsumers.size(),
106                 "Number of request producers does not match that of response consumers");
107         this.requestProducerQueue = new ConcurrentLinkedQueue<HttpAsyncRequestProducer>(requestProducers);
108         this.responseConsumerQueue = new ConcurrentLinkedQueue<HttpAsyncResponseConsumer<T>>(responseConsumers);
109         this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
110         this.resultQueue = new ConcurrentLinkedQueue<T>();
111         this.future = new BasicFuture<List<T>>(callback);
112         this.localContext = Args.notNull(localContext, "HTTP context");
113         this.conn = Args.notNull(conn, "HTTP connection");
114         this.httpPocessor = Args.notNull(httpPocessor, "HTTP processor");
115         this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
116             DefaultConnectionReuseStrategy.INSTANCE;
117         this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, this.conn);
118         this.requestProducerRef = new AtomicReference<HttpAsyncRequestProducer>(null);
119         this.responseConsumerRef = new AtomicReference<HttpAsyncResponseConsumer<T>>(null);
120         this.keepAlive = new AtomicBoolean(false);
121         this.closed = new AtomicBoolean(false);
122     }
123 
124     /**
125      * Creates new instance of {@code PipeliningClientExchangeHandler}.
126      *
127      * @param requestProducers the request producers.
128      * @param responseConsumers the response consumers.
129      * @param localContext the local execution context.
130      * @param conn the actual connection.
131      * @param httpPocessor the HTTP protocol processor.
132      */
133     public PipeliningClientExchangeHandler(
134             final List<? extends HttpAsyncRequestProducer> requestProducers,
135             final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
136             final HttpContext localContext,
137             final NHttpClientConnection conn,
138             final HttpProcessor httpPocessor) {
139         this(requestProducers, responseConsumers, null, localContext, conn, httpPocessor, null);
140     }
141 
142     public Future<List<T>> getFuture() {
143         return this.future;
144     }
145 
146     private static void closeQuietly(final Closeable closeable) {
147         if (closeable != null) {
148             try {
149                 closeable.close();
150             } catch (final IOException ex) {
151             }
152         }
153     }
154 
155     private void releaseResources() {
156         closeQuietly(this.requestProducerRef.getAndSet(null));
157         closeQuietly(this.responseConsumerRef.getAndSet(null));
158         while (!this.requestProducerQueue.isEmpty()) {
159             closeQuietly(this.requestProducerQueue.remove());
160         }
161         while (!this.responseConsumerQueue.isEmpty()) {
162             closeQuietly(this.responseConsumerQueue.remove());
163         }
164         this.requestQueue.clear();
165         this.resultQueue.clear();
166     }
167 
168     @Override
169     public void close() throws IOException {
170         if (this.closed.compareAndSet(false, true)) {
171             releaseResources();
172             if (!this.future.isDone()) {
173                 this.future.cancel();
174             }
175         }
176     }
177 
178     @Override
179     public HttpRequest generateRequest() throws IOException, HttpException {
180         Asserts.check(this.requestProducerRef.get() == null, "Inconsistent state: request producer is not null");
181         final HttpAsyncRequestProducer requestProducer = this.requestProducerQueue.poll();
182         if (requestProducer == null) {
183             return null;
184         }
185         this.requestProducerRef.set(requestProducer);
186         final HttpRequest request = requestProducer.generateRequest();
187         this.httpPocessor.process(request, this.localContext);
188         this.requestQueue.add(request);
189         return request;
190     }
191 
192     @Override
193     public void produceContent(
194             final ContentEncoder encoder, final IOControl ioControl) throws IOException {
195         final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
196         Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
197         requestProducer.produceContent(encoder, ioControl);
198     }
199 
200     @Override
201     public void requestCompleted() {
202         final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.getAndSet(null);
203         Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
204         requestProducer.requestCompleted(this.localContext);
205     }
206 
207     @Override
208     public void responseReceived(final HttpResponse response) throws IOException, HttpException {
209         Asserts.check(this.responseConsumerRef.get() == null, "Inconsistent state: response consumer is not null");
210 
211         final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerQueue.poll();
212         Asserts.check(responseConsumer != null, "Inconsistent state: response consumer queue is empty");
213         this.responseConsumerRef.set(responseConsumer);
214 
215         final HttpRequest request = this.requestQueue.poll();
216         Asserts.check(request != null, "Inconsistent state: request queue is empty");
217 
218         this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
219         this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
220         this.httpPocessor.process(response, this.localContext);
221 
222         responseConsumer.responseReceived(response);
223         this.keepAlive.set(this.connReuseStrategy.keepAlive(response, this.localContext));
224     }
225 
226     @Override
227     public void consumeContent(
228             final ContentDecoder decoder, final IOControl ioControl) throws IOException {
229         final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
230         Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is null");
231         responseConsumer.consumeContent(decoder, ioControl);
232     }
233 
234     @Override
235     public void responseCompleted() throws IOException {
236         final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.getAndSet(null);
237         Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is null");
238         try {
239             if (!this.keepAlive.get()) {
240                 this.conn.close();
241             }
242             responseConsumer.responseCompleted(this.localContext);
243             final T result = responseConsumer.getResult();
244             final Exception ex = responseConsumer.getException();
245             if (result != null) {
246                 this.resultQueue.add(result);
247             } else {
248                 this.future.failed(ex);
249                 this.conn.shutdown();
250             }
251             if (!conn.isOpen()) {
252                 if (this.closed.compareAndSet(false, true)) {
253                     releaseResources();
254                 }
255             }
256             if (!this.future.isDone() && this.responseConsumerQueue.isEmpty()) {
257                 this.future.completed(new ArrayList<T>(this.resultQueue));
258                 this.resultQueue.clear();
259             }
260         } catch (final RuntimeException ex) {
261             failed(ex);
262             throw ex;
263         }
264     }
265 
266     @Override
267     public void inputTerminated() {
268         failed(new ConnectionClosedException());
269     }
270 
271     @Override
272     public void failed(final Exception ex) {
273         if (this.closed.compareAndSet(false, true)) {
274             try {
275                 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
276                 if (requestProducer != null) {
277                     requestProducer.failed(ex);
278                 }
279                 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
280                 if (responseConsumer != null) {
281                     responseConsumer.failed(ex);
282                 }
283             } finally {
284                 try {
285                     this.future.failed(ex);
286                 } finally {
287                     releaseResources();
288                 }
289             }
290         }
291     }
292 
293     @Override
294     public boolean cancel() {
295         if (this.closed.compareAndSet(false, true)) {
296             try {
297                 try {
298                     final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
299                     return responseConsumer != null && responseConsumer.cancel();
300                 } finally {
301                     this.future.cancel();
302                 }
303             } finally {
304                 releaseResources();
305             }
306         }
307         return false;
308     }
309 
310     @Override
311     public boolean isDone() {
312         return this.future.isDone();
313     }
314 
315 }