1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import org.apache.hc.client5.http.HttpRoute;
34 import org.apache.hc.client5.http.async.AsyncExecRuntime;
35 import org.apache.hc.client5.http.config.RequestConfig;
36 import org.apache.hc.client5.http.config.TlsConfig;
37 import org.apache.hc.client5.http.impl.ConnPoolSupport;
38 import org.apache.hc.client5.http.impl.Operations;
39 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
40 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
41 import org.apache.hc.client5.http.protocol.HttpClientContext;
42 import org.apache.hc.core5.concurrent.CallbackContribution;
43 import org.apache.hc.core5.concurrent.Cancellable;
44 import org.apache.hc.core5.concurrent.FutureCallback;
45 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
46 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
47 import org.apache.hc.core5.http.nio.HandlerFactory;
48 import org.apache.hc.core5.io.CloseMode;
49 import org.apache.hc.core5.reactor.ConnectionInitiator;
50 import org.apache.hc.core5.util.TimeValue;
51 import org.apache.hc.core5.util.Timeout;
52 import org.slf4j.Logger;
53
54 class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
55
56 private final Logger log;
57 private final AsyncClientConnectionManager manager;
58 private final ConnectionInitiator connectionInitiator;
59 private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
60
61
62
63 @Deprecated
64 private final TlsConfig tlsConfig;
65 private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
66 private volatile boolean reusable;
67 private volatile Object state;
68 private volatile TimeValue validDuration;
69
70 InternalHttpAsyncExecRuntime(
71 final Logger log,
72 final AsyncClientConnectionManager manager,
73 final ConnectionInitiator connectionInitiator,
74 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
75 final TlsConfig tlsConfig) {
76 super();
77 this.log = log;
78 this.manager = manager;
79 this.connectionInitiator = connectionInitiator;
80 this.pushHandlerFactory = pushHandlerFactory;
81 this.tlsConfig = tlsConfig;
82 this.endpointRef = new AtomicReference<>();
83 this.validDuration = TimeValue.NEG_ONE_MILLISECOND;
84 }
85
86 @Override
87 public boolean isEndpointAcquired() {
88 return endpointRef.get() != null;
89 }
90
91 @Override
92 public Cancellable acquireEndpoint(
93 final String id,
94 final HttpRoute route,
95 final Object object,
96 final HttpClientContext context,
97 final FutureCallback<AsyncExecRuntime> callback) {
98 if (endpointRef.get() == null) {
99 state = object;
100 final RequestConfig requestConfig = context.getRequestConfig();
101 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
102 if (log.isDebugEnabled()) {
103 log.debug("{} acquiring endpoint ({})", id, connectionRequestTimeout);
104 }
105 return Operations.cancellable(manager.lease(
106 id,
107 route,
108 object,
109 connectionRequestTimeout,
110 new FutureCallback<AsyncConnectionEndpoint>() {
111
112 @Override
113 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
114 endpointRef.set(connectionEndpoint);
115 reusable = connectionEndpoint.isConnected();
116 if (log.isDebugEnabled()) {
117 log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
118 }
119 callback.completed(InternalHttpAsyncExecRuntime.this);
120 }
121
122 @Override
123 public void failed(final Exception ex) {
124 callback.failed(ex);
125 }
126
127 @Override
128 public void cancelled() {
129 callback.cancelled();
130 }
131 }));
132 }
133 callback.completed(this);
134 return Operations.nonCancellable();
135 }
136
137 private void discardEndpoint(final AsyncConnectionEndpoint endpoint) {
138 try {
139 endpoint.close(CloseMode.IMMEDIATE);
140 if (log.isDebugEnabled()) {
141 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
142 }
143 } finally {
144 if (log.isDebugEnabled()) {
145 log.debug("{} discarding endpoint", ConnPoolSupport.getId(endpoint));
146 }
147 manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
148 }
149 }
150
151 @Override
152 public void releaseEndpoint() {
153 final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
154 if (endpoint != null) {
155 if (reusable) {
156 if (log.isDebugEnabled()) {
157 log.debug("{} releasing valid endpoint", ConnPoolSupport.getId(endpoint));
158 }
159 manager.release(endpoint, state, validDuration);
160 } else {
161 discardEndpoint(endpoint);
162 }
163 }
164 }
165
166 @Override
167 public void discardEndpoint() {
168 final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
169 if (endpoint != null) {
170 discardEndpoint(endpoint);
171 }
172 }
173
174 @Override
175 public boolean validateConnection() {
176 if (reusable) {
177 final AsyncConnectionEndpoint endpoint = endpointRef.get();
178 return endpoint != null && endpoint.isConnected();
179 }
180 final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
181 if (endpoint != null) {
182 discardEndpoint(endpoint);
183 }
184 return false;
185 }
186
187 AsyncConnectionEndpoint ensureValid() {
188 final AsyncConnectionEndpoint endpoint = endpointRef.get();
189 if (endpoint == null) {
190 throw new IllegalStateException("Endpoint not acquired / already released");
191 }
192 return endpoint;
193 }
194
195 @Override
196 public boolean isEndpointConnected() {
197 final AsyncConnectionEndpoint endpoint = endpointRef.get();
198 return endpoint != null && endpoint.isConnected();
199 }
200
201 @Override
202 public Cancellable connectEndpoint(
203 final HttpClientContext context,
204 final FutureCallback<AsyncExecRuntime> callback) {
205 final AsyncConnectionEndpoint endpoint = ensureValid();
206 if (endpoint.isConnected()) {
207 callback.completed(this);
208 return Operations.nonCancellable();
209 }
210 final RequestConfig requestConfig = context.getRequestConfig();
211 @SuppressWarnings("deprecation")
212 final Timeout connectTimeout = requestConfig.getConnectTimeout();
213 if (log.isDebugEnabled()) {
214 log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
215 }
216 return Operations.cancellable(manager.connect(
217 endpoint,
218 connectionInitiator,
219 connectTimeout,
220 tlsConfig,
221 context,
222 new CallbackContribution<AsyncConnectionEndpoint>(callback) {
223
224 @Override
225 public void completed(final AsyncConnectionEndpoint endpoint) {
226 if (log.isDebugEnabled()) {
227 log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
228 }
229 if (callback != null) {
230 callback.completed(InternalHttpAsyncExecRuntime.this);
231 }
232 }
233
234 }));
235
236 }
237
238 @Override
239 public void disconnectEndpoint() {
240 final AsyncConnectionEndpoint endpoint = endpointRef.get();
241 if (endpoint != null) {
242 endpoint.close(CloseMode.GRACEFUL);
243 }
244 }
245
246 @Override
247 public void upgradeTls(final HttpClientContext context) {
248 upgradeTls(context, null);
249 }
250
251 @Override
252 public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
253 final AsyncConnectionEndpoint endpoint = ensureValid();
254 if (log.isDebugEnabled()) {
255 log.debug("{} upgrading endpoint", ConnPoolSupport.getId(endpoint));
256 }
257 manager.upgrade(endpoint, tlsConfig, context, new CallbackContribution<AsyncConnectionEndpoint>(callback) {
258
259 @Override
260 public void completed(final AsyncConnectionEndpoint endpoint) {
261 if (callback != null) {
262 callback.completed(InternalHttpAsyncExecRuntime.this);
263 }
264 }
265
266 });
267 }
268
269 @Override
270 public Cancellable execute(
271 final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
272 final AsyncConnectionEndpoint endpoint = ensureValid();
273 if (endpoint.isConnected()) {
274 if (log.isDebugEnabled()) {
275 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
276 }
277 final RequestConfig requestConfig = context.getRequestConfig();
278 final Timeout responseTimeout = requestConfig.getResponseTimeout();
279 if (responseTimeout != null) {
280 endpoint.setSocketTimeout(responseTimeout);
281 }
282 endpoint.execute(id, exchangeHandler, context);
283 if (context.getRequestConfig().isHardCancellationEnabled()) {
284 return () -> {
285 exchangeHandler.cancel();
286 return true;
287 };
288 }
289 } else {
290 connectEndpoint(context, new FutureCallback<AsyncExecRuntime>() {
291
292 @Override
293 public void completed(final AsyncExecRuntime runtime) {
294 if (log.isDebugEnabled()) {
295 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
296 }
297 try {
298 endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
299 } catch (final RuntimeException ex) {
300 failed(ex);
301 }
302 }
303
304 @Override
305 public void failed(final Exception ex) {
306 exchangeHandler.failed(ex);
307 }
308
309 @Override
310 public void cancelled() {
311 exchangeHandler.failed(new InterruptedIOException());
312 }
313
314 });
315 }
316 return Operations.nonCancellable();
317 }
318
319 @Override
320 public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
321 reusable = true;
322 state = newState;
323 validDuration = newValidDuration;
324 }
325
326 @Override
327 public void markConnectionNonReusable() {
328 reusable = false;
329 state = null;
330 validDuration = null;
331 }
332
333 @Override
334 public AsyncExecRuntime fork() {
335 return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig);
336 }
337
338 }