1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
37 import org.apache.hc.client5.http.HttpRoute;
38 import org.apache.hc.client5.http.UserTokenHandler;
39 import org.apache.hc.client5.http.async.AsyncExecCallback;
40 import org.apache.hc.client5.http.async.AsyncExecChain;
41 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
42 import org.apache.hc.client5.http.async.AsyncExecRuntime;
43 import org.apache.hc.client5.http.impl.ProtocolSwitchStrategy;
44 import org.apache.hc.client5.http.protocol.HttpClientContext;
45 import org.apache.hc.core5.annotation.Contract;
46 import org.apache.hc.core5.annotation.Internal;
47 import org.apache.hc.core5.annotation.ThreadingBehavior;
48 import org.apache.hc.core5.concurrent.CancellableDependency;
49 import org.apache.hc.core5.concurrent.FutureCallback;
50 import org.apache.hc.core5.http.EntityDetails;
51 import org.apache.hc.core5.http.Header;
52 import org.apache.hc.core5.http.HttpException;
53 import org.apache.hc.core5.http.HttpRequest;
54 import org.apache.hc.core5.http.HttpResponse;
55 import org.apache.hc.core5.http.HttpStatus;
56 import org.apache.hc.core5.http.ProtocolException;
57 import org.apache.hc.core5.http.ProtocolVersion;
58 import org.apache.hc.core5.http.message.RequestLine;
59 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
60 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
61 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
62 import org.apache.hc.core5.http.nio.CapacityChannel;
63 import org.apache.hc.core5.http.nio.DataStreamChannel;
64 import org.apache.hc.core5.http.nio.RequestChannel;
65 import org.apache.hc.core5.http.protocol.HttpContext;
66 import org.apache.hc.core5.http.protocol.HttpProcessor;
67 import org.apache.hc.core5.util.Args;
68 import org.apache.hc.core5.util.TimeValue;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72
73
74
75
76
77
78
79 @Contract(threading = ThreadingBehavior.STATELESS)
80 @Internal
81 class HttpAsyncMainClientExec implements AsyncExecChainHandler {
82
83 private static final Logger LOG = LoggerFactory.getLogger(HttpAsyncMainClientExec.class);
84
85 private final HttpProcessor httpProcessor;
86 private final ConnectionKeepAliveStrategy keepAliveStrategy;
87 private final UserTokenHandler userTokenHandler;
88 private final ProtocolSwitchStrategy protocolSwitchStrategy;
89
90 HttpAsyncMainClientExec(final HttpProcessor httpProcessor,
91 final ConnectionKeepAliveStrategy keepAliveStrategy,
92 final UserTokenHandler userTokenHandler) {
93 this.httpProcessor = Args.notNull(httpProcessor, "HTTP protocol processor");
94 this.keepAliveStrategy = keepAliveStrategy;
95 this.userTokenHandler = userTokenHandler;
96 this.protocolSwitchStrategy = new ProtocolSwitchStrategy();
97 }
98
99 @Override
100 public void execute(
101 final HttpRequest request,
102 final AsyncEntityProducer entityProducer,
103 final AsyncExecChain.Scope scope,
104 final AsyncExecChain chain,
105 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
106 final String exchangeId = scope.exchangeId;
107 final HttpRoute route = scope.route;
108 final CancellableDependency operation = scope.cancellableDependency;
109 final HttpClientContext clientContext = scope.clientContext;
110 final AsyncExecRuntime execRuntime = scope.execRuntime;
111
112 if (LOG.isDebugEnabled()) {
113 LOG.debug("{} executing {}", exchangeId, new RequestLine(request));
114 }
115
116 final AtomicInteger messageCountDown = new AtomicInteger(2);
117 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
118
119 private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
120
121 @Override
122 public void releaseResources() {
123 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
124 if (entityConsumer != null) {
125 entityConsumer.releaseResources();
126 }
127 }
128
129 @Override
130 public void failed(final Exception cause) {
131 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
132 if (entityConsumer != null) {
133 entityConsumer.releaseResources();
134 }
135 execRuntime.markConnectionNonReusable();
136 asyncExecCallback.failed(cause);
137 }
138
139 @Override
140 public void cancel() {
141 if (messageCountDown.get() > 0) {
142 failed(new InterruptedIOException());
143 }
144 }
145
146 @Override
147 public void produceRequest(
148 final RequestChannel channel,
149 final HttpContext context) throws HttpException, IOException {
150
151 clientContext.setRoute(route);
152 clientContext.setRequest(request);
153 httpProcessor.process(request, entityProducer, clientContext);
154
155 channel.sendRequest(request, entityProducer, context);
156 if (entityProducer == null) {
157 messageCountDown.decrementAndGet();
158 }
159 }
160
161 @Override
162 public int available() {
163 return entityProducer.available();
164 }
165
166 @Override
167 public void produce(final DataStreamChannel channel) throws IOException {
168 entityProducer.produce(new DataStreamChannel() {
169
170 @Override
171 public void requestOutput() {
172 channel.requestOutput();
173 }
174
175 @Override
176 public int write(final ByteBuffer src) throws IOException {
177 return channel.write(src);
178 }
179
180 @Override
181 public void endStream(final List<? extends Header> trailers) throws IOException {
182 channel.endStream(trailers);
183 if (messageCountDown.decrementAndGet() <= 0) {
184 asyncExecCallback.completed();
185 }
186 }
187
188 @Override
189 public void endStream() throws IOException {
190 channel.endStream();
191 if (messageCountDown.decrementAndGet() <= 0) {
192 asyncExecCallback.completed();
193 }
194 }
195
196 });
197 }
198
199 @Override
200 public void consumeInformation(
201 final HttpResponse response,
202 final HttpContext context) throws HttpException, IOException {
203 if (response.getCode() == HttpStatus.SC_SWITCHING_PROTOCOLS) {
204 final ProtocolVersion upgradeProtocol = protocolSwitchStrategy.switchProtocol(response);
205 if (upgradeProtocol == null || !upgradeProtocol.getProtocol().equals("TLS")) {
206 throw new ProtocolException("Failure switching protocols");
207 }
208 if (LOG.isDebugEnabled()) {
209 LOG.debug("Switching to {}", upgradeProtocol);
210 }
211 execRuntime.upgradeTls(clientContext, new FutureCallback<AsyncExecRuntime>() {
212
213 @Override
214 public void completed(final AsyncExecRuntime result) {
215 LOG.debug("Successfully switched to {}", upgradeProtocol);
216 }
217
218 @Override
219 public void failed(final Exception ex) {
220 asyncExecCallback.failed(ex);
221 }
222
223 @Override
224 public void cancelled() {
225 asyncExecCallback.failed(new InterruptedIOException());
226 }
227
228 });
229 } else {
230 asyncExecCallback.handleInformationResponse(response);
231 }
232 }
233
234 @Override
235 public void consumeResponse(
236 final HttpResponse response,
237 final EntityDetails entityDetails,
238 final HttpContext context) throws HttpException, IOException {
239
240 clientContext.setResponse(response);
241 httpProcessor.process(response, entityDetails, clientContext);
242
243 entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
244 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
245 messageCountDown.decrementAndGet();
246 }
247 final TimeValue keepAliveDuration = keepAliveStrategy.getKeepAliveDuration(response, clientContext);
248 Object userToken = clientContext.getUserToken();
249 if (userToken == null) {
250 userToken = userTokenHandler.getUserToken(route, request, clientContext);
251 clientContext.setUserToken(userToken);
252 }
253 execRuntime.markConnectionReusable(userToken, keepAliveDuration);
254 if (entityDetails == null) {
255 execRuntime.validateConnection();
256 if (messageCountDown.decrementAndGet() <= 0) {
257 asyncExecCallback.completed();
258 }
259 }
260 }
261
262 @Override
263 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
264 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
265 if (entityConsumer != null) {
266 entityConsumer.updateCapacity(capacityChannel);
267 } else {
268 capacityChannel.update(Integer.MAX_VALUE);
269 }
270 }
271
272 @Override
273 public void consume(final ByteBuffer src) throws IOException {
274 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
275 if (entityConsumer != null) {
276 entityConsumer.consume(src);
277 }
278 }
279
280 @Override
281 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
282 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
283 if (entityConsumer != null) {
284 entityConsumer.streamEnd(trailers);
285 } else {
286 execRuntime.validateConnection();
287 }
288 if (messageCountDown.decrementAndGet() <= 0) {
289 asyncExecCallback.completed();
290 }
291 }
292
293 };
294
295 if (LOG.isDebugEnabled()) {
296 operation.setDependency(execRuntime.execute(
297 exchangeId,
298 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
299 clientContext));
300 } else {
301 operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
302 }
303 }
304
305 }