1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35
36 import org.apache.hc.client5.http.DnsResolver;
37 import org.apache.hc.client5.http.HttpRoute;
38 import org.apache.hc.client5.http.config.Configurable;
39 import org.apache.hc.client5.http.config.ConnectionConfig;
40 import org.apache.hc.client5.http.config.RequestConfig;
41 import org.apache.hc.client5.http.impl.ConnPoolSupport;
42 import org.apache.hc.client5.http.impl.ExecSupport;
43 import org.apache.hc.client5.http.impl.classic.RequestFailedException;
44 import org.apache.hc.client5.http.impl.nio.MultihomeConnectionInitiator;
45 import org.apache.hc.client5.http.protocol.HttpClientContext;
46 import org.apache.hc.core5.annotation.Contract;
47 import org.apache.hc.core5.annotation.ThreadingBehavior;
48 import org.apache.hc.core5.concurrent.Cancellable;
49 import org.apache.hc.core5.concurrent.ComplexCancellable;
50 import org.apache.hc.core5.concurrent.FutureCallback;
51 import org.apache.hc.core5.function.Resolver;
52 import org.apache.hc.core5.http.EntityDetails;
53 import org.apache.hc.core5.http.Header;
54 import org.apache.hc.core5.http.HttpException;
55 import org.apache.hc.core5.http.HttpHost;
56 import org.apache.hc.core5.http.HttpResponse;
57 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
58 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
59 import org.apache.hc.core5.http.nio.CapacityChannel;
60 import org.apache.hc.core5.http.nio.DataStreamChannel;
61 import org.apache.hc.core5.http.nio.HandlerFactory;
62 import org.apache.hc.core5.http.nio.RequestChannel;
63 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
65 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
66 import org.apache.hc.core5.http.protocol.HttpContext;
67 import org.apache.hc.core5.io.CloseMode;
68 import org.apache.hc.core5.reactor.Command;
69 import org.apache.hc.core5.reactor.ConnectionInitiator;
70 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
71 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
72 import org.apache.hc.core5.reactor.IOReactorConfig;
73 import org.apache.hc.core5.reactor.IOSession;
74 import org.apache.hc.core5.util.Args;
75 import org.apache.hc.core5.util.Timeout;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
93 public final class MinimalH2AsyncClient extends AbstractMinimalHttpAsyncClientBase {
94
95 private static final Logger LOG = LoggerFactory.getLogger(MinimalH2AsyncClient.class);
96 private final InternalH2ConnPool connPool;
97 private final ConnectionInitiator connectionInitiator;
98
99 MinimalH2AsyncClient(
100 final IOEventHandlerFactory eventHandlerFactory,
101 final AsyncPushConsumerRegistry pushConsumerRegistry,
102 final IOReactorConfig reactorConfig,
103 final ThreadFactory threadFactory,
104 final ThreadFactory workerThreadFactory,
105 final DnsResolver dnsResolver,
106 final TlsStrategy tlsStrategy) {
107 super(new DefaultConnectingIOReactor(
108 eventHandlerFactory,
109 reactorConfig,
110 workerThreadFactory,
111 LoggingIOSessionDecorator.INSTANCE,
112 LoggingExceptionCallback.INSTANCE,
113 null,
114 ioSession -> ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.IMMEDIATE)),
115 pushConsumerRegistry,
116 threadFactory);
117 this.connectionInitiator = new MultihomeConnectionInitiator(getConnectionInitiator(), dnsResolver);
118 this.connPool = new InternalH2ConnPool(this.connectionInitiator, object -> null, tlsStrategy);
119 }
120
121 @Override
122 public Cancellable execute(
123 final AsyncClientExchangeHandler exchangeHandler,
124 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
125 final HttpContext context) {
126 Args.notNull(exchangeHandler, "Message exchange handler");
127 final ComplexCancellable cancellable = new ComplexCancellable();
128 try {
129 if (!isRunning()) {
130 throw new CancellationException("Request execution cancelled");
131 }
132 final HttpClientContext clientContext = HttpClientContext.castOrCreate(context);
133 exchangeHandler.produceRequest((request, entityDetails, context1) -> {
134 RequestConfig requestConfig = null;
135 if (request instanceof Configurable) {
136 requestConfig = ((Configurable) request).getConfig();
137 }
138 if (requestConfig != null) {
139 clientContext.setRequestConfig(requestConfig);
140 } else {
141 requestConfig = clientContext.getRequestConfigOrDefault();
142 }
143 @SuppressWarnings("deprecation")
144 final Timeout connectTimeout = requestConfig.getConnectTimeout();
145 final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
146
147 final Future<IOSession> sessionFuture = connPool.getSession(new HttpRoute(target), connectTimeout,
148 new FutureCallback<IOSession>() {
149
150 @Override
151 public void completed(final IOSession session) {
152 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
153
154 @Override
155 public void releaseResources() {
156 exchangeHandler.releaseResources();
157 }
158
159 @Override
160 public void failed(final Exception cause) {
161 exchangeHandler.failed(cause);
162 }
163
164 @Override
165 public void cancel() {
166 failed(new RequestFailedException("Request aborted"));
167 }
168
169 @Override
170 public void produceRequest(
171 final RequestChannel channel,
172 final HttpContext context1) throws HttpException, IOException {
173 channel.sendRequest(request, entityDetails, context1);
174 }
175
176 @Override
177 public int available() {
178 return exchangeHandler.available();
179 }
180
181 @Override
182 public void produce(final DataStreamChannel channel) throws IOException {
183 exchangeHandler.produce(channel);
184 }
185
186 @Override
187 public void consumeInformation(
188 final HttpResponse response,
189 final HttpContext context1) throws HttpException, IOException {
190 exchangeHandler.consumeInformation(response, context1);
191 }
192
193 @Override
194 public void consumeResponse(
195 final HttpResponse response,
196 final EntityDetails entityDetails,
197 final HttpContext context1) throws HttpException, IOException {
198 exchangeHandler.consumeResponse(response, entityDetails, context1);
199 }
200
201 @Override
202 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
203 exchangeHandler.updateCapacity(capacityChannel);
204 }
205
206 @Override
207 public void consume(final ByteBuffer src) throws IOException {
208 exchangeHandler.consume(src);
209 }
210
211 @Override
212 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
213 exchangeHandler.streamEnd(trailers);
214 }
215
216 };
217 if (LOG.isDebugEnabled()) {
218 final String exchangeId = ExecSupport.getNextExchangeId();
219 clientContext.setExchangeId(exchangeId);
220 if (LOG.isDebugEnabled()) {
221 LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(session));
222 }
223 session.enqueue(
224 new RequestExecutionCommand(
225 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
226 pushHandlerFactory,
227 cancellable,
228 clientContext),
229 Command.Priority.NORMAL);
230 } else {
231 session.enqueue(
232 new RequestExecutionCommand(
233 internalExchangeHandler,
234 pushHandlerFactory,
235 cancellable,
236 clientContext),
237 Command.Priority.NORMAL);
238 }
239 }
240
241 @Override
242 public void failed(final Exception ex) {
243 exchangeHandler.failed(ex);
244 }
245
246 @Override
247 public void cancelled() {
248 exchangeHandler.cancel();
249 }
250
251 });
252 cancellable.setDependency(() -> sessionFuture.cancel(true));
253 }, context);
254 } catch (final HttpException | IOException | IllegalStateException ex) {
255 exchangeHandler.failed(ex);
256 }
257 return cancellable;
258 }
259
260
261
262
263
264
265 public void setConnectionConfigResolver(final Resolver<HttpHost, ConnectionConfig> connectionConfigResolver) {
266 connPool.setConnectionConfigResolver(connectionConfigResolver);
267 }
268
269 }