View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.bootstrap;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.util.List;
33  import java.util.Set;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import org.apache.hc.core5.annotation.Internal;
38  import org.apache.hc.core5.concurrent.BasicFuture;
39  import org.apache.hc.core5.concurrent.ComplexFuture;
40  import org.apache.hc.core5.concurrent.FutureCallback;
41  import org.apache.hc.core5.concurrent.FutureContribution;
42  import org.apache.hc.core5.function.Callback;
43  import org.apache.hc.core5.function.Decorator;
44  import org.apache.hc.core5.http.ConnectionClosedException;
45  import org.apache.hc.core5.http.EntityDetails;
46  import org.apache.hc.core5.http.Header;
47  import org.apache.hc.core5.http.HttpConnection;
48  import org.apache.hc.core5.http.HttpException;
49  import org.apache.hc.core5.http.HttpHost;
50  import org.apache.hc.core5.http.HttpRequest;
51  import org.apache.hc.core5.http.HttpResponse;
52  import org.apache.hc.core5.http.ProtocolException;
53  import org.apache.hc.core5.http.impl.DefaultAddressResolver;
54  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
55  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
56  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
57  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
58  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
59  import org.apache.hc.core5.http.nio.CapacityChannel;
60  import org.apache.hc.core5.http.nio.DataStreamChannel;
61  import org.apache.hc.core5.http.nio.HandlerFactory;
62  import org.apache.hc.core5.http.nio.RequestChannel;
63  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
65  import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
66  import org.apache.hc.core5.http.protocol.HttpContext;
67  import org.apache.hc.core5.http.protocol.HttpCoreContext;
68  import org.apache.hc.core5.io.CloseMode;
69  import org.apache.hc.core5.net.URIAuthority;
70  import org.apache.hc.core5.pool.ConnPoolControl;
71  import org.apache.hc.core5.pool.ManagedConnPool;
72  import org.apache.hc.core5.pool.PoolEntry;
73  import org.apache.hc.core5.pool.PoolStats;
74  import org.apache.hc.core5.reactor.Command;
75  import org.apache.hc.core5.reactor.EndpointParameters;
76  import org.apache.hc.core5.reactor.IOEventHandler;
77  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
78  import org.apache.hc.core5.reactor.IOReactorConfig;
79  import org.apache.hc.core5.reactor.IOSession;
80  import org.apache.hc.core5.reactor.IOSessionListener;
81  import org.apache.hc.core5.util.Args;
82  import org.apache.hc.core5.util.TimeValue;
83  import org.apache.hc.core5.util.Timeout;
84  
85  /**
86   * HTTP/1.1 client side message exchange initiator.
87   *
88   * @since 5.0
89   */
90  public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
91  
92      private final ManagedConnPool<HttpHost, IOSession> connPool;
93  
94      /**
95       * Use {@link AsyncRequesterBootstrap} to create instances of this class.
96       */
97      @Internal
98      public HttpAsyncRequester(
99              final IOReactorConfig ioReactorConfig,
100             final IOEventHandlerFactory eventHandlerFactory,
101             final Decorator<IOSession> ioSessionDecorator,
102             final Callback<Exception> exceptionCallback,
103             final IOSessionListener sessionListener,
104             final ManagedConnPool<HttpHost, IOSession> connPool) {
105         super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
106                 ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
107         this.connPool = Args.notNull(connPool, "Connection pool");
108     }
109 
110     @Override
111     public PoolStats getTotalStats() {
112         return connPool.getTotalStats();
113     }
114 
115     @Override
116     public PoolStats getStats(final HttpHost route) {
117         return connPool.getStats(route);
118     }
119 
120     @Override
121     public void setMaxTotal(final int max) {
122         connPool.setMaxTotal(max);
123     }
124 
125     @Override
126     public int getMaxTotal() {
127         return connPool.getMaxTotal();
128     }
129 
130     @Override
131     public void setDefaultMaxPerRoute(final int max) {
132         connPool.setDefaultMaxPerRoute(max);
133     }
134 
135     @Override
136     public int getDefaultMaxPerRoute() {
137         return connPool.getDefaultMaxPerRoute();
138     }
139 
140     @Override
141     public void setMaxPerRoute(final HttpHost route, final int max) {
142         connPool.setMaxPerRoute(route, max);
143     }
144 
145     @Override
146     public int getMaxPerRoute(final HttpHost route) {
147         return connPool.getMaxPerRoute(route);
148     }
149 
150     @Override
151     public void closeIdle(final TimeValue idleTime) {
152         connPool.closeIdle(idleTime);
153     }
154 
155     @Override
156     public void closeExpired() {
157         connPool.closeExpired();
158     }
159 
160     @Override
161     public Set<HttpHost> getRoutes() {
162         return connPool.getRoutes();
163     }
164 
165     public Future<AsyncClientEndpoint> connect(
166             final HttpHost host,
167             final Timeout timeout,
168             final Object attachment,
169             final FutureCallback<AsyncClientEndpoint> callback) {
170         return doConnect(host, timeout, attachment, callback);
171     }
172 
173     protected Future<AsyncClientEndpoint> doConnect(
174             final HttpHost host,
175             final Timeout timeout,
176             final Object attachment,
177             final FutureCallback<AsyncClientEndpoint> callback) {
178         Args.notNull(host, "Host");
179         Args.notNull(timeout, "Timeout");
180         final ComplexFuture<AsyncClientEndpoint> resultFuture = new ComplexFuture<>(callback);
181         final Future<PoolEntry<HttpHost, IOSession>> leaseFuture = connPool.lease(
182                 host, null, timeout, new FutureCallback<PoolEntry<HttpHost, IOSession>>() {
183 
184                     @Override
185                     public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
186                         final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
187                         final IOSession ioSession = poolEntry.getConnection();
188                         if (ioSession != null && !ioSession.isOpen()) {
189                             poolEntry.discardConnection(CloseMode.IMMEDIATE);
190                         }
191                         if (poolEntry.hasConnection()) {
192                             resultFuture.completed(endpoint);
193                         } else {
194                             final Future<IOSession> future = requestSession(
195                                     host,
196                                     timeout,
197                                     new EndpointParameters(host, attachment),
198                                     new FutureCallback<IOSession>() {
199 
200                                         @Override
201                                         public void completed(final IOSession session) {
202                                             session.setSocketTimeout(timeout);
203                                             poolEntry.assignConnection(session);
204                                             resultFuture.completed(endpoint);
205                                         }
206 
207                                         @Override
208                                         public void failed(final Exception cause) {
209                                             try {
210                                                 resultFuture.failed(cause);
211                                             } finally {
212                                                 endpoint.releaseAndDiscard();
213                                             }
214                                         }
215 
216                                         @Override
217                                         public void cancelled() {
218                                             try {
219                                                 resultFuture.cancel();
220                                             } finally {
221                                                 endpoint.releaseAndDiscard();
222                                             }
223                                         }
224 
225                                     });
226                             resultFuture.setDependency(future);
227                         }
228                     }
229 
230                     @Override
231                     public void failed(final Exception ex) {
232                         resultFuture.failed(ex);
233                     }
234 
235                     @Override
236                     public void cancelled() {
237                         resultFuture.cancel();
238                     }
239 
240                 });
241         resultFuture.setDependency(leaseFuture);
242         return resultFuture;
243     }
244 
245     public Future<AsyncClientEndpoint> connect(final HttpHost host, final Timeout timeout) {
246         return connect(host, timeout, null, null);
247     }
248 
249     public void execute(
250             final AsyncClientExchangeHandler exchangeHandler,
251             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
252             final Timeout timeout,
253             final HttpContext executeContext) {
254         Args.notNull(exchangeHandler, "Exchange handler");
255         Args.notNull(timeout, "Timeout");
256         Args.notNull(executeContext, "Context");
257         try {
258             exchangeHandler.produceRequest(new RequestChannel() {
259 
260                 @Override
261                 public void sendRequest(
262                         final HttpRequest request,
263                         final EntityDetails entityDetails, final HttpContext requestContext) throws HttpException, IOException {
264                     final String scheme = request.getScheme();
265                     final URIAuthority authority = request.getAuthority();
266                     if (authority == null) {
267                         throw new ProtocolException("Request authority not specified");
268                     }
269                     final HttpHostttp/HttpHost.html#HttpHost">HttpHost target = new HttpHost(scheme, authority);
270                     connect(target, timeout, null, new FutureCallback<AsyncClientEndpoint>() {
271 
272                         @Override
273                         public void completed(final AsyncClientEndpoint endpoint) {
274                             endpoint.execute(new AsyncClientExchangeHandler() {
275 
276                                 @Override
277                                 public void releaseResources() {
278                                     endpoint.releaseAndDiscard();
279                                     exchangeHandler.releaseResources();
280                                 }
281 
282                                 @Override
283                                 public void failed(final Exception cause) {
284                                     endpoint.releaseAndDiscard();
285                                     exchangeHandler.failed(cause);
286                                 }
287 
288                                 @Override
289                                 public void cancel() {
290                                     endpoint.releaseAndDiscard();
291                                     exchangeHandler.cancel();
292                                 }
293 
294                                 @Override
295                                 public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
296                                     channel.sendRequest(request, entityDetails, httpContext);
297                                 }
298 
299                                 @Override
300                                 public int available() {
301                                     return exchangeHandler.available();
302                                 }
303 
304                                 @Override
305                                 public void produce(final DataStreamChannel channel) throws IOException {
306                                     exchangeHandler.produce(channel);
307                                 }
308 
309                                 @Override
310                                 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
311                                     exchangeHandler.consumeInformation(response, httpContext);
312                                 }
313 
314                                 @Override
315                                 public void consumeResponse(
316                                         final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
317                                     if (entityDetails == null) {
318                                         endpoint.releaseAndReuse();
319                                     }
320                                     exchangeHandler.consumeResponse(response, entityDetails, httpContext);
321                                 }
322 
323                                 @Override
324                                 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
325                                     exchangeHandler.updateCapacity(capacityChannel);
326                                 }
327 
328                                 @Override
329                                 public void consume(final ByteBuffer src) throws IOException {
330                                     exchangeHandler.consume(src);
331                                 }
332 
333                                 @Override
334                                 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
335                                     endpoint.releaseAndReuse();
336                                     exchangeHandler.streamEnd(trailers);
337                                 }
338 
339                             }, pushHandlerFactory, executeContext);
340 
341                         }
342 
343                         @Override
344                         public void failed(final Exception ex) {
345                             exchangeHandler.failed(ex);
346                         }
347 
348                         @Override
349                         public void cancelled() {
350                             exchangeHandler.cancel();
351                         }
352 
353                     });
354 
355                 }
356 
357             }, executeContext);
358 
359         } catch (final IOException | HttpException ex) {
360             exchangeHandler.failed(ex);
361         }
362     }
363 
364     public void execute(
365             final AsyncClientExchangeHandler exchangeHandler,
366             final Timeout timeout,
367             final HttpContext executeContext) {
368         execute(exchangeHandler, null, timeout, executeContext);
369     }
370 
371     public final <T> Future<T> execute(
372             final AsyncRequestProducer requestProducer,
373             final AsyncResponseConsumer<T> responseConsumer,
374             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
375             final Timeout timeout,
376             final HttpContext context,
377             final FutureCallback<T> callback) {
378         Args.notNull(requestProducer, "Request producer");
379         Args.notNull(responseConsumer, "Response consumer");
380         Args.notNull(timeout, "Timeout");
381         final BasicFuture<T> future = new BasicFuture<>(callback);
382         final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(
383                 requestProducer,
384                 responseConsumer,
385                 new FutureContribution<T>(future) {
386 
387                     @Override
388                     public void completed(final T result) {
389                         future.completed(result);
390                     }
391 
392                 });
393         execute(exchangeHandler, pushHandlerFactory, timeout, context != null ? context : HttpCoreContext.create());
394         return future;
395     }
396 
397     public final <T> Future<T> execute(
398             final AsyncRequestProducer requestProducer,
399             final AsyncResponseConsumer<T> responseConsumer,
400             final Timeout timeout,
401             final HttpContext context,
402             final FutureCallback<T> callback) {
403         return execute(requestProducer, responseConsumer, null, timeout, context, callback);
404     }
405 
406     public final <T> Future<T> execute(
407             final AsyncRequestProducer requestProducer,
408             final AsyncResponseConsumer<T> responseConsumer,
409             final Timeout timeout,
410             final FutureCallback<T> callback) {
411         return execute(requestProducer, responseConsumer, null, timeout, null, callback);
412     }
413 
414     private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
415 
416         final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
417 
418         InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
419             this.poolEntryRef = new AtomicReference<>(poolEntry);
420         }
421 
422         private IOSession getIOSession() {
423             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
424             if (poolEntry == null) {
425                 throw new IllegalStateException("Endpoint has already been released");
426             }
427             final IOSession ioSession = poolEntry.getConnection();
428             if (ioSession == null) {
429                 throw new IllegalStateException("I/O session is invalid");
430             }
431             return ioSession;
432         }
433 
434         @Override
435         public void execute(
436                 final AsyncClientExchangeHandler exchangeHandler,
437                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
438                 final HttpContext context) {
439             final IOSession ioSession = getIOSession();
440             ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
441             if (!ioSession.isOpen()) {
442                 try {
443                     exchangeHandler.failed(new ConnectionClosedException());
444                 } finally {
445                     exchangeHandler.releaseResources();
446                 }
447             }
448         }
449 
450         @Override
451         public boolean isConnected() {
452             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
453             if (poolEntry != null) {
454                 final IOSession ioSession = poolEntry.getConnection();
455                 if (ioSession == null || !ioSession.isOpen()) {
456                     return false;
457                 }
458                 final IOEventHandler handler = ioSession.getHandler();
459                 return (handler instanceof HttpConnection../../../org/apache/hc/core5/http/HttpConnection.html#HttpConnection">HttpConnection) && ((HttpConnection) handler).isOpen();
460             }
461             return false;
462         }
463 
464         @Override
465         public void releaseAndReuse() {
466             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
467             if (poolEntry != null) {
468                 final IOSession ioSession = poolEntry.getConnection();
469                 connPool.release(poolEntry, ioSession != null && ioSession.isOpen());
470             }
471         }
472 
473         @Override
474         public void releaseAndDiscard() {
475             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
476             if (poolEntry != null) {
477                 poolEntry.discardConnection(CloseMode.GRACEFUL);
478                 connPool.release(poolEntry, false);
479             }
480         }
481 
482     }
483 
484 }