1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41
42 import org.apache.hc.client5.http.HttpRoute;
43 import org.apache.hc.client5.http.async.AsyncExecCallback;
44 import org.apache.hc.client5.http.async.AsyncExecChain;
45 import org.apache.hc.client5.http.async.AsyncExecRuntime;
46 import org.apache.hc.client5.http.auth.AuthSchemeFactory;
47 import org.apache.hc.client5.http.auth.CredentialsProvider;
48 import org.apache.hc.client5.http.config.Configurable;
49 import org.apache.hc.client5.http.config.RequestConfig;
50 import org.apache.hc.client5.http.cookie.CookieSpecFactory;
51 import org.apache.hc.client5.http.cookie.CookieStore;
52 import org.apache.hc.client5.http.impl.ExecSupport;
53 import org.apache.hc.client5.http.protocol.HttpClientContext;
54 import org.apache.hc.client5.http.routing.RoutingSupport;
55 import org.apache.hc.core5.concurrent.Cancellable;
56 import org.apache.hc.core5.concurrent.ComplexFuture;
57 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
58 import org.apache.hc.core5.concurrent.FutureCallback;
59 import org.apache.hc.core5.http.EntityDetails;
60 import org.apache.hc.core5.http.HttpException;
61 import org.apache.hc.core5.http.HttpHost;
62 import org.apache.hc.core5.http.HttpRequest;
63 import org.apache.hc.core5.http.HttpResponse;
64 import org.apache.hc.core5.http.HttpStatus;
65 import org.apache.hc.core5.http.config.Lookup;
66 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
67 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
68 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
69 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
70 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
71 import org.apache.hc.core5.http.nio.DataStreamChannel;
72 import org.apache.hc.core5.http.nio.HandlerFactory;
73 import org.apache.hc.core5.http.protocol.HttpContext;
74 import org.apache.hc.core5.http.support.BasicRequestBuilder;
75 import org.apache.hc.core5.io.CloseMode;
76 import org.apache.hc.core5.io.ModalCloseable;
77 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
78 import org.apache.hc.core5.util.TimeValue;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
83
84 private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor", true);
85
86 private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
87 private final AsyncExecChainElement execChain;
88 private final Lookup<CookieSpecFactory> cookieSpecRegistry;
89 private final Lookup<AuthSchemeFactory> authSchemeRegistry;
90 private final CookieStore cookieStore;
91 private final CredentialsProvider credentialsProvider;
92 private final RequestConfig defaultConfig;
93 private final ConcurrentLinkedQueue<Closeable> closeables;
94 private final ScheduledExecutorService scheduledExecutorService;
95
96 InternalAbstractHttpAsyncClient(
97 final DefaultConnectingIOReactor ioReactor,
98 final AsyncPushConsumerRegistry pushConsumerRegistry,
99 final ThreadFactory threadFactory,
100 final AsyncExecChainElement execChain,
101 final Lookup<CookieSpecFactory> cookieSpecRegistry,
102 final Lookup<AuthSchemeFactory> authSchemeRegistry,
103 final CookieStore cookieStore,
104 final CredentialsProvider credentialsProvider,
105 final RequestConfig defaultConfig,
106 final List<Closeable> closeables) {
107 super(ioReactor, pushConsumerRegistry, threadFactory);
108 this.execChain = execChain;
109 this.cookieSpecRegistry = cookieSpecRegistry;
110 this.authSchemeRegistry = authSchemeRegistry;
111 this.cookieStore = cookieStore;
112 this.credentialsProvider = credentialsProvider;
113 this.defaultConfig = defaultConfig;
114 this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
115 this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
116 }
117
118 @Override
119 void internalClose(final CloseMode closeMode) {
120 if (this.closeables != null) {
121 Closeable closeable;
122 while ((closeable = this.closeables.poll()) != null) {
123 try {
124 if (closeable instanceof ModalCloseable) {
125 ((ModalCloseable) closeable).close(closeMode);
126 } else {
127 closeable.close();
128 }
129 } catch (final IOException ex) {
130 LOG.error(ex.getMessage(), ex);
131 }
132 }
133 }
134 final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
135 for (final Runnable runnable: runnables) {
136 if (runnable instanceof Cancellable) {
137 ((Cancellable) runnable).cancel();
138 }
139 }
140 }
141
142 private void setupContext(final HttpClientContext context) {
143 if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
144 context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
145 }
146 if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
147 context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
148 }
149 if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
150 context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
151 }
152 if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
153 context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
154 }
155 if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
156 context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
157 }
158 }
159
160 abstract AsyncExecRuntime createAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
161
162 abstract HttpRoute determineRoute(HttpHost httpHost, HttpClientContext clientContext) throws HttpException;
163
164 @Override
165 protected <T> Future<T> doExecute(
166 final HttpHost httpHost,
167 final AsyncRequestProducer requestProducer,
168 final AsyncResponseConsumer<T> responseConsumer,
169 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
170 final HttpContext context,
171 final FutureCallback<T> callback) {
172 final ComplexFuture<T> future = new ComplexFuture<>(callback);
173 try {
174 if (!isRunning()) {
175 throw new CancellationException("Request execution cancelled");
176 }
177 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
178 requestProducer.sendRequest((request, entityDetails, c) -> {
179
180 RequestConfig requestConfig = null;
181 if (request instanceof Configurable) {
182 requestConfig = ((Configurable) request).getConfig();
183 }
184 if (requestConfig != null) {
185 clientContext.setRequestConfig(requestConfig);
186 }
187
188 setupContext(clientContext);
189
190 final HttpRoute route = determineRoute(
191 httpHost != null ? httpHost : RoutingSupport.determineHost(request),
192 clientContext);
193 final String exchangeId = ExecSupport.getNextExchangeId();
194 clientContext.setExchangeId(exchangeId);
195 if (LOG.isDebugEnabled()) {
196 LOG.debug("{} preparing request execution", exchangeId);
197 }
198 final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory);
199
200 final AsyncExecChain.Scheduler scheduler = this::executeScheduled;
201
202 final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
203 clientContext, execRuntime, scheduler, new AtomicInteger(1));
204 final AtomicBoolean outputTerminated = new AtomicBoolean(false);
205 executeImmediate(
206 BasicRequestBuilder.copy(request).build(),
207 entityDetails != null ? new AsyncEntityProducer() {
208
209 @Override
210 public void releaseResources() {
211 requestProducer.releaseResources();
212 }
213
214 @Override
215 public void failed(final Exception cause) {
216 requestProducer.failed(cause);
217 }
218
219 @Override
220 public boolean isRepeatable() {
221 return requestProducer.isRepeatable();
222 }
223
224 @Override
225 public long getContentLength() {
226 return entityDetails.getContentLength();
227 }
228
229 @Override
230 public String getContentType() {
231 return entityDetails.getContentType();
232 }
233
234 @Override
235 public String getContentEncoding() {
236 return entityDetails.getContentEncoding();
237 }
238
239 @Override
240 public boolean isChunked() {
241 return entityDetails.isChunked();
242 }
243
244 @Override
245 public Set<String> getTrailerNames() {
246 return entityDetails.getTrailerNames();
247 }
248
249 @Override
250 public int available() {
251 return requestProducer.available();
252 }
253
254 @Override
255 public void produce(final DataStreamChannel channel) throws IOException {
256 if (outputTerminated.get()) {
257 channel.endStream();
258 return;
259 }
260 requestProducer.produce(channel);
261 }
262
263 } : null,
264 scope,
265 new AsyncExecCallback() {
266
267 @Override
268 public AsyncDataConsumer handleResponse(
269 final HttpResponse response,
270 final EntityDetails entityDetails) throws HttpException, IOException {
271 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
272 outputTerminated.set(true);
273 requestProducer.releaseResources();
274 }
275 responseConsumer.consumeResponse(response, entityDetails, c,
276 new FutureCallback<T>() {
277
278 @Override
279 public void completed(final T result) {
280 future.completed(result);
281 }
282
283 @Override
284 public void failed(final Exception ex) {
285 future.failed(ex);
286 }
287
288 @Override
289 public void cancelled() {
290 future.cancel();
291 }
292
293 });
294 return entityDetails != null ? responseConsumer : null;
295 }
296
297 @Override
298 public void handleInformationResponse(
299 final HttpResponse response) throws HttpException, IOException {
300 responseConsumer.informationResponse(response, c);
301 }
302
303 @Override
304 public void completed() {
305 if (LOG.isDebugEnabled()) {
306 LOG.debug("{} message exchange successfully completed", exchangeId);
307 }
308 try {
309 execRuntime.releaseEndpoint();
310 } finally {
311 responseConsumer.releaseResources();
312 requestProducer.releaseResources();
313 }
314 }
315
316 @Override
317 public void failed(final Exception cause) {
318 if (LOG.isDebugEnabled()) {
319 LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
320 }
321 try {
322 execRuntime.discardEndpoint();
323 responseConsumer.failed(cause);
324 } finally {
325 try {
326 future.failed(cause);
327 } finally {
328 responseConsumer.releaseResources();
329 requestProducer.releaseResources();
330 }
331 }
332 }
333
334 });
335 }, context);
336 } catch (final HttpException | IOException | IllegalStateException ex) {
337 future.failed(ex);
338 }
339 return future;
340 }
341
342 void executeImmediate(
343 final HttpRequest request,
344 final AsyncEntityProducer entityProducer,
345 final AsyncExecChain.Scope scope,
346 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
347 execChain.execute(request, entityProducer, scope, asyncExecCallback);
348 }
349
350 void executeScheduled(
351 final HttpRequest request,
352 final AsyncEntityProducer entityProducer,
353 final AsyncExecChain.Scope scope,
354 final AsyncExecCallback asyncExecCallback,
355 final TimeValue delay) {
356 final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
357 request, entityProducer, scope, asyncExecCallback, delay);
358 if (TimeValue.isPositive(delay)) {
359 scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
360 } else {
361 scheduledExecutorService.execute(scheduledTask);
362 }
363 }
364
365 class ScheduledRequestExecution implements Runnable, Cancellable {
366
367 final HttpRequest request;
368 final AsyncEntityProducer entityProducer;
369 final AsyncExecChain.Scope scope;
370 final AsyncExecCallback asyncExecCallback;
371 final TimeValue delay;
372
373 ScheduledRequestExecution(final HttpRequest request,
374 final AsyncEntityProducer entityProducer,
375 final AsyncExecChain.Scope scope,
376 final AsyncExecCallback asyncExecCallback,
377 final TimeValue delay) {
378 this.request = request;
379 this.entityProducer = entityProducer;
380 this.scope = scope;
381 this.asyncExecCallback = asyncExecCallback;
382 this.delay = delay;
383 }
384
385 @Override
386 public void run() {
387 try {
388 execChain.execute(request, entityProducer, scope, asyncExecCallback);
389 } catch (final Exception ex) {
390 asyncExecCallback.failed(ex);
391 }
392 }
393
394 @Override
395 public boolean cancel() {
396 asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
397 return true;
398 }
399
400 }
401
402 }