1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35
36 import org.apache.hc.client5.http.DnsResolver;
37 import org.apache.hc.client5.http.config.Configurable;
38 import org.apache.hc.client5.http.config.ConnectionConfig;
39 import org.apache.hc.client5.http.config.RequestConfig;
40 import org.apache.hc.client5.http.impl.ConnPoolSupport;
41 import org.apache.hc.client5.http.impl.ExecSupport;
42 import org.apache.hc.client5.http.impl.classic.RequestFailedException;
43 import org.apache.hc.client5.http.impl.nio.MultihomeConnectionInitiator;
44 import org.apache.hc.client5.http.protocol.HttpClientContext;
45 import org.apache.hc.core5.annotation.Contract;
46 import org.apache.hc.core5.annotation.ThreadingBehavior;
47 import org.apache.hc.core5.concurrent.Cancellable;
48 import org.apache.hc.core5.concurrent.ComplexCancellable;
49 import org.apache.hc.core5.concurrent.FutureCallback;
50 import org.apache.hc.core5.function.Resolver;
51 import org.apache.hc.core5.http.EntityDetails;
52 import org.apache.hc.core5.http.Header;
53 import org.apache.hc.core5.http.HttpException;
54 import org.apache.hc.core5.http.HttpHost;
55 import org.apache.hc.core5.http.HttpResponse;
56 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
57 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
58 import org.apache.hc.core5.http.nio.CapacityChannel;
59 import org.apache.hc.core5.http.nio.DataStreamChannel;
60 import org.apache.hc.core5.http.nio.HandlerFactory;
61 import org.apache.hc.core5.http.nio.RequestChannel;
62 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
63 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
64 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
65 import org.apache.hc.core5.http.protocol.HttpContext;
66 import org.apache.hc.core5.io.CloseMode;
67 import org.apache.hc.core5.reactor.Command;
68 import org.apache.hc.core5.reactor.ConnectionInitiator;
69 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
70 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
71 import org.apache.hc.core5.reactor.IOReactorConfig;
72 import org.apache.hc.core5.reactor.IOSession;
73 import org.apache.hc.core5.util.Timeout;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
91 public final class MinimalH2AsyncClient extends AbstractMinimalHttpAsyncClientBase {
92
93 private static final Logger LOG = LoggerFactory.getLogger(MinimalH2AsyncClient.class);
94 private final InternalH2ConnPool connPool;
95 private final ConnectionInitiator connectionInitiator;
96
97 MinimalH2AsyncClient(
98 final IOEventHandlerFactory eventHandlerFactory,
99 final AsyncPushConsumerRegistry pushConsumerRegistry,
100 final IOReactorConfig reactorConfig,
101 final ThreadFactory threadFactory,
102 final ThreadFactory workerThreadFactory,
103 final DnsResolver dnsResolver,
104 final TlsStrategy tlsStrategy) {
105 super(new DefaultConnectingIOReactor(
106 eventHandlerFactory,
107 reactorConfig,
108 workerThreadFactory,
109 LoggingIOSessionDecorator.INSTANCE,
110 LoggingExceptionCallback.INSTANCE,
111 null,
112 ioSession -> ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.IMMEDIATE)),
113 pushConsumerRegistry,
114 threadFactory);
115 this.connectionInitiator = new MultihomeConnectionInitiator(getConnectionInitiator(), dnsResolver);
116 this.connPool = new InternalH2ConnPool(this.connectionInitiator, object -> null, tlsStrategy);
117 }
118
119 @Override
120 public Cancellable execute(
121 final AsyncClientExchangeHandler exchangeHandler,
122 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
123 final HttpContext context) {
124 final ComplexCancellable cancellable = new ComplexCancellable();
125 try {
126 if (!isRunning()) {
127 throw new CancellationException("Request execution cancelled");
128 }
129 final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
130 exchangeHandler.produceRequest((request, entityDetails, context1) -> {
131 RequestConfig requestConfig = null;
132 if (request instanceof Configurable) {
133 requestConfig = ((Configurable) request).getConfig();
134 }
135 if (requestConfig != null) {
136 clientContext.setRequestConfig(requestConfig);
137 } else {
138 requestConfig = clientContext.getRequestConfig();
139 }
140 @SuppressWarnings("deprecation")
141 final Timeout connectTimeout = requestConfig.getConnectTimeout();
142 final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
143
144 final Future<IOSession> sessionFuture = connPool.getSession(target, connectTimeout,
145 new FutureCallback<IOSession>() {
146
147 @Override
148 public void completed(final IOSession session) {
149 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
150
151 @Override
152 public void releaseResources() {
153 exchangeHandler.releaseResources();
154 }
155
156 @Override
157 public void failed(final Exception cause) {
158 exchangeHandler.failed(cause);
159 }
160
161 @Override
162 public void cancel() {
163 failed(new RequestFailedException("Request aborted"));
164 }
165
166 @Override
167 public void produceRequest(
168 final RequestChannel channel,
169 final HttpContext context1) throws HttpException, IOException {
170 channel.sendRequest(request, entityDetails, context1);
171 }
172
173 @Override
174 public int available() {
175 return exchangeHandler.available();
176 }
177
178 @Override
179 public void produce(final DataStreamChannel channel) throws IOException {
180 exchangeHandler.produce(channel);
181 }
182
183 @Override
184 public void consumeInformation(
185 final HttpResponse response,
186 final HttpContext context1) throws HttpException, IOException {
187 exchangeHandler.consumeInformation(response, context1);
188 }
189
190 @Override
191 public void consumeResponse(
192 final HttpResponse response,
193 final EntityDetails entityDetails,
194 final HttpContext context1) throws HttpException, IOException {
195 exchangeHandler.consumeResponse(response, entityDetails, context1);
196 }
197
198 @Override
199 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
200 exchangeHandler.updateCapacity(capacityChannel);
201 }
202
203 @Override
204 public void consume(final ByteBuffer src) throws IOException {
205 exchangeHandler.consume(src);
206 }
207
208 @Override
209 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
210 exchangeHandler.streamEnd(trailers);
211 }
212
213 };
214 if (LOG.isDebugEnabled()) {
215 final String exchangeId = ExecSupport.getNextExchangeId();
216 clientContext.setExchangeId(exchangeId);
217 if (LOG.isDebugEnabled()) {
218 LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(session));
219 }
220 session.enqueue(
221 new RequestExecutionCommand(
222 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
223 pushHandlerFactory,
224 cancellable,
225 clientContext),
226 Command.Priority.NORMAL);
227 } else {
228 session.enqueue(
229 new RequestExecutionCommand(
230 internalExchangeHandler,
231 pushHandlerFactory,
232 cancellable,
233 clientContext),
234 Command.Priority.NORMAL);
235 }
236 }
237
238 @Override
239 public void failed(final Exception ex) {
240 exchangeHandler.failed(ex);
241 }
242
243 @Override
244 public void cancelled() {
245 exchangeHandler.cancel();
246 }
247
248 });
249 cancellable.setDependency(() -> sessionFuture.cancel(true));
250 }, context);
251 } catch (final HttpException | IOException | IllegalStateException ex) {
252 exchangeHandler.failed(ex);
253 }
254 return cancellable;
255 }
256
257
258
259
260
261
262 public void setConnectionConfigResolver(final Resolver<HttpHost, ConnectionConfig> connectionConfigResolver) {
263 connPool.setConnectionConfigResolver(connectionConfigResolver);
264 }
265
266 }