1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
37 import org.apache.hc.client5.http.HttpRoute;
38 import org.apache.hc.client5.http.UserTokenHandler;
39 import org.apache.hc.client5.http.async.AsyncExecCallback;
40 import org.apache.hc.client5.http.async.AsyncExecChain;
41 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
42 import org.apache.hc.client5.http.async.AsyncExecRuntime;
43 import org.apache.hc.client5.http.protocol.HttpClientContext;
44 import org.apache.hc.core5.annotation.Contract;
45 import org.apache.hc.core5.annotation.Internal;
46 import org.apache.hc.core5.annotation.ThreadingBehavior;
47 import org.apache.hc.core5.concurrent.CancellableDependency;
48 import org.apache.hc.core5.http.EntityDetails;
49 import org.apache.hc.core5.http.Header;
50 import org.apache.hc.core5.http.HttpException;
51 import org.apache.hc.core5.http.HttpRequest;
52 import org.apache.hc.core5.http.HttpResponse;
53 import org.apache.hc.core5.http.HttpStatus;
54 import org.apache.hc.core5.http.message.RequestLine;
55 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
56 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
57 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
58 import org.apache.hc.core5.http.nio.CapacityChannel;
59 import org.apache.hc.core5.http.nio.DataStreamChannel;
60 import org.apache.hc.core5.http.nio.RequestChannel;
61 import org.apache.hc.core5.http.protocol.HttpContext;
62 import org.apache.hc.core5.util.TimeValue;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66
67
68
69
70
71
72
73 @Contract(threading = ThreadingBehavior.STATELESS)
74 @Internal
75 class HttpAsyncMainClientExec implements AsyncExecChainHandler {
76
77 private static final Logger LOG = LoggerFactory.getLogger(HttpAsyncMainClientExec.class);
78
79 private final ConnectionKeepAliveStrategy keepAliveStrategy;
80 private final UserTokenHandler userTokenHandler;
81
82 HttpAsyncMainClientExec(final ConnectionKeepAliveStrategy keepAliveStrategy, final UserTokenHandler userTokenHandler) {
83 this.keepAliveStrategy = keepAliveStrategy;
84 this.userTokenHandler = userTokenHandler;
85 }
86
87 @Override
88 public void execute(
89 final HttpRequest request,
90 final AsyncEntityProducer entityProducer,
91 final AsyncExecChain.Scope scope,
92 final AsyncExecChain chain,
93 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
94 final String exchangeId = scope.exchangeId;
95 final HttpRoute route = scope.route;
96 final CancellableDependency operation = scope.cancellableDependency;
97 final HttpClientContext clientContext = scope.clientContext;
98 final AsyncExecRuntime execRuntime = scope.execRuntime;
99
100 if (LOG.isDebugEnabled()) {
101 LOG.debug("{} executing {}", exchangeId, new RequestLine(request));
102 }
103
104 final AtomicInteger messageCountDown = new AtomicInteger(2);
105 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
106
107 private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
108
109 @Override
110 public void releaseResources() {
111 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
112 if (entityConsumer != null) {
113 entityConsumer.releaseResources();
114 }
115 }
116
117 @Override
118 public void failed(final Exception cause) {
119 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
120 if (entityConsumer != null) {
121 entityConsumer.releaseResources();
122 }
123 execRuntime.markConnectionNonReusable();
124 asyncExecCallback.failed(cause);
125 }
126
127 @Override
128 public void cancel() {
129 if (messageCountDown.get() > 0) {
130 failed(new InterruptedIOException());
131 }
132 }
133
134 @Override
135 public void produceRequest(
136 final RequestChannel channel,
137 final HttpContext context) throws HttpException, IOException {
138 channel.sendRequest(request, entityProducer, context);
139 if (entityProducer == null) {
140 messageCountDown.decrementAndGet();
141 }
142 }
143
144 @Override
145 public int available() {
146 return entityProducer.available();
147 }
148
149 @Override
150 public void produce(final DataStreamChannel channel) throws IOException {
151 entityProducer.produce(new DataStreamChannel() {
152
153 @Override
154 public void requestOutput() {
155 channel.requestOutput();
156 }
157
158 @Override
159 public int write(final ByteBuffer src) throws IOException {
160 return channel.write(src);
161 }
162
163 @Override
164 public void endStream(final List<? extends Header> trailers) throws IOException {
165 channel.endStream(trailers);
166 if (messageCountDown.decrementAndGet() <= 0) {
167 asyncExecCallback.completed();
168 }
169 }
170
171 @Override
172 public void endStream() throws IOException {
173 channel.endStream();
174 if (messageCountDown.decrementAndGet() <= 0) {
175 asyncExecCallback.completed();
176 }
177 }
178
179 });
180 }
181
182 @Override
183 public void consumeInformation(
184 final HttpResponse response,
185 final HttpContext context) throws HttpException, IOException {
186 asyncExecCallback.handleInformationResponse(response);
187 }
188
189 @Override
190 public void consumeResponse(
191 final HttpResponse response,
192 final EntityDetails entityDetails,
193 final HttpContext context) throws HttpException, IOException {
194 entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
195 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
196 messageCountDown.decrementAndGet();
197 }
198 final TimeValue keepAliveDuration = keepAliveStrategy.getKeepAliveDuration(response, clientContext);
199 Object userToken = clientContext.getUserToken();
200 if (userToken == null) {
201 userToken = userTokenHandler.getUserToken(route, clientContext);
202 clientContext.setAttribute(HttpClientContext.USER_TOKEN, userToken);
203 }
204 execRuntime.markConnectionReusable(userToken, keepAliveDuration);
205 if (entityDetails == null) {
206 execRuntime.validateConnection();
207 if (messageCountDown.decrementAndGet() <= 0) {
208 asyncExecCallback.completed();
209 }
210 }
211 }
212
213 @Override
214 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
215 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
216 if (entityConsumer != null) {
217 entityConsumer.updateCapacity(capacityChannel);
218 } else {
219 capacityChannel.update(Integer.MAX_VALUE);
220 }
221 }
222
223 @Override
224 public void consume(final ByteBuffer src) throws IOException {
225 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
226 if (entityConsumer != null) {
227 entityConsumer.consume(src);
228 }
229 }
230
231 @Override
232 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
233 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
234 if (entityConsumer != null) {
235 entityConsumer.streamEnd(trailers);
236 } else {
237 execRuntime.validateConnection();
238 }
239 if (messageCountDown.decrementAndGet() <= 0) {
240 asyncExecCallback.completed();
241 }
242 }
243
244 };
245
246 if (LOG.isDebugEnabled()) {
247 operation.setDependency(execRuntime.execute(
248 exchangeId,
249 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
250 clientContext));
251 } else {
252 operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
253 }
254 }
255
256 }