View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.List;
32  import java.util.Set;
33  import java.util.concurrent.CancellationException;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.Executors;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.ScheduledExecutorService;
38  import java.util.concurrent.ThreadFactory;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicInteger;
41  import java.util.function.Function;
42  
43  import org.apache.hc.client5.http.HttpRoute;
44  import org.apache.hc.client5.http.async.AsyncExecCallback;
45  import org.apache.hc.client5.http.async.AsyncExecChain;
46  import org.apache.hc.client5.http.async.AsyncExecRuntime;
47  import org.apache.hc.client5.http.auth.AuthSchemeFactory;
48  import org.apache.hc.client5.http.auth.CredentialsProvider;
49  import org.apache.hc.client5.http.config.Configurable;
50  import org.apache.hc.client5.http.config.RequestConfig;
51  import org.apache.hc.client5.http.cookie.CookieSpecFactory;
52  import org.apache.hc.client5.http.cookie.CookieStore;
53  import org.apache.hc.client5.http.impl.ExecSupport;
54  import org.apache.hc.client5.http.protocol.HttpClientContext;
55  import org.apache.hc.client5.http.routing.RoutingSupport;
56  import org.apache.hc.core5.concurrent.Cancellable;
57  import org.apache.hc.core5.concurrent.ComplexFuture;
58  import org.apache.hc.core5.concurrent.DefaultThreadFactory;
59  import org.apache.hc.core5.concurrent.FutureCallback;
60  import org.apache.hc.core5.http.EntityDetails;
61  import org.apache.hc.core5.http.HttpException;
62  import org.apache.hc.core5.http.HttpHost;
63  import org.apache.hc.core5.http.HttpRequest;
64  import org.apache.hc.core5.http.HttpResponse;
65  import org.apache.hc.core5.http.HttpStatus;
66  import org.apache.hc.core5.http.config.Lookup;
67  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
68  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
69  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
70  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
71  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
72  import org.apache.hc.core5.http.nio.DataStreamChannel;
73  import org.apache.hc.core5.http.nio.HandlerFactory;
74  import org.apache.hc.core5.http.protocol.HttpContext;
75  import org.apache.hc.core5.http.support.BasicRequestBuilder;
76  import org.apache.hc.core5.io.CloseMode;
77  import org.apache.hc.core5.io.ModalCloseable;
78  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
79  import org.apache.hc.core5.util.TimeValue;
80  import org.slf4j.Logger;
81  import org.slf4j.LoggerFactory;
82  
83  abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
84  
85      private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor", true);
86  
87      private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
88  
89      private final AsyncExecChainElement execChain;
90      private final Lookup<CookieSpecFactory> cookieSpecRegistry;
91      private final Lookup<AuthSchemeFactory> authSchemeRegistry;
92      private final CookieStore cookieStore;
93      private final CredentialsProvider credentialsProvider;
94      private final Function<HttpContext, HttpClientContext> contextAdaptor;
95      private final RequestConfig defaultConfig;
96      private final ConcurrentLinkedQueue<Closeable> closeables;
97      private final ScheduledExecutorService scheduledExecutorService;
98      private final AsyncExecChain.Scheduler scheduler;
99  
100     InternalAbstractHttpAsyncClient(
101             final DefaultConnectingIOReactor ioReactor,
102             final AsyncPushConsumerRegistry pushConsumerRegistry,
103             final ThreadFactory threadFactory,
104             final AsyncExecChainElement execChain,
105             final Lookup<CookieSpecFactory> cookieSpecRegistry,
106             final Lookup<AuthSchemeFactory> authSchemeRegistry,
107             final CookieStore cookieStore,
108             final CredentialsProvider credentialsProvider,
109             final Function<HttpContext, HttpClientContext> contextAdaptor,
110             final RequestConfig defaultConfig,
111             final List<Closeable> closeables) {
112         super(ioReactor, pushConsumerRegistry, threadFactory);
113         this.execChain = execChain;
114         this.cookieSpecRegistry = cookieSpecRegistry;
115         this.authSchemeRegistry = authSchemeRegistry;
116         this.cookieStore = cookieStore;
117         this.credentialsProvider = credentialsProvider;
118         this.contextAdaptor = contextAdaptor;
119         this.defaultConfig = defaultConfig;
120         this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
121         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
122         this.scheduler = new AsyncExecChain.Scheduler() {
123 
124             @Override
125             public void scheduleExecution(
126                     final HttpRequest request,
127                     final AsyncEntityProducer entityProducer,
128                     final AsyncExecChain.Scope scope,
129                     final AsyncExecCallback asyncExecCallback,
130                     final TimeValue delay) {
131                 executeScheduled(request, entityProducer, scope, execChain::execute, asyncExecCallback, delay);
132             }
133 
134             @Override
135             public void scheduleExecution(
136                     final HttpRequest request,
137                     final AsyncEntityProducer entityProducer,
138                     final AsyncExecChain.Scope scope,
139                     final AsyncExecChain chain,
140                     final AsyncExecCallback asyncExecCallback,
141                     final TimeValue delay) {
142                 executeScheduled(request, entityProducer, scope, chain, asyncExecCallback, delay);
143             }
144         };
145 
146     }
147 
148     @Override
149     void internalClose(final CloseMode closeMode) {
150         if (this.closeables != null) {
151             Closeable closeable;
152             while ((closeable = this.closeables.poll()) != null) {
153                 try {
154                     if (closeable instanceof ModalCloseable) {
155                         ((ModalCloseable) closeable).close(closeMode);
156                     } else {
157                         closeable.close();
158                     }
159                 } catch (final IOException ex) {
160                     LOG.error(ex.getMessage(), ex);
161                 }
162             }
163         }
164         final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
165         for (final Runnable runnable: runnables) {
166             if (runnable instanceof Cancellable) {
167                 ((Cancellable) runnable).cancel();
168             }
169         }
170     }
171 
172     private void setupContext(final HttpClientContext context) {
173         if (context.getAuthSchemeRegistry() == null) {
174             context.setAuthSchemeRegistry(authSchemeRegistry);
175         }
176         if (context.getCookieSpecRegistry() == null) {
177             context.setCookieSpecRegistry(cookieSpecRegistry);
178         }
179         if (context.getCookieStore() == null) {
180             context.setCookieStore(cookieStore);
181         }
182         if (context.getCredentialsProvider() == null) {
183             context.setCredentialsProvider(credentialsProvider);
184         }
185         if (context.getRequestConfig() == null) {
186             context.setRequestConfig(defaultConfig);
187         }
188     }
189 
190     abstract AsyncExecRuntime createAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
191 
192     abstract HttpRoute determineRoute(HttpHost httpHost, HttpRequest request, HttpClientContext clientContext) throws HttpException;
193 
194     @Override
195     protected <T> Future<T> doExecute(
196             final HttpHost httpHost,
197             final AsyncRequestProducer requestProducer,
198             final AsyncResponseConsumer<T> responseConsumer,
199             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
200             final HttpContext context,
201             final FutureCallback<T> callback) {
202         final ComplexFuture<T> future = new ComplexFuture<>(callback);
203         try {
204             if (!isRunning()) {
205                 throw new CancellationException("Request execution cancelled");
206             }
207             final HttpClientContext clientContext = contextAdaptor.apply(context);
208             requestProducer.sendRequest((request, entityDetails, c) -> {
209 
210                 RequestConfig requestConfig = null;
211                 if (request instanceof Configurable) {
212                     requestConfig = ((Configurable) request).getConfig();
213                 }
214                 if (requestConfig != null) {
215                     clientContext.setRequestConfig(requestConfig);
216                 }
217 
218                 setupContext(clientContext);
219 
220                 final HttpRoute route = determineRoute(
221                         httpHost != null ? httpHost : RoutingSupport.determineHost(request),
222                         request,
223                         clientContext);
224                 final String exchangeId = ExecSupport.getNextExchangeId();
225                 clientContext.setExchangeId(exchangeId);
226                 if (LOG.isDebugEnabled()) {
227                     LOG.debug("{} preparing request execution", exchangeId);
228                 }
229                 final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory);
230 
231                 final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
232                         clientContext, execRuntime, scheduler, new AtomicInteger(1));
233                 final AtomicBoolean outputTerminated = new AtomicBoolean(false);
234                 executeImmediate(
235                         BasicRequestBuilder.copy(request).build(),
236                         entityDetails != null ? new AsyncEntityProducer() {
237 
238                             @Override
239                             public void releaseResources() {
240                                 requestProducer.releaseResources();
241                             }
242 
243                             @Override
244                             public void failed(final Exception cause) {
245                                 requestProducer.failed(cause);
246                             }
247 
248                             @Override
249                             public boolean isRepeatable() {
250                                 return requestProducer.isRepeatable();
251                             }
252 
253                             @Override
254                             public long getContentLength() {
255                                 return entityDetails.getContentLength();
256                             }
257 
258                             @Override
259                             public String getContentType() {
260                                 return entityDetails.getContentType();
261                             }
262 
263                             @Override
264                             public String getContentEncoding() {
265                                 return entityDetails.getContentEncoding();
266                             }
267 
268                             @Override
269                             public boolean isChunked() {
270                                 return entityDetails.isChunked();
271                             }
272 
273                             @Override
274                             public Set<String> getTrailerNames() {
275                                 return entityDetails.getTrailerNames();
276                             }
277 
278                             @Override
279                             public int available() {
280                                 return requestProducer.available();
281                             }
282 
283                             @Override
284                             public void produce(final DataStreamChannel channel) throws IOException {
285                                 if (outputTerminated.get()) {
286                                     channel.endStream();
287                                     return;
288                                 }
289                                 requestProducer.produce(channel);
290                             }
291 
292                         } : null,
293                         scope,
294                         execChain::execute,
295                         new AsyncExecCallback() {
296 
297                             @Override
298                             public AsyncDataConsumer handleResponse(
299                                     final HttpResponse response,
300                                     final EntityDetails entityDetails) throws HttpException, IOException {
301                                 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
302                                     outputTerminated.set(true);
303                                     requestProducer.releaseResources();
304                                 }
305                                 responseConsumer.consumeResponse(response, entityDetails, c,
306                                         new FutureCallback<T>() {
307 
308                                             @Override
309                                             public void completed(final T result) {
310                                                 future.completed(result);
311                                             }
312 
313                                             @Override
314                                             public void failed(final Exception ex) {
315                                                 future.failed(ex);
316                                             }
317 
318                                             @Override
319                                             public void cancelled() {
320                                                 future.cancel();
321                                             }
322 
323                                         });
324                                 return entityDetails != null ? responseConsumer : null;
325                             }
326 
327                             @Override
328                             public void handleInformationResponse(
329                                     final HttpResponse response) throws HttpException, IOException {
330                                 responseConsumer.informationResponse(response, c);
331                             }
332 
333                             @Override
334                             public void completed() {
335                                 if (LOG.isDebugEnabled()) {
336                                     LOG.debug("{} message exchange successfully completed", exchangeId);
337                                 }
338                                 try {
339                                     execRuntime.releaseEndpoint();
340                                 } finally {
341                                     responseConsumer.releaseResources();
342                                     requestProducer.releaseResources();
343                                 }
344                             }
345 
346                             @Override
347                             public void failed(final Exception cause) {
348                                 if (LOG.isDebugEnabled()) {
349                                     LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
350                                 }
351                                 try {
352                                     execRuntime.discardEndpoint();
353                                     responseConsumer.failed(cause);
354                                 } finally {
355                                     try {
356                                         future.failed(cause);
357                                     } finally {
358                                         responseConsumer.releaseResources();
359                                         requestProducer.releaseResources();
360                                     }
361                                 }
362                             }
363 
364                         });
365             }, context);
366         } catch (final HttpException | IOException | IllegalStateException ex) {
367             future.failed(ex);
368         }
369         return future;
370     }
371 
372     void executeImmediate(
373             final HttpRequest request,
374             final AsyncEntityProducer entityProducer,
375             final AsyncExecChain.Scope scope,
376             final AsyncExecChain chain,
377             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
378         chain.proceed(request, entityProducer, scope, asyncExecCallback);
379     }
380 
381     void executeScheduled(
382             final HttpRequest request,
383             final AsyncEntityProducer entityProducer,
384             final AsyncExecChain.Scope scope,
385             final AsyncExecChain chain,
386             final AsyncExecCallback asyncExecCallback,
387             final TimeValue delay) {
388         final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
389                 request, entityProducer, scope, chain, asyncExecCallback, delay);
390         if (TimeValue.isPositive(delay)) {
391             scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
392         } else {
393             scheduledExecutorService.execute(scheduledTask);
394         }
395     }
396 
397     class ScheduledRequestExecution implements Runnable, Cancellable {
398 
399         final HttpRequest request;
400         final AsyncEntityProducer entityProducer;
401         final AsyncExecChain.Scope scope;
402         final AsyncExecChain chain;
403         final AsyncExecCallback asyncExecCallback;
404         final TimeValue delay;
405 
406         ScheduledRequestExecution(final HttpRequest request,
407                                   final AsyncEntityProducer entityProducer,
408                                   final AsyncExecChain.Scope scope,
409                                   final AsyncExecChain chain,
410                                   final AsyncExecCallback asyncExecCallback,
411                                   final TimeValue delay) {
412             this.request = request;
413             this.entityProducer = entityProducer;
414             this.scope = scope;
415             this.chain = chain;
416             this.asyncExecCallback = asyncExecCallback;
417             this.delay = delay;
418         }
419 
420         @Override
421         public void run() {
422             try {
423                 chain.proceed(request, entityProducer, scope, asyncExecCallback);
424             } catch (final Exception ex) {
425                 asyncExecCallback.failed(ex);
426             }
427         }
428 
429         @Override
430         public boolean cancel() {
431             asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
432             return true;
433         }
434 
435     }
436 
437 }