View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.IOException;
30  import java.net.InetSocketAddress;
31  import java.nio.ByteBuffer;
32  import java.util.List;
33  import java.util.concurrent.CancellationException;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.ThreadFactory;
36  
37  import org.apache.hc.client5.http.DnsResolver;
38  import org.apache.hc.client5.http.config.Configurable;
39  import org.apache.hc.client5.http.config.RequestConfig;
40  import org.apache.hc.client5.http.impl.ConnPoolSupport;
41  import org.apache.hc.client5.http.impl.ExecSupport;
42  import org.apache.hc.client5.http.impl.classic.RequestFailedException;
43  import org.apache.hc.client5.http.impl.nio.MultihomeConnectionInitiator;
44  import org.apache.hc.client5.http.protocol.HttpClientContext;
45  import org.apache.hc.core5.annotation.Contract;
46  import org.apache.hc.core5.annotation.ThreadingBehavior;
47  import org.apache.hc.core5.concurrent.Cancellable;
48  import org.apache.hc.core5.concurrent.ComplexCancellable;
49  import org.apache.hc.core5.concurrent.FutureCallback;
50  import org.apache.hc.core5.function.Callback;
51  import org.apache.hc.core5.function.Resolver;
52  import org.apache.hc.core5.http.EntityDetails;
53  import org.apache.hc.core5.http.Header;
54  import org.apache.hc.core5.http.HttpException;
55  import org.apache.hc.core5.http.HttpHost;
56  import org.apache.hc.core5.http.HttpRequest;
57  import org.apache.hc.core5.http.HttpResponse;
58  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
59  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
60  import org.apache.hc.core5.http.nio.CapacityChannel;
61  import org.apache.hc.core5.http.nio.DataStreamChannel;
62  import org.apache.hc.core5.http.nio.HandlerFactory;
63  import org.apache.hc.core5.http.nio.RequestChannel;
64  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
65  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
66  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
67  import org.apache.hc.core5.http.protocol.HttpContext;
68  import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
69  import org.apache.hc.core5.io.CloseMode;
70  import org.apache.hc.core5.reactor.Command;
71  import org.apache.hc.core5.reactor.ConnectionInitiator;
72  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
73  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
74  import org.apache.hc.core5.reactor.IOReactorConfig;
75  import org.apache.hc.core5.reactor.IOSession;
76  import org.apache.hc.core5.util.Timeout;
77  import org.slf4j.Logger;
78  import org.slf4j.LoggerFactory;
79  
80  /**
81   * Minimal implementation of HTTP/2 only {@link CloseableHttpAsyncClient}. This client
82   * is optimized for HTTP/2 multiplexing message transport and does not support advanced
83   * HTTP protocol functionality such as request execution via a proxy, state management,
84   * authentication and request redirects.
85   * <p>
86   * Concurrent message exchanges with the same connection route executed by
87   * this client will get automatically multiplexed over a single physical HTTP/2
88   * connection.
89   * </p>
90   *
91   * @since 5.0
92   */
93  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
94  public final class MinimalH2AsyncClient extends AbstractMinimalHttpAsyncClientBase {
95  
96      private static final Logger LOG = LoggerFactory.getLogger(MinimalH2AsyncClient.class);
97      private final H2ConnPool connPool;
98      private final ConnectionInitiator connectionInitiator;
99  
100     MinimalH2AsyncClient(
101             final IOEventHandlerFactory eventHandlerFactory,
102             final AsyncPushConsumerRegistry pushConsumerRegistry,
103             final IOReactorConfig reactorConfig,
104             final ThreadFactory threadFactory,
105             final ThreadFactory workerThreadFactory,
106             final DnsResolver dnsResolver,
107             final TlsStrategy tlsStrategy) {
108         super(new DefaultConnectingIOReactor(
109                 eventHandlerFactory,
110                 reactorConfig,
111                 workerThreadFactory,
112                 LoggingIOSessionDecorator.INSTANCE,
113                 LoggingExceptionCallback.INSTANCE,
114                 null,
115                 new Callback<IOSession>() {
116 
117                     @Override
118                     public void execute(final IOSession ioSession) {
119                         ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.IMMEDIATE);
120                     }
121 
122                 }),
123                 pushConsumerRegistry,
124                 threadFactory);
125         this.connectionInitiator = new MultihomeConnectionInitiator(getConnectionInitiator(), dnsResolver);
126         this.connPool = new H2ConnPool(this.connectionInitiator, new Resolver<HttpHost, InetSocketAddress>() {
127 
128             @Override
129             public InetSocketAddress resolve(final HttpHost object) {
130                 return null;
131             }
132 
133         }, tlsStrategy);
134     }
135 
136     @Override
137     public Cancellable execute(
138             final AsyncClientExchangeHandler exchangeHandler,
139             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
140             final HttpContext context) {
141         final ComplexCancellable cancellable = new ComplexCancellable();
142         try {
143             if (!isRunning()) {
144                 throw new CancellationException("Request execution cancelled");
145             }
146             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
147             exchangeHandler.produceRequest(new RequestChannel() {
148 
149                 @Override
150                 public void sendRequest(
151                         final HttpRequest request,
152                         final EntityDetails entityDetails,
153                         final HttpContext context) throws HttpException, IOException {
154                     RequestConfig requestConfig = null;
155                     if (request instanceof Configurable) {
156                         requestConfig = ((Configurable) request).getConfig();
157                     }
158                     if (requestConfig != null) {
159                         clientContext.setRequestConfig(requestConfig);
160                     } else {
161                         requestConfig = clientContext.getRequestConfig();
162                     }
163                     final Timeout connectTimeout = requestConfig.getConnectTimeout();
164                     final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
165 
166                     final Future<IOSession> sessionFuture = connPool.getSession(target, connectTimeout,
167                         new FutureCallback<IOSession>() {
168 
169                         @Override
170                         public void completed(final IOSession session) {
171                             final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
172 
173                                 @Override
174                                 public void releaseResources() {
175                                     exchangeHandler.releaseResources();
176                                 }
177 
178                                 @Override
179                                 public void failed(final Exception cause) {
180                                     exchangeHandler.failed(cause);
181                                 }
182 
183                                 @Override
184                                 public void cancel() {
185                                     failed(new RequestFailedException("Request aborted"));
186                                 }
187 
188                                 @Override
189                                 public void produceRequest(
190                                         final RequestChannel channel,
191                                         final HttpContext context) throws HttpException, IOException {
192                                     channel.sendRequest(request, entityDetails, context);
193                                 }
194 
195                                 @Override
196                                 public int available() {
197                                     return exchangeHandler.available();
198                                 }
199 
200                                 @Override
201                                 public void produce(final DataStreamChannel channel) throws IOException {
202                                     exchangeHandler.produce(channel);
203                                 }
204 
205                                 @Override
206                                 public void consumeInformation(
207                                         final HttpResponse response,
208                                         final HttpContext context) throws HttpException, IOException {
209                                     exchangeHandler.consumeInformation(response, context);
210                                 }
211 
212                                 @Override
213                                 public void consumeResponse(
214                                         final HttpResponse response,
215                                         final EntityDetails entityDetails,
216                                         final HttpContext context) throws HttpException, IOException {
217                                     exchangeHandler.consumeResponse(response, entityDetails, context);
218                                 }
219 
220                                 @Override
221                                 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
222                                     exchangeHandler.updateCapacity(capacityChannel);
223                                 }
224 
225                                 @Override
226                                 public void consume(final ByteBuffer src) throws IOException {
227                                     exchangeHandler.consume(src);
228                                 }
229 
230                                 @Override
231                                 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
232                                     exchangeHandler.streamEnd(trailers);
233                                 }
234 
235                             };
236                             if (LOG.isDebugEnabled()) {
237                                 final String exchangeId = ExecSupport.getNextExchangeId();
238                                 clientContext.setExchangeId(exchangeId);
239                                 if (LOG.isDebugEnabled()) {
240                                     LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(session));
241                                 }
242                                 session.enqueue(
243                                         new RequestExecutionCommand(
244                                                 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
245                                                 pushHandlerFactory,
246                                                 cancellable,
247                                                 clientContext),
248                                         Command.Priority.NORMAL);
249                             } else {
250                                 session.enqueue(
251                                         new RequestExecutionCommand(
252                                                 internalExchangeHandler,
253                                                 pushHandlerFactory,
254                                                 cancellable,
255                                                 clientContext),
256                                         Command.Priority.NORMAL);
257                             }
258                         }
259 
260                         @Override
261                         public void failed(final Exception ex) {
262                             exchangeHandler.failed(ex);
263                         }
264 
265                         @Override
266                         public void cancelled() {
267                             exchangeHandler.cancel();
268                         }
269 
270                     });
271                     cancellable.setDependency(new Cancellable() {
272 
273                         @Override
274                         public boolean cancel() {
275                             return sessionFuture.cancel(true);
276                         }
277 
278                     });
279                 }
280 
281             }, context);
282         } catch (final HttpException | IOException | IllegalStateException ex) {
283             exchangeHandler.failed(ex);
284         }
285         return cancellable;
286     }
287 
288 }