1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.classic;
29
30 import java.io.IOException;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.TimeoutException;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.client5.http.HttpRoute;
36 import org.apache.hc.client5.http.classic.ExecRuntime;
37 import org.apache.hc.client5.http.config.RequestConfig;
38 import org.apache.hc.client5.http.impl.ConnPoolSupport;
39 import org.apache.hc.client5.http.io.ConnectionEndpoint;
40 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
41 import org.apache.hc.client5.http.io.LeaseRequest;
42 import org.apache.hc.client5.http.protocol.HttpClientContext;
43 import org.apache.hc.core5.concurrent.Cancellable;
44 import org.apache.hc.core5.concurrent.CancellableDependency;
45 import org.apache.hc.core5.http.ClassicHttpRequest;
46 import org.apache.hc.core5.http.ClassicHttpResponse;
47 import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
48 import org.apache.hc.core5.http.HttpException;
49 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
50 import org.apache.hc.core5.io.CloseMode;
51 import org.apache.hc.core5.util.Args;
52 import org.apache.hc.core5.util.TimeValue;
53 import org.apache.hc.core5.util.Timeout;
54 import org.slf4j.Logger;
55
56 class InternalExecRuntime implements ExecRuntime, Cancellable {
57
58 private final Logger log;
59
60 private final HttpClientConnectionManager manager;
61 private final HttpRequestExecutor requestExecutor;
62 private final CancellableDependency cancellableDependency;
63 private final AtomicReference<ConnectionEndpoint> endpointRef;
64
65 private volatile boolean reusable;
66 private volatile Object state;
67 private volatile TimeValue validDuration;
68
69 InternalExecRuntime(
70 final Logger log,
71 final HttpClientConnectionManager manager,
72 final HttpRequestExecutor requestExecutor,
73 final CancellableDependency cancellableDependency) {
74 super();
75 this.log = log;
76 this.manager = manager;
77 this.requestExecutor = requestExecutor;
78 this.cancellableDependency = cancellableDependency;
79 this.endpointRef = new AtomicReference<>();
80 this.validDuration = TimeValue.NEG_ONE_MILLISECOND;
81 }
82
83 @Override
84 public boolean isExecutionAborted() {
85 return cancellableDependency != null && cancellableDependency.isCancelled();
86 }
87
88 @Override
89 public boolean isEndpointAcquired() {
90 return endpointRef.get() != null;
91 }
92
93 @Override
94 public void acquireEndpoint(
95 final String id, final HttpRoute route, final Object object, final HttpClientContext context) throws IOException {
96 Args.notNull(route, "Route");
97 if (endpointRef.get() == null) {
98 final RequestConfig requestConfig = context.getRequestConfig();
99 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
100 if (log.isDebugEnabled()) {
101 log.debug("{} acquiring endpoint ({})", id, connectionRequestTimeout);
102 }
103 final LeaseRequest connRequest = manager.lease(id, route, connectionRequestTimeout, object);
104 state = object;
105 if (cancellableDependency != null) {
106 cancellableDependency.setDependency(connRequest);
107 }
108 try {
109 final ConnectionEndpoint connectionEndpoint = connRequest.get(connectionRequestTimeout);
110 endpointRef.set(connectionEndpoint);
111 reusable = connectionEndpoint.isConnected();
112 if (cancellableDependency != null) {
113 cancellableDependency.setDependency(this);
114 }
115 if (log.isDebugEnabled()) {
116 log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
117 }
118 } catch(final TimeoutException ex) {
119 throw new ConnectionRequestTimeoutException(ex.getMessage());
120 } catch(final InterruptedException interrupted) {
121 Thread.currentThread().interrupt();
122 throw new RequestFailedException("Request aborted", interrupted);
123 } catch(final ExecutionException ex) {
124 Throwable cause = ex.getCause();
125 if (cause == null) {
126 cause = ex;
127 }
128 throw new RequestFailedException("Request execution failed", cause);
129 }
130 } else {
131 throw new IllegalStateException("Endpoint already acquired");
132 }
133 }
134
135 ConnectionEndpoint ensureValid() {
136 final ConnectionEndpoint endpoint = endpointRef.get();
137 if (endpoint == null) {
138 throw new IllegalStateException("Endpoint not acquired / already released");
139 }
140 return endpoint;
141 }
142
143 @Override
144 public boolean isEndpointConnected() {
145 final ConnectionEndpoint endpoint = endpointRef.get();
146 return endpoint != null && endpoint.isConnected();
147 }
148
149 private void connectEndpoint(final ConnectionEndpoint endpoint, final HttpClientContext context) throws IOException {
150 if (isExecutionAborted()) {
151 throw new RequestFailedException("Request aborted");
152 }
153 final RequestConfig requestConfig = context.getRequestConfig();
154 final Timeout connectTimeout = requestConfig.getConnectTimeout();
155 if (log.isDebugEnabled()) {
156 log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
157 }
158 manager.connect(endpoint, connectTimeout, context);
159 if (log.isDebugEnabled()) {
160 log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
161 }
162 }
163
164 @Override
165 public void connectEndpoint(final HttpClientContext context) throws IOException {
166 final ConnectionEndpoint endpoint = ensureValid();
167 if (!endpoint.isConnected()) {
168 connectEndpoint(endpoint, context);
169 }
170 }
171
172 @Override
173 public void disconnectEndpoint() throws IOException {
174 final ConnectionEndpoint endpoint = endpointRef.get();
175 if (endpoint != null) {
176 endpoint.close();
177 if (log.isDebugEnabled()) {
178 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
179 }
180 }
181 }
182
183 @Override
184 public void upgradeTls(final HttpClientContext context) throws IOException {
185 final ConnectionEndpoint endpoint = ensureValid();
186 if (log.isDebugEnabled()) {
187 log.debug("{} upgrading endpoint", ConnPoolSupport.getId(endpoint));
188 }
189 manager.upgrade(endpoint, context);
190 }
191
192 @Override
193 public ClassicHttpResponse execute(
194 final String id,
195 final ClassicHttpRequest request,
196 final HttpClientContext context) throws IOException, HttpException {
197 final ConnectionEndpoint endpoint = ensureValid();
198 if (!endpoint.isConnected()) {
199 connectEndpoint(endpoint, context);
200 }
201 if (isExecutionAborted()) {
202 throw new RequestFailedException("Request aborted");
203 }
204 final RequestConfig requestConfig = context.getRequestConfig();
205 final Timeout responseTimeout = requestConfig.getResponseTimeout();
206 if (responseTimeout != null) {
207 endpoint.setSocketTimeout(responseTimeout);
208 }
209 if (log.isDebugEnabled()) {
210 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
211 }
212 return endpoint.execute(id, request, requestExecutor, context);
213 }
214
215 @Override
216 public boolean isConnectionReusable() {
217 return reusable;
218 }
219
220 @Override
221 public void markConnectionReusable(final Object state, final TimeValue validDuration) {
222 this.reusable = true;
223 this.state = state;
224 this.validDuration = validDuration;
225 }
226
227 @Override
228 public void markConnectionNonReusable() {
229 reusable = false;
230 }
231
232 private void discardEndpoint(final ConnectionEndpoint endpoint) {
233 try {
234 endpoint.close(CloseMode.IMMEDIATE);
235 if (log.isDebugEnabled()) {
236 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
237 }
238 } finally {
239 if (log.isDebugEnabled()) {
240 log.debug("{} discarding endpoint", ConnPoolSupport.getId(endpoint));
241 }
242 manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
243 }
244 }
245
246 @Override
247 public void releaseEndpoint() {
248 final ConnectionEndpoint endpoint = endpointRef.getAndSet(null);
249 if (endpoint != null) {
250 if (reusable) {
251 if (log.isDebugEnabled()) {
252 log.debug("{} releasing valid endpoint", ConnPoolSupport.getId(endpoint));
253 }
254 manager.release(endpoint, state, validDuration);
255 } else {
256 discardEndpoint(endpoint);
257 }
258 }
259 }
260
261 @Override
262 public void discardEndpoint() {
263 final ConnectionEndpoint endpoint = endpointRef.getAndSet(null);
264 if (endpoint != null) {
265 discardEndpoint(endpoint);
266 }
267 }
268
269 @Override
270 public boolean cancel() {
271 final boolean alreadyReleased = endpointRef.get() == null;
272 final ConnectionEndpoint endpoint = endpointRef.getAndSet(null);
273 if (endpoint != null) {
274 if (log.isDebugEnabled()) {
275 log.debug("{} cancel", ConnPoolSupport.getId(endpoint));
276 }
277 discardEndpoint(endpoint);
278 }
279 return !alreadyReleased;
280 }
281
282 @Override
283 public ExecRuntime fork(final CancellableDependency cancellableDependency) {
284 return new InternalExecRuntime(log, manager, requestExecutor, cancellableDependency);
285 }
286
287 }