1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41
42 import org.apache.hc.client5.http.HttpRoute;
43 import org.apache.hc.client5.http.async.AsyncExecCallback;
44 import org.apache.hc.client5.http.async.AsyncExecChain;
45 import org.apache.hc.client5.http.async.AsyncExecRuntime;
46 import org.apache.hc.client5.http.auth.AuthSchemeFactory;
47 import org.apache.hc.client5.http.auth.CredentialsProvider;
48 import org.apache.hc.client5.http.config.Configurable;
49 import org.apache.hc.client5.http.config.RequestConfig;
50 import org.apache.hc.client5.http.cookie.CookieSpecFactory;
51 import org.apache.hc.client5.http.cookie.CookieStore;
52 import org.apache.hc.client5.http.impl.ExecSupport;
53 import org.apache.hc.client5.http.protocol.HttpClientContext;
54 import org.apache.hc.client5.http.routing.RoutingSupport;
55 import org.apache.hc.core5.concurrent.Cancellable;
56 import org.apache.hc.core5.concurrent.ComplexFuture;
57 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
58 import org.apache.hc.core5.concurrent.FutureCallback;
59 import org.apache.hc.core5.http.EntityDetails;
60 import org.apache.hc.core5.http.HttpException;
61 import org.apache.hc.core5.http.HttpHost;
62 import org.apache.hc.core5.http.HttpRequest;
63 import org.apache.hc.core5.http.HttpResponse;
64 import org.apache.hc.core5.http.HttpStatus;
65 import org.apache.hc.core5.http.config.Lookup;
66 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
67 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
68 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
69 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
70 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
71 import org.apache.hc.core5.http.nio.DataStreamChannel;
72 import org.apache.hc.core5.http.nio.HandlerFactory;
73 import org.apache.hc.core5.http.protocol.HttpContext;
74 import org.apache.hc.core5.http.support.BasicRequestBuilder;
75 import org.apache.hc.core5.io.CloseMode;
76 import org.apache.hc.core5.io.ModalCloseable;
77 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
78 import org.apache.hc.core5.util.TimeValue;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
83
84 private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor", true);
85
86 private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
87 private final AsyncExecChainElement execChain;
88 private final Lookup<CookieSpecFactory> cookieSpecRegistry;
89 private final Lookup<AuthSchemeFactory> authSchemeRegistry;
90 private final CookieStore cookieStore;
91 private final CredentialsProvider credentialsProvider;
92 private final RequestConfig defaultConfig;
93 private final ConcurrentLinkedQueue<Closeable> closeables;
94 private final ScheduledExecutorService scheduledExecutorService;
95 private final AsyncExecChain.Scheduler scheduler;
96
97 InternalAbstractHttpAsyncClient(
98 final DefaultConnectingIOReactor ioReactor,
99 final AsyncPushConsumerRegistry pushConsumerRegistry,
100 final ThreadFactory threadFactory,
101 final AsyncExecChainElement execChain,
102 final Lookup<CookieSpecFactory> cookieSpecRegistry,
103 final Lookup<AuthSchemeFactory> authSchemeRegistry,
104 final CookieStore cookieStore,
105 final CredentialsProvider credentialsProvider,
106 final RequestConfig defaultConfig,
107 final List<Closeable> closeables) {
108 super(ioReactor, pushConsumerRegistry, threadFactory);
109 this.execChain = execChain;
110 this.cookieSpecRegistry = cookieSpecRegistry;
111 this.authSchemeRegistry = authSchemeRegistry;
112 this.cookieStore = cookieStore;
113 this.credentialsProvider = credentialsProvider;
114 this.defaultConfig = defaultConfig;
115 this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
116 this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
117 this.scheduler = new AsyncExecChain.Scheduler() {
118
119 @Override
120 public void scheduleExecution(
121 final HttpRequest request,
122 final AsyncEntityProducer entityProducer,
123 final AsyncExecChain.Scope scope,
124 final AsyncExecCallback asyncExecCallback,
125 final TimeValue delay) {
126 executeScheduled(request, entityProducer, scope, execChain::execute, asyncExecCallback, delay);
127 }
128
129 @Override
130 public void scheduleExecution(
131 final HttpRequest request,
132 final AsyncEntityProducer entityProducer,
133 final AsyncExecChain.Scope scope,
134 final AsyncExecChain chain,
135 final AsyncExecCallback asyncExecCallback,
136 final TimeValue delay) {
137 executeScheduled(request, entityProducer, scope, chain, asyncExecCallback, delay);
138 }
139 };
140
141 }
142
143 @Override
144 void internalClose(final CloseMode closeMode) {
145 if (this.closeables != null) {
146 Closeable closeable;
147 while ((closeable = this.closeables.poll()) != null) {
148 try {
149 if (closeable instanceof ModalCloseable) {
150 ((ModalCloseable) closeable).close(closeMode);
151 } else {
152 closeable.close();
153 }
154 } catch (final IOException ex) {
155 LOG.error(ex.getMessage(), ex);
156 }
157 }
158 }
159 final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
160 for (final Runnable runnable: runnables) {
161 if (runnable instanceof Cancellable) {
162 ((Cancellable) runnable).cancel();
163 }
164 }
165 }
166
167 private void setupContext(final HttpClientContext context) {
168 if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
169 context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
170 }
171 if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
172 context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
173 }
174 if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
175 context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
176 }
177 if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
178 context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
179 }
180 if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
181 context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
182 }
183 }
184
185 abstract AsyncExecRuntime createAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
186
187 abstract HttpRoute determineRoute(HttpHost httpHost, HttpClientContext clientContext) throws HttpException;
188
189 @Override
190 protected <T> Future<T> doExecute(
191 final HttpHost httpHost,
192 final AsyncRequestProducer requestProducer,
193 final AsyncResponseConsumer<T> responseConsumer,
194 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
195 final HttpContext context,
196 final FutureCallback<T> callback) {
197 final ComplexFuture<T> future = new ComplexFuture<>(callback);
198 try {
199 if (!isRunning()) {
200 throw new CancellationException("Request execution cancelled");
201 }
202 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
203 requestProducer.sendRequest((request, entityDetails, c) -> {
204
205 RequestConfig requestConfig = null;
206 if (request instanceof Configurable) {
207 requestConfig = ((Configurable) request).getConfig();
208 }
209 if (requestConfig != null) {
210 clientContext.setRequestConfig(requestConfig);
211 }
212
213 setupContext(clientContext);
214
215 final HttpRoute route = determineRoute(
216 httpHost != null ? httpHost : RoutingSupport.determineHost(request),
217 clientContext);
218 final String exchangeId = ExecSupport.getNextExchangeId();
219 clientContext.setExchangeId(exchangeId);
220 if (LOG.isDebugEnabled()) {
221 LOG.debug("{} preparing request execution", exchangeId);
222 }
223 final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory);
224
225 final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
226 clientContext, execRuntime, scheduler, new AtomicInteger(1));
227 final AtomicBoolean outputTerminated = new AtomicBoolean(false);
228 executeImmediate(
229 BasicRequestBuilder.copy(request).build(),
230 entityDetails != null ? new AsyncEntityProducer() {
231
232 @Override
233 public void releaseResources() {
234 requestProducer.releaseResources();
235 }
236
237 @Override
238 public void failed(final Exception cause) {
239 requestProducer.failed(cause);
240 }
241
242 @Override
243 public boolean isRepeatable() {
244 return requestProducer.isRepeatable();
245 }
246
247 @Override
248 public long getContentLength() {
249 return entityDetails.getContentLength();
250 }
251
252 @Override
253 public String getContentType() {
254 return entityDetails.getContentType();
255 }
256
257 @Override
258 public String getContentEncoding() {
259 return entityDetails.getContentEncoding();
260 }
261
262 @Override
263 public boolean isChunked() {
264 return entityDetails.isChunked();
265 }
266
267 @Override
268 public Set<String> getTrailerNames() {
269 return entityDetails.getTrailerNames();
270 }
271
272 @Override
273 public int available() {
274 return requestProducer.available();
275 }
276
277 @Override
278 public void produce(final DataStreamChannel channel) throws IOException {
279 if (outputTerminated.get()) {
280 channel.endStream();
281 return;
282 }
283 requestProducer.produce(channel);
284 }
285
286 } : null,
287 scope,
288 execChain::execute,
289 new AsyncExecCallback() {
290
291 @Override
292 public AsyncDataConsumer handleResponse(
293 final HttpResponse response,
294 final EntityDetails entityDetails) throws HttpException, IOException {
295 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
296 outputTerminated.set(true);
297 requestProducer.releaseResources();
298 }
299 responseConsumer.consumeResponse(response, entityDetails, c,
300 new FutureCallback<T>() {
301
302 @Override
303 public void completed(final T result) {
304 future.completed(result);
305 }
306
307 @Override
308 public void failed(final Exception ex) {
309 future.failed(ex);
310 }
311
312 @Override
313 public void cancelled() {
314 future.cancel();
315 }
316
317 });
318 return entityDetails != null ? responseConsumer : null;
319 }
320
321 @Override
322 public void handleInformationResponse(
323 final HttpResponse response) throws HttpException, IOException {
324 responseConsumer.informationResponse(response, c);
325 }
326
327 @Override
328 public void completed() {
329 if (LOG.isDebugEnabled()) {
330 LOG.debug("{} message exchange successfully completed", exchangeId);
331 }
332 try {
333 execRuntime.releaseEndpoint();
334 } finally {
335 responseConsumer.releaseResources();
336 requestProducer.releaseResources();
337 }
338 }
339
340 @Override
341 public void failed(final Exception cause) {
342 if (LOG.isDebugEnabled()) {
343 LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
344 }
345 try {
346 execRuntime.discardEndpoint();
347 responseConsumer.failed(cause);
348 } finally {
349 try {
350 future.failed(cause);
351 } finally {
352 responseConsumer.releaseResources();
353 requestProducer.releaseResources();
354 }
355 }
356 }
357
358 });
359 }, context);
360 } catch (final HttpException | IOException | IllegalStateException ex) {
361 future.failed(ex);
362 }
363 return future;
364 }
365
366 void executeImmediate(
367 final HttpRequest request,
368 final AsyncEntityProducer entityProducer,
369 final AsyncExecChain.Scope scope,
370 final AsyncExecChain chain,
371 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
372 chain.proceed(request, entityProducer, scope, asyncExecCallback);
373 }
374
375 void executeScheduled(
376 final HttpRequest request,
377 final AsyncEntityProducer entityProducer,
378 final AsyncExecChain.Scope scope,
379 final AsyncExecChain chain,
380 final AsyncExecCallback asyncExecCallback,
381 final TimeValue delay) {
382 final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
383 request, entityProducer, scope, chain, asyncExecCallback, delay);
384 if (TimeValue.isPositive(delay)) {
385 scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
386 } else {
387 scheduledExecutorService.execute(scheduledTask);
388 }
389 }
390
391 class ScheduledRequestExecution implements Runnable, Cancellable {
392
393 final HttpRequest request;
394 final AsyncEntityProducer entityProducer;
395 final AsyncExecChain.Scope scope;
396 final AsyncExecChain chain;
397 final AsyncExecCallback asyncExecCallback;
398 final TimeValue delay;
399
400 ScheduledRequestExecution(final HttpRequest request,
401 final AsyncEntityProducer entityProducer,
402 final AsyncExecChain.Scope scope,
403 final AsyncExecChain chain,
404 final AsyncExecCallback asyncExecCallback,
405 final TimeValue delay) {
406 this.request = request;
407 this.entityProducer = entityProducer;
408 this.scope = scope;
409 this.chain = chain;
410 this.asyncExecCallback = asyncExecCallback;
411 this.delay = delay;
412 }
413
414 @Override
415 public void run() {
416 try {
417 chain.proceed(request, entityProducer, scope, asyncExecCallback);
418 } catch (final Exception ex) {
419 asyncExecCallback.failed(ex);
420 }
421 }
422
423 @Override
424 public boolean cancel() {
425 asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
426 return true;
427 }
428
429 }
430
431 }