View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.CancellationException;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.ThreadFactory;
35  
36  import org.apache.hc.client5.http.DnsResolver;
37  import org.apache.hc.client5.http.HttpRoute;
38  import org.apache.hc.client5.http.config.Configurable;
39  import org.apache.hc.client5.http.config.ConnectionConfig;
40  import org.apache.hc.client5.http.config.RequestConfig;
41  import org.apache.hc.client5.http.impl.ConnPoolSupport;
42  import org.apache.hc.client5.http.impl.ExecSupport;
43  import org.apache.hc.client5.http.impl.classic.RequestFailedException;
44  import org.apache.hc.client5.http.impl.nio.MultihomeConnectionInitiator;
45  import org.apache.hc.client5.http.protocol.HttpClientContext;
46  import org.apache.hc.core5.annotation.Contract;
47  import org.apache.hc.core5.annotation.ThreadingBehavior;
48  import org.apache.hc.core5.concurrent.Cancellable;
49  import org.apache.hc.core5.concurrent.ComplexCancellable;
50  import org.apache.hc.core5.concurrent.FutureCallback;
51  import org.apache.hc.core5.function.Resolver;
52  import org.apache.hc.core5.http.EntityDetails;
53  import org.apache.hc.core5.http.Header;
54  import org.apache.hc.core5.http.HttpException;
55  import org.apache.hc.core5.http.HttpHost;
56  import org.apache.hc.core5.http.HttpResponse;
57  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
58  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
59  import org.apache.hc.core5.http.nio.CapacityChannel;
60  import org.apache.hc.core5.http.nio.DataStreamChannel;
61  import org.apache.hc.core5.http.nio.HandlerFactory;
62  import org.apache.hc.core5.http.nio.RequestChannel;
63  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
65  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
66  import org.apache.hc.core5.http.protocol.HttpContext;
67  import org.apache.hc.core5.io.CloseMode;
68  import org.apache.hc.core5.reactor.Command;
69  import org.apache.hc.core5.reactor.ConnectionInitiator;
70  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
71  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
72  import org.apache.hc.core5.reactor.IOReactorConfig;
73  import org.apache.hc.core5.reactor.IOSession;
74  import org.apache.hc.core5.util.Args;
75  import org.apache.hc.core5.util.Timeout;
76  import org.slf4j.Logger;
77  import org.slf4j.LoggerFactory;
78  
79  /**
80   * Minimal implementation of HTTP/2 only {@link CloseableHttpAsyncClient}. This client
81   * is optimized for HTTP/2 multiplexing message transport and does not support advanced
82   * HTTP protocol functionality such as request execution via a proxy, state management,
83   * authentication and request redirects.
84   * <p>
85   * Concurrent message exchanges with the same connection route executed by
86   * this client will get automatically multiplexed over a single physical HTTP/2
87   * connection.
88   * </p>
89   *
90   * @since 5.0
91   */
92  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
93  public final class MinimalH2AsyncClient extends AbstractMinimalHttpAsyncClientBase {
94  
95      private static final Logger LOG = LoggerFactory.getLogger(MinimalH2AsyncClient.class);
96      private final InternalH2ConnPool connPool;
97      private final ConnectionInitiator connectionInitiator;
98  
99      MinimalH2AsyncClient(
100             final IOEventHandlerFactory eventHandlerFactory,
101             final AsyncPushConsumerRegistry pushConsumerRegistry,
102             final IOReactorConfig reactorConfig,
103             final ThreadFactory threadFactory,
104             final ThreadFactory workerThreadFactory,
105             final DnsResolver dnsResolver,
106             final TlsStrategy tlsStrategy) {
107         super(new DefaultConnectingIOReactor(
108                         eventHandlerFactory,
109                         reactorConfig,
110                         workerThreadFactory,
111                         LoggingIOSessionDecorator.INSTANCE,
112                         LoggingExceptionCallback.INSTANCE,
113                         null,
114                         ioSession -> ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.IMMEDIATE)),
115                 pushConsumerRegistry,
116                 threadFactory);
117         this.connectionInitiator = new MultihomeConnectionInitiator(getConnectionInitiator(), dnsResolver);
118         this.connPool = new InternalH2ConnPool(this.connectionInitiator, object -> null, tlsStrategy);
119     }
120 
121     @Override
122     public Cancellable execute(
123             final AsyncClientExchangeHandler exchangeHandler,
124             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
125             final HttpContext context) {
126         Args.notNull(exchangeHandler, "Message exchange handler");
127         final ComplexCancellable cancellable = new ComplexCancellable();
128         try {
129             if (!isRunning()) {
130                 throw new CancellationException("Request execution cancelled");
131             }
132             final HttpClientContext clientContext = HttpClientContext.castOrCreate(context);
133             exchangeHandler.produceRequest((request, entityDetails, context1) -> {
134                 RequestConfig requestConfig = null;
135                 if (request instanceof Configurable) {
136                     requestConfig = ((Configurable) request).getConfig();
137                 }
138                 if (requestConfig != null) {
139                     clientContext.setRequestConfig(requestConfig);
140                 } else {
141                     requestConfig = clientContext.getRequestConfigOrDefault();
142                 }
143                 @SuppressWarnings("deprecation")
144                 final Timeout connectTimeout = requestConfig.getConnectTimeout();
145                 final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
146 
147                 final Future<IOSession> sessionFuture = connPool.getSession(new HttpRoute(target), connectTimeout,
148                     new FutureCallback<IOSession>() {
149 
150                     @Override
151                     public void completed(final IOSession session) {
152                         final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
153 
154                             @Override
155                             public void releaseResources() {
156                                 exchangeHandler.releaseResources();
157                             }
158 
159                             @Override
160                             public void failed(final Exception cause) {
161                                 exchangeHandler.failed(cause);
162                             }
163 
164                             @Override
165                             public void cancel() {
166                                 failed(new RequestFailedException("Request aborted"));
167                             }
168 
169                             @Override
170                             public void produceRequest(
171                                     final RequestChannel channel,
172                                     final HttpContext context1) throws HttpException, IOException {
173                                 channel.sendRequest(request, entityDetails, context1);
174                             }
175 
176                             @Override
177                             public int available() {
178                                 return exchangeHandler.available();
179                             }
180 
181                             @Override
182                             public void produce(final DataStreamChannel channel) throws IOException {
183                                 exchangeHandler.produce(channel);
184                             }
185 
186                             @Override
187                             public void consumeInformation(
188                                     final HttpResponse response,
189                                     final HttpContext context1) throws HttpException, IOException {
190                                 exchangeHandler.consumeInformation(response, context1);
191                             }
192 
193                             @Override
194                             public void consumeResponse(
195                                     final HttpResponse response,
196                                     final EntityDetails entityDetails,
197                                     final HttpContext context1) throws HttpException, IOException {
198                                 exchangeHandler.consumeResponse(response, entityDetails, context1);
199                             }
200 
201                             @Override
202                             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
203                                 exchangeHandler.updateCapacity(capacityChannel);
204                             }
205 
206                             @Override
207                             public void consume(final ByteBuffer src) throws IOException {
208                                 exchangeHandler.consume(src);
209                             }
210 
211                             @Override
212                             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
213                                 exchangeHandler.streamEnd(trailers);
214                             }
215 
216                         };
217                         if (LOG.isDebugEnabled()) {
218                             final String exchangeId = ExecSupport.getNextExchangeId();
219                             clientContext.setExchangeId(exchangeId);
220                             if (LOG.isDebugEnabled()) {
221                                 LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(session));
222                             }
223                             session.enqueue(
224                                     new RequestExecutionCommand(
225                                             new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
226                                             pushHandlerFactory,
227                                             cancellable,
228                                             clientContext),
229                                     Command.Priority.NORMAL);
230                         } else {
231                             session.enqueue(
232                                     new RequestExecutionCommand(
233                                             internalExchangeHandler,
234                                             pushHandlerFactory,
235                                             cancellable,
236                                             clientContext),
237                                     Command.Priority.NORMAL);
238                         }
239                     }
240 
241                     @Override
242                     public void failed(final Exception ex) {
243                         exchangeHandler.failed(ex);
244                     }
245 
246                     @Override
247                     public void cancelled() {
248                         exchangeHandler.cancel();
249                     }
250 
251                 });
252                 cancellable.setDependency(() -> sessionFuture.cancel(true));
253             }, context);
254         } catch (final HttpException | IOException | IllegalStateException ex) {
255             exchangeHandler.failed(ex);
256         }
257         return cancellable;
258     }
259 
260     /**
261      * Sets {@link Resolver} for {@link ConnectionConfig} on a per host basis.
262      *
263      * @since 5.2
264      */
265     public void setConnectionConfigResolver(final Resolver<HttpHost, ConnectionConfig> connectionConfigResolver) {
266         connPool.setConnectionConfigResolver(connectionConfigResolver);
267     }
268 
269 }