View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.CancellationException;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.ThreadFactory;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import org.apache.hc.client5.http.HttpRoute;
39  import org.apache.hc.client5.http.SchemePortResolver;
40  import org.apache.hc.client5.http.config.Configurable;
41  import org.apache.hc.client5.http.config.RequestConfig;
42  import org.apache.hc.client5.http.config.TlsConfig;
43  import org.apache.hc.client5.http.impl.ConnPoolSupport;
44  import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
45  import org.apache.hc.client5.http.impl.ExecSupport;
46  import org.apache.hc.client5.http.impl.classic.RequestFailedException;
47  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
48  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
49  import org.apache.hc.client5.http.protocol.HttpClientContext;
50  import org.apache.hc.client5.http.routing.RoutingSupport;
51  import org.apache.hc.core5.annotation.Contract;
52  import org.apache.hc.core5.annotation.ThreadingBehavior;
53  import org.apache.hc.core5.concurrent.BasicFuture;
54  import org.apache.hc.core5.concurrent.Cancellable;
55  import org.apache.hc.core5.concurrent.ComplexCancellable;
56  import org.apache.hc.core5.concurrent.ComplexFuture;
57  import org.apache.hc.core5.concurrent.FutureCallback;
58  import org.apache.hc.core5.http.EntityDetails;
59  import org.apache.hc.core5.http.Header;
60  import org.apache.hc.core5.http.HttpException;
61  import org.apache.hc.core5.http.HttpHost;
62  import org.apache.hc.core5.http.HttpResponse;
63  import org.apache.hc.core5.http.HttpStatus;
64  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
65  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
66  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
67  import org.apache.hc.core5.http.nio.CapacityChannel;
68  import org.apache.hc.core5.http.nio.DataStreamChannel;
69  import org.apache.hc.core5.http.nio.HandlerFactory;
70  import org.apache.hc.core5.http.nio.RequestChannel;
71  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
72  import org.apache.hc.core5.http.protocol.HttpContext;
73  import org.apache.hc.core5.io.CloseMode;
74  import org.apache.hc.core5.io.Closer;
75  import org.apache.hc.core5.reactor.Command;
76  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
77  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
78  import org.apache.hc.core5.reactor.IOReactorConfig;
79  import org.apache.hc.core5.util.Args;
80  import org.apache.hc.core5.util.Asserts;
81  import org.apache.hc.core5.util.TimeValue;
82  import org.apache.hc.core5.util.Timeout;
83  import org.slf4j.Logger;
84  import org.slf4j.LoggerFactory;
85  
86  /**
87   * Minimal implementation of {@link CloseableHttpAsyncClient}. This client is
88   * optimized for HTTP/1.1 and HTTP/2 message transport and does not support
89   * advanced HTTP protocol functionality such as request execution via a proxy,
90   * state management, authentication and request redirects.
91   * <p>
92   * Concurrent message exchanges executed by this client will get assigned to
93   * separate connections leased from the connection pool.
94   * </p>
95   *
96   * @since 5.0
97   */
98  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
99  public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClientBase {
100 
101     private static final Logger LOG = LoggerFactory.getLogger(MinimalHttpAsyncClient.class);
102     private final AsyncClientConnectionManager manager;
103     private final SchemePortResolver schemePortResolver;
104     private final TlsConfig tlsConfig;
105 
106     MinimalHttpAsyncClient(
107             final IOEventHandlerFactory eventHandlerFactory,
108             final AsyncPushConsumerRegistry pushConsumerRegistry,
109             final IOReactorConfig reactorConfig,
110             final ThreadFactory threadFactory,
111             final ThreadFactory workerThreadFactory,
112             final AsyncClientConnectionManager manager,
113             final SchemePortResolver schemePortResolver,
114             final TlsConfig tlsConfig) {
115         super(new DefaultConnectingIOReactor(
116                         eventHandlerFactory,
117                         reactorConfig,
118                         workerThreadFactory,
119                         LoggingIOSessionDecorator.INSTANCE,
120                         LoggingExceptionCallback.INSTANCE,
121                         null,
122                         ioSession -> ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL)),
123                 pushConsumerRegistry,
124                 threadFactory);
125         this.manager = manager;
126         this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
127         this.tlsConfig = tlsConfig;
128     }
129 
130     private Future<AsyncConnectionEndpoint> leaseEndpoint(
131             final HttpHost host,
132             final Timeout connectionRequestTimeout,
133             final Timeout connectTimeout,
134             final HttpClientContext clientContext,
135             final FutureCallback<AsyncConnectionEndpoint> callback) {
136         final HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver));
137         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
138         final String exchangeId = ExecSupport.getNextExchangeId();
139         clientContext.setExchangeId(exchangeId);
140         final Future<AsyncConnectionEndpoint> leaseFuture = manager.lease(
141                 exchangeId,
142                 route,
143                 null,
144                 connectionRequestTimeout,
145                 new FutureCallback<AsyncConnectionEndpoint>() {
146 
147                     @Override
148                     public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
149                         if (connectionEndpoint.isConnected()) {
150                             resultFuture.completed(connectionEndpoint);
151                         } else {
152                             final Future<AsyncConnectionEndpoint> connectFuture = manager.connect(
153                                     connectionEndpoint,
154                                     getConnectionInitiator(),
155                                     connectTimeout,
156                                     tlsConfig,
157                                     clientContext,
158                                     new FutureCallback<AsyncConnectionEndpoint>() {
159 
160                                         @Override
161                                         public void completed(final AsyncConnectionEndpoint result) {
162                                             resultFuture.completed(result);
163                                         }
164 
165                                         @Override
166                                         public void failed(final Exception ex) {
167                                             try {
168                                                 Closer.closeQuietly(connectionEndpoint);
169                                                 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
170                                             } finally {
171                                                 resultFuture.failed(ex);
172                                             }
173                                         }
174 
175                                         @Override
176                                         public void cancelled() {
177                                             try {
178                                                 Closer.closeQuietly(connectionEndpoint);
179                                                 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
180                                             } finally {
181                                                 resultFuture.cancel(true);
182                                             }
183                                         }
184 
185                                     });
186                             resultFuture.setDependency(connectFuture);
187                         }
188                     }
189 
190                     @Override
191                     public void failed(final Exception ex) {
192                         callback.failed(ex);
193                     }
194 
195                     @Override
196                     public void cancelled() {
197                         callback.cancelled();
198                     }
199 
200                 });
201         resultFuture.setDependency(leaseFuture);
202         return resultFuture;
203     }
204 
205     public Future<AsyncClientEndpoint> lease(
206             final HttpHost host,
207             final FutureCallback<AsyncClientEndpoint> callback) {
208         return lease(host, HttpClientContext.create(), callback);
209     }
210 
211     public Future<AsyncClientEndpoint> lease(
212             final HttpHost host,
213             final HttpContext context,
214             final FutureCallback<AsyncClientEndpoint> callback) {
215         Args.notNull(host, "Host");
216         Args.notNull(context, "HTTP context");
217         final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
218         if (!isRunning()) {
219             future.failed(new CancellationException("Connection lease cancelled"));
220             return future;
221         }
222         final HttpClientContext clientContext = HttpClientContext.adapt(context);
223         final RequestConfig requestConfig = clientContext.getRequestConfig();
224         final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
225         @SuppressWarnings("deprecation")
226         final Timeout connectTimeout = requestConfig.getConnectTimeout();
227         leaseEndpoint(
228                 host,
229                 connectionRequestTimeout,
230                 connectTimeout,
231                 clientContext,
232                 new FutureCallback<AsyncConnectionEndpoint>() {
233 
234                     @Override
235                     public void completed(final AsyncConnectionEndpoint result) {
236                         future.completed(new InternalAsyncClientEndpoint(result));
237                     }
238 
239                     @Override
240                     public void failed(final Exception ex) {
241                         future.failed(ex);
242                     }
243 
244                     @Override
245                     public void cancelled() {
246                         future.cancel(true);
247                     }
248 
249                 });
250         return future;
251     }
252 
253     @Override
254     public Cancellable execute(
255             final AsyncClientExchangeHandler exchangeHandler,
256             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
257             final HttpContext context) {
258         final ComplexCancellable cancellable = new ComplexCancellable();
259         try {
260             if (!isRunning()) {
261                 throw new CancellationException("Request execution cancelled");
262             }
263             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
264             exchangeHandler.produceRequest((request, entityDetails, context1) -> {
265                 RequestConfig requestConfig = null;
266                 if (request instanceof Configurable) {
267                     requestConfig = ((Configurable) request).getConfig();
268                 }
269                 if (requestConfig != null) {
270                     clientContext.setRequestConfig(requestConfig);
271                 } else {
272                     requestConfig = clientContext.getRequestConfig();
273                 }
274                 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
275                 @SuppressWarnings("deprecation")
276                 final Timeout connectTimeout = requestConfig.getConnectTimeout();
277                 final Timeout responseTimeout = requestConfig.getResponseTimeout();
278                 final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
279 
280                 final Future<AsyncConnectionEndpoint> leaseFuture = leaseEndpoint(
281                         target,
282                         connectionRequestTimeout,
283                         connectTimeout,
284                         clientContext,
285                         new FutureCallback<AsyncConnectionEndpoint>() {
286 
287                             @Override
288                             public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
289                                 final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(connectionEndpoint);
290                                 final AtomicInteger messageCountDown = new AtomicInteger(2);
291                                 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
292 
293                                     @Override
294                                     public void releaseResources() {
295                                         try {
296                                             exchangeHandler.releaseResources();
297                                         } finally {
298                                             endpoint.releaseAndDiscard();
299                                         }
300                                     }
301 
302                                     @Override
303                                     public void failed(final Exception cause) {
304                                         try {
305                                             exchangeHandler.failed(cause);
306                                         } finally {
307                                             endpoint.releaseAndDiscard();
308                                         }
309                                     }
310 
311                                     @Override
312                                     public void cancel() {
313                                         failed(new RequestFailedException("Request aborted"));
314                                     }
315 
316                                     @Override
317                                     public void produceRequest(
318                                             final RequestChannel channel,
319                                             final HttpContext context1) throws HttpException, IOException {
320                                         channel.sendRequest(request, entityDetails, context1);
321                                         if (entityDetails == null) {
322                                             messageCountDown.decrementAndGet();
323                                         }
324                                     }
325 
326                                     @Override
327                                     public int available() {
328                                         return exchangeHandler.available();
329                                     }
330 
331                                     @Override
332                                     public void produce(final DataStreamChannel channel) throws IOException {
333                                         exchangeHandler.produce(new DataStreamChannel() {
334 
335                                             @Override
336                                             public void requestOutput() {
337                                                 channel.requestOutput();
338                                             }
339 
340                                             @Override
341                                             public int write(final ByteBuffer src) throws IOException {
342                                                 return channel.write(src);
343                                             }
344 
345                                             @Override
346                                             public void endStream(final List<? extends Header> trailers) throws IOException {
347                                                 channel.endStream(trailers);
348                                                 if (messageCountDown.decrementAndGet() <= 0) {
349                                                     endpoint.releaseAndReuse();
350                                                 }
351                                             }
352 
353                                             @Override
354                                             public void endStream() throws IOException {
355                                                 channel.endStream();
356                                                 if (messageCountDown.decrementAndGet() <= 0) {
357                                                     endpoint.releaseAndReuse();
358                                                 }
359                                             }
360 
361                                         });
362                                     }
363 
364                                     @Override
365                                     public void consumeInformation(
366                                             final HttpResponse response,
367                                             final HttpContext context1) throws HttpException, IOException {
368                                         exchangeHandler.consumeInformation(response, context1);
369                                     }
370 
371                                     @Override
372                                     public void consumeResponse(
373                                             final HttpResponse response,
374                                             final EntityDetails entityDetails,
375                                             final HttpContext context1) throws HttpException, IOException {
376                                         exchangeHandler.consumeResponse(response, entityDetails, context1);
377                                         if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
378                                             messageCountDown.decrementAndGet();
379                                         }
380                                         if (entityDetails == null) {
381                                             if (messageCountDown.decrementAndGet() <= 0) {
382                                                 endpoint.releaseAndReuse();
383                                             }
384                                         }
385                                     }
386 
387                                     @Override
388                                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
389                                         exchangeHandler.updateCapacity(capacityChannel);
390                                     }
391 
392                                     @Override
393                                     public void consume(final ByteBuffer src) throws IOException {
394                                         exchangeHandler.consume(src);
395                                     }
396 
397                                     @Override
398                                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
399                                         if (messageCountDown.decrementAndGet() <= 0) {
400                                             endpoint.releaseAndReuse();
401                                         }
402                                         exchangeHandler.streamEnd(trailers);
403                                     }
404 
405                                 };
406                                 if (responseTimeout != null) {
407                                     endpoint.setSocketTimeout(responseTimeout);
408                                 }
409                                 endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
410                             }
411 
412                             @Override
413                             public void failed(final Exception ex) {
414                                 exchangeHandler.failed(ex);
415                             }
416 
417                             @Override
418                             public void cancelled() {
419                                 exchangeHandler.cancel();
420                             }
421 
422                         });
423 
424                 cancellable.setDependency(() -> leaseFuture.cancel(true));
425             }, context);
426 
427         } catch (final HttpException | IOException | IllegalStateException ex) {
428             exchangeHandler.failed(ex);
429         }
430         return cancellable;
431     }
432 
433     private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
434 
435         private final AsyncConnectionEndpoint connectionEndpoint;
436         private final AtomicBoolean released;
437 
438         InternalAsyncClientEndpoint(final AsyncConnectionEndpoint connectionEndpoint) {
439             this.connectionEndpoint = connectionEndpoint;
440             this.released = new AtomicBoolean(false);
441         }
442 
443         boolean isReleased() {
444             return released.get();
445         }
446 
447         @Override
448         public boolean isConnected() {
449             return !isReleased() && connectionEndpoint.isConnected();
450         }
451 
452         @Override
453         public void execute(
454                 final AsyncClientExchangeHandler exchangeHandler,
455                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
456                 final HttpContext context) {
457             Asserts.check(!released.get(), "Endpoint has already been released");
458 
459             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
460             final String exchangeId = ExecSupport.getNextExchangeId();
461             clientContext.setExchangeId(exchangeId);
462             if (LOG.isDebugEnabled()) {
463                 LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint));
464                 connectionEndpoint.execute(
465                         exchangeId,
466                         new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
467                         pushHandlerFactory,
468                         clientContext);
469             } else {
470                 connectionEndpoint.execute(exchangeId, exchangeHandler, clientContext);
471             }
472         }
473 
474         public void setSocketTimeout(final Timeout timeout) {
475             connectionEndpoint.setSocketTimeout(timeout);
476         }
477 
478         @Override
479         public void releaseAndReuse() {
480             if (released.compareAndSet(false, true)) {
481                 manager.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECOND);
482             }
483         }
484 
485         @Override
486         public void releaseAndDiscard() {
487             if (released.compareAndSet(false, true)) {
488                 Closer.closeQuietly(connectionEndpoint);
489                 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
490             }
491         }
492 
493     }
494 
495 }