View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http2.impl.nio.bootstrap;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.nio.ByteBuffer;
33  import java.util.List;
34  import java.util.Set;
35  import java.util.concurrent.Future;
36  
37  import org.apache.hc.core5.annotation.Internal;
38  import org.apache.hc.core5.concurrent.Cancellable;
39  import org.apache.hc.core5.concurrent.CancellableDependency;
40  import org.apache.hc.core5.concurrent.ComplexFuture;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.concurrent.FutureContribution;
43  import org.apache.hc.core5.function.Callback;
44  import org.apache.hc.core5.function.Decorator;
45  import org.apache.hc.core5.function.Resolver;
46  import org.apache.hc.core5.http.ConnectionClosedException;
47  import org.apache.hc.core5.http.EntityDetails;
48  import org.apache.hc.core5.http.Header;
49  import org.apache.hc.core5.http.HttpException;
50  import org.apache.hc.core5.http.HttpHost;
51  import org.apache.hc.core5.http.HttpRequest;
52  import org.apache.hc.core5.http.HttpResponse;
53  import org.apache.hc.core5.http.ProtocolException;
54  import org.apache.hc.core5.http.impl.DefaultAddressResolver;
55  import org.apache.hc.core5.http.impl.bootstrap.AsyncRequester;
56  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
57  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
58  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
59  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
60  import org.apache.hc.core5.http.nio.CapacityChannel;
61  import org.apache.hc.core5.http.nio.DataStreamChannel;
62  import org.apache.hc.core5.http.nio.HandlerFactory;
63  import org.apache.hc.core5.http.nio.RequestChannel;
64  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
65  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
66  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
67  import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
68  import org.apache.hc.core5.http.protocol.HttpContext;
69  import org.apache.hc.core5.http.protocol.HttpCoreContext;
70  import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
71  import org.apache.hc.core5.net.URIAuthority;
72  import org.apache.hc.core5.reactor.Command;
73  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
74  import org.apache.hc.core5.reactor.IOReactorConfig;
75  import org.apache.hc.core5.reactor.IOSession;
76  import org.apache.hc.core5.reactor.IOSessionListener;
77  import org.apache.hc.core5.util.Args;
78  import org.apache.hc.core5.util.TimeValue;
79  import org.apache.hc.core5.util.Timeout;
80  
81  /**
82   * HTTP/2 multiplexing client side message exchange initiator.
83   *
84   * @since 5.0
85   */
86  public class H2MultiplexingRequester extends AsyncRequester{
87  
88      private final H2ConnPool connPool;
89  
90      /**
91       * Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class.
92       */
93      @Internal
94      public H2MultiplexingRequester(
95              final IOReactorConfig ioReactorConfig,
96              final IOEventHandlerFactory eventHandlerFactory,
97              final Decorator<IOSession> ioSessionDecorator,
98              final Callback<Exception> exceptionCallback,
99              final IOSessionListener sessionListener,
100             final Resolver<HttpHost, InetSocketAddress> addressResolver,
101             final TlsStrategy tlsStrategy) {
102         super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
103                         ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
104         this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy);
105     }
106 
107     public void closeIdle(final TimeValue idleTime) {
108         connPool.closeIdle(idleTime);
109     }
110 
111     public Set<HttpHost> getRoutes() {
112         return connPool.getRoutes();
113     }
114 
115     public TimeValue getValidateAfterInactivity() {
116         return connPool.getValidateAfterInactivity();
117     }
118 
119     public void setValidateAfterInactivity(final TimeValue timeValue) {
120         connPool.setValidateAfterInactivity(timeValue);
121     }
122 
123     public Cancellable execute(
124             final AsyncClientExchangeHandler exchangeHandler,
125             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
126             final Timeout timeout,
127             final HttpContext context) {
128         Args.notNull(exchangeHandler, "Exchange handler");
129         Args.notNull(timeout, "Timeout");
130         Args.notNull(context, "Context");
131         final CancellableExecution/CancellableExecution.html#CancellableExecution">CancellableExecution cancellableExecution = new CancellableExecution();
132         execute(exchangeHandler, pushHandlerFactory, cancellableExecution, timeout, context);
133         return cancellableExecution;
134     }
135 
136     public Cancellable execute(
137             final AsyncClientExchangeHandler exchangeHandler,
138             final Timeout timeout,
139             final HttpContext context) {
140         return execute(exchangeHandler, null, timeout, context);
141     }
142 
143     private void execute(
144             final AsyncClientExchangeHandler exchangeHandler,
145             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
146             final CancellableDependency cancellableDependency,
147             final Timeout timeout,
148             final HttpContext context) {
149         Args.notNull(exchangeHandler, "Exchange handler");
150         Args.notNull(timeout, "Timeout");
151         Args.notNull(context, "Context");
152         try {
153             exchangeHandler.produceRequest(new RequestChannel() {
154 
155                 @Override
156                 public void sendRequest(
157                         final HttpRequest request,
158                         final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
159                     final String scheme = request.getScheme();
160                     final URIAuthority authority = request.getAuthority();
161                     if (authority == null) {
162                         throw new ProtocolException("Request authority not specified");
163                     }
164                     final HttpHost5/http/HttpHost.html#HttpHost">HttpHost target = new HttpHost(scheme, authority);
165                     connPool.getSession(target, timeout, new FutureCallback<IOSession>() {
166 
167                         @Override
168                         public void completed(final IOSession ioSession) {
169                             ioSession.enqueue(new RequestExecutionCommand(new AsyncClientExchangeHandler() {
170 
171                                 @Override
172                                 public void releaseResources() {
173                                     exchangeHandler.releaseResources();
174                                 }
175 
176                                 @Override
177                                 public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
178                                     channel.sendRequest(request, entityDetails, httpContext);
179                                 }
180 
181                                 @Override
182                                 public int available() {
183                                     return exchangeHandler.available();
184                                 }
185 
186                                 @Override
187                                 public void produce(final DataStreamChannel channel) throws IOException {
188                                     exchangeHandler.produce(channel);
189                                 }
190 
191                                 @Override
192                                 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
193                                     exchangeHandler.consumeInformation(response, httpContext);
194                                 }
195 
196                                 @Override
197                                 public void consumeResponse(
198                                         final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
199                                     exchangeHandler.consumeResponse(response, entityDetails, httpContext);
200                                 }
201 
202                                 @Override
203                                 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
204                                     exchangeHandler.updateCapacity(capacityChannel);
205                                 }
206 
207                                 @Override
208                                 public void consume(final ByteBuffer src) throws IOException {
209                                     exchangeHandler.consume(src);
210                                 }
211 
212                                 @Override
213                                 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
214                                     exchangeHandler.streamEnd(trailers);
215                                 }
216 
217                                 @Override
218                                 public void cancel() {
219                                     exchangeHandler.cancel();
220                                 }
221 
222                                 @Override
223                                 public void failed(final Exception cause) {
224                                     exchangeHandler.failed(cause);
225                                 }
226 
227                             }, pushHandlerFactory, cancellableDependency, context), Command.Priority.NORMAL);
228                             if (!ioSession.isOpen()) {
229                                 exchangeHandler.failed(new ConnectionClosedException());
230                             }
231                         }
232 
233                         @Override
234                         public void failed(final Exception ex) {
235                             exchangeHandler.failed(ex);
236                         }
237 
238                         @Override
239                         public void cancelled() {
240                             exchangeHandler.cancel();
241                         }
242 
243                     });
244 
245                 }
246 
247             }, context);
248         } catch (final IOException | HttpException ex) {
249             exchangeHandler.failed(ex);
250         }
251     }
252 
253     public final <T> Future<T> execute(
254             final AsyncRequestProducer requestProducer,
255             final AsyncResponseConsumer<T> responseConsumer,
256             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
257             final Timeout timeout,
258             final HttpContext context,
259             final FutureCallback<T> callback) {
260         Args.notNull(requestProducer, "Request producer");
261         Args.notNull(responseConsumer, "Response consumer");
262         Args.notNull(timeout, "Timeout");
263         final ComplexFuture<T> future = new ComplexFuture<>(callback);
264         final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(
265                 requestProducer,
266                 responseConsumer,
267                 new FutureContribution<T>(future) {
268 
269                     @Override
270                     public void completed(final T result) {
271                         future.completed(result);
272                     }
273 
274                 });
275         execute(exchangeHandler, pushHandlerFactory, future, timeout, context != null ? context : HttpCoreContext.create());
276         return future;
277     }
278 
279     public final <T> Future<T> execute(
280             final AsyncRequestProducer requestProducer,
281             final AsyncResponseConsumer<T> responseConsumer,
282             final Timeout timeout,
283             final HttpContext context,
284             final FutureCallback<T> callback) {
285         return execute(requestProducer, responseConsumer, null, timeout, context, callback);
286     }
287 
288     public final <T> Future<T> execute(
289             final AsyncRequestProducer requestProducer,
290             final AsyncResponseConsumer<T> responseConsumer,
291             final Timeout timeout,
292             final FutureCallback<T> callback) {
293         return execute(requestProducer, responseConsumer, null, timeout, null, callback);
294     }
295 
296 }