1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import org.apache.hc.client5.http.EndpointInfo;
34 import org.apache.hc.client5.http.HttpRoute;
35 import org.apache.hc.client5.http.async.AsyncExecRuntime;
36 import org.apache.hc.client5.http.config.RequestConfig;
37 import org.apache.hc.client5.http.config.TlsConfig;
38 import org.apache.hc.client5.http.impl.ConnPoolSupport;
39 import org.apache.hc.client5.http.impl.Operations;
40 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
41 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
42 import org.apache.hc.client5.http.protocol.HttpClientContext;
43 import org.apache.hc.core5.concurrent.CallbackContribution;
44 import org.apache.hc.core5.concurrent.Cancellable;
45 import org.apache.hc.core5.concurrent.FutureCallback;
46 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
47 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
48 import org.apache.hc.core5.http.nio.HandlerFactory;
49 import org.apache.hc.core5.io.CloseMode;
50 import org.apache.hc.core5.reactor.ConnectionInitiator;
51 import org.apache.hc.core5.util.TimeValue;
52 import org.apache.hc.core5.util.Timeout;
53 import org.slf4j.Logger;
54
55 class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
56
57 private final Logger log;
58 private final AsyncClientConnectionManager manager;
59 private final ConnectionInitiator connectionInitiator;
60 private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
61
62
63
64 @Deprecated
65 private final TlsConfig tlsConfig;
66 private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
67 private volatile boolean reusable;
68 private volatile Object state;
69 private volatile TimeValue validDuration;
70
71 InternalHttpAsyncExecRuntime(
72 final Logger log,
73 final AsyncClientConnectionManager manager,
74 final ConnectionInitiator connectionInitiator,
75 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
76 final TlsConfig tlsConfig) {
77 super();
78 this.log = log;
79 this.manager = manager;
80 this.connectionInitiator = connectionInitiator;
81 this.pushHandlerFactory = pushHandlerFactory;
82 this.tlsConfig = tlsConfig;
83 this.endpointRef = new AtomicReference<>();
84 this.validDuration = TimeValue.NEG_ONE_MILLISECOND;
85 }
86
87 @Override
88 public boolean isEndpointAcquired() {
89 return endpointRef.get() != null;
90 }
91
92 @Override
93 public Cancellable acquireEndpoint(
94 final String id,
95 final HttpRoute route,
96 final Object object,
97 final HttpClientContext context,
98 final FutureCallback<AsyncExecRuntime> callback) {
99 if (endpointRef.get() == null) {
100 state = object;
101 final RequestConfig requestConfig = context.getRequestConfigOrDefault();
102 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
103 if (log.isDebugEnabled()) {
104 log.debug("{} acquiring endpoint ({})", id, connectionRequestTimeout);
105 }
106 return Operations.cancellable(manager.lease(
107 id,
108 route,
109 object,
110 connectionRequestTimeout,
111 new FutureCallback<AsyncConnectionEndpoint>() {
112
113 @Override
114 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
115 endpointRef.set(connectionEndpoint);
116 reusable = connectionEndpoint.isConnected();
117 if (log.isDebugEnabled()) {
118 log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
119 }
120 callback.completed(InternalHttpAsyncExecRuntime.this);
121 }
122
123 @Override
124 public void failed(final Exception ex) {
125 callback.failed(ex);
126 }
127
128 @Override
129 public void cancelled() {
130 callback.cancelled();
131 }
132 }));
133 }
134 callback.completed(this);
135 return Operations.nonCancellable();
136 }
137
138 private void discardEndpoint(final AsyncConnectionEndpoint endpoint) {
139 try {
140 endpoint.close(CloseMode.IMMEDIATE);
141 if (log.isDebugEnabled()) {
142 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
143 }
144 } finally {
145 if (log.isDebugEnabled()) {
146 log.debug("{} discarding endpoint", ConnPoolSupport.getId(endpoint));
147 }
148 manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
149 }
150 }
151
152 @Override
153 public void releaseEndpoint() {
154 final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
155 if (endpoint != null) {
156 if (reusable) {
157 if (log.isDebugEnabled()) {
158 log.debug("{} releasing valid endpoint", ConnPoolSupport.getId(endpoint));
159 }
160 manager.release(endpoint, state, validDuration);
161 } else {
162 discardEndpoint(endpoint);
163 }
164 }
165 }
166
167 @Override
168 public void discardEndpoint() {
169 final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
170 if (endpoint != null) {
171 discardEndpoint(endpoint);
172 }
173 }
174
175 @Override
176 public boolean validateConnection() {
177 if (reusable) {
178 final AsyncConnectionEndpoint endpoint = endpointRef.get();
179 return endpoint != null && endpoint.isConnected();
180 }
181 final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
182 if (endpoint != null) {
183 discardEndpoint(endpoint);
184 }
185 return false;
186 }
187
188 AsyncConnectionEndpoint ensureValid() {
189 final AsyncConnectionEndpoint endpoint = endpointRef.get();
190 if (endpoint == null) {
191 throw new IllegalStateException("Endpoint not acquired / already released");
192 }
193 return endpoint;
194 }
195
196 @Override
197 public boolean isEndpointConnected() {
198 final AsyncConnectionEndpoint endpoint = endpointRef.get();
199 return endpoint != null && endpoint.isConnected();
200 }
201
202 @Override
203 public Cancellable connectEndpoint(
204 final HttpClientContext context,
205 final FutureCallback<AsyncExecRuntime> callback) {
206 final AsyncConnectionEndpoint endpoint = ensureValid();
207 if (endpoint.isConnected()) {
208 callback.completed(this);
209 return Operations.nonCancellable();
210 }
211 final RequestConfig requestConfig = context.getRequestConfigOrDefault();
212 @SuppressWarnings("deprecation")
213 final Timeout connectTimeout = requestConfig.getConnectTimeout();
214 if (log.isDebugEnabled()) {
215 log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
216 }
217 return Operations.cancellable(manager.connect(
218 endpoint,
219 connectionInitiator,
220 connectTimeout,
221 tlsConfig,
222 context,
223 new CallbackContribution<AsyncConnectionEndpoint>(callback) {
224
225 @Override
226 public void completed(final AsyncConnectionEndpoint endpoint) {
227 if (log.isDebugEnabled()) {
228 log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
229 }
230 if (callback != null) {
231 callback.completed(InternalHttpAsyncExecRuntime.this);
232 }
233 }
234
235 }));
236
237 }
238
239 @Override
240 public void disconnectEndpoint() {
241 final AsyncConnectionEndpoint endpoint = endpointRef.get();
242 if (endpoint != null) {
243 endpoint.close(CloseMode.GRACEFUL);
244 }
245 }
246
247 @Override
248 public void upgradeTls(final HttpClientContext context) {
249 upgradeTls(context, null);
250 }
251
252 @Override
253 public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
254 final AsyncConnectionEndpoint endpoint = ensureValid();
255 if (log.isDebugEnabled()) {
256 log.debug("{} upgrading endpoint", ConnPoolSupport.getId(endpoint));
257 }
258 manager.upgrade(endpoint, tlsConfig, context, new CallbackContribution<AsyncConnectionEndpoint>(callback) {
259
260 @Override
261 public void completed(final AsyncConnectionEndpoint endpoint) {
262 if (callback != null) {
263 callback.completed(InternalHttpAsyncExecRuntime.this);
264 }
265 }
266
267 });
268 }
269
270 public EndpointInfo getEndpointInfo() {
271 final AsyncConnectionEndpoint endpoint = endpointRef.get();
272 return endpoint != null ? endpoint.getInfo() : null;
273 }
274
275 @Override
276 public Cancellable execute(
277 final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
278 final AsyncConnectionEndpoint endpoint = ensureValid();
279 if (endpoint.isConnected()) {
280 if (log.isDebugEnabled()) {
281 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
282 }
283 final RequestConfig requestConfig = context.getRequestConfigOrDefault();
284 final Timeout responseTimeout = requestConfig.getResponseTimeout();
285 if (responseTimeout != null) {
286 endpoint.setSocketTimeout(responseTimeout);
287 }
288 endpoint.execute(id, exchangeHandler, context);
289 if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) {
290 return () -> {
291 exchangeHandler.cancel();
292 return true;
293 };
294 }
295 } else {
296 connectEndpoint(context, new FutureCallback<AsyncExecRuntime>() {
297
298 @Override
299 public void completed(final AsyncExecRuntime runtime) {
300 if (log.isDebugEnabled()) {
301 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
302 }
303 try {
304 endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
305 } catch (final RuntimeException ex) {
306 failed(ex);
307 }
308 }
309
310 @Override
311 public void failed(final Exception ex) {
312 exchangeHandler.failed(ex);
313 }
314
315 @Override
316 public void cancelled() {
317 exchangeHandler.failed(new InterruptedIOException());
318 }
319
320 });
321 }
322 return Operations.nonCancellable();
323 }
324
325 @Override
326 public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
327 reusable = true;
328 state = newState;
329 validDuration = newValidDuration;
330 }
331
332 @Override
333 public void markConnectionNonReusable() {
334 reusable = false;
335 state = null;
336 validDuration = null;
337 }
338
339 @Override
340 public AsyncExecRuntime fork() {
341 return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig);
342 }
343
344 }