1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import org.apache.hc.client5.http.HttpRoute;
34 import org.apache.hc.client5.http.async.AsyncExecRuntime;
35 import org.apache.hc.client5.http.config.RequestConfig;
36 import org.apache.hc.client5.http.impl.ConnPoolSupport;
37 import org.apache.hc.client5.http.impl.Operations;
38 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
39 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
40 import org.apache.hc.client5.http.protocol.HttpClientContext;
41 import org.apache.hc.core5.concurrent.Cancellable;
42 import org.apache.hc.core5.concurrent.FutureCallback;
43 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
44 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
45 import org.apache.hc.core5.http.nio.HandlerFactory;
46 import org.apache.hc.core5.http2.HttpVersionPolicy;
47 import org.apache.hc.core5.io.CloseMode;
48 import org.apache.hc.core5.reactor.ConnectionInitiator;
49 import org.apache.hc.core5.util.TimeValue;
50 import org.apache.hc.core5.util.Timeout;
51 import org.slf4j.Logger;
52
53 class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
54
55 private final Logger log;
56 private final AsyncClientConnectionManager manager;
57 private final ConnectionInitiator connectionInitiator;
58 private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
59 private final HttpVersionPolicy versionPolicy;
60 private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
61 private volatile boolean reusable;
62 private volatile Object state;
63 private volatile TimeValue validDuration;
64
65 InternalHttpAsyncExecRuntime(
66 final Logger log,
67 final AsyncClientConnectionManager manager,
68 final ConnectionInitiator connectionInitiator,
69 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
70 final HttpVersionPolicy versionPolicy) {
71 super();
72 this.log = log;
73 this.manager = manager;
74 this.connectionInitiator = connectionInitiator;
75 this.pushHandlerFactory = pushHandlerFactory;
76 this.versionPolicy = versionPolicy;
77 this.endpointRef = new AtomicReference<>(null);
78 this.validDuration = TimeValue.NEG_ONE_MILLISECOND;
79 }
80
81 @Override
82 public boolean isEndpointAcquired() {
83 return endpointRef.get() != null;
84 }
85
86 @Override
87 public Cancellable acquireEndpoint(
88 final String id,
89 final HttpRoute route,
90 final Object object,
91 final HttpClientContext context,
92 final FutureCallback<AsyncExecRuntime> callback) {
93 if (endpointRef.get() == null) {
94 state = object;
95 final RequestConfig requestConfig = context.getRequestConfig();
96 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
97 if (log.isDebugEnabled()) {
98 log.debug("{}: acquiring endpoint ({})", id, connectionRequestTimeout);
99 }
100 return Operations.cancellable(manager.lease(
101 id,
102 route,
103 object,
104 connectionRequestTimeout,
105 new FutureCallback<AsyncConnectionEndpoint>() {
106
107 @Override
108 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
109 endpointRef.set(connectionEndpoint);
110 reusable = connectionEndpoint.isConnected();
111 if (log.isDebugEnabled()) {
112 log.debug("{}: acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
113 }
114 callback.completed(InternalHttpAsyncExecRuntime.this);
115 }
116
117 @Override
118 public void failed(final Exception ex) {
119 callback.failed(ex);
120 }
121
122 @Override
123 public void cancelled() {
124 callback.cancelled();
125 }
126 }));
127 }
128 callback.completed(this);
129 return Operations.nonCancellable();
130 }
131
132 private void discardEndpoint(final AsyncConnectionEndpoint endpoint) {
133 try {
134 endpoint.close(CloseMode.IMMEDIATE);
135 if (log.isDebugEnabled()) {
136 log.debug("{}: endpoint closed", ConnPoolSupport.getId(endpoint));
137 }
138 } finally {
139 if (log.isDebugEnabled()) {
140 log.debug("{}: discarding endpoint", ConnPoolSupport.getId(endpoint));
141 }
142 manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
143 }
144 }
145
146 @Override
147 public void releaseEndpoint() {
148 final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
149 if (endpoint != null) {
150 if (reusable) {
151 if (log.isDebugEnabled()) {
152 log.debug("{}: releasing valid endpoint", ConnPoolSupport.getId(endpoint));
153 }
154 manager.release(endpoint, state, validDuration);
155 } else {
156 discardEndpoint(endpoint);
157 }
158 }
159 }
160
161 @Override
162 public void discardEndpoint() {
163 final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
164 if (endpoint != null) {
165 discardEndpoint(endpoint);
166 }
167 }
168
169 @Override
170 public boolean validateConnection() {
171 if (reusable) {
172 final AsyncConnectionEndpoint endpoint = endpointRef.get();
173 return endpoint != null && endpoint.isConnected();
174 }
175 final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
176 if (endpoint != null) {
177 discardEndpoint(endpoint);
178 }
179 return false;
180 }
181
182 AsyncConnectionEndpoint ensureValid() {
183 final AsyncConnectionEndpoint endpoint = endpointRef.get();
184 if (endpoint == null) {
185 throw new IllegalStateException("Endpoint not acquired / already released");
186 }
187 return endpoint;
188 }
189
190 @Override
191 public boolean isEndpointConnected() {
192 final AsyncConnectionEndpoint endpoint = endpointRef.get();
193 return endpoint != null && endpoint.isConnected();
194 }
195
196 @Override
197 public Cancellable connectEndpoint(
198 final HttpClientContext context,
199 final FutureCallback<AsyncExecRuntime> callback) {
200 final AsyncConnectionEndpoint endpoint = ensureValid();
201 if (endpoint.isConnected()) {
202 callback.completed(this);
203 return Operations.nonCancellable();
204 }
205 final RequestConfig requestConfig = context.getRequestConfig();
206 final Timeout connectTimeout = requestConfig.getConnectTimeout();
207 if (log.isDebugEnabled()) {
208 log.debug("{}: connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
209 }
210 return Operations.cancellable(manager.connect(
211 endpoint,
212 connectionInitiator,
213 connectTimeout,
214 versionPolicy,
215 context,
216 new FutureCallback<AsyncConnectionEndpoint>() {
217
218 @Override
219 public void completed(final AsyncConnectionEndpoint endpoint) {
220 if (log.isDebugEnabled()) {
221 log.debug("{}: endpoint connected", ConnPoolSupport.getId(endpoint));
222 }
223 callback.completed(InternalHttpAsyncExecRuntime.this);
224 }
225
226 @Override
227 public void failed(final Exception ex) {
228 callback.failed(ex);
229 }
230
231 @Override
232 public void cancelled() {
233 callback.cancelled();
234 }
235
236 }));
237
238 }
239
240 @Override
241 public void upgradeTls(final HttpClientContext context) {
242 final AsyncConnectionEndpoint endpoint = ensureValid();
243 if (log.isDebugEnabled()) {
244 log.debug("{}: upgrading endpoint", ConnPoolSupport.getId(endpoint));
245 }
246 manager.upgrade(endpoint, versionPolicy, context);
247 }
248
249 @Override
250 public Cancellable execute(
251 final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
252 final AsyncConnectionEndpoint endpoint = ensureValid();
253 if (endpoint.isConnected()) {
254 if (log.isDebugEnabled()) {
255 log.debug("{}: start execution {}", ConnPoolSupport.getId(endpoint), id);
256 }
257 final RequestConfig requestConfig = context.getRequestConfig();
258 final Timeout responseTimeout = requestConfig.getResponseTimeout();
259 if (responseTimeout != null) {
260 endpoint.setSocketTimeout(responseTimeout);
261 }
262 endpoint.execute(id, exchangeHandler, context);
263 if (context.getRequestConfig().isHardCancellationEnabled()) {
264 return new Cancellable() {
265 @Override
266 public boolean cancel() {
267 exchangeHandler.cancel();
268 return true;
269 }
270 };
271 }
272 } else {
273 connectEndpoint(context, new FutureCallback<AsyncExecRuntime>() {
274
275 @Override
276 public void completed(final AsyncExecRuntime runtime) {
277 if (log.isDebugEnabled()) {
278 log.debug("{}: start execution {}", ConnPoolSupport.getId(endpoint), id);
279 }
280 try {
281 endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
282 } catch (final RuntimeException ex) {
283 failed(ex);
284 }
285 }
286
287 @Override
288 public void failed(final Exception ex) {
289 exchangeHandler.failed(ex);
290 }
291
292 @Override
293 public void cancelled() {
294 exchangeHandler.failed(new InterruptedIOException());
295 }
296
297 });
298 }
299 return Operations.nonCancellable();
300 }
301
302 @Override
303 public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
304 reusable = true;
305 state = newState;
306 validDuration = newValidDuration;
307 }
308
309 @Override
310 public void markConnectionNonReusable() {
311 reusable = false;
312 state = null;
313 validDuration = null;
314 }
315
316 @Override
317 public AsyncExecRuntime fork() {
318 return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, versionPolicy);
319 }
320
321 }