View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.CancellationException;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.ThreadFactory;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import org.apache.hc.client5.http.HttpRoute;
39  import org.apache.hc.client5.http.SchemePortResolver;
40  import org.apache.hc.client5.http.config.Configurable;
41  import org.apache.hc.client5.http.config.RequestConfig;
42  import org.apache.hc.client5.http.config.TlsConfig;
43  import org.apache.hc.client5.http.impl.ConnPoolSupport;
44  import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
45  import org.apache.hc.client5.http.impl.ExecSupport;
46  import org.apache.hc.client5.http.impl.classic.RequestFailedException;
47  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
48  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
49  import org.apache.hc.client5.http.protocol.HttpClientContext;
50  import org.apache.hc.client5.http.routing.RoutingSupport;
51  import org.apache.hc.core5.annotation.Contract;
52  import org.apache.hc.core5.annotation.ThreadingBehavior;
53  import org.apache.hc.core5.concurrent.BasicFuture;
54  import org.apache.hc.core5.concurrent.Cancellable;
55  import org.apache.hc.core5.concurrent.ComplexCancellable;
56  import org.apache.hc.core5.concurrent.ComplexFuture;
57  import org.apache.hc.core5.concurrent.FutureCallback;
58  import org.apache.hc.core5.http.EntityDetails;
59  import org.apache.hc.core5.http.Header;
60  import org.apache.hc.core5.http.HttpException;
61  import org.apache.hc.core5.http.HttpHost;
62  import org.apache.hc.core5.http.HttpResponse;
63  import org.apache.hc.core5.http.HttpStatus;
64  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
65  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
66  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
67  import org.apache.hc.core5.http.nio.CapacityChannel;
68  import org.apache.hc.core5.http.nio.DataStreamChannel;
69  import org.apache.hc.core5.http.nio.HandlerFactory;
70  import org.apache.hc.core5.http.nio.RequestChannel;
71  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
72  import org.apache.hc.core5.http.protocol.HttpContext;
73  import org.apache.hc.core5.io.CloseMode;
74  import org.apache.hc.core5.io.Closer;
75  import org.apache.hc.core5.reactor.Command;
76  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
77  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
78  import org.apache.hc.core5.reactor.IOReactorConfig;
79  import org.apache.hc.core5.util.Args;
80  import org.apache.hc.core5.util.Asserts;
81  import org.apache.hc.core5.util.TimeValue;
82  import org.apache.hc.core5.util.Timeout;
83  import org.slf4j.Logger;
84  import org.slf4j.LoggerFactory;
85  
86  /**
87   * Minimal implementation of {@link CloseableHttpAsyncClient}. This client is
88   * optimized for HTTP/1.1 and HTTP/2 message transport and does not support
89   * advanced HTTP protocol functionality such as request execution via a proxy,
90   * state management, authentication and request redirects.
91   * <p>
92   * Concurrent message exchanges executed by this client will get assigned to
93   * separate connections leased from the connection pool.
94   * </p>
95   *
96   * @since 5.0
97   */
98  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
99  public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClientBase {
100 
101     private static final Logger LOG = LoggerFactory.getLogger(MinimalHttpAsyncClient.class);
102     private final AsyncClientConnectionManager manager;
103     private final SchemePortResolver schemePortResolver;
104     private final TlsConfig tlsConfig;
105 
106     MinimalHttpAsyncClient(
107             final IOEventHandlerFactory eventHandlerFactory,
108             final AsyncPushConsumerRegistry pushConsumerRegistry,
109             final IOReactorConfig reactorConfig,
110             final ThreadFactory threadFactory,
111             final ThreadFactory workerThreadFactory,
112             final AsyncClientConnectionManager manager,
113             final SchemePortResolver schemePortResolver,
114             final TlsConfig tlsConfig) {
115         super(new DefaultConnectingIOReactor(
116                         eventHandlerFactory,
117                         reactorConfig,
118                         workerThreadFactory,
119                         LoggingIOSessionDecorator.INSTANCE,
120                         LoggingExceptionCallback.INSTANCE,
121                         null,
122                         ioSession -> ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL)),
123                 pushConsumerRegistry,
124                 threadFactory);
125         this.manager = manager;
126         this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
127         this.tlsConfig = tlsConfig;
128     }
129 
130     private Future<AsyncConnectionEndpoint> leaseEndpoint(
131             final HttpHost host,
132             final Timeout connectionRequestTimeout,
133             final Timeout connectTimeout,
134             final HttpClientContext clientContext,
135             final FutureCallback<AsyncConnectionEndpoint> callback) {
136         final HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver));
137         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
138         final String exchangeId = ExecSupport.getNextExchangeId();
139         clientContext.setExchangeId(exchangeId);
140         final Future<AsyncConnectionEndpoint> leaseFuture = manager.lease(
141                 exchangeId,
142                 route,
143                 null,
144                 connectionRequestTimeout,
145                 new FutureCallback<AsyncConnectionEndpoint>() {
146 
147                     @Override
148                     public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
149                         if (connectionEndpoint.isConnected()) {
150                             resultFuture.completed(connectionEndpoint);
151                         } else {
152                             final Future<AsyncConnectionEndpoint> connectFuture = manager.connect(
153                                     connectionEndpoint,
154                                     getConnectionInitiator(),
155                                     connectTimeout,
156                                     tlsConfig,
157                                     clientContext,
158                                     new FutureCallback<AsyncConnectionEndpoint>() {
159 
160                                         @Override
161                                         public void completed(final AsyncConnectionEndpoint result) {
162                                             resultFuture.completed(result);
163                                         }
164 
165                                         @Override
166                                         public void failed(final Exception ex) {
167                                             try {
168                                                 Closer.closeQuietly(connectionEndpoint);
169                                                 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
170                                             } finally {
171                                                 resultFuture.failed(ex);
172                                             }
173                                         }
174 
175                                         @Override
176                                         public void cancelled() {
177                                             try {
178                                                 Closer.closeQuietly(connectionEndpoint);
179                                                 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
180                                             } finally {
181                                                 resultFuture.cancel(true);
182                                             }
183                                         }
184 
185                                     });
186                             resultFuture.setDependency(connectFuture);
187                         }
188                     }
189 
190                     @Override
191                     public void failed(final Exception ex) {
192                         callback.failed(ex);
193                     }
194 
195                     @Override
196                     public void cancelled() {
197                         callback.cancelled();
198                     }
199 
200                 });
201         resultFuture.setDependency(leaseFuture);
202         return resultFuture;
203     }
204 
205     public Future<AsyncClientEndpoint> lease(
206             final HttpHost host,
207             final FutureCallback<AsyncClientEndpoint> callback) {
208         return lease(host, null, callback);
209     }
210 
211     public Future<AsyncClientEndpoint> lease(
212             final HttpHost host,
213             final HttpContext context,
214             final FutureCallback<AsyncClientEndpoint> callback) {
215         Args.notNull(host, "Host");
216         final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
217         if (!isRunning()) {
218             future.failed(new CancellationException("Connection lease cancelled"));
219             return future;
220         }
221         final HttpClientContext clientContext = HttpClientContext.castOrCreate(context);
222         final RequestConfig requestConfig = clientContext.getRequestConfigOrDefault();
223         final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
224         @SuppressWarnings("deprecation")
225         final Timeout connectTimeout = requestConfig.getConnectTimeout();
226         leaseEndpoint(
227                 host,
228                 connectionRequestTimeout,
229                 connectTimeout,
230                 clientContext,
231                 new FutureCallback<AsyncConnectionEndpoint>() {
232 
233                     @Override
234                     public void completed(final AsyncConnectionEndpoint result) {
235                         future.completed(new InternalAsyncClientEndpoint(result));
236                     }
237 
238                     @Override
239                     public void failed(final Exception ex) {
240                         future.failed(ex);
241                     }
242 
243                     @Override
244                     public void cancelled() {
245                         future.cancel(true);
246                     }
247 
248                 });
249         return future;
250     }
251 
252     @Override
253     public Cancellable execute(
254             final AsyncClientExchangeHandler exchangeHandler,
255             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
256             final HttpContext context) {
257         final ComplexCancellable cancellable = new ComplexCancellable();
258         try {
259             if (!isRunning()) {
260                 throw new CancellationException("Request execution cancelled");
261             }
262             final HttpClientContext clientContext = HttpClientContext.castOrCreate(context);
263             exchangeHandler.produceRequest((request, entityDetails, context1) -> {
264                 RequestConfig requestConfig = null;
265                 if (request instanceof Configurable) {
266                     requestConfig = ((Configurable) request).getConfig();
267                 }
268                 if (requestConfig != null) {
269                     clientContext.setRequestConfig(requestConfig);
270                 } else {
271                     requestConfig = clientContext.getRequestConfigOrDefault();
272                 }
273                 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
274                 @SuppressWarnings("deprecation")
275                 final Timeout connectTimeout = requestConfig.getConnectTimeout();
276                 final Timeout responseTimeout = requestConfig.getResponseTimeout();
277                 final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
278 
279                 final Future<AsyncConnectionEndpoint> leaseFuture = leaseEndpoint(
280                         target,
281                         connectionRequestTimeout,
282                         connectTimeout,
283                         clientContext,
284                         new FutureCallback<AsyncConnectionEndpoint>() {
285 
286                             @Override
287                             public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
288                                 final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(connectionEndpoint);
289                                 final AtomicInteger messageCountDown = new AtomicInteger(2);
290                                 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
291 
292                                     @Override
293                                     public void releaseResources() {
294                                         try {
295                                             exchangeHandler.releaseResources();
296                                         } finally {
297                                             endpoint.releaseAndDiscard();
298                                         }
299                                     }
300 
301                                     @Override
302                                     public void failed(final Exception cause) {
303                                         try {
304                                             exchangeHandler.failed(cause);
305                                         } finally {
306                                             endpoint.releaseAndDiscard();
307                                         }
308                                     }
309 
310                                     @Override
311                                     public void cancel() {
312                                         failed(new RequestFailedException("Request aborted"));
313                                     }
314 
315                                     @Override
316                                     public void produceRequest(
317                                             final RequestChannel channel,
318                                             final HttpContext context1) throws HttpException, IOException {
319                                         channel.sendRequest(request, entityDetails, context1);
320                                         if (entityDetails == null) {
321                                             messageCountDown.decrementAndGet();
322                                         }
323                                     }
324 
325                                     @Override
326                                     public int available() {
327                                         return exchangeHandler.available();
328                                     }
329 
330                                     @Override
331                                     public void produce(final DataStreamChannel channel) throws IOException {
332                                         exchangeHandler.produce(new DataStreamChannel() {
333 
334                                             @Override
335                                             public void requestOutput() {
336                                                 channel.requestOutput();
337                                             }
338 
339                                             @Override
340                                             public int write(final ByteBuffer src) throws IOException {
341                                                 return channel.write(src);
342                                             }
343 
344                                             @Override
345                                             public void endStream(final List<? extends Header> trailers) throws IOException {
346                                                 channel.endStream(trailers);
347                                                 if (messageCountDown.decrementAndGet() <= 0) {
348                                                     endpoint.releaseAndReuse();
349                                                 }
350                                             }
351 
352                                             @Override
353                                             public void endStream() throws IOException {
354                                                 channel.endStream();
355                                                 if (messageCountDown.decrementAndGet() <= 0) {
356                                                     endpoint.releaseAndReuse();
357                                                 }
358                                             }
359 
360                                         });
361                                     }
362 
363                                     @Override
364                                     public void consumeInformation(
365                                             final HttpResponse response,
366                                             final HttpContext context1) throws HttpException, IOException {
367                                         exchangeHandler.consumeInformation(response, context1);
368                                     }
369 
370                                     @Override
371                                     public void consumeResponse(
372                                             final HttpResponse response,
373                                             final EntityDetails entityDetails,
374                                             final HttpContext context1) throws HttpException, IOException {
375                                         exchangeHandler.consumeResponse(response, entityDetails, context1);
376                                         if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
377                                             messageCountDown.decrementAndGet();
378                                         }
379                                         if (entityDetails == null) {
380                                             if (messageCountDown.decrementAndGet() <= 0) {
381                                                 endpoint.releaseAndReuse();
382                                             }
383                                         }
384                                     }
385 
386                                     @Override
387                                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
388                                         exchangeHandler.updateCapacity(capacityChannel);
389                                     }
390 
391                                     @Override
392                                     public void consume(final ByteBuffer src) throws IOException {
393                                         exchangeHandler.consume(src);
394                                     }
395 
396                                     @Override
397                                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
398                                         if (messageCountDown.decrementAndGet() <= 0) {
399                                             endpoint.releaseAndReuse();
400                                         }
401                                         exchangeHandler.streamEnd(trailers);
402                                     }
403 
404                                 };
405                                 if (responseTimeout != null) {
406                                     endpoint.setSocketTimeout(responseTimeout);
407                                 }
408                                 endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
409                             }
410 
411                             @Override
412                             public void failed(final Exception ex) {
413                                 exchangeHandler.failed(ex);
414                             }
415 
416                             @Override
417                             public void cancelled() {
418                                 exchangeHandler.cancel();
419                             }
420 
421                         });
422 
423                 cancellable.setDependency(() -> leaseFuture.cancel(true));
424             }, context);
425 
426         } catch (final HttpException | IOException | IllegalStateException ex) {
427             exchangeHandler.failed(ex);
428         }
429         return cancellable;
430     }
431 
432     private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
433 
434         private final AsyncConnectionEndpoint connectionEndpoint;
435         private final AtomicBoolean released;
436 
437         InternalAsyncClientEndpoint(final AsyncConnectionEndpoint connectionEndpoint) {
438             this.connectionEndpoint = connectionEndpoint;
439             this.released = new AtomicBoolean(false);
440         }
441 
442         boolean isReleased() {
443             return released.get();
444         }
445 
446         @Override
447         public boolean isConnected() {
448             return !isReleased() && connectionEndpoint.isConnected();
449         }
450 
451         @Override
452         public void execute(
453                 final AsyncClientExchangeHandler exchangeHandler,
454                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
455                 final HttpContext context) {
456             Asserts.check(!released.get(), "Endpoint has already been released");
457 
458             final HttpClientContext clientContext = HttpClientContext.castOrCreate(context);
459             final String exchangeId = ExecSupport.getNextExchangeId();
460             clientContext.setExchangeId(exchangeId);
461             if (LOG.isDebugEnabled()) {
462                 LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint));
463                 connectionEndpoint.execute(
464                         exchangeId,
465                         new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
466                         pushHandlerFactory,
467                         clientContext);
468             } else {
469                 connectionEndpoint.execute(exchangeId, exchangeHandler, clientContext);
470             }
471         }
472 
473         public void setSocketTimeout(final Timeout timeout) {
474             connectionEndpoint.setSocketTimeout(timeout);
475         }
476 
477         @Override
478         public void releaseAndReuse() {
479             if (released.compareAndSet(false, true)) {
480                 manager.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECOND);
481             }
482         }
483 
484         @Override
485         public void releaseAndDiscard() {
486             if (released.compareAndSet(false, true)) {
487                 Closer.closeQuietly(connectionEndpoint);
488                 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
489             }
490         }
491 
492     }
493 
494 }