1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.atomic.AtomicBoolean;
38
39 import org.apache.hc.client5.http.HttpRoute;
40 import org.apache.hc.client5.http.async.AsyncExecCallback;
41 import org.apache.hc.client5.http.async.AsyncExecChain;
42 import org.apache.hc.client5.http.async.AsyncExecRuntime;
43 import org.apache.hc.client5.http.auth.AuthSchemeFactory;
44 import org.apache.hc.client5.http.auth.CredentialsProvider;
45 import org.apache.hc.client5.http.config.Configurable;
46 import org.apache.hc.client5.http.config.RequestConfig;
47 import org.apache.hc.client5.http.cookie.CookieSpecFactory;
48 import org.apache.hc.client5.http.cookie.CookieStore;
49 import org.apache.hc.client5.http.impl.ExecSupport;
50 import org.apache.hc.client5.http.impl.RequestCopier;
51 import org.apache.hc.client5.http.protocol.HttpClientContext;
52 import org.apache.hc.client5.http.routing.RoutingSupport;
53 import org.apache.hc.core5.concurrent.ComplexFuture;
54 import org.apache.hc.core5.concurrent.FutureCallback;
55 import org.apache.hc.core5.http.EntityDetails;
56 import org.apache.hc.core5.http.HttpException;
57 import org.apache.hc.core5.http.HttpHost;
58 import org.apache.hc.core5.http.HttpRequest;
59 import org.apache.hc.core5.http.HttpResponse;
60 import org.apache.hc.core5.http.HttpStatus;
61 import org.apache.hc.core5.http.config.Lookup;
62 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
63 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
64 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
65 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
66 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
67 import org.apache.hc.core5.http.nio.DataStreamChannel;
68 import org.apache.hc.core5.http.nio.HandlerFactory;
69 import org.apache.hc.core5.http.nio.RequestChannel;
70 import org.apache.hc.core5.http.protocol.HttpContext;
71 import org.apache.hc.core5.io.CloseMode;
72 import org.apache.hc.core5.io.ModalCloseable;
73 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
78
79 private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
80 private final AsyncExecChainElement execChain;
81 private final Lookup<CookieSpecFactory> cookieSpecRegistry;
82 private final Lookup<AuthSchemeFactory> authSchemeRegistry;
83 private final CookieStore cookieStore;
84 private final CredentialsProvider credentialsProvider;
85 private final RequestConfig defaultConfig;
86 private final ConcurrentLinkedQueue<Closeable> closeables;
87
88 InternalAbstractHttpAsyncClient(
89 final DefaultConnectingIOReactor ioReactor,
90 final AsyncPushConsumerRegistry pushConsumerRegistry,
91 final ThreadFactory threadFactory,
92 final AsyncExecChainElement execChain,
93 final Lookup<CookieSpecFactory> cookieSpecRegistry,
94 final Lookup<AuthSchemeFactory> authSchemeRegistry,
95 final CookieStore cookieStore,
96 final CredentialsProvider credentialsProvider,
97 final RequestConfig defaultConfig,
98 final List<Closeable> closeables) {
99 super(ioReactor, pushConsumerRegistry, threadFactory);
100 this.execChain = execChain;
101 this.cookieSpecRegistry = cookieSpecRegistry;
102 this.authSchemeRegistry = authSchemeRegistry;
103 this.cookieStore = cookieStore;
104 this.credentialsProvider = credentialsProvider;
105 this.defaultConfig = defaultConfig;
106 this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
107 }
108
109 @Override
110 void internalClose(final CloseMode closeMode) {
111 if (this.closeables != null) {
112 Closeable closeable;
113 while ((closeable = this.closeables.poll()) != null) {
114 try {
115 if (closeable instanceof ModalCloseable) {
116 ((ModalCloseable) closeable).close(closeMode);
117 } else {
118 closeable.close();
119 }
120 } catch (final IOException ex) {
121 LOG.error(ex.getMessage(), ex);
122 }
123 }
124 }
125 }
126
127 private void setupContext(final HttpClientContext context) {
128 if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
129 context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
130 }
131 if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
132 context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
133 }
134 if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
135 context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
136 }
137 if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
138 context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
139 }
140 if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
141 context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
142 }
143 }
144
145 abstract AsyncExecRuntime createAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
146
147 abstract HttpRoute determineRoute(HttpHost httpHost, HttpClientContext clientContext) throws HttpException;
148
149 @Override
150 protected <T> Future<T> doExecute(
151 final HttpHost httpHost,
152 final AsyncRequestProducer requestProducer,
153 final AsyncResponseConsumer<T> responseConsumer,
154 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
155 final HttpContext context,
156 final FutureCallback<T> callback) {
157 final ComplexFuture<T> future = new ComplexFuture<>(callback);
158 try {
159 if (!isRunning()) {
160 throw new CancellationException("Request execution cancelled");
161 }
162 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
163 requestProducer.sendRequest(new RequestChannel() {
164
165 @Override
166 public void sendRequest(
167 final HttpRequest request,
168 final EntityDetails entityDetails,
169 final HttpContext context) throws HttpException, IOException {
170
171 RequestConfig requestConfig = null;
172 if (request instanceof Configurable) {
173 requestConfig = ((Configurable) request).getConfig();
174 }
175 if (requestConfig != null) {
176 clientContext.setRequestConfig(requestConfig);
177 }
178 final HttpRoute route = determineRoute(
179 httpHost != null ? httpHost : RoutingSupport.determineHost(request),
180 clientContext);
181 final String exchangeId = ExecSupport.getNextExchangeId();
182 if (LOG.isDebugEnabled()) {
183 LOG.debug("{}: preparing request execution", exchangeId);
184 }
185 final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory);
186
187 setupContext(clientContext);
188
189 final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
190 clientContext, execRuntime);
191 final AtomicBoolean outputTerminated = new AtomicBoolean(false);
192 execChain.execute(
193 RequestCopier.INSTANCE.copy(request),
194 entityDetails != null ? new AsyncEntityProducer() {
195
196 @Override
197 public void releaseResources() {
198 requestProducer.releaseResources();
199 }
200
201 @Override
202 public void failed(final Exception cause) {
203 requestProducer.failed(cause);
204 }
205
206 @Override
207 public boolean isRepeatable() {
208 return requestProducer.isRepeatable();
209 }
210
211 @Override
212 public long getContentLength() {
213 return entityDetails.getContentLength();
214 }
215
216 @Override
217 public String getContentType() {
218 return entityDetails.getContentType();
219 }
220
221 @Override
222 public String getContentEncoding() {
223 return entityDetails.getContentEncoding();
224 }
225
226 @Override
227 public boolean isChunked() {
228 return entityDetails.isChunked();
229 }
230
231 @Override
232 public Set<String> getTrailerNames() {
233 return entityDetails.getTrailerNames();
234 }
235
236 @Override
237 public int available() {
238 return requestProducer.available();
239 }
240
241 @Override
242 public void produce(final DataStreamChannel channel) throws IOException {
243 if (outputTerminated.get()) {
244 channel.endStream();
245 return;
246 }
247 requestProducer.produce(channel);
248 }
249
250 } : null,
251 scope,
252 new AsyncExecCallback() {
253
254 @Override
255 public AsyncDataConsumer handleResponse(
256 final HttpResponse response,
257 final EntityDetails entityDetails) throws HttpException, IOException {
258 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
259 outputTerminated.set(true);
260 requestProducer.releaseResources();
261 }
262 responseConsumer.consumeResponse(response, entityDetails, context,
263 new FutureCallback<T>() {
264
265 @Override
266 public void completed(final T result) {
267 future.completed(result);
268 }
269
270 @Override
271 public void failed(final Exception ex) {
272 future.failed(ex);
273 }
274
275 @Override
276 public void cancelled() {
277 future.cancel();
278 }
279
280 });
281 return responseConsumer;
282 }
283
284 @Override
285 public void handleInformationResponse(
286 final HttpResponse response) throws HttpException, IOException {
287 responseConsumer.informationResponse(response, context);
288 }
289
290 @Override
291 public void completed() {
292 if (LOG.isDebugEnabled()) {
293 LOG.debug("{}: message exchange successfully completed", exchangeId);
294 }
295 try {
296 execRuntime.releaseEndpoint();
297 } finally {
298 responseConsumer.releaseResources();
299 requestProducer.releaseResources();
300 }
301 }
302
303 @Override
304 public void failed(final Exception cause) {
305 if (LOG.isDebugEnabled()) {
306 LOG.debug("{}: request failed: {}", exchangeId, cause.getMessage());
307 }
308 try {
309 execRuntime.discardEndpoint();
310 responseConsumer.failed(cause);
311 } finally {
312 try {
313 future.failed(cause);
314 } finally {
315 responseConsumer.releaseResources();
316 requestProducer.releaseResources();
317 }
318 }
319 }
320
321 });
322 }
323
324 }, context);
325 } catch (final HttpException | IOException | IllegalStateException ex) {
326 future.failed(ex);
327 }
328 return future;
329 }
330
331 }