1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.ThreadFactory;
36
37 import org.apache.hc.client5.http.DnsResolver;
38 import org.apache.hc.client5.http.config.Configurable;
39 import org.apache.hc.client5.http.config.RequestConfig;
40 import org.apache.hc.client5.http.impl.ConnPoolSupport;
41 import org.apache.hc.client5.http.impl.ExecSupport;
42 import org.apache.hc.client5.http.impl.classic.RequestFailedException;
43 import org.apache.hc.client5.http.impl.nio.MultihomeConnectionInitiator;
44 import org.apache.hc.client5.http.protocol.HttpClientContext;
45 import org.apache.hc.core5.annotation.Contract;
46 import org.apache.hc.core5.annotation.ThreadingBehavior;
47 import org.apache.hc.core5.concurrent.Cancellable;
48 import org.apache.hc.core5.concurrent.ComplexCancellable;
49 import org.apache.hc.core5.concurrent.FutureCallback;
50 import org.apache.hc.core5.function.Callback;
51 import org.apache.hc.core5.function.Resolver;
52 import org.apache.hc.core5.http.EntityDetails;
53 import org.apache.hc.core5.http.Header;
54 import org.apache.hc.core5.http.HttpException;
55 import org.apache.hc.core5.http.HttpHost;
56 import org.apache.hc.core5.http.HttpRequest;
57 import org.apache.hc.core5.http.HttpResponse;
58 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
59 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
60 import org.apache.hc.core5.http.nio.CapacityChannel;
61 import org.apache.hc.core5.http.nio.DataStreamChannel;
62 import org.apache.hc.core5.http.nio.HandlerFactory;
63 import org.apache.hc.core5.http.nio.RequestChannel;
64 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
65 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
66 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
67 import org.apache.hc.core5.http.protocol.HttpContext;
68 import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
69 import org.apache.hc.core5.io.CloseMode;
70 import org.apache.hc.core5.reactor.Command;
71 import org.apache.hc.core5.reactor.ConnectionInitiator;
72 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
73 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
74 import org.apache.hc.core5.reactor.IOReactorConfig;
75 import org.apache.hc.core5.reactor.IOSession;
76 import org.apache.hc.core5.util.Timeout;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
94 public final class MinimalH2AsyncClient extends AbstractMinimalHttpAsyncClientBase {
95
96 private static final Logger LOG = LoggerFactory.getLogger(MinimalH2AsyncClient.class);
97 private final H2ConnPool connPool;
98 private final ConnectionInitiator connectionInitiator;
99
100 MinimalH2AsyncClient(
101 final IOEventHandlerFactory eventHandlerFactory,
102 final AsyncPushConsumerRegistry pushConsumerRegistry,
103 final IOReactorConfig reactorConfig,
104 final ThreadFactory threadFactory,
105 final ThreadFactory workerThreadFactory,
106 final DnsResolver dnsResolver,
107 final TlsStrategy tlsStrategy) {
108 super(new DefaultConnectingIOReactor(
109 eventHandlerFactory,
110 reactorConfig,
111 workerThreadFactory,
112 LoggingIOSessionDecorator.INSTANCE,
113 LoggingExceptionCallback.INSTANCE,
114 null,
115 new Callback<IOSession>() {
116
117 @Override
118 public void execute(final IOSession ioSession) {
119 ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.IMMEDIATE);
120 }
121
122 }),
123 pushConsumerRegistry,
124 threadFactory);
125 this.connectionInitiator = new MultihomeConnectionInitiator(getConnectionInitiator(), dnsResolver);
126 this.connPool = new H2ConnPool(this.connectionInitiator, new Resolver<HttpHost, InetSocketAddress>() {
127
128 @Override
129 public InetSocketAddress resolve(final HttpHost object) {
130 return null;
131 }
132
133 }, tlsStrategy);
134 }
135
136 @Override
137 public Cancellable execute(
138 final AsyncClientExchangeHandler exchangeHandler,
139 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
140 final HttpContext context) {
141 final ComplexCancellable cancellable = new ComplexCancellable();
142 try {
143 if (!isRunning()) {
144 throw new CancellationException("Request execution cancelled");
145 }
146 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
147 exchangeHandler.produceRequest(new RequestChannel() {
148
149 @Override
150 public void sendRequest(
151 final HttpRequest request,
152 final EntityDetails entityDetails,
153 final HttpContext context) throws HttpException, IOException {
154 RequestConfig requestConfig = null;
155 if (request instanceof Configurable) {
156 requestConfig = ((Configurable) request).getConfig();
157 }
158 if (requestConfig != null) {
159 clientContext.setRequestConfig(requestConfig);
160 } else {
161 requestConfig = clientContext.getRequestConfig();
162 }
163 final Timeout connectTimeout = requestConfig.getConnectTimeout();
164 final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
165
166 final Future<IOSession> sessionFuture = connPool.getSession(target, connectTimeout,
167 new FutureCallback<IOSession>() {
168
169 @Override
170 public void completed(final IOSession session) {
171 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
172
173 @Override
174 public void releaseResources() {
175 exchangeHandler.releaseResources();
176 }
177
178 @Override
179 public void failed(final Exception cause) {
180 exchangeHandler.failed(cause);
181 }
182
183 @Override
184 public void cancel() {
185 failed(new RequestFailedException("Request aborted"));
186 }
187
188 @Override
189 public void produceRequest(
190 final RequestChannel channel,
191 final HttpContext context) throws HttpException, IOException {
192 channel.sendRequest(request, entityDetails, context);
193 }
194
195 @Override
196 public int available() {
197 return exchangeHandler.available();
198 }
199
200 @Override
201 public void produce(final DataStreamChannel channel) throws IOException {
202 exchangeHandler.produce(channel);
203 }
204
205 @Override
206 public void consumeInformation(
207 final HttpResponse response,
208 final HttpContext context) throws HttpException, IOException {
209 exchangeHandler.consumeInformation(response, context);
210 }
211
212 @Override
213 public void consumeResponse(
214 final HttpResponse response,
215 final EntityDetails entityDetails,
216 final HttpContext context) throws HttpException, IOException {
217 exchangeHandler.consumeResponse(response, entityDetails, context);
218 }
219
220 @Override
221 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
222 exchangeHandler.updateCapacity(capacityChannel);
223 }
224
225 @Override
226 public void consume(final ByteBuffer src) throws IOException {
227 exchangeHandler.consume(src);
228 }
229
230 @Override
231 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
232 exchangeHandler.streamEnd(trailers);
233 }
234
235 };
236 if (LOG.isDebugEnabled()) {
237 final String exchangeId = ExecSupport.getNextExchangeId();
238 LOG.debug("{}: executing message exchange {}", ConnPoolSupport.getId(session), exchangeId);
239 session.enqueue(
240 new RequestExecutionCommand(
241 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
242 pushHandlerFactory,
243 cancellable,
244 clientContext),
245 Command.Priority.NORMAL);
246 } else {
247 session.enqueue(
248 new RequestExecutionCommand(
249 internalExchangeHandler,
250 pushHandlerFactory,
251 cancellable,
252 clientContext),
253 Command.Priority.NORMAL);
254 }
255 }
256
257 @Override
258 public void failed(final Exception ex) {
259 exchangeHandler.failed(ex);
260 }
261
262 @Override
263 public void cancelled() {
264 exchangeHandler.cancel();
265 }
266
267 });
268 cancellable.setDependency(new Cancellable() {
269
270 @Override
271 public boolean cancel() {
272 return sessionFuture.cancel(true);
273 }
274
275 });
276 }
277
278 }, context);
279 } catch (final HttpException | IOException | IllegalStateException ex) {
280 exchangeHandler.failed(ex);
281 }
282 return cancellable;
283 }
284
285 }