1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Function;
42
43 import org.apache.hc.client5.http.HttpRoute;
44 import org.apache.hc.client5.http.async.AsyncExecCallback;
45 import org.apache.hc.client5.http.async.AsyncExecChain;
46 import org.apache.hc.client5.http.async.AsyncExecRuntime;
47 import org.apache.hc.client5.http.auth.AuthSchemeFactory;
48 import org.apache.hc.client5.http.auth.CredentialsProvider;
49 import org.apache.hc.client5.http.config.Configurable;
50 import org.apache.hc.client5.http.config.RequestConfig;
51 import org.apache.hc.client5.http.cookie.CookieSpecFactory;
52 import org.apache.hc.client5.http.cookie.CookieStore;
53 import org.apache.hc.client5.http.impl.ExecSupport;
54 import org.apache.hc.client5.http.protocol.HttpClientContext;
55 import org.apache.hc.client5.http.routing.RoutingSupport;
56 import org.apache.hc.core5.concurrent.Cancellable;
57 import org.apache.hc.core5.concurrent.ComplexFuture;
58 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
59 import org.apache.hc.core5.concurrent.FutureCallback;
60 import org.apache.hc.core5.http.EntityDetails;
61 import org.apache.hc.core5.http.HttpException;
62 import org.apache.hc.core5.http.HttpHost;
63 import org.apache.hc.core5.http.HttpRequest;
64 import org.apache.hc.core5.http.HttpResponse;
65 import org.apache.hc.core5.http.HttpStatus;
66 import org.apache.hc.core5.http.config.Lookup;
67 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
68 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
69 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
70 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
71 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
72 import org.apache.hc.core5.http.nio.DataStreamChannel;
73 import org.apache.hc.core5.http.nio.HandlerFactory;
74 import org.apache.hc.core5.http.protocol.HttpContext;
75 import org.apache.hc.core5.http.support.BasicRequestBuilder;
76 import org.apache.hc.core5.io.CloseMode;
77 import org.apache.hc.core5.io.ModalCloseable;
78 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
79 import org.apache.hc.core5.util.TimeValue;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82
83 abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
84
85 private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor", true);
86
87 private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
88
89 private final AsyncExecChainElement execChain;
90 private final Lookup<CookieSpecFactory> cookieSpecRegistry;
91 private final Lookup<AuthSchemeFactory> authSchemeRegistry;
92 private final CookieStore cookieStore;
93 private final CredentialsProvider credentialsProvider;
94 private final Function<HttpContext, HttpClientContext> contextAdaptor;
95 private final RequestConfig defaultConfig;
96 private final ConcurrentLinkedQueue<Closeable> closeables;
97 private final ScheduledExecutorService scheduledExecutorService;
98 private final AsyncExecChain.Scheduler scheduler;
99
100 InternalAbstractHttpAsyncClient(
101 final DefaultConnectingIOReactor ioReactor,
102 final AsyncPushConsumerRegistry pushConsumerRegistry,
103 final ThreadFactory threadFactory,
104 final AsyncExecChainElement execChain,
105 final Lookup<CookieSpecFactory> cookieSpecRegistry,
106 final Lookup<AuthSchemeFactory> authSchemeRegistry,
107 final CookieStore cookieStore,
108 final CredentialsProvider credentialsProvider,
109 final Function<HttpContext, HttpClientContext> contextAdaptor,
110 final RequestConfig defaultConfig,
111 final List<Closeable> closeables) {
112 super(ioReactor, pushConsumerRegistry, threadFactory);
113 this.execChain = execChain;
114 this.cookieSpecRegistry = cookieSpecRegistry;
115 this.authSchemeRegistry = authSchemeRegistry;
116 this.cookieStore = cookieStore;
117 this.credentialsProvider = credentialsProvider;
118 this.contextAdaptor = contextAdaptor;
119 this.defaultConfig = defaultConfig;
120 this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
121 this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
122 this.scheduler = new AsyncExecChain.Scheduler() {
123
124 @Override
125 public void scheduleExecution(
126 final HttpRequest request,
127 final AsyncEntityProducer entityProducer,
128 final AsyncExecChain.Scope scope,
129 final AsyncExecCallback asyncExecCallback,
130 final TimeValue delay) {
131 executeScheduled(request, entityProducer, scope, execChain::execute, asyncExecCallback, delay);
132 }
133
134 @Override
135 public void scheduleExecution(
136 final HttpRequest request,
137 final AsyncEntityProducer entityProducer,
138 final AsyncExecChain.Scope scope,
139 final AsyncExecChain chain,
140 final AsyncExecCallback asyncExecCallback,
141 final TimeValue delay) {
142 executeScheduled(request, entityProducer, scope, chain, asyncExecCallback, delay);
143 }
144 };
145
146 }
147
148 @Override
149 void internalClose(final CloseMode closeMode) {
150 if (this.closeables != null) {
151 Closeable closeable;
152 while ((closeable = this.closeables.poll()) != null) {
153 try {
154 if (closeable instanceof ModalCloseable) {
155 ((ModalCloseable) closeable).close(closeMode);
156 } else {
157 closeable.close();
158 }
159 } catch (final IOException ex) {
160 LOG.error(ex.getMessage(), ex);
161 }
162 }
163 }
164 final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
165 for (final Runnable runnable: runnables) {
166 if (runnable instanceof Cancellable) {
167 ((Cancellable) runnable).cancel();
168 }
169 }
170 }
171
172 private void setupContext(final HttpClientContext context) {
173 if (context.getAuthSchemeRegistry() == null) {
174 context.setAuthSchemeRegistry(authSchemeRegistry);
175 }
176 if (context.getCookieSpecRegistry() == null) {
177 context.setCookieSpecRegistry(cookieSpecRegistry);
178 }
179 if (context.getCookieStore() == null) {
180 context.setCookieStore(cookieStore);
181 }
182 if (context.getCredentialsProvider() == null) {
183 context.setCredentialsProvider(credentialsProvider);
184 }
185 if (context.getRequestConfig() == null) {
186 context.setRequestConfig(defaultConfig);
187 }
188 }
189
190 abstract AsyncExecRuntime createAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
191
192 abstract HttpRoute determineRoute(HttpHost httpHost, HttpRequest request, HttpClientContext clientContext) throws HttpException;
193
194 @Override
195 protected <T> Future<T> doExecute(
196 final HttpHost httpHost,
197 final AsyncRequestProducer requestProducer,
198 final AsyncResponseConsumer<T> responseConsumer,
199 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
200 final HttpContext context,
201 final FutureCallback<T> callback) {
202 final ComplexFuture<T> future = new ComplexFuture<>(callback);
203 try {
204 if (!isRunning()) {
205 throw new CancellationException("Request execution cancelled");
206 }
207 final HttpClientContext clientContext = contextAdaptor.apply(context);
208 requestProducer.sendRequest((request, entityDetails, c) -> {
209
210 RequestConfig requestConfig = null;
211 if (request instanceof Configurable) {
212 requestConfig = ((Configurable) request).getConfig();
213 }
214 if (requestConfig != null) {
215 clientContext.setRequestConfig(requestConfig);
216 }
217
218 setupContext(clientContext);
219
220 final HttpRoute route = determineRoute(
221 httpHost != null ? httpHost : RoutingSupport.determineHost(request),
222 request,
223 clientContext);
224 final String exchangeId = ExecSupport.getNextExchangeId();
225 clientContext.setExchangeId(exchangeId);
226 if (LOG.isDebugEnabled()) {
227 LOG.debug("{} preparing request execution", exchangeId);
228 }
229 final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory);
230
231 final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
232 clientContext, execRuntime, scheduler, new AtomicInteger(1));
233 final AtomicBoolean outputTerminated = new AtomicBoolean(false);
234 executeImmediate(
235 BasicRequestBuilder.copy(request).build(),
236 entityDetails != null ? new AsyncEntityProducer() {
237
238 @Override
239 public void releaseResources() {
240 requestProducer.releaseResources();
241 }
242
243 @Override
244 public void failed(final Exception cause) {
245 requestProducer.failed(cause);
246 }
247
248 @Override
249 public boolean isRepeatable() {
250 return requestProducer.isRepeatable();
251 }
252
253 @Override
254 public long getContentLength() {
255 return entityDetails.getContentLength();
256 }
257
258 @Override
259 public String getContentType() {
260 return entityDetails.getContentType();
261 }
262
263 @Override
264 public String getContentEncoding() {
265 return entityDetails.getContentEncoding();
266 }
267
268 @Override
269 public boolean isChunked() {
270 return entityDetails.isChunked();
271 }
272
273 @Override
274 public Set<String> getTrailerNames() {
275 return entityDetails.getTrailerNames();
276 }
277
278 @Override
279 public int available() {
280 return requestProducer.available();
281 }
282
283 @Override
284 public void produce(final DataStreamChannel channel) throws IOException {
285 if (outputTerminated.get()) {
286 channel.endStream();
287 return;
288 }
289 requestProducer.produce(channel);
290 }
291
292 } : null,
293 scope,
294 execChain::execute,
295 new AsyncExecCallback() {
296
297 @Override
298 public AsyncDataConsumer handleResponse(
299 final HttpResponse response,
300 final EntityDetails entityDetails) throws HttpException, IOException {
301 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
302 outputTerminated.set(true);
303 requestProducer.releaseResources();
304 }
305 responseConsumer.consumeResponse(response, entityDetails, c,
306 new FutureCallback<T>() {
307
308 @Override
309 public void completed(final T result) {
310 future.completed(result);
311 }
312
313 @Override
314 public void failed(final Exception ex) {
315 future.failed(ex);
316 }
317
318 @Override
319 public void cancelled() {
320 future.cancel();
321 }
322
323 });
324 return entityDetails != null ? responseConsumer : null;
325 }
326
327 @Override
328 public void handleInformationResponse(
329 final HttpResponse response) throws HttpException, IOException {
330 responseConsumer.informationResponse(response, c);
331 }
332
333 @Override
334 public void completed() {
335 if (LOG.isDebugEnabled()) {
336 LOG.debug("{} message exchange successfully completed", exchangeId);
337 }
338 try {
339 execRuntime.releaseEndpoint();
340 } finally {
341 responseConsumer.releaseResources();
342 requestProducer.releaseResources();
343 }
344 }
345
346 @Override
347 public void failed(final Exception cause) {
348 if (LOG.isDebugEnabled()) {
349 LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
350 }
351 try {
352 execRuntime.discardEndpoint();
353 responseConsumer.failed(cause);
354 } finally {
355 try {
356 future.failed(cause);
357 } finally {
358 responseConsumer.releaseResources();
359 requestProducer.releaseResources();
360 }
361 }
362 }
363
364 });
365 }, context);
366 } catch (final HttpException | IOException | IllegalStateException ex) {
367 future.failed(ex);
368 }
369 return future;
370 }
371
372 void executeImmediate(
373 final HttpRequest request,
374 final AsyncEntityProducer entityProducer,
375 final AsyncExecChain.Scope scope,
376 final AsyncExecChain chain,
377 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
378 chain.proceed(request, entityProducer, scope, asyncExecCallback);
379 }
380
381 void executeScheduled(
382 final HttpRequest request,
383 final AsyncEntityProducer entityProducer,
384 final AsyncExecChain.Scope scope,
385 final AsyncExecChain chain,
386 final AsyncExecCallback asyncExecCallback,
387 final TimeValue delay) {
388 final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
389 request, entityProducer, scope, chain, asyncExecCallback, delay);
390 if (TimeValue.isPositive(delay)) {
391 scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
392 } else {
393 scheduledExecutorService.execute(scheduledTask);
394 }
395 }
396
397 class ScheduledRequestExecution implements Runnable, Cancellable {
398
399 final HttpRequest request;
400 final AsyncEntityProducer entityProducer;
401 final AsyncExecChain.Scope scope;
402 final AsyncExecChain chain;
403 final AsyncExecCallback asyncExecCallback;
404 final TimeValue delay;
405
406 ScheduledRequestExecution(final HttpRequest request,
407 final AsyncEntityProducer entityProducer,
408 final AsyncExecChain.Scope scope,
409 final AsyncExecChain chain,
410 final AsyncExecCallback asyncExecCallback,
411 final TimeValue delay) {
412 this.request = request;
413 this.entityProducer = entityProducer;
414 this.scope = scope;
415 this.chain = chain;
416 this.asyncExecCallback = asyncExecCallback;
417 this.delay = delay;
418 }
419
420 @Override
421 public void run() {
422 try {
423 chain.proceed(request, entityProducer, scope, asyncExecCallback);
424 } catch (final Exception ex) {
425 asyncExecCallback.failed(ex);
426 }
427 }
428
429 @Override
430 public boolean cancel() {
431 asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
432 return true;
433 }
434
435 }
436
437 }