View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.CancellationException;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.ThreadFactory;
35  
36  import org.apache.hc.client5.http.DnsResolver;
37  import org.apache.hc.client5.http.config.Configurable;
38  import org.apache.hc.client5.http.config.ConnectionConfig;
39  import org.apache.hc.client5.http.config.RequestConfig;
40  import org.apache.hc.client5.http.impl.ConnPoolSupport;
41  import org.apache.hc.client5.http.impl.ExecSupport;
42  import org.apache.hc.client5.http.impl.classic.RequestFailedException;
43  import org.apache.hc.client5.http.impl.nio.MultihomeConnectionInitiator;
44  import org.apache.hc.client5.http.protocol.HttpClientContext;
45  import org.apache.hc.core5.annotation.Contract;
46  import org.apache.hc.core5.annotation.ThreadingBehavior;
47  import org.apache.hc.core5.concurrent.Cancellable;
48  import org.apache.hc.core5.concurrent.ComplexCancellable;
49  import org.apache.hc.core5.concurrent.FutureCallback;
50  import org.apache.hc.core5.function.Resolver;
51  import org.apache.hc.core5.http.EntityDetails;
52  import org.apache.hc.core5.http.Header;
53  import org.apache.hc.core5.http.HttpException;
54  import org.apache.hc.core5.http.HttpHost;
55  import org.apache.hc.core5.http.HttpResponse;
56  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
57  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
58  import org.apache.hc.core5.http.nio.CapacityChannel;
59  import org.apache.hc.core5.http.nio.DataStreamChannel;
60  import org.apache.hc.core5.http.nio.HandlerFactory;
61  import org.apache.hc.core5.http.nio.RequestChannel;
62  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
63  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
64  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
65  import org.apache.hc.core5.http.protocol.HttpContext;
66  import org.apache.hc.core5.io.CloseMode;
67  import org.apache.hc.core5.reactor.Command;
68  import org.apache.hc.core5.reactor.ConnectionInitiator;
69  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
70  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
71  import org.apache.hc.core5.reactor.IOReactorConfig;
72  import org.apache.hc.core5.reactor.IOSession;
73  import org.apache.hc.core5.util.Timeout;
74  import org.slf4j.Logger;
75  import org.slf4j.LoggerFactory;
76  
77  /**
78   * Minimal implementation of HTTP/2 only {@link CloseableHttpAsyncClient}. This client
79   * is optimized for HTTP/2 multiplexing message transport and does not support advanced
80   * HTTP protocol functionality such as request execution via a proxy, state management,
81   * authentication and request redirects.
82   * <p>
83   * Concurrent message exchanges with the same connection route executed by
84   * this client will get automatically multiplexed over a single physical HTTP/2
85   * connection.
86   * </p>
87   *
88   * @since 5.0
89   */
90  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
91  public final class MinimalH2AsyncClient extends AbstractMinimalHttpAsyncClientBase {
92  
93      private static final Logger LOG = LoggerFactory.getLogger(MinimalH2AsyncClient.class);
94      private final InternalH2ConnPool connPool;
95      private final ConnectionInitiator connectionInitiator;
96  
97      MinimalH2AsyncClient(
98              final IOEventHandlerFactory eventHandlerFactory,
99              final AsyncPushConsumerRegistry pushConsumerRegistry,
100             final IOReactorConfig reactorConfig,
101             final ThreadFactory threadFactory,
102             final ThreadFactory workerThreadFactory,
103             final DnsResolver dnsResolver,
104             final TlsStrategy tlsStrategy) {
105         super(new DefaultConnectingIOReactor(
106                         eventHandlerFactory,
107                         reactorConfig,
108                         workerThreadFactory,
109                         LoggingIOSessionDecorator.INSTANCE,
110                         LoggingExceptionCallback.INSTANCE,
111                         null,
112                         ioSession -> ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.IMMEDIATE)),
113                 pushConsumerRegistry,
114                 threadFactory);
115         this.connectionInitiator = new MultihomeConnectionInitiator(getConnectionInitiator(), dnsResolver);
116         this.connPool = new InternalH2ConnPool(this.connectionInitiator, object -> null, tlsStrategy);
117     }
118 
119     @Override
120     public Cancellable execute(
121             final AsyncClientExchangeHandler exchangeHandler,
122             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
123             final HttpContext context) {
124         final ComplexCancellable cancellable = new ComplexCancellable();
125         try {
126             if (!isRunning()) {
127                 throw new CancellationException("Request execution cancelled");
128             }
129             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
130             exchangeHandler.produceRequest((request, entityDetails, context1) -> {
131                 RequestConfig requestConfig = null;
132                 if (request instanceof Configurable) {
133                     requestConfig = ((Configurable) request).getConfig();
134                 }
135                 if (requestConfig != null) {
136                     clientContext.setRequestConfig(requestConfig);
137                 } else {
138                     requestConfig = clientContext.getRequestConfig();
139                 }
140                 @SuppressWarnings("deprecation")
141                 final Timeout connectTimeout = requestConfig.getConnectTimeout();
142                 final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
143 
144                 final Future<IOSession> sessionFuture = connPool.getSession(target, connectTimeout,
145                     new FutureCallback<IOSession>() {
146 
147                     @Override
148                     public void completed(final IOSession session) {
149                         final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
150 
151                             @Override
152                             public void releaseResources() {
153                                 exchangeHandler.releaseResources();
154                             }
155 
156                             @Override
157                             public void failed(final Exception cause) {
158                                 exchangeHandler.failed(cause);
159                             }
160 
161                             @Override
162                             public void cancel() {
163                                 failed(new RequestFailedException("Request aborted"));
164                             }
165 
166                             @Override
167                             public void produceRequest(
168                                     final RequestChannel channel,
169                                     final HttpContext context1) throws HttpException, IOException {
170                                 channel.sendRequest(request, entityDetails, context1);
171                             }
172 
173                             @Override
174                             public int available() {
175                                 return exchangeHandler.available();
176                             }
177 
178                             @Override
179                             public void produce(final DataStreamChannel channel) throws IOException {
180                                 exchangeHandler.produce(channel);
181                             }
182 
183                             @Override
184                             public void consumeInformation(
185                                     final HttpResponse response,
186                                     final HttpContext context1) throws HttpException, IOException {
187                                 exchangeHandler.consumeInformation(response, context1);
188                             }
189 
190                             @Override
191                             public void consumeResponse(
192                                     final HttpResponse response,
193                                     final EntityDetails entityDetails,
194                                     final HttpContext context1) throws HttpException, IOException {
195                                 exchangeHandler.consumeResponse(response, entityDetails, context1);
196                             }
197 
198                             @Override
199                             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
200                                 exchangeHandler.updateCapacity(capacityChannel);
201                             }
202 
203                             @Override
204                             public void consume(final ByteBuffer src) throws IOException {
205                                 exchangeHandler.consume(src);
206                             }
207 
208                             @Override
209                             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
210                                 exchangeHandler.streamEnd(trailers);
211                             }
212 
213                         };
214                         if (LOG.isDebugEnabled()) {
215                             final String exchangeId = ExecSupport.getNextExchangeId();
216                             clientContext.setExchangeId(exchangeId);
217                             if (LOG.isDebugEnabled()) {
218                                 LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(session));
219                             }
220                             session.enqueue(
221                                     new RequestExecutionCommand(
222                                             new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
223                                             pushHandlerFactory,
224                                             cancellable,
225                                             clientContext),
226                                     Command.Priority.NORMAL);
227                         } else {
228                             session.enqueue(
229                                     new RequestExecutionCommand(
230                                             internalExchangeHandler,
231                                             pushHandlerFactory,
232                                             cancellable,
233                                             clientContext),
234                                     Command.Priority.NORMAL);
235                         }
236                     }
237 
238                     @Override
239                     public void failed(final Exception ex) {
240                         exchangeHandler.failed(ex);
241                     }
242 
243                     @Override
244                     public void cancelled() {
245                         exchangeHandler.cancel();
246                     }
247 
248                 });
249                 cancellable.setDependency(() -> sessionFuture.cancel(true));
250             }, context);
251         } catch (final HttpException | IOException | IllegalStateException ex) {
252             exchangeHandler.failed(ex);
253         }
254         return cancellable;
255     }
256 
257     /**
258      * Sets {@link Resolver} for {@link ConnectionConfig} on a per host basis.
259      *
260      * @since 5.2
261      */
262     public void setConnectionConfigResolver(final Resolver<HttpHost, ConnectionConfig> connectionConfigResolver) {
263         connPool.setConnectionConfigResolver(connectionConfigResolver);
264     }
265 
266 }