1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.classic;
29
30 import java.io.IOException;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.TimeoutException;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.client5.http.EndpointInfo;
36 import org.apache.hc.client5.http.HttpRoute;
37 import org.apache.hc.client5.http.classic.ExecRuntime;
38 import org.apache.hc.client5.http.config.RequestConfig;
39 import org.apache.hc.client5.http.impl.ConnPoolSupport;
40 import org.apache.hc.client5.http.io.ConnectionEndpoint;
41 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
42 import org.apache.hc.client5.http.io.LeaseRequest;
43 import org.apache.hc.client5.http.protocol.HttpClientContext;
44 import org.apache.hc.core5.concurrent.Cancellable;
45 import org.apache.hc.core5.concurrent.CancellableDependency;
46 import org.apache.hc.core5.http.ClassicHttpRequest;
47 import org.apache.hc.core5.http.ClassicHttpResponse;
48 import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
49 import org.apache.hc.core5.http.HttpException;
50 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
51 import org.apache.hc.core5.http.io.HttpResponseInformationCallback;
52 import org.apache.hc.core5.io.CloseMode;
53 import org.apache.hc.core5.util.Args;
54 import org.apache.hc.core5.util.TimeValue;
55 import org.apache.hc.core5.util.Timeout;
56 import org.slf4j.Logger;
57
58 class InternalExecRuntime implements ExecRuntime, Cancellable {
59
60 private final Logger log;
61
62 private final HttpClientConnectionManager manager;
63 private final HttpRequestExecutor requestExecutor;
64 private final CancellableDependency cancellableDependency;
65 private final AtomicReference<ConnectionEndpoint> endpointRef;
66
67 private volatile boolean reusable;
68 private volatile Object state;
69 private volatile TimeValue validDuration;
70
71 InternalExecRuntime(
72 final Logger log,
73 final HttpClientConnectionManager manager,
74 final HttpRequestExecutor requestExecutor,
75 final CancellableDependency cancellableDependency) {
76 super();
77 this.log = log;
78 this.manager = manager;
79 this.requestExecutor = requestExecutor;
80 this.cancellableDependency = cancellableDependency;
81 this.endpointRef = new AtomicReference<>();
82 this.validDuration = TimeValue.NEG_ONE_MILLISECOND;
83 }
84
85 @Override
86 public boolean isExecutionAborted() {
87 return cancellableDependency != null && cancellableDependency.isCancelled();
88 }
89
90 @Override
91 public boolean isEndpointAcquired() {
92 return endpointRef.get() != null;
93 }
94
95 @Override
96 public void acquireEndpoint(
97 final String id, final HttpRoute route, final Object object, final HttpClientContext context) throws IOException {
98 Args.notNull(route, "Route");
99 if (endpointRef.get() == null) {
100 final RequestConfig requestConfig = context.getRequestConfigOrDefault();
101 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
102 if (log.isDebugEnabled()) {
103 log.debug("{} acquiring endpoint ({})", id, connectionRequestTimeout);
104 }
105 final LeaseRequest connRequest = manager.lease(id, route, connectionRequestTimeout, object);
106 state = object;
107 if (cancellableDependency != null) {
108 cancellableDependency.setDependency(connRequest);
109 }
110 try {
111 final ConnectionEndpoint connectionEndpoint = connRequest.get(connectionRequestTimeout);
112 endpointRef.set(connectionEndpoint);
113 reusable = connectionEndpoint.isConnected();
114 if (cancellableDependency != null) {
115 cancellableDependency.setDependency(this);
116 }
117 if (log.isDebugEnabled()) {
118 log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
119 }
120 } catch(final TimeoutException ex) {
121 connRequest.cancel();
122 throw new ConnectionRequestTimeoutException(ex.getMessage());
123 } catch(final InterruptedException interrupted) {
124 connRequest.cancel();
125 Thread.currentThread().interrupt();
126 throw new RequestFailedException("Request aborted", interrupted);
127 } catch(final ExecutionException ex) {
128 connRequest.cancel();
129 Throwable cause = ex.getCause();
130 if (cause == null) {
131 cause = ex;
132 }
133 throw new RequestFailedException("Request execution failed", cause);
134 }
135 } else {
136 throw new IllegalStateException("Endpoint already acquired");
137 }
138 }
139
140 ConnectionEndpoint ensureValid() {
141 final ConnectionEndpoint endpoint = endpointRef.get();
142 if (endpoint == null) {
143 throw new IllegalStateException("Endpoint not acquired / already released");
144 }
145 return endpoint;
146 }
147
148 @Override
149 public boolean isEndpointConnected() {
150 final ConnectionEndpoint endpoint = endpointRef.get();
151 return endpoint != null && endpoint.isConnected();
152 }
153
154 private void connectEndpoint(final ConnectionEndpoint endpoint, final HttpClientContext context) throws IOException {
155 if (isExecutionAborted()) {
156 throw new RequestFailedException("Request aborted");
157 }
158 final RequestConfig requestConfig = context.getRequestConfigOrDefault();
159 @SuppressWarnings("deprecation")
160 final Timeout connectTimeout = requestConfig.getConnectTimeout();
161 if (log.isDebugEnabled()) {
162 log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
163 }
164 manager.connect(endpoint, connectTimeout, context);
165 if (log.isDebugEnabled()) {
166 log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
167 }
168 }
169
170 @Override
171 public void connectEndpoint(final HttpClientContext context) throws IOException {
172 final ConnectionEndpoint endpoint = ensureValid();
173 if (!endpoint.isConnected()) {
174 connectEndpoint(endpoint, context);
175 }
176 }
177
178 @Override
179 public void disconnectEndpoint() throws IOException {
180 final ConnectionEndpoint endpoint = endpointRef.get();
181 if (endpoint != null) {
182 endpoint.close();
183 if (log.isDebugEnabled()) {
184 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
185 }
186 }
187 }
188
189 @Override
190 public void upgradeTls(final HttpClientContext context) throws IOException {
191 final ConnectionEndpoint endpoint = ensureValid();
192 if (log.isDebugEnabled()) {
193 log.debug("{} upgrading endpoint", ConnPoolSupport.getId(endpoint));
194 }
195 manager.upgrade(endpoint, context);
196 }
197
198 @Override
199 public EndpointInfo getEndpointInfo() {
200 final ConnectionEndpoint endpoint = endpointRef.get();
201 return endpoint != null ? endpoint.getInfo() : null;
202 }
203
204 @Override
205 public ClassicHttpResponse execute(
206 final String id,
207 final ClassicHttpRequest request,
208 final HttpClientContext context) throws IOException, HttpException {
209 return execute(id, request, null, context);
210 }
211
212 @Override
213 public ClassicHttpResponse execute(
214 final String id,
215 final ClassicHttpRequest request,
216 final HttpResponseInformationCallback informationCallback,
217 final HttpClientContext context) throws IOException, HttpException {
218 final ConnectionEndpoint endpoint = ensureValid();
219 if (!endpoint.isConnected()) {
220 connectEndpoint(endpoint, context);
221 }
222 if (isExecutionAborted()) {
223 throw new RequestFailedException("Request aborted");
224 }
225 final RequestConfig requestConfig = context.getRequestConfigOrDefault();
226 final Timeout responseTimeout = requestConfig.getResponseTimeout();
227 if (responseTimeout != null) {
228 endpoint.setSocketTimeout(responseTimeout);
229 }
230 if (log.isDebugEnabled()) {
231 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
232 }
233 return endpoint.execute(
234 id,
235 request,
236 (r, conn, c) -> requestExecutor.execute(r, conn, informationCallback, c),
237 context);
238 }
239
240 @Override
241 public boolean isConnectionReusable() {
242 return reusable;
243 }
244
245 @Override
246 public void markConnectionReusable(final Object state, final TimeValue validDuration) {
247 this.reusable = true;
248 this.state = state;
249 this.validDuration = validDuration;
250 }
251
252 @Override
253 public void markConnectionNonReusable() {
254 reusable = false;
255 }
256
257 private void discardEndpoint(final ConnectionEndpoint endpoint) {
258 try {
259 endpoint.close(CloseMode.IMMEDIATE);
260 if (log.isDebugEnabled()) {
261 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
262 }
263 } finally {
264 if (log.isDebugEnabled()) {
265 log.debug("{} discarding endpoint", ConnPoolSupport.getId(endpoint));
266 }
267 manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
268 }
269 }
270
271 @Override
272 public void releaseEndpoint() {
273 final ConnectionEndpoint endpoint = endpointRef.getAndSet(null);
274 if (endpoint != null) {
275 if (reusable) {
276 if (log.isDebugEnabled()) {
277 log.debug("{} releasing valid endpoint", ConnPoolSupport.getId(endpoint));
278 }
279 manager.release(endpoint, state, validDuration);
280 } else {
281 discardEndpoint(endpoint);
282 }
283 }
284 }
285
286 @Override
287 public void discardEndpoint() {
288 final ConnectionEndpoint endpoint = endpointRef.getAndSet(null);
289 if (endpoint != null) {
290 discardEndpoint(endpoint);
291 }
292 }
293
294 @Override
295 public boolean cancel() {
296 final boolean alreadyReleased = endpointRef.get() == null;
297 final ConnectionEndpoint endpoint = endpointRef.getAndSet(null);
298 if (endpoint != null) {
299 if (log.isDebugEnabled()) {
300 log.debug("{} cancel", ConnPoolSupport.getId(endpoint));
301 }
302 discardEndpoint(endpoint);
303 }
304 return !alreadyReleased;
305 }
306
307 @Override
308 public ExecRuntime fork(final CancellableDependency cancellableDependency) {
309 return new InternalExecRuntime(log, manager, requestExecutor, cancellableDependency);
310 }
311
312 }