View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.List;
32  import java.util.Set;
33  import java.util.concurrent.CancellationException;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.Executors;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.ScheduledExecutorService;
38  import java.util.concurrent.ThreadFactory;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicInteger;
41  
42  import org.apache.hc.client5.http.HttpRoute;
43  import org.apache.hc.client5.http.async.AsyncExecCallback;
44  import org.apache.hc.client5.http.async.AsyncExecChain;
45  import org.apache.hc.client5.http.async.AsyncExecRuntime;
46  import org.apache.hc.client5.http.auth.AuthSchemeFactory;
47  import org.apache.hc.client5.http.auth.CredentialsProvider;
48  import org.apache.hc.client5.http.config.Configurable;
49  import org.apache.hc.client5.http.config.RequestConfig;
50  import org.apache.hc.client5.http.cookie.CookieSpecFactory;
51  import org.apache.hc.client5.http.cookie.CookieStore;
52  import org.apache.hc.client5.http.impl.ExecSupport;
53  import org.apache.hc.client5.http.protocol.HttpClientContext;
54  import org.apache.hc.client5.http.routing.RoutingSupport;
55  import org.apache.hc.core5.concurrent.Cancellable;
56  import org.apache.hc.core5.concurrent.ComplexFuture;
57  import org.apache.hc.core5.concurrent.DefaultThreadFactory;
58  import org.apache.hc.core5.concurrent.FutureCallback;
59  import org.apache.hc.core5.http.EntityDetails;
60  import org.apache.hc.core5.http.HttpException;
61  import org.apache.hc.core5.http.HttpHost;
62  import org.apache.hc.core5.http.HttpRequest;
63  import org.apache.hc.core5.http.HttpResponse;
64  import org.apache.hc.core5.http.HttpStatus;
65  import org.apache.hc.core5.http.config.Lookup;
66  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
67  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
68  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
69  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
70  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
71  import org.apache.hc.core5.http.nio.DataStreamChannel;
72  import org.apache.hc.core5.http.nio.HandlerFactory;
73  import org.apache.hc.core5.http.protocol.HttpContext;
74  import org.apache.hc.core5.http.support.BasicRequestBuilder;
75  import org.apache.hc.core5.io.CloseMode;
76  import org.apache.hc.core5.io.ModalCloseable;
77  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
78  import org.apache.hc.core5.util.TimeValue;
79  import org.slf4j.Logger;
80  import org.slf4j.LoggerFactory;
81  
82  abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
83  
84      private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor", true);
85  
86      private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
87      private final AsyncExecChainElement execChain;
88      private final Lookup<CookieSpecFactory> cookieSpecRegistry;
89      private final Lookup<AuthSchemeFactory> authSchemeRegistry;
90      private final CookieStore cookieStore;
91      private final CredentialsProvider credentialsProvider;
92      private final RequestConfig defaultConfig;
93      private final ConcurrentLinkedQueue<Closeable> closeables;
94      private final ScheduledExecutorService scheduledExecutorService;
95  
96      InternalAbstractHttpAsyncClient(
97              final DefaultConnectingIOReactor ioReactor,
98              final AsyncPushConsumerRegistry pushConsumerRegistry,
99              final ThreadFactory threadFactory,
100             final AsyncExecChainElement execChain,
101             final Lookup<CookieSpecFactory> cookieSpecRegistry,
102             final Lookup<AuthSchemeFactory> authSchemeRegistry,
103             final CookieStore cookieStore,
104             final CredentialsProvider credentialsProvider,
105             final RequestConfig defaultConfig,
106             final List<Closeable> closeables) {
107         super(ioReactor, pushConsumerRegistry, threadFactory);
108         this.execChain = execChain;
109         this.cookieSpecRegistry = cookieSpecRegistry;
110         this.authSchemeRegistry = authSchemeRegistry;
111         this.cookieStore = cookieStore;
112         this.credentialsProvider = credentialsProvider;
113         this.defaultConfig = defaultConfig;
114         this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
115         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
116     }
117 
118     @Override
119     void internalClose(final CloseMode closeMode) {
120         if (this.closeables != null) {
121             Closeable closeable;
122             while ((closeable = this.closeables.poll()) != null) {
123                 try {
124                     if (closeable instanceof ModalCloseable) {
125                         ((ModalCloseable) closeable).close(closeMode);
126                     } else {
127                         closeable.close();
128                     }
129                 } catch (final IOException ex) {
130                     LOG.error(ex.getMessage(), ex);
131                 }
132             }
133         }
134         final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
135         for (final Runnable runnable: runnables) {
136             if (runnable instanceof Cancellable) {
137                 ((Cancellable) runnable).cancel();
138             }
139         }
140     }
141 
142     private void setupContext(final HttpClientContext context) {
143         if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
144             context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
145         }
146         if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
147             context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
148         }
149         if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
150             context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
151         }
152         if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
153             context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
154         }
155         if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
156             context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
157         }
158     }
159 
160     abstract AsyncExecRuntime createAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
161 
162     abstract HttpRoute determineRoute(HttpHost httpHost, HttpClientContext clientContext) throws HttpException;
163 
164     @Override
165     protected <T> Future<T> doExecute(
166             final HttpHost httpHost,
167             final AsyncRequestProducer requestProducer,
168             final AsyncResponseConsumer<T> responseConsumer,
169             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
170             final HttpContext context,
171             final FutureCallback<T> callback) {
172         final ComplexFuture<T> future = new ComplexFuture<>(callback);
173         try {
174             if (!isRunning()) {
175                 throw new CancellationException("Request execution cancelled");
176             }
177             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
178             requestProducer.sendRequest((request, entityDetails, c) -> {
179 
180                 RequestConfig requestConfig = null;
181                 if (request instanceof Configurable) {
182                     requestConfig = ((Configurable) request).getConfig();
183                 }
184                 if (requestConfig != null) {
185                     clientContext.setRequestConfig(requestConfig);
186                 }
187 
188                 setupContext(clientContext);
189 
190                 final HttpRoute route = determineRoute(
191                         httpHost != null ? httpHost : RoutingSupport.determineHost(request),
192                         clientContext);
193                 final String exchangeId = ExecSupport.getNextExchangeId();
194                 clientContext.setExchangeId(exchangeId);
195                 if (LOG.isDebugEnabled()) {
196                     LOG.debug("{} preparing request execution", exchangeId);
197                 }
198                 final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory);
199 
200                 final AsyncExecChain.Scheduler scheduler = this::executeScheduled;
201 
202                 final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
203                         clientContext, execRuntime, scheduler, new AtomicInteger(1));
204                 final AtomicBoolean outputTerminated = new AtomicBoolean(false);
205                 executeImmediate(
206                         BasicRequestBuilder.copy(request).build(),
207                         entityDetails != null ? new AsyncEntityProducer() {
208 
209                             @Override
210                             public void releaseResources() {
211                                 requestProducer.releaseResources();
212                             }
213 
214                             @Override
215                             public void failed(final Exception cause) {
216                                 requestProducer.failed(cause);
217                             }
218 
219                             @Override
220                             public boolean isRepeatable() {
221                                 return requestProducer.isRepeatable();
222                             }
223 
224                             @Override
225                             public long getContentLength() {
226                                 return entityDetails.getContentLength();
227                             }
228 
229                             @Override
230                             public String getContentType() {
231                                 return entityDetails.getContentType();
232                             }
233 
234                             @Override
235                             public String getContentEncoding() {
236                                 return entityDetails.getContentEncoding();
237                             }
238 
239                             @Override
240                             public boolean isChunked() {
241                                 return entityDetails.isChunked();
242                             }
243 
244                             @Override
245                             public Set<String> getTrailerNames() {
246                                 return entityDetails.getTrailerNames();
247                             }
248 
249                             @Override
250                             public int available() {
251                                 return requestProducer.available();
252                             }
253 
254                             @Override
255                             public void produce(final DataStreamChannel channel) throws IOException {
256                                 if (outputTerminated.get()) {
257                                     channel.endStream();
258                                     return;
259                                 }
260                                 requestProducer.produce(channel);
261                             }
262 
263                         } : null,
264                         scope,
265                         new AsyncExecCallback() {
266 
267                             @Override
268                             public AsyncDataConsumer handleResponse(
269                                     final HttpResponse response,
270                                     final EntityDetails entityDetails) throws HttpException, IOException {
271                                 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
272                                     outputTerminated.set(true);
273                                     requestProducer.releaseResources();
274                                 }
275                                 responseConsumer.consumeResponse(response, entityDetails, c,
276                                         new FutureCallback<T>() {
277 
278                                             @Override
279                                             public void completed(final T result) {
280                                                 future.completed(result);
281                                             }
282 
283                                             @Override
284                                             public void failed(final Exception ex) {
285                                                 future.failed(ex);
286                                             }
287 
288                                             @Override
289                                             public void cancelled() {
290                                                 future.cancel();
291                                             }
292 
293                                         });
294                                 return entityDetails != null ? responseConsumer : null;
295                             }
296 
297                             @Override
298                             public void handleInformationResponse(
299                                     final HttpResponse response) throws HttpException, IOException {
300                                 responseConsumer.informationResponse(response, c);
301                             }
302 
303                             @Override
304                             public void completed() {
305                                 if (LOG.isDebugEnabled()) {
306                                     LOG.debug("{} message exchange successfully completed", exchangeId);
307                                 }
308                                 try {
309                                     execRuntime.releaseEndpoint();
310                                 } finally {
311                                     responseConsumer.releaseResources();
312                                     requestProducer.releaseResources();
313                                 }
314                             }
315 
316                             @Override
317                             public void failed(final Exception cause) {
318                                 if (LOG.isDebugEnabled()) {
319                                     LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
320                                 }
321                                 try {
322                                     execRuntime.discardEndpoint();
323                                     responseConsumer.failed(cause);
324                                 } finally {
325                                     try {
326                                         future.failed(cause);
327                                     } finally {
328                                         responseConsumer.releaseResources();
329                                         requestProducer.releaseResources();
330                                     }
331                                 }
332                             }
333 
334                         });
335             }, context);
336         } catch (final HttpException | IOException | IllegalStateException ex) {
337             future.failed(ex);
338         }
339         return future;
340     }
341 
342     void executeImmediate(
343             final HttpRequest request,
344             final AsyncEntityProducer entityProducer,
345             final AsyncExecChain.Scope scope,
346             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
347         execChain.execute(request, entityProducer, scope, asyncExecCallback);
348     }
349 
350     void executeScheduled(
351             final HttpRequest request,
352             final AsyncEntityProducer entityProducer,
353             final AsyncExecChain.Scope scope,
354             final AsyncExecCallback asyncExecCallback,
355             final TimeValue delay) {
356         final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
357                 request, entityProducer, scope, asyncExecCallback, delay);
358         if (TimeValue.isPositive(delay)) {
359             scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
360         } else {
361             scheduledExecutorService.execute(scheduledTask);
362         }
363     }
364 
365     class ScheduledRequestExecution implements Runnable, Cancellable {
366 
367         final HttpRequest request;
368         final AsyncEntityProducer entityProducer;
369         final AsyncExecChain.Scope scope;
370         final AsyncExecCallback asyncExecCallback;
371         final TimeValue delay;
372 
373         ScheduledRequestExecution(final HttpRequest request,
374                                   final AsyncEntityProducer entityProducer,
375                                   final AsyncExecChain.Scope scope,
376                                   final AsyncExecCallback asyncExecCallback,
377                                   final TimeValue delay) {
378             this.request = request;
379             this.entityProducer = entityProducer;
380             this.scope = scope;
381             this.asyncExecCallback = asyncExecCallback;
382             this.delay = delay;
383         }
384 
385         @Override
386         public void run() {
387             try {
388                 execChain.execute(request, entityProducer, scope, asyncExecCallback);
389             } catch (final Exception ex) {
390                 asyncExecCallback.failed(ex);
391             }
392         }
393 
394         @Override
395         public boolean cancel() {
396             asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
397             return true;
398         }
399 
400     }
401 
402 }