1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.classic;
29
30 import java.io.IOException;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.TimeoutException;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.client5.http.HttpRoute;
36 import org.apache.hc.client5.http.classic.ExecRuntime;
37 import org.apache.hc.client5.http.config.RequestConfig;
38 import org.apache.hc.client5.http.impl.ConnPoolSupport;
39 import org.apache.hc.client5.http.io.ConnectionEndpoint;
40 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
41 import org.apache.hc.client5.http.io.LeaseRequest;
42 import org.apache.hc.client5.http.protocol.HttpClientContext;
43 import org.apache.hc.core5.concurrent.Cancellable;
44 import org.apache.hc.core5.concurrent.CancellableDependency;
45 import org.apache.hc.core5.http.ClassicHttpRequest;
46 import org.apache.hc.core5.http.ClassicHttpResponse;
47 import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
48 import org.apache.hc.core5.http.HttpException;
49 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
50 import org.apache.hc.core5.io.CloseMode;
51 import org.apache.hc.core5.util.Args;
52 import org.apache.hc.core5.util.TimeValue;
53 import org.apache.hc.core5.util.Timeout;
54 import org.slf4j.Logger;
55
56 class InternalExecRuntime implements ExecRuntime, Cancellable {
57
58 private final Logger log;
59
60 private final HttpClientConnectionManager manager;
61 private final HttpRequestExecutor requestExecutor;
62 private final CancellableDependency cancellableDependency;
63 private final AtomicReference<ConnectionEndpoint> endpointRef;
64
65 private volatile boolean reusable;
66 private volatile Object state;
67 private volatile TimeValue validDuration;
68
69 InternalExecRuntime(
70 final Logger log,
71 final HttpClientConnectionManager manager,
72 final HttpRequestExecutor requestExecutor,
73 final CancellableDependency cancellableDependency) {
74 super();
75 this.log = log;
76 this.manager = manager;
77 this.requestExecutor = requestExecutor;
78 this.cancellableDependency = cancellableDependency;
79 this.endpointRef = new AtomicReference<>(null);
80 this.validDuration = TimeValue.NEG_ONE_MILLISECOND;
81 }
82
83 @Override
84 public boolean isExecutionAborted() {
85 return cancellableDependency != null && cancellableDependency.isCancelled();
86 }
87
88 @Override
89 public boolean isEndpointAcquired() {
90 return endpointRef.get() != null;
91 }
92
93 @Override
94 public void acquireEndpoint(
95 final String id, final HttpRoute route, final Object object, final HttpClientContext context) throws IOException {
96 Args.notNull(route, "Route");
97 if (endpointRef.get() == null) {
98 final RequestConfig requestConfig = context.getRequestConfig();
99 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
100 if (log.isDebugEnabled()) {
101 log.debug("{}: acquiring endpoint ({})", id, connectionRequestTimeout);
102 }
103 final LeaseRequest connRequest = manager.lease(id, route, connectionRequestTimeout, object);
104 state = object;
105 if (cancellableDependency != null) {
106 if (cancellableDependency.isCancelled()) {
107 connRequest.cancel();
108 throw new RequestFailedException("Request aborted");
109 }
110 cancellableDependency.setDependency(connRequest);
111 }
112 try {
113 final ConnectionEndpoint connectionEndpoint = connRequest.get(connectionRequestTimeout);
114 endpointRef.set(connectionEndpoint);
115 reusable = connectionEndpoint.isConnected();
116 if (cancellableDependency != null) {
117 cancellableDependency.setDependency(this);
118 }
119 if (log.isDebugEnabled()) {
120 log.debug("{}: acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
121 }
122 } catch(final TimeoutException ex) {
123 throw new ConnectionRequestTimeoutException(ex.getMessage());
124 } catch(final InterruptedException interrupted) {
125 Thread.currentThread().interrupt();
126 throw new RequestFailedException("Request aborted", interrupted);
127 } catch(final ExecutionException ex) {
128 Throwable cause = ex.getCause();
129 if (cause == null) {
130 cause = ex;
131 }
132 throw new RequestFailedException("Request execution failed", cause);
133 }
134 } else {
135 throw new IllegalStateException("Endpoint already acquired");
136 }
137 }
138
139 ConnectionEndpoint ensureValid() {
140 final ConnectionEndpoint endpoint = endpointRef.get();
141 if (endpoint == null) {
142 throw new IllegalStateException("Endpoint not acquired / already released");
143 }
144 return endpoint;
145 }
146
147 @Override
148 public boolean isEndpointConnected() {
149 final ConnectionEndpoint endpoint = endpointRef.get();
150 return endpoint != null && endpoint.isConnected();
151 }
152
153 private void connectEndpoint(final ConnectionEndpoint endpoint, final HttpClientContext context) throws IOException {
154 if (cancellableDependency != null) {
155 if (cancellableDependency.isCancelled()) {
156 throw new RequestFailedException("Request aborted");
157 }
158 }
159 final RequestConfig requestConfig = context.getRequestConfig();
160 final Timeout connectTimeout = requestConfig.getConnectTimeout();
161 if (log.isDebugEnabled()) {
162 log.debug("{}: connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
163 }
164 manager.connect(endpoint, connectTimeout, context);
165 if (log.isDebugEnabled()) {
166 log.debug("{}: endpoint connected", ConnPoolSupport.getId(endpoint));
167 }
168 }
169
170 @Override
171 public void connectEndpoint(final HttpClientContext context) throws IOException {
172 final ConnectionEndpoint endpoint = ensureValid();
173 if (!endpoint.isConnected()) {
174 connectEndpoint(endpoint, context);
175 }
176 }
177
178 @Override
179 public void disconnectEndpoint() throws IOException {
180 final ConnectionEndpoint endpoint = endpointRef.get();
181 if (endpoint != null) {
182 endpoint.close();
183 if (log.isDebugEnabled()) {
184 log.debug("{}: endpoint closed", ConnPoolSupport.getId(endpoint));
185 }
186 }
187 }
188
189 @Override
190 public void upgradeTls(final HttpClientContext context) throws IOException {
191 final ConnectionEndpoint endpoint = ensureValid();
192 if (log.isDebugEnabled()) {
193 log.debug("{}: upgrading endpoint", ConnPoolSupport.getId(endpoint));
194 }
195 manager.upgrade(endpoint, context);
196 }
197
198 @Override
199 public ClassicHttpResponse execute(
200 final String id,
201 final ClassicHttpRequest request,
202 final HttpClientContext context) throws IOException, HttpException {
203 final ConnectionEndpoint endpoint = ensureValid();
204 if (!endpoint.isConnected()) {
205 connectEndpoint(endpoint, context);
206 }
207 final RequestConfig requestConfig = context.getRequestConfig();
208 final Timeout responseTimeout = requestConfig.getResponseTimeout();
209 if (responseTimeout != null) {
210 endpoint.setSocketTimeout(responseTimeout);
211 }
212 if (log.isDebugEnabled()) {
213 log.debug("{}: start execution {}", ConnPoolSupport.getId(endpoint), id);
214 }
215 return endpoint.execute(id, request, requestExecutor, context);
216 }
217
218 @Override
219 public boolean isConnectionReusable() {
220 return reusable;
221 }
222
223 @Override
224 public void markConnectionReusable(final Object state, final TimeValue validDuration) {
225 this.reusable = true;
226 this.state = state;
227 this.validDuration = validDuration;
228 }
229
230 @Override
231 public void markConnectionNonReusable() {
232 reusable = false;
233 }
234
235 private void discardEndpoint(final ConnectionEndpoint endpoint) {
236 try {
237 endpoint.close(CloseMode.IMMEDIATE);
238 if (log.isDebugEnabled()) {
239 log.debug("{}: endpoint closed", ConnPoolSupport.getId(endpoint));
240 }
241 } finally {
242 if (log.isDebugEnabled()) {
243 log.debug("{}: discarding endpoint", ConnPoolSupport.getId(endpoint));
244 }
245 manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
246 }
247 }
248
249 @Override
250 public void releaseEndpoint() {
251 final ConnectionEndpoint endpoint = endpointRef.getAndSet(null);
252 if (endpoint != null) {
253 if (reusable) {
254 if (log.isDebugEnabled()) {
255 log.debug("{}: releasing valid endpoint", ConnPoolSupport.getId(endpoint));
256 }
257 manager.release(endpoint, state, validDuration);
258 } else {
259 discardEndpoint(endpoint);
260 }
261 }
262 }
263
264 @Override
265 public void discardEndpoint() {
266 final ConnectionEndpoint endpoint = endpointRef.getAndSet(null);
267 if (endpoint != null) {
268 discardEndpoint(endpoint);
269 }
270 }
271
272 @Override
273 public boolean cancel() {
274 final boolean alreadyReleased = endpointRef.get() == null;
275 final ConnectionEndpoint endpoint = endpointRef.getAndSet(null);
276 if (endpoint != null) {
277 if (log.isDebugEnabled()) {
278 log.debug("{}: cancel", ConnPoolSupport.getId(endpoint));
279 }
280 discardEndpoint(endpoint);
281 }
282 return !alreadyReleased;
283 }
284
285 @Override
286 public ExecRuntime fork(final CancellableDependency cancellableDependency) {
287 return new InternalExecRuntime(log, manager, requestExecutor, cancellableDependency);
288 }
289
290 }