View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.CancellationException;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.ThreadFactory;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import org.apache.hc.client5.http.HttpRoute;
39  import org.apache.hc.client5.http.SchemePortResolver;
40  import org.apache.hc.client5.http.config.Configurable;
41  import org.apache.hc.client5.http.config.RequestConfig;
42  import org.apache.hc.client5.http.impl.ConnPoolSupport;
43  import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
44  import org.apache.hc.client5.http.impl.ExecSupport;
45  import org.apache.hc.client5.http.impl.classic.RequestFailedException;
46  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
47  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
48  import org.apache.hc.client5.http.protocol.HttpClientContext;
49  import org.apache.hc.client5.http.routing.RoutingSupport;
50  import org.apache.hc.core5.annotation.Contract;
51  import org.apache.hc.core5.annotation.ThreadingBehavior;
52  import org.apache.hc.core5.concurrent.BasicFuture;
53  import org.apache.hc.core5.concurrent.Cancellable;
54  import org.apache.hc.core5.concurrent.ComplexCancellable;
55  import org.apache.hc.core5.concurrent.ComplexFuture;
56  import org.apache.hc.core5.concurrent.FutureCallback;
57  import org.apache.hc.core5.function.Callback;
58  import org.apache.hc.core5.http.EntityDetails;
59  import org.apache.hc.core5.http.Header;
60  import org.apache.hc.core5.http.HttpException;
61  import org.apache.hc.core5.http.HttpHost;
62  import org.apache.hc.core5.http.HttpRequest;
63  import org.apache.hc.core5.http.HttpResponse;
64  import org.apache.hc.core5.http.HttpStatus;
65  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
66  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
67  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
68  import org.apache.hc.core5.http.nio.CapacityChannel;
69  import org.apache.hc.core5.http.nio.DataStreamChannel;
70  import org.apache.hc.core5.http.nio.HandlerFactory;
71  import org.apache.hc.core5.http.nio.RequestChannel;
72  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
73  import org.apache.hc.core5.http.protocol.HttpContext;
74  import org.apache.hc.core5.http2.HttpVersionPolicy;
75  import org.apache.hc.core5.io.CloseMode;
76  import org.apache.hc.core5.io.Closer;
77  import org.apache.hc.core5.reactor.Command;
78  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
79  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
80  import org.apache.hc.core5.reactor.IOReactorConfig;
81  import org.apache.hc.core5.reactor.IOSession;
82  import org.apache.hc.core5.util.Args;
83  import org.apache.hc.core5.util.Asserts;
84  import org.apache.hc.core5.util.TimeValue;
85  import org.apache.hc.core5.util.Timeout;
86  import org.slf4j.Logger;
87  import org.slf4j.LoggerFactory;
88  
89  /**
90   * Minimal implementation of {@link CloseableHttpAsyncClient}. This client is
91   * optimized for HTTP/1.1 and HTTP/2 message transport and does not support
92   * advanced HTTP protocol functionality such as request execution via a proxy,
93   * state management, authentication and request redirects.
94   * <p>
95   * Concurrent message exchanges executed by this client will get assigned to
96   * separate connections leased from the connection pool.
97   * </p>
98   *
99   * @since 5.0
100  */
101 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
102 public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClientBase {
103 
104     private static final Logger LOG = LoggerFactory.getLogger(MinimalHttpAsyncClient.class);
105     private final AsyncClientConnectionManager manager;
106     private final SchemePortResolver schemePortResolver;
107     private final HttpVersionPolicy versionPolicy;
108 
109     MinimalHttpAsyncClient(
110             final IOEventHandlerFactory eventHandlerFactory,
111             final AsyncPushConsumerRegistry pushConsumerRegistry,
112             final HttpVersionPolicy versionPolicy,
113             final IOReactorConfig reactorConfig,
114             final ThreadFactory threadFactory,
115             final ThreadFactory workerThreadFactory,
116             final AsyncClientConnectionManager manager,
117             final SchemePortResolver schemePortResolver) {
118         super(new DefaultConnectingIOReactor(
119                 eventHandlerFactory,
120                 reactorConfig,
121                 workerThreadFactory,
122                 LoggingIOSessionDecorator.INSTANCE,
123                 LoggingExceptionCallback.INSTANCE,
124                 null,
125                 new Callback<IOSession>() {
126 
127                     @Override
128                     public void execute(final IOSession ioSession) {
129                         ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL);
130                     }
131 
132                 }),
133                 pushConsumerRegistry,
134                 threadFactory);
135         this.manager = manager;
136         this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
137         this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
138     }
139 
140     private Future<AsyncConnectionEndpoint> leaseEndpoint(
141             final HttpHost host,
142             final Timeout connectionRequestTimeout,
143             final Timeout connectTimeout,
144             final HttpClientContext clientContext,
145             final FutureCallback<AsyncConnectionEndpoint> callback) {
146         final HttpRoute/http/HttpRoute.html#HttpRoute">HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver));
147         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
148         final String exchangeId = ExecSupport.getNextExchangeId();
149         clientContext.setExchangeId(exchangeId);
150         final Future<AsyncConnectionEndpoint> leaseFuture = manager.lease(
151                 exchangeId,
152                 route,
153                 null,
154                 connectionRequestTimeout,
155                 new FutureCallback<AsyncConnectionEndpoint>() {
156 
157                     @Override
158                     public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
159                         if (connectionEndpoint.isConnected()) {
160                             resultFuture.completed(connectionEndpoint);
161                         } else {
162                             final Future<AsyncConnectionEndpoint> connectFuture = manager.connect(
163                                     connectionEndpoint,
164                                     getConnectionInitiator(),
165                                     connectTimeout,
166                                     versionPolicy,
167                                     clientContext,
168                                     new FutureCallback<AsyncConnectionEndpoint>() {
169 
170                                         @Override
171                                         public void completed(final AsyncConnectionEndpoint result) {
172                                             resultFuture.completed(result);
173                                         }
174 
175                                         @Override
176                                         public void failed(final Exception ex) {
177                                             resultFuture.failed(ex);
178                                         }
179 
180                                         @Override
181                                         public void cancelled() {
182                                             resultFuture.cancel(true);
183                                         }
184 
185                                     });
186                             resultFuture.setDependency(connectFuture);
187                         }
188                     }
189 
190                     @Override
191                     public void failed(final Exception ex) {
192                         callback.failed(ex);
193                     }
194 
195                     @Override
196                     public void cancelled() {
197                         callback.cancelled();
198                     }
199 
200                 });
201         resultFuture.setDependency(leaseFuture);
202         return resultFuture;
203     }
204 
205     public Future<AsyncClientEndpoint> lease(
206             final HttpHost host,
207             final FutureCallback<AsyncClientEndpoint> callback) {
208         return lease(host, HttpClientContext.create(), callback);
209     }
210 
211     public Future<AsyncClientEndpoint> lease(
212             final HttpHost host,
213             final HttpContext context,
214             final FutureCallback<AsyncClientEndpoint> callback) {
215         Args.notNull(host, "Host");
216         Args.notNull(context, "HTTP context");
217         final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
218         if (!isRunning()) {
219             future.failed(new CancellationException("Connection lease cancelled"));
220             return future;
221         }
222         final HttpClientContext clientContext = HttpClientContext.adapt(context);
223         final RequestConfig requestConfig = clientContext.getRequestConfig();
224         final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
225         final Timeout connectTimeout = requestConfig.getConnectTimeout();
226         leaseEndpoint(
227                 host,
228                 connectionRequestTimeout,
229                 connectTimeout,
230                 clientContext,
231                 new FutureCallback<AsyncConnectionEndpoint>() {
232 
233                     @Override
234                     public void completed(final AsyncConnectionEndpoint result) {
235                         future.completed(new InternalAsyncClientEndpoint(result));
236                     }
237 
238                     @Override
239                     public void failed(final Exception ex) {
240                         future.failed(ex);
241                     }
242 
243                     @Override
244                     public void cancelled() {
245                         future.cancel(true);
246                     }
247 
248                 });
249         return future;
250     }
251 
252     @Override
253     public Cancellable execute(
254             final AsyncClientExchangeHandler exchangeHandler,
255             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
256             final HttpContext context) {
257         final ComplexCancellable cancellable = new ComplexCancellable();
258         try {
259             if (!isRunning()) {
260                 throw new CancellationException("Request execution cancelled");
261             }
262             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
263             exchangeHandler.produceRequest(new RequestChannel() {
264 
265                 @Override
266                 public void sendRequest(
267                         final HttpRequest request,
268                         final EntityDetails entityDetails,
269                         final HttpContext context) throws HttpException, IOException {
270                     RequestConfig requestConfig = null;
271                     if (request instanceof Configurable) {
272                         requestConfig = ((Configurable) request).getConfig();
273                     }
274                     if (requestConfig != null) {
275                         clientContext.setRequestConfig(requestConfig);
276                     } else {
277                         requestConfig = clientContext.getRequestConfig();
278                     }
279                     final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
280                     final Timeout connectTimeout = requestConfig.getConnectTimeout();
281                     final Timeout responseTimeout = requestConfig.getResponseTimeout();
282                     final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
283 
284                     final Future<AsyncConnectionEndpoint> leaseFuture = leaseEndpoint(
285                             target,
286                             connectionRequestTimeout,
287                             connectTimeout,
288                             clientContext,
289                             new FutureCallback<AsyncConnectionEndpoint>() {
290 
291                                 @Override
292                                 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
293                                     final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(connectionEndpoint);
294                                     final AtomicInteger messageCountDown = new AtomicInteger(2);
295                                     final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
296 
297                                         @Override
298                                         public void releaseResources() {
299                                             try {
300                                                 exchangeHandler.releaseResources();
301                                             } finally {
302                                                 endpoint.releaseAndDiscard();
303                                             }
304                                         }
305 
306                                         @Override
307                                         public void failed(final Exception cause) {
308                                             try {
309                                                 exchangeHandler.failed(cause);
310                                             } finally {
311                                                 endpoint.releaseAndDiscard();
312                                             }
313                                         }
314 
315                                         @Override
316                                         public void cancel() {
317                                             failed(new RequestFailedException("Request aborted"));
318                                         }
319 
320                                         @Override
321                                         public void produceRequest(
322                                                 final RequestChannel channel,
323                                                 final HttpContext context) throws HttpException, IOException {
324                                             channel.sendRequest(request, entityDetails, context);
325                                             if (entityDetails == null) {
326                                                 messageCountDown.decrementAndGet();
327                                             }
328                                         }
329 
330                                         @Override
331                                         public int available() {
332                                             return exchangeHandler.available();
333                                         }
334 
335                                         @Override
336                                         public void produce(final DataStreamChannel channel) throws IOException {
337                                             exchangeHandler.produce(new DataStreamChannel() {
338 
339                                                 @Override
340                                                 public void requestOutput() {
341                                                     channel.requestOutput();
342                                                 }
343 
344                                                 @Override
345                                                 public int write(final ByteBuffer src) throws IOException {
346                                                     return channel.write(src);
347                                                 }
348 
349                                                 @Override
350                                                 public void endStream(final List<? extends Header> trailers) throws IOException {
351                                                     channel.endStream(trailers);
352                                                     if (messageCountDown.decrementAndGet() <= 0) {
353                                                         endpoint.releaseAndReuse();
354                                                     }
355                                                 }
356 
357                                                 @Override
358                                                 public void endStream() throws IOException {
359                                                     channel.endStream();
360                                                     if (messageCountDown.decrementAndGet() <= 0) {
361                                                         endpoint.releaseAndReuse();
362                                                     }
363                                                 }
364 
365                                             });
366                                         }
367 
368                                         @Override
369                                         public void consumeInformation(
370                                                 final HttpResponse response,
371                                                 final HttpContext context) throws HttpException, IOException {
372                                             exchangeHandler.consumeInformation(response, context);
373                                         }
374 
375                                         @Override
376                                         public void consumeResponse(
377                                                 final HttpResponse response,
378                                                 final EntityDetails entityDetails,
379                                                 final HttpContext context) throws HttpException, IOException {
380                                             exchangeHandler.consumeResponse(response, entityDetails, context);
381                                             if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
382                                                 messageCountDown.decrementAndGet();
383                                             }
384                                             if (entityDetails == null) {
385                                                 if (messageCountDown.decrementAndGet() <= 0) {
386                                                     endpoint.releaseAndReuse();
387                                                 }
388                                             }
389                                         }
390 
391                                         @Override
392                                         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
393                                             exchangeHandler.updateCapacity(capacityChannel);
394                                         }
395 
396                                         @Override
397                                         public void consume(final ByteBuffer src) throws IOException {
398                                             exchangeHandler.consume(src);
399                                         }
400 
401                                         @Override
402                                         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
403                                             if (messageCountDown.decrementAndGet() <= 0) {
404                                                 endpoint.releaseAndReuse();
405                                             }
406                                             exchangeHandler.streamEnd(trailers);
407                                         }
408 
409                                     };
410                                     if (responseTimeout != null) {
411                                         endpoint.setSocketTimeout(responseTimeout);
412                                     }
413                                     endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
414                                 }
415 
416                                 @Override
417                                 public void failed(final Exception ex) {
418                                     exchangeHandler.failed(ex);
419                                 }
420 
421                                 @Override
422                                 public void cancelled() {
423                                     exchangeHandler.cancel();
424                                 }
425 
426                             });
427 
428                     cancellable.setDependency(new Cancellable() {
429 
430                         @Override
431                         public boolean cancel() {
432                             return leaseFuture.cancel(true);
433                         }
434 
435                     });
436                 }
437             }, context);
438 
439         } catch (final HttpException | IOException | IllegalStateException ex) {
440             exchangeHandler.failed(ex);
441         }
442         return cancellable;
443     }
444 
445     private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
446 
447         private final AsyncConnectionEndpoint connectionEndpoint;
448         private final AtomicBoolean released;
449 
450         InternalAsyncClientEndpoint(final AsyncConnectionEndpoint connectionEndpoint) {
451             this.connectionEndpoint = connectionEndpoint;
452             this.released = new AtomicBoolean(false);
453         }
454 
455         boolean isReleased() {
456             return released.get();
457         }
458 
459         @Override
460         public boolean isConnected() {
461             return !isReleased() && connectionEndpoint.isConnected();
462         }
463 
464         @Override
465         public void execute(
466                 final AsyncClientExchangeHandler exchangeHandler,
467                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
468                 final HttpContext context) {
469             Asserts.check(!released.get(), "Endpoint has already been released");
470 
471             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
472             final String exchangeId = ExecSupport.getNextExchangeId();
473             clientContext.setExchangeId(exchangeId);
474             if (LOG.isDebugEnabled()) {
475                 LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint));
476                 connectionEndpoint.execute(
477                         exchangeId,
478                         new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
479                         pushHandlerFactory,
480                         clientContext);
481             } else {
482                 connectionEndpoint.execute(exchangeId, exchangeHandler, clientContext);
483             }
484         }
485 
486         public void setSocketTimeout(final Timeout timeout) {
487             connectionEndpoint.setSocketTimeout(timeout);
488         }
489 
490         @Override
491         public void releaseAndReuse() {
492             if (released.compareAndSet(false, true)) {
493                 manager.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECOND);
494             }
495         }
496 
497         @Override
498         public void releaseAndDiscard() {
499             if (released.compareAndSet(false, true)) {
500                 Closer.closeQuietly(connectionEndpoint);
501                 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
502             }
503         }
504 
505     }
506 
507 }