View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.http.ConnectionClosedException;
35  import org.apache.http.ConnectionReuseStrategy;
36  import org.apache.http.HttpException;
37  import org.apache.http.HttpRequest;
38  import org.apache.http.HttpResponse;
39  import org.apache.http.concurrent.BasicFuture;
40  import org.apache.http.concurrent.FutureCallback;
41  import org.apache.http.impl.DefaultConnectionReuseStrategy;
42  import org.apache.http.nio.ContentDecoder;
43  import org.apache.http.nio.ContentEncoder;
44  import org.apache.http.nio.IOControl;
45  import org.apache.http.nio.NHttpClientConnection;
46  import org.apache.http.protocol.HttpContext;
47  import org.apache.http.protocol.HttpCoreContext;
48  import org.apache.http.protocol.HttpProcessor;
49  import org.apache.http.util.Args;
50  
51  /**
52   * Basic implementation of {@link HttpAsyncClientExchangeHandler} that executes
53   * a single HTTP request / response exchange.
54   *
55   * @param <T> the result type of request execution.
56   * @since 4.3
57   */
58  public class BasicAsyncClientExchangeHandler<T> implements HttpAsyncClientExchangeHandler {
59  
60      private final HttpAsyncRequestProducer requestProducer;
61      private final HttpAsyncResponseConsumer<T> responseConsumer;
62      private final BasicFuture<T> future;
63      private final HttpContext localContext;
64      private final NHttpClientConnection conn;
65      private final HttpProcessor httpPocessor;
66      private final ConnectionReuseStrategy connReuseStrategy;
67      private final AtomicBoolean requestSent;
68      private final AtomicBoolean keepAlive;
69      private final AtomicBoolean closed;
70  
71      /**
72       * Creates new instance of BasicAsyncRequestExecutionHandler.
73       *
74       * @param requestProducer the request producer.
75       * @param responseConsumer the response consumer.
76       * @param callback the future callback invoked when the operation is completed.
77       * @param localContext the local execution context.
78       * @param conn the actual connection.
79       * @param httpPocessor the HTTP protocol processor.
80       * @param connReuseStrategy the connection re-use strategy.
81       */
82      public BasicAsyncClientExchangeHandler(
83              final HttpAsyncRequestProducer requestProducer,
84              final HttpAsyncResponseConsumer<T> responseConsumer,
85              final FutureCallback<T> callback,
86              final HttpContext localContext,
87              final NHttpClientConnection conn,
88              final HttpProcessor httpPocessor,
89              final ConnectionReuseStrategy connReuseStrategy) {
90          super();
91          this.requestProducer = Args.notNull(requestProducer, "Request producer");
92          this.responseConsumer = Args.notNull(responseConsumer, "Response consumer");
93          this.future = new BasicFuture<T>(callback);
94          this.localContext = Args.notNull(localContext, "HTTP context");
95          this.conn = Args.notNull(conn, "HTTP connection");
96          this.httpPocessor = Args.notNull(httpPocessor, "HTTP processor");
97          this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
98              DefaultConnectionReuseStrategy.INSTANCE;
99          this.requestSent = new AtomicBoolean(false);
100         this.keepAlive = new AtomicBoolean(false);
101         this.closed = new AtomicBoolean(false);
102     }
103 
104     /**
105      * Creates new instance of BasicAsyncRequestExecutionHandler.
106      *
107      * @param requestProducer the request producer.
108      * @param responseConsumer the response consumer.
109      * @param localContext the local execution context.
110      * @param conn the actual connection.
111      * @param httpPocessor the HTTP protocol processor.
112      */
113     public BasicAsyncClientExchangeHandler(
114             final HttpAsyncRequestProducer requestProducer,
115             final HttpAsyncResponseConsumer<T> responseConsumer,
116             final HttpContext localContext,
117             final NHttpClientConnection conn,
118             final HttpProcessor httpPocessor) {
119         this(requestProducer, responseConsumer, null, localContext, conn, httpPocessor, null);
120     }
121 
122     public Future<T> getFuture() {
123         return this.future;
124     }
125 
126     private void releaseResources() {
127         try {
128             this.responseConsumer.close();
129         } catch (final IOException ex) {
130         }
131         try {
132             this.requestProducer.close();
133         } catch (final IOException ex) {
134         }
135     }
136 
137     @Override
138     public void close() throws IOException {
139         if (this.closed.compareAndSet(false, true)) {
140             releaseResources();
141             if (!this.future.isDone()) {
142                 this.future.cancel();
143             }
144         }
145     }
146 
147     @Override
148     public HttpRequest generateRequest() throws IOException, HttpException {
149         if (isDone()) {
150             return null;
151         }
152         final HttpRequest request = this.requestProducer.generateRequest();
153         this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
154         this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, this.conn);
155         this.httpPocessor.process(request, this.localContext);
156         return request;
157     }
158 
159     @Override
160     public void produceContent(
161             final ContentEncoder encoder, final IOControl ioControl) throws IOException {
162         this.requestProducer.produceContent(encoder, ioControl);
163     }
164 
165     @Override
166     public void requestCompleted() {
167         this.requestProducer.requestCompleted(this.localContext);
168         this.requestSent.set(true);
169     }
170 
171     @Override
172     public void responseReceived(final HttpResponse response) throws IOException, HttpException {
173         this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
174         this.httpPocessor.process(response, this.localContext);
175         this.responseConsumer.responseReceived(response);
176         this.keepAlive.set(this.connReuseStrategy.keepAlive(response, this.localContext));
177     }
178 
179     @Override
180     public void consumeContent(
181             final ContentDecoder decoder, final IOControl ioControl) throws IOException {
182         this.responseConsumer.consumeContent(decoder, ioControl);
183     }
184 
185     @Override
186     public void responseCompleted() throws IOException {
187         try {
188             if (!this.keepAlive.get()) {
189                 this.conn.close();
190             }
191             this.responseConsumer.responseCompleted(this.localContext);
192             final T result = this.responseConsumer.getResult();
193             final Exception ex = this.responseConsumer.getException();
194             if (result != null) {
195                 this.future.completed(result);
196             } else {
197                 this.future.failed(ex);
198             }
199             if (this.closed.compareAndSet(false, true)) {
200                 releaseResources();
201             }
202         } catch (final RuntimeException ex) {
203             failed(ex);
204             throw ex;
205         }
206     }
207 
208     @Override
209     public void inputTerminated() {
210         failed(new ConnectionClosedException());
211     }
212 
213     @Override
214     public void failed(final Exception ex) {
215         if (this.closed.compareAndSet(false, true)) {
216             try {
217                 if (!this.requestSent.get()) {
218                     this.requestProducer.failed(ex);
219                 }
220                 this.responseConsumer.failed(ex);
221             } finally {
222                 try {
223                     this.future.failed(ex);
224                 } finally {
225                     releaseResources();
226                 }
227             }
228         }
229     }
230 
231     @Override
232     public boolean cancel() {
233         if (this.closed.compareAndSet(false, true)) {
234             try {
235                 try {
236                     return this.responseConsumer.cancel();
237                 } finally {
238                     this.future.cancel();
239                 }
240             } finally {
241                 releaseResources();
242             }
243         }
244         return false;
245     }
246 
247     @Override
248     public boolean isDone() {
249         return this.responseConsumer.isDone();
250     }
251 
252 }