View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.bootstrap;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.util.List;
33  import java.util.Set;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import org.apache.hc.core5.annotation.Internal;
38  import org.apache.hc.core5.concurrent.BasicFuture;
39  import org.apache.hc.core5.concurrent.CallbackContribution;
40  import org.apache.hc.core5.concurrent.ComplexFuture;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.concurrent.FutureContribution;
43  import org.apache.hc.core5.function.Callback;
44  import org.apache.hc.core5.function.Decorator;
45  import org.apache.hc.core5.http.ConnectionClosedException;
46  import org.apache.hc.core5.http.EntityDetails;
47  import org.apache.hc.core5.http.Header;
48  import org.apache.hc.core5.http.HttpConnection;
49  import org.apache.hc.core5.http.HttpException;
50  import org.apache.hc.core5.http.HttpHost;
51  import org.apache.hc.core5.http.HttpResponse;
52  import org.apache.hc.core5.http.ProtocolException;
53  import org.apache.hc.core5.http.impl.DefaultAddressResolver;
54  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
55  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
56  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
57  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
58  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
59  import org.apache.hc.core5.http.nio.CapacityChannel;
60  import org.apache.hc.core5.http.nio.DataStreamChannel;
61  import org.apache.hc.core5.http.nio.HandlerFactory;
62  import org.apache.hc.core5.http.nio.RequestChannel;
63  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
65  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
66  import org.apache.hc.core5.http.nio.ssl.TlsUpgradeCapable;
67  import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
68  import org.apache.hc.core5.http.protocol.HttpContext;
69  import org.apache.hc.core5.http.protocol.HttpCoreContext;
70  import org.apache.hc.core5.io.CloseMode;
71  import org.apache.hc.core5.net.NamedEndpoint;
72  import org.apache.hc.core5.net.URIAuthority;
73  import org.apache.hc.core5.pool.ConnPoolControl;
74  import org.apache.hc.core5.pool.ManagedConnPool;
75  import org.apache.hc.core5.pool.PoolEntry;
76  import org.apache.hc.core5.pool.PoolStats;
77  import org.apache.hc.core5.reactor.Command;
78  import org.apache.hc.core5.reactor.EndpointParameters;
79  import org.apache.hc.core5.reactor.IOEventHandler;
80  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
81  import org.apache.hc.core5.reactor.IOReactorConfig;
82  import org.apache.hc.core5.reactor.IOSession;
83  import org.apache.hc.core5.reactor.IOSessionListener;
84  import org.apache.hc.core5.reactor.ProtocolIOSession;
85  import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
86  import org.apache.hc.core5.util.Args;
87  import org.apache.hc.core5.util.TimeValue;
88  import org.apache.hc.core5.util.Timeout;
89  
90  /**
91   * HTTP/1.1 client side message exchange initiator.
92   *
93   * @since 5.0
94   */
95  public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
96  
97      private final ManagedConnPool<HttpHost, IOSession> connPool;
98      private final TlsStrategy tlsStrategy;
99      private final Timeout handshakeTimeout;
100 
101     /**
102      * Use {@link AsyncRequesterBootstrap} to create instances of this class.
103      *
104      * @since 5.2
105      */
106     @Internal
107     public HttpAsyncRequester(
108             final IOReactorConfig ioReactorConfig,
109             final IOEventHandlerFactory eventHandlerFactory,
110             final Decorator<IOSession> ioSessionDecorator,
111             final Callback<Exception> exceptionCallback,
112             final IOSessionListener sessionListener,
113             final ManagedConnPool<HttpHost, IOSession> connPool,
114             final TlsStrategy tlsStrategy,
115             final Timeout handshakeTimeout) {
116         super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
117                 ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
118         this.connPool = Args.notNull(connPool, "Connection pool");
119         this.tlsStrategy = tlsStrategy;
120         this.handshakeTimeout = handshakeTimeout;
121     }
122 
123     /**
124      * Use {@link AsyncRequesterBootstrap} to create instances of this class.
125      */
126     @Internal
127     public HttpAsyncRequester(
128             final IOReactorConfig ioReactorConfig,
129             final IOEventHandlerFactory eventHandlerFactory,
130             final Decorator<IOSession> ioSessionDecorator,
131             final Callback<Exception> exceptionCallback,
132             final IOSessionListener sessionListener,
133             final ManagedConnPool<HttpHost, IOSession> connPool) {
134         this(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
135                 null, null);
136     }
137 
138     @Override
139     public PoolStats getTotalStats() {
140         return connPool.getTotalStats();
141     }
142 
143     @Override
144     public PoolStats getStats(final HttpHost route) {
145         return connPool.getStats(route);
146     }
147 
148     @Override
149     public void setMaxTotal(final int max) {
150         connPool.setMaxTotal(max);
151     }
152 
153     @Override
154     public int getMaxTotal() {
155         return connPool.getMaxTotal();
156     }
157 
158     @Override
159     public void setDefaultMaxPerRoute(final int max) {
160         connPool.setDefaultMaxPerRoute(max);
161     }
162 
163     @Override
164     public int getDefaultMaxPerRoute() {
165         return connPool.getDefaultMaxPerRoute();
166     }
167 
168     @Override
169     public void setMaxPerRoute(final HttpHost route, final int max) {
170         connPool.setMaxPerRoute(route, max);
171     }
172 
173     @Override
174     public int getMaxPerRoute(final HttpHost route) {
175         return connPool.getMaxPerRoute(route);
176     }
177 
178     @Override
179     public void closeIdle(final TimeValue idleTime) {
180         connPool.closeIdle(idleTime);
181     }
182 
183     @Override
184     public void closeExpired() {
185         connPool.closeExpired();
186     }
187 
188     @Override
189     public Set<HttpHost> getRoutes() {
190         return connPool.getRoutes();
191     }
192 
193     public Future<AsyncClientEndpoint> connect(
194             final HttpHost host,
195             final Timeout timeout,
196             final Object attachment,
197             final FutureCallback<AsyncClientEndpoint> callback) {
198         return doConnect(host, timeout, attachment, callback);
199     }
200 
201     protected Future<AsyncClientEndpoint> doConnect(
202             final HttpHost host,
203             final Timeout timeout,
204             final Object attachment,
205             final FutureCallback<AsyncClientEndpoint> callback) {
206         Args.notNull(host, "Host");
207         Args.notNull(timeout, "Timeout");
208         final ComplexFuture<AsyncClientEndpoint> resultFuture = new ComplexFuture<>(callback);
209         final Future<PoolEntry<HttpHost, IOSession>> leaseFuture = connPool.lease(
210                 host, null, timeout, new FutureCallback<PoolEntry<HttpHost, IOSession>>() {
211 
212                     @Override
213                     public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
214                         final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
215                         final IOSession ioSession = poolEntry.getConnection();
216                         if (ioSession != null && !ioSession.isOpen()) {
217                             poolEntry.discardConnection(CloseMode.IMMEDIATE);
218                         }
219                         if (poolEntry.hasConnection()) {
220                             resultFuture.completed(endpoint);
221                         } else {
222                             final Future<IOSession> future = requestSession(
223                                     host,
224                                     timeout,
225                                     new EndpointParameters(host, attachment),
226                                     new FutureCallback<IOSession>() {
227 
228                                         @Override
229                                         public void completed(final IOSession session) {
230                                             session.setSocketTimeout(timeout);
231                                             poolEntry.assignConnection(session);
232                                             resultFuture.completed(endpoint);
233                                         }
234 
235                                         @Override
236                                         public void failed(final Exception cause) {
237                                             try {
238                                                 resultFuture.failed(cause);
239                                             } finally {
240                                                 endpoint.releaseAndDiscard();
241                                             }
242                                         }
243 
244                                         @Override
245                                         public void cancelled() {
246                                             try {
247                                                 resultFuture.cancel();
248                                             } finally {
249                                                 endpoint.releaseAndDiscard();
250                                             }
251                                         }
252 
253                                     });
254                             resultFuture.setDependency(future);
255                         }
256                     }
257 
258                     @Override
259                     public void failed(final Exception ex) {
260                         resultFuture.failed(ex);
261                     }
262 
263                     @Override
264                     public void cancelled() {
265                         resultFuture.cancel();
266                     }
267 
268                 });
269         resultFuture.setDependency(leaseFuture);
270         return resultFuture;
271     }
272 
273     public Future<AsyncClientEndpoint> connect(final HttpHost host, final Timeout timeout) {
274         return connect(host, timeout, null, null);
275     }
276 
277     public void execute(
278             final AsyncClientExchangeHandler exchangeHandler,
279             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
280             final Timeout timeout,
281             final HttpContext executeContext) {
282         Args.notNull(exchangeHandler, "Exchange handler");
283         Args.notNull(timeout, "Timeout");
284         Args.notNull(executeContext, "Context");
285         try {
286             exchangeHandler.produceRequest((request, entityDetails, requestContext) -> {
287                 final String scheme = request.getScheme();
288                 final URIAuthority authority = request.getAuthority();
289                 if (authority == null) {
290                     throw new ProtocolException("Request authority not specified");
291                 }
292                 final HttpHost target = new HttpHost(scheme, authority);
293                 connect(target, timeout, null, new FutureCallback<AsyncClientEndpoint>() {
294 
295                     @Override
296                     public void completed(final AsyncClientEndpoint endpoint) {
297                         endpoint.execute(new AsyncClientExchangeHandler() {
298 
299                             @Override
300                             public void releaseResources() {
301                                 endpoint.releaseAndDiscard();
302                                 exchangeHandler.releaseResources();
303                             }
304 
305                             @Override
306                             public void failed(final Exception cause) {
307                                 endpoint.releaseAndDiscard();
308                                 exchangeHandler.failed(cause);
309                             }
310 
311                             @Override
312                             public void cancel() {
313                                 endpoint.releaseAndDiscard();
314                                 exchangeHandler.cancel();
315                             }
316 
317                             @Override
318                             public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
319                                 channel.sendRequest(request, entityDetails, httpContext);
320                             }
321 
322                             @Override
323                             public int available() {
324                                 return exchangeHandler.available();
325                             }
326 
327                             @Override
328                             public void produce(final DataStreamChannel channel) throws IOException {
329                                 exchangeHandler.produce(channel);
330                             }
331 
332                             @Override
333                             public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
334                                 exchangeHandler.consumeInformation(response, httpContext);
335                             }
336 
337                             @Override
338                             public void consumeResponse(
339                                     final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
340                                 if (entityDetails == null) {
341                                     endpoint.releaseAndReuse();
342                                 }
343                                 exchangeHandler.consumeResponse(response, entityDetails, httpContext);
344                             }
345 
346                             @Override
347                             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
348                                 exchangeHandler.updateCapacity(capacityChannel);
349                             }
350 
351                             @Override
352                             public void consume(final ByteBuffer src) throws IOException {
353                                 exchangeHandler.consume(src);
354                             }
355 
356                             @Override
357                             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
358                                 endpoint.releaseAndReuse();
359                                 exchangeHandler.streamEnd(trailers);
360                             }
361 
362                         }, pushHandlerFactory, executeContext);
363 
364                     }
365 
366                     @Override
367                     public void failed(final Exception ex) {
368                         exchangeHandler.failed(ex);
369                     }
370 
371                     @Override
372                     public void cancelled() {
373                         exchangeHandler.cancel();
374                     }
375 
376                 });
377 
378             }, executeContext);
379 
380         } catch (final IOException | HttpException ex) {
381             exchangeHandler.failed(ex);
382         }
383     }
384 
385     public void execute(
386             final AsyncClientExchangeHandler exchangeHandler,
387             final Timeout timeout,
388             final HttpContext executeContext) {
389         execute(exchangeHandler, null, timeout, executeContext);
390     }
391 
392     public final <T> Future<T> execute(
393             final AsyncRequestProducer requestProducer,
394             final AsyncResponseConsumer<T> responseConsumer,
395             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
396             final Timeout timeout,
397             final HttpContext context,
398             final FutureCallback<T> callback) {
399         Args.notNull(requestProducer, "Request producer");
400         Args.notNull(responseConsumer, "Response consumer");
401         Args.notNull(timeout, "Timeout");
402         final BasicFuture<T> future = new BasicFuture<>(callback);
403         final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(
404                 requestProducer,
405                 responseConsumer,
406                 new FutureContribution<T>(future) {
407 
408                     @Override
409                     public void completed(final T result) {
410                         future.completed(result);
411                     }
412 
413                 });
414         execute(exchangeHandler, pushHandlerFactory, timeout, context != null ? context : HttpCoreContext.create());
415         return future;
416     }
417 
418     public final <T> Future<T> execute(
419             final AsyncRequestProducer requestProducer,
420             final AsyncResponseConsumer<T> responseConsumer,
421             final Timeout timeout,
422             final HttpContext context,
423             final FutureCallback<T> callback) {
424         return execute(requestProducer, responseConsumer, null, timeout, context, callback);
425     }
426 
427     public final <T> Future<T> execute(
428             final AsyncRequestProducer requestProducer,
429             final AsyncResponseConsumer<T> responseConsumer,
430             final Timeout timeout,
431             final FutureCallback<T> callback) {
432         return execute(requestProducer, responseConsumer, null, timeout, null, callback);
433     }
434 
435     protected void doTlsUpgrade(
436             final ProtocolIOSession ioSession,
437             final NamedEndpoint endpoint,
438             final FutureCallback<ProtocolIOSession> callback) {
439         if (tlsStrategy != null) {
440             tlsStrategy.upgrade(ioSession,
441                     endpoint,
442                     null,
443                     handshakeTimeout,
444                     new CallbackContribution<TransportSecurityLayer>(callback) {
445 
446                         @Override
447                         public void completed(final TransportSecurityLayer transportSecurityLayer) {
448                             if (callback != null) {
449                                 callback.completed(ioSession);
450                             }
451                         }
452 
453                     });
454         } else {
455             throw new IllegalStateException("TLS upgrade not supported");
456         }
457     }
458 
459     private class InternalAsyncClientEndpoint extends AsyncClientEndpoint implements TlsUpgradeCapable {
460 
461         final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
462 
463         InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
464             this.poolEntryRef = new AtomicReference<>(poolEntry);
465         }
466 
467         private IOSession getIOSession() {
468             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
469             if (poolEntry == null) {
470                 throw new IllegalStateException("Endpoint has already been released");
471             }
472             final IOSession ioSession = poolEntry.getConnection();
473             if (ioSession == null) {
474                 throw new IllegalStateException("I/O session is invalid");
475             }
476             return ioSession;
477         }
478 
479         @Override
480         public void execute(
481                 final AsyncClientExchangeHandler exchangeHandler,
482                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
483                 final HttpContext context) {
484             final IOSession ioSession = getIOSession();
485             ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
486             if (!ioSession.isOpen()) {
487                 try {
488                     exchangeHandler.failed(new ConnectionClosedException());
489                 } finally {
490                     exchangeHandler.releaseResources();
491                 }
492             }
493         }
494 
495         @Override
496         public boolean isConnected() {
497             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
498             if (poolEntry != null) {
499                 final IOSession ioSession = poolEntry.getConnection();
500                 if (ioSession == null || !ioSession.isOpen()) {
501                     return false;
502                 }
503                 final IOEventHandler handler = ioSession.getHandler();
504                 return (handler instanceof HttpConnection) && ((HttpConnection) handler).isOpen();
505             }
506             return false;
507         }
508 
509         @Override
510         public void releaseAndReuse() {
511             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
512             if (poolEntry != null) {
513                 final IOSession ioSession = poolEntry.getConnection();
514                 connPool.release(poolEntry, ioSession != null && ioSession.isOpen());
515             }
516         }
517 
518         @Override
519         public void releaseAndDiscard() {
520             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
521             if (poolEntry != null) {
522                 poolEntry.discardConnection(CloseMode.GRACEFUL);
523                 connPool.release(poolEntry, false);
524             }
525         }
526 
527         @Override
528         public void tlsUpgrade(final NamedEndpoint endpoint, final FutureCallback<ProtocolIOSession> callback) {
529             final IOSession ioSession = getIOSession();
530             if (ioSession instanceof ProtocolIOSession) {
531                 doTlsUpgrade((ProtocolIOSession) ioSession, endpoint, callback);
532             } else {
533                 throw new IllegalStateException("TLS upgrade not supported");
534             }
535         }
536     }
537 
538 }