View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.List;
32  import java.util.Set;
33  import java.util.concurrent.CancellationException;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.Executors;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.ScheduledExecutorService;
38  import java.util.concurrent.ThreadFactory;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicInteger;
41  
42  import org.apache.hc.client5.http.HttpRoute;
43  import org.apache.hc.client5.http.async.AsyncExecCallback;
44  import org.apache.hc.client5.http.async.AsyncExecChain;
45  import org.apache.hc.client5.http.async.AsyncExecRuntime;
46  import org.apache.hc.client5.http.auth.AuthSchemeFactory;
47  import org.apache.hc.client5.http.auth.CredentialsProvider;
48  import org.apache.hc.client5.http.config.Configurable;
49  import org.apache.hc.client5.http.config.RequestConfig;
50  import org.apache.hc.client5.http.cookie.CookieSpecFactory;
51  import org.apache.hc.client5.http.cookie.CookieStore;
52  import org.apache.hc.client5.http.impl.ExecSupport;
53  import org.apache.hc.client5.http.protocol.HttpClientContext;
54  import org.apache.hc.client5.http.routing.RoutingSupport;
55  import org.apache.hc.core5.concurrent.Cancellable;
56  import org.apache.hc.core5.concurrent.ComplexFuture;
57  import org.apache.hc.core5.concurrent.DefaultThreadFactory;
58  import org.apache.hc.core5.concurrent.FutureCallback;
59  import org.apache.hc.core5.http.EntityDetails;
60  import org.apache.hc.core5.http.HttpException;
61  import org.apache.hc.core5.http.HttpHost;
62  import org.apache.hc.core5.http.HttpRequest;
63  import org.apache.hc.core5.http.HttpResponse;
64  import org.apache.hc.core5.http.HttpStatus;
65  import org.apache.hc.core5.http.config.Lookup;
66  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
67  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
68  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
69  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
70  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
71  import org.apache.hc.core5.http.nio.DataStreamChannel;
72  import org.apache.hc.core5.http.nio.HandlerFactory;
73  import org.apache.hc.core5.http.nio.RequestChannel;
74  import org.apache.hc.core5.http.protocol.HttpContext;
75  import org.apache.hc.core5.http.support.BasicRequestBuilder;
76  import org.apache.hc.core5.io.CloseMode;
77  import org.apache.hc.core5.io.ModalCloseable;
78  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
79  import org.apache.hc.core5.util.TimeValue;
80  import org.slf4j.Logger;
81  import org.slf4j.LoggerFactory;
82  
83  abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
84  
85      private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor");
86  
87      private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
88      private final AsyncExecChainElement execChain;
89      private final Lookup<CookieSpecFactory> cookieSpecRegistry;
90      private final Lookup<AuthSchemeFactory> authSchemeRegistry;
91      private final CookieStore cookieStore;
92      private final CredentialsProvider credentialsProvider;
93      private final RequestConfig defaultConfig;
94      private final ConcurrentLinkedQueue<Closeable> closeables;
95      private final ScheduledExecutorService scheduledExecutorService;
96  
97      InternalAbstractHttpAsyncClient(
98              final DefaultConnectingIOReactor ioReactor,
99              final AsyncPushConsumerRegistry pushConsumerRegistry,
100             final ThreadFactory threadFactory,
101             final AsyncExecChainElement execChain,
102             final Lookup<CookieSpecFactory> cookieSpecRegistry,
103             final Lookup<AuthSchemeFactory> authSchemeRegistry,
104             final CookieStore cookieStore,
105             final CredentialsProvider credentialsProvider,
106             final RequestConfig defaultConfig,
107             final List<Closeable> closeables) {
108         super(ioReactor, pushConsumerRegistry, threadFactory);
109         this.execChain = execChain;
110         this.cookieSpecRegistry = cookieSpecRegistry;
111         this.authSchemeRegistry = authSchemeRegistry;
112         this.cookieStore = cookieStore;
113         this.credentialsProvider = credentialsProvider;
114         this.defaultConfig = defaultConfig;
115         this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
116         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
117     }
118 
119     @Override
120     void internalClose(final CloseMode closeMode) {
121         if (this.closeables != null) {
122             Closeable closeable;
123             while ((closeable = this.closeables.poll()) != null) {
124                 try {
125                     if (closeable instanceof ModalCloseable) {
126                         ((ModalCloseable) closeable).close(closeMode);
127                     } else {
128                         closeable.close();
129                     }
130                 } catch (final IOException ex) {
131                     LOG.error(ex.getMessage(), ex);
132                 }
133             }
134         }
135         final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
136         for (final Runnable runnable: runnables) {
137             if (runnable instanceof Cancellable) {
138                 ((Cancellable) runnable).cancel();
139             }
140         }
141     }
142 
143     private void setupContext(final HttpClientContext context) {
144         if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
145             context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
146         }
147         if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
148             context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
149         }
150         if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
151             context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
152         }
153         if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
154             context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
155         }
156         if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
157             context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
158         }
159     }
160 
161     abstract AsyncExecRuntime createAsyncExecRuntime(
162             HandlerFactory<AsyncPushConsumer> pushHandlerFactory, HttpRoute route);
163 
164     abstract HttpRoute determineRoute(HttpHost httpHost, HttpClientContext clientContext) throws HttpException;
165 
166     @Override
167     protected <T> Future<T> doExecute(
168             final HttpHost httpHost,
169             final AsyncRequestProducer requestProducer,
170             final AsyncResponseConsumer<T> responseConsumer,
171             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
172             final HttpContext context,
173             final FutureCallback<T> callback) {
174         final ComplexFuture<T> future = new ComplexFuture<>(callback);
175         try {
176             if (!isRunning()) {
177                 throw new CancellationException("Request execution cancelled");
178             }
179             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
180             requestProducer.sendRequest(new RequestChannel() {
181 
182                 @Override
183                 public void sendRequest(
184                         final HttpRequest request,
185                         final EntityDetails entityDetails,
186                         final HttpContext context) throws HttpException, IOException {
187 
188                     RequestConfig requestConfig = null;
189                     if (request instanceof Configurable) {
190                         requestConfig = ((Configurable) request).getConfig();
191                     }
192                     if (requestConfig != null) {
193                         clientContext.setRequestConfig(requestConfig);
194                     }
195                     final HttpRoute route = determineRoute(
196                             httpHost != null ? httpHost : RoutingSupport.determineHost(request),
197                             clientContext);
198                     final String exchangeId = ExecSupport.getNextExchangeId();
199                     clientContext.setExchangeId(exchangeId);
200                     if (LOG.isDebugEnabled()) {
201                         LOG.debug("{} preparing request execution", exchangeId);
202                     }
203                     final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory, route);
204 
205                     setupContext(clientContext);
206 
207                     final AsyncExecChain.Scheduler scheduler = new AsyncExecChain.Scheduler() {
208 
209                         @Override
210                         public void scheduleExecution(final HttpRequest request,
211                                                       final AsyncEntityProducer entityProducer,
212                                                       final AsyncExecChain.Scope scope,
213                                                       final AsyncExecCallback asyncExecCallback,
214                                                       final TimeValue delay) {
215                             executeScheduled(request, entityProducer, scope, asyncExecCallback, delay);
216                         }
217 
218                     };
219 
220                     final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
221                             clientContext, execRuntime, scheduler, new AtomicInteger(1));
222                     final AtomicBoolean outputTerminated = new AtomicBoolean(false);
223                     executeImmediate(
224                             BasicRequestBuilder.copy(request).build(),
225                             entityDetails != null ? new AsyncEntityProducer() {
226 
227                                 @Override
228                                 public void releaseResources() {
229                                     requestProducer.releaseResources();
230                                 }
231 
232                                 @Override
233                                 public void failed(final Exception cause) {
234                                     requestProducer.failed(cause);
235                                 }
236 
237                                 @Override
238                                 public boolean isRepeatable() {
239                                     return requestProducer.isRepeatable();
240                                 }
241 
242                                 @Override
243                                 public long getContentLength() {
244                                     return entityDetails.getContentLength();
245                                 }
246 
247                                 @Override
248                                 public String getContentType() {
249                                     return entityDetails.getContentType();
250                                 }
251 
252                                 @Override
253                                 public String getContentEncoding() {
254                                     return entityDetails.getContentEncoding();
255                                 }
256 
257                                 @Override
258                                 public boolean isChunked() {
259                                     return entityDetails.isChunked();
260                                 }
261 
262                                 @Override
263                                 public Set<String> getTrailerNames() {
264                                     return entityDetails.getTrailerNames();
265                                 }
266 
267                                 @Override
268                                 public int available() {
269                                     return requestProducer.available();
270                                 }
271 
272                                 @Override
273                                 public void produce(final DataStreamChannel channel) throws IOException {
274                                     if (outputTerminated.get()) {
275                                         channel.endStream();
276                                         return;
277                                     }
278                                     requestProducer.produce(channel);
279                                 }
280 
281                             } : null,
282                             scope,
283                             new AsyncExecCallback() {
284 
285                                 @Override
286                                 public AsyncDataConsumer handleResponse(
287                                         final HttpResponse response,
288                                         final EntityDetails entityDetails) throws HttpException, IOException {
289                                     if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
290                                         outputTerminated.set(true);
291                                         requestProducer.releaseResources();
292                                     }
293                                     responseConsumer.consumeResponse(response, entityDetails, context,
294                                             new FutureCallback<T>() {
295 
296                                                 @Override
297                                                 public void completed(final T result) {
298                                                     future.completed(result);
299                                                 }
300 
301                                                 @Override
302                                                 public void failed(final Exception ex) {
303                                                     future.failed(ex);
304                                                 }
305 
306                                                 @Override
307                                                 public void cancelled() {
308                                                     future.cancel();
309                                                 }
310 
311                                             });
312                                     return entityDetails != null ? responseConsumer : null;
313                                 }
314 
315                                 @Override
316                                 public void handleInformationResponse(
317                                         final HttpResponse response) throws HttpException, IOException {
318                                     responseConsumer.informationResponse(response, context);
319                                 }
320 
321                                 @Override
322                                 public void completed() {
323                                     if (LOG.isDebugEnabled()) {
324                                         LOG.debug("{} message exchange successfully completed", exchangeId);
325                                     }
326                                     try {
327                                         execRuntime.releaseEndpoint();
328                                     } finally {
329                                         responseConsumer.releaseResources();
330                                         requestProducer.releaseResources();
331                                     }
332                                 }
333 
334                                 @Override
335                                 public void failed(final Exception cause) {
336                                     if (LOG.isDebugEnabled()) {
337                                         LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
338                                     }
339                                     try {
340                                         execRuntime.discardEndpoint();
341                                         responseConsumer.failed(cause);
342                                     } finally {
343                                         try {
344                                             future.failed(cause);
345                                         } finally {
346                                             responseConsumer.releaseResources();
347                                             requestProducer.releaseResources();
348                                         }
349                                     }
350                                 }
351 
352                             });
353                 }
354 
355             }, context);
356         } catch (final HttpException | IOException | IllegalStateException ex) {
357             future.failed(ex);
358         }
359         return future;
360     }
361 
362     void executeImmediate(
363             final HttpRequest request,
364             final AsyncEntityProducer entityProducer,
365             final AsyncExecChain.Scope scope,
366             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
367         execChain.execute(request, entityProducer, scope, asyncExecCallback);
368     }
369 
370     void executeScheduled(
371             final HttpRequest request,
372             final AsyncEntityProducer entityProducer,
373             final AsyncExecChain.Scope scope,
374             final AsyncExecCallback asyncExecCallback,
375             final TimeValue delay) {
376         final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
377                 request, entityProducer, scope, asyncExecCallback, delay);
378         if (TimeValue.isPositive(delay)) {
379             scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
380         } else {
381             scheduledExecutorService.execute(scheduledTask);
382         }
383     }
384 
385     class ScheduledRequestExecution implements Runnable, Cancellable {
386 
387         final HttpRequest request;
388         final AsyncEntityProducer entityProducer;
389         final AsyncExecChain.Scope scope;
390         final AsyncExecCallback asyncExecCallback;
391         final TimeValue delay;
392 
393         ScheduledRequestExecution(final HttpRequest request,
394                                   final AsyncEntityProducer entityProducer,
395                                   final AsyncExecChain.Scope scope,
396                                   final AsyncExecCallback asyncExecCallback,
397                                   final TimeValue delay) {
398             this.request = request;
399             this.entityProducer = entityProducer;
400             this.scope = scope;
401             this.asyncExecCallback = asyncExecCallback;
402             this.delay = delay;
403         }
404 
405         @Override
406         public void run() {
407             try {
408                 execChain.execute(request, entityProducer, scope, asyncExecCallback);
409             } catch (final Exception ex) {
410                 asyncExecCallback.failed(ex);
411             }
412         }
413 
414         @Override
415         public boolean cancel() {
416             asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
417             return true;
418         }
419 
420     }
421 
422 }