1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.IOException;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.http.ConnectionClosedException;
35 import org.apache.http.ConnectionReuseStrategy;
36 import org.apache.http.HttpException;
37 import org.apache.http.HttpRequest;
38 import org.apache.http.HttpResponse;
39 import org.apache.http.concurrent.BasicFuture;
40 import org.apache.http.concurrent.FutureCallback;
41 import org.apache.http.impl.DefaultConnectionReuseStrategy;
42 import org.apache.http.nio.ContentDecoder;
43 import org.apache.http.nio.ContentEncoder;
44 import org.apache.http.nio.IOControl;
45 import org.apache.http.nio.NHttpClientConnection;
46 import org.apache.http.protocol.HttpContext;
47 import org.apache.http.protocol.HttpCoreContext;
48 import org.apache.http.protocol.HttpProcessor;
49 import org.apache.http.util.Args;
50
51
52
53
54
55
56
57
58 public class BasicAsyncClientExchangeHandler<T> implements HttpAsyncClientExchangeHandler {
59
60 private final HttpAsyncRequestProducer requestProducer;
61 private final HttpAsyncResponseConsumer<T> responseConsumer;
62 private final BasicFuture<T> future;
63 private final HttpContext localContext;
64 private final NHttpClientConnection conn;
65 private final HttpProcessor httpPocessor;
66 private final ConnectionReuseStrategy connReuseStrategy;
67 private final AtomicBoolean requestSent;
68 private final AtomicBoolean keepAlive;
69 private final AtomicBoolean closed;
70
71
72
73
74
75
76
77
78
79
80
81
82 public BasicAsyncClientExchangeHandler(
83 final HttpAsyncRequestProducer requestProducer,
84 final HttpAsyncResponseConsumer<T> responseConsumer,
85 final FutureCallback<T> callback,
86 final HttpContext localContext,
87 final NHttpClientConnection conn,
88 final HttpProcessor httpPocessor,
89 final ConnectionReuseStrategy connReuseStrategy) {
90 super();
91 this.requestProducer = Args.notNull(requestProducer, "Request producer");
92 this.responseConsumer = Args.notNull(responseConsumer, "Response consumer");
93 this.future = new BasicFuture<T>(callback);
94 this.localContext = Args.notNull(localContext, "HTTP context");
95 this.conn = Args.notNull(conn, "HTTP connection");
96 this.httpPocessor = Args.notNull(httpPocessor, "HTTP processor");
97 this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
98 DefaultConnectionReuseStrategy.INSTANCE;
99 this.requestSent = new AtomicBoolean(false);
100 this.keepAlive = new AtomicBoolean(false);
101 this.closed = new AtomicBoolean(false);
102 }
103
104
105
106
107
108
109
110
111
112
113 public BasicAsyncClientExchangeHandler(
114 final HttpAsyncRequestProducer requestProducer,
115 final HttpAsyncResponseConsumer<T> responseConsumer,
116 final HttpContext localContext,
117 final NHttpClientConnection conn,
118 final HttpProcessor httpPocessor) {
119 this(requestProducer, responseConsumer, null, localContext, conn, httpPocessor, null);
120 }
121
122 public Future<T> getFuture() {
123 return this.future;
124 }
125
126 private void releaseResources() {
127 try {
128 this.responseConsumer.close();
129 } catch (final IOException ex) {
130 }
131 try {
132 this.requestProducer.close();
133 } catch (final IOException ex) {
134 }
135 }
136
137 @Override
138 public void close() throws IOException {
139 if (this.closed.compareAndSet(false, true)) {
140 releaseResources();
141 if (!this.future.isDone()) {
142 this.future.cancel();
143 }
144 }
145 }
146
147 @Override
148 public HttpRequest generateRequest() throws IOException, HttpException {
149 if (isDone()) {
150 return null;
151 }
152 final HttpRequest request = this.requestProducer.generateRequest();
153 this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
154 this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, this.conn);
155 this.httpPocessor.process(request, this.localContext);
156 return request;
157 }
158
159 @Override
160 public void produceContent(
161 final ContentEncoder encoder, final IOControl ioControl) throws IOException {
162 this.requestProducer.produceContent(encoder, ioControl);
163 }
164
165 @Override
166 public void requestCompleted() {
167 this.requestProducer.requestCompleted(this.localContext);
168 this.requestSent.set(true);
169 }
170
171 @Override
172 public void responseReceived(final HttpResponse response) throws IOException, HttpException {
173 this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
174 this.httpPocessor.process(response, this.localContext);
175 this.responseConsumer.responseReceived(response);
176 this.keepAlive.set(this.connReuseStrategy.keepAlive(response, this.localContext));
177 }
178
179 @Override
180 public void consumeContent(
181 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
182 this.responseConsumer.consumeContent(decoder, ioControl);
183 }
184
185 @Override
186 public void responseCompleted() throws IOException {
187 try {
188 if (!this.keepAlive.get()) {
189 this.conn.close();
190 }
191 this.responseConsumer.responseCompleted(this.localContext);
192 final T result = this.responseConsumer.getResult();
193 final Exception ex = this.responseConsumer.getException();
194 if (result != null) {
195 this.future.completed(result);
196 } else {
197 this.future.failed(ex);
198 }
199 if (this.closed.compareAndSet(false, true)) {
200 releaseResources();
201 }
202 } catch (final RuntimeException ex) {
203 failed(ex);
204 throw ex;
205 }
206 }
207
208 @Override
209 public void inputTerminated() {
210 failed(new ConnectionClosedException());
211 }
212
213 @Override
214 public void failed(final Exception ex) {
215 if (this.closed.compareAndSet(false, true)) {
216 try {
217 if (!this.requestSent.get()) {
218 this.requestProducer.failed(ex);
219 }
220 this.responseConsumer.failed(ex);
221 } finally {
222 try {
223 this.future.failed(ex);
224 } finally {
225 releaseResources();
226 }
227 }
228 }
229 }
230
231 @Override
232 public boolean cancel() {
233 if (this.closed.compareAndSet(false, true)) {
234 try {
235 try {
236 return this.responseConsumer.cancel();
237 } finally {
238 this.future.cancel();
239 }
240 } finally {
241 releaseResources();
242 }
243 }
244 return false;
245 }
246
247 @Override
248 public boolean isDone() {
249 return this.responseConsumer.isDone();
250 }
251
252 }