1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.io;
29
30 import java.io.IOException;
31 import java.time.Instant;
32 import java.util.Objects;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.TimeoutException;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.concurrent.atomic.AtomicReference;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import org.apache.hc.client5.http.DnsResolver;
41 import org.apache.hc.client5.http.HttpRoute;
42 import org.apache.hc.client5.http.SchemePortResolver;
43 import org.apache.hc.client5.http.config.ConnectionConfig;
44 import org.apache.hc.client5.http.config.TlsConfig;
45 import org.apache.hc.client5.http.impl.ConnPoolSupport;
46 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
47 import org.apache.hc.client5.http.io.ConnectionEndpoint;
48 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
49 import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
50 import org.apache.hc.client5.http.io.LeaseRequest;
51 import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
52 import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
53 import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
54 import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
55 import org.apache.hc.core5.annotation.Contract;
56 import org.apache.hc.core5.annotation.ThreadingBehavior;
57 import org.apache.hc.core5.http.ClassicHttpRequest;
58 import org.apache.hc.core5.http.ClassicHttpResponse;
59 import org.apache.hc.core5.http.HttpException;
60 import org.apache.hc.core5.http.HttpHost;
61 import org.apache.hc.core5.http.URIScheme;
62 import org.apache.hc.core5.http.config.Lookup;
63 import org.apache.hc.core5.http.config.Registry;
64 import org.apache.hc.core5.http.config.RegistryBuilder;
65 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
66 import org.apache.hc.core5.http.io.HttpConnectionFactory;
67 import org.apache.hc.core5.http.io.SocketConfig;
68 import org.apache.hc.core5.http.protocol.HttpContext;
69 import org.apache.hc.core5.io.CloseMode;
70 import org.apache.hc.core5.util.Args;
71 import org.apache.hc.core5.util.Asserts;
72 import org.apache.hc.core5.util.Deadline;
73 import org.apache.hc.core5.util.TimeValue;
74 import org.apache.hc.core5.util.Timeout;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 @Contract(threading = ThreadingBehavior.SAFE)
97 public class BasicHttpClientConnectionManager implements HttpClientConnectionManager {
98
99 private static final Logger LOG = LoggerFactory.getLogger(BasicHttpClientConnectionManager.class);
100
101 private static final AtomicLong COUNT = new AtomicLong(0);
102
103 private final HttpClientConnectionOperator connectionOperator;
104 private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
105 private final String id;
106
107 private final ReentrantLock lock;
108
109 private ManagedHttpClientConnection conn;
110 private HttpRoute route;
111 private Object state;
112 private long created;
113 private long updated;
114 private long expiry;
115 private boolean leased;
116 private SocketConfig socketConfig;
117 private ConnectionConfig connectionConfig;
118 private TlsConfig tlsConfig;
119
120 private final AtomicBoolean closed;
121
122 private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
123 return RegistryBuilder.<ConnectionSocketFactory>create()
124 .register(URIScheme.HTTP.id, PlainConnectionSocketFactory.getSocketFactory())
125 .register(URIScheme.HTTPS.id, SSLConnectionSocketFactory.getSocketFactory())
126 .build();
127 }
128
129 public BasicHttpClientConnectionManager(
130 final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
131 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory,
132 final SchemePortResolver schemePortResolver,
133 final DnsResolver dnsResolver) {
134 this(new DefaultHttpClientConnectionOperator(
135 socketFactoryRegistry, schemePortResolver, dnsResolver), connFactory);
136 }
137
138
139
140
141 public BasicHttpClientConnectionManager(
142 final HttpClientConnectionOperator httpClientConnectionOperator,
143 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
144 super();
145 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
146 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
147 this.id = String.format("ep-%010d", COUNT.getAndIncrement());
148 this.expiry = Long.MAX_VALUE;
149 this.socketConfig = SocketConfig.DEFAULT;
150 this.connectionConfig = ConnectionConfig.DEFAULT;
151 this.tlsConfig = TlsConfig.DEFAULT;
152 this.closed = new AtomicBoolean(false);
153 this.lock = new ReentrantLock();
154 }
155
156 public BasicHttpClientConnectionManager(
157 final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
158 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
159 this(socketFactoryRegistry, connFactory, null, null);
160 }
161
162 public BasicHttpClientConnectionManager(
163 final Lookup<ConnectionSocketFactory> socketFactoryRegistry) {
164 this(socketFactoryRegistry, null, null, null);
165 }
166
167 public BasicHttpClientConnectionManager() {
168 this(getDefaultRegistry(), null, null, null);
169 }
170
171 @Override
172 public void close() {
173 close(CloseMode.GRACEFUL);
174 }
175
176 @Override
177 public void close(final CloseMode closeMode) {
178 if (this.closed.compareAndSet(false, true)) {
179 closeConnection(closeMode);
180 }
181 }
182
183 HttpRoute getRoute() {
184 return route;
185 }
186
187 Object getState() {
188 return state;
189 }
190
191 public SocketConfig getSocketConfig() {
192 lock.lock();
193 try {
194 return socketConfig;
195 } finally {
196 lock.unlock();
197 }
198 }
199
200 public void setSocketConfig(final SocketConfig socketConfig) {
201 lock.lock();
202 try {
203 this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
204 } finally {
205 lock.unlock();
206 }
207 }
208
209
210
211
212 public ConnectionConfig getConnectionConfig() {
213 lock.lock();
214 try {
215 return connectionConfig;
216 } finally {
217 lock.unlock();
218 }
219
220 }
221
222
223
224
225 public void setConnectionConfig(final ConnectionConfig connectionConfig) {
226 lock.lock();
227 try {
228 this.connectionConfig = connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
229 } finally {
230 lock.unlock();
231 }
232 }
233
234
235
236
237 public TlsConfig getTlsConfig() {
238 lock.lock();
239 try {
240 return tlsConfig;
241 } finally {
242 lock.unlock();
243 }
244 }
245
246
247
248
249 public void setTlsConfig(final TlsConfig tlsConfig) {
250 lock.lock();
251 try {
252 this.tlsConfig = tlsConfig != null ? tlsConfig : TlsConfig.DEFAULT;
253 } finally {
254 lock.unlock();
255 }
256 }
257
258 public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
259 return lease(id, route, Timeout.DISABLED, state);
260 }
261
262 @Override
263 public LeaseRequest lease(final String id, final HttpRoute route, final Timeout requestTimeout, final Object state) {
264 return new LeaseRequest() {
265
266 @Override
267 public ConnectionEndpoint get(
268 final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
269 try {
270 return new InternalConnectionEndpoint(route, getConnection(route, state));
271 } catch (final IOException ex) {
272 throw new ExecutionException(ex.getMessage(), ex);
273 }
274 }
275
276 @Override
277 public boolean cancel() {
278 return false;
279 }
280
281 };
282 }
283
284 private void closeConnection(final CloseMode closeMode) {
285 lock.lock();
286 try {
287 if (this.conn != null) {
288 if (LOG.isDebugEnabled()) {
289 LOG.debug("{} Closing connection {}", id, closeMode);
290 }
291 this.conn.close(closeMode);
292 this.conn = null;
293 }
294 } finally {
295 lock.unlock();
296 }
297 }
298
299 private void checkExpiry() {
300 if (this.conn != null && System.currentTimeMillis() >= this.expiry) {
301 if (LOG.isDebugEnabled()) {
302 LOG.debug("{} Connection expired @ {}", id, Instant.ofEpochMilli(this.expiry));
303 }
304 closeConnection(CloseMode.GRACEFUL);
305 }
306 }
307
308 private void validate() {
309 if (this.conn != null) {
310 final TimeValue timeToLive = connectionConfig.getTimeToLive();
311 if (TimeValue.isNonNegative(timeToLive)) {
312 final Deadline deadline = Deadline.calculate(created, timeToLive);
313 if (deadline.isExpired()) {
314 closeConnection(CloseMode.GRACEFUL);
315 }
316 }
317 }
318 if (this.conn != null) {
319 final TimeValue timeValue = connectionConfig.getValidateAfterInactivity() != null ?
320 connectionConfig.getValidateAfterInactivity() : TimeValue.ofSeconds(2);
321 if (TimeValue.isNonNegative(timeValue)) {
322 final Deadline deadline = Deadline.calculate(updated, timeValue);
323 if (deadline.isExpired()) {
324 boolean stale;
325 try {
326 stale = conn.isStale();
327 } catch (final IOException ignore) {
328 stale = true;
329 }
330 if (stale) {
331 if (LOG.isDebugEnabled()) {
332 LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(conn));
333 }
334 closeConnection(CloseMode.GRACEFUL);
335 }
336 }
337 }
338 }
339 }
340
341 ManagedHttpClientConnection getConnection(final HttpRoute route, final Object state) throws IOException {
342 lock.lock();
343 try {
344 Asserts.check(!isClosed(), "Connection manager has been shut down");
345 if (LOG.isDebugEnabled()) {
346 LOG.debug("{} Get connection for route {}", id, route);
347 }
348 Asserts.check(!this.leased, "Connection %s is still allocated", conn);
349 if (!Objects.equals(this.route, route) || !Objects.equals(this.state, state)) {
350 closeConnection(CloseMode.GRACEFUL);
351 }
352 this.route = route;
353 this.state = state;
354 checkExpiry();
355 validate();
356 if (this.conn == null) {
357 this.conn = this.connFactory.createConnection(null);
358 this.created = System.currentTimeMillis();
359 } else {
360 this.conn.activate();
361 }
362 this.leased = true;
363 if (LOG.isDebugEnabled()) {
364 LOG.debug("{} Using connection {}", id, conn);
365 }
366 return this.conn;
367 } finally {
368 lock.unlock();
369 }
370 }
371
372
373 private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
374 if (endpoint instanceof InternalConnectionEndpoint) {
375 return (InternalConnectionEndpoint) endpoint;
376 }
377 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
378 }
379
380 @Override
381 public void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
382 lock.lock();
383 try {
384 Args.notNull(endpoint, "Managed endpoint");
385 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
386 final ManagedHttpClientConnection conn = internalEndpoint.detach();
387 if (LOG.isDebugEnabled()) {
388 LOG.debug("{} Releasing connection {}", id, conn);
389 }
390 if (isClosed()) {
391 return;
392 }
393 try {
394 if (keepAlive == null && conn != null) {
395 conn.close(CloseMode.GRACEFUL);
396 }
397 this.updated = System.currentTimeMillis();
398 if (conn != null && conn.isOpen() && conn.isConsistent()) {
399 this.state = state;
400 conn.passivate();
401 if (TimeValue.isPositive(keepAlive)) {
402 if (LOG.isDebugEnabled()) {
403 LOG.debug("{} Connection can be kept alive for {}", id, keepAlive);
404 }
405 this.expiry = this.updated + keepAlive.toMilliseconds();
406 } else {
407 if (LOG.isDebugEnabled()) {
408 LOG.debug("{} Connection can be kept alive indefinitely", id);
409 }
410 this.expiry = Long.MAX_VALUE;
411 }
412 } else {
413 this.route = null;
414 this.conn = null;
415 this.expiry = Long.MAX_VALUE;
416 if (LOG.isDebugEnabled()) {
417 LOG.debug("{} Connection is not kept alive", id);
418 }
419 }
420 } finally {
421 this.leased = false;
422 }
423 } finally {
424 lock.unlock();
425 }
426 }
427
428 @Override
429 public void connect(final ConnectionEndpoint endpoint, final TimeValue timeout, final HttpContext context) throws IOException {
430 lock.lock();
431 try {
432 Args.notNull(endpoint, "Endpoint");
433
434 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
435 if (internalEndpoint.isConnected()) {
436 return;
437 }
438 final HttpRoute route = internalEndpoint.getRoute();
439 final HttpHost host;
440 if (route.getProxyHost() != null) {
441 host = route.getProxyHost();
442 } else {
443 host = route.getTargetHost();
444 }
445 final Timeout connectTimeout = timeout != null ? Timeout.of(timeout.getDuration(), timeout.getTimeUnit()) : connectionConfig.getConnectTimeout();
446 final ManagedHttpClientConnection connection = internalEndpoint.getConnection();
447 if (LOG.isDebugEnabled()) {
448 LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
449 }
450 this.connectionOperator.connect(
451 connection,
452 host,
453 route.getLocalSocketAddress(),
454 connectTimeout,
455 socketConfig,
456 tlsConfig,
457 context);
458 if (LOG.isDebugEnabled()) {
459 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
460 }
461 final Timeout socketTimeout = connectionConfig.getSocketTimeout();
462 if (socketTimeout != null) {
463 connection.setSocketTimeout(socketTimeout);
464 }
465 } finally {
466 lock.unlock();
467 }
468 }
469
470 @Override
471 public void upgrade(
472 final ConnectionEndpoint endpoint,
473 final HttpContext context) throws IOException {
474 lock.lock();
475 try {
476 Args.notNull(endpoint, "Endpoint");
477 Args.notNull(route, "HTTP route");
478 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
479 this.connectionOperator.upgrade(
480 internalEndpoint.getConnection(),
481 internalEndpoint.getRoute().getTargetHost(),
482 tlsConfig,
483 context);
484 } finally {
485 lock.unlock();
486 }
487 }
488
489 public void closeExpired() {
490 lock.lock();
491 try {
492 if (isClosed()) {
493 return;
494 }
495 if (!this.leased) {
496 checkExpiry();
497 }
498 } finally {
499 lock.unlock();
500 }
501 }
502
503 public void closeIdle(final TimeValue idleTime) {
504 lock.lock();
505 try {
506 Args.notNull(idleTime, "Idle time");
507 if (isClosed()) {
508 return;
509 }
510 if (!this.leased) {
511 long time = idleTime.toMilliseconds();
512 if (time < 0) {
513 time = 0;
514 }
515 final long deadline = System.currentTimeMillis() - time;
516 if (this.updated <= deadline) {
517 closeConnection(CloseMode.GRACEFUL);
518 }
519 }
520 } finally {
521 lock.unlock();
522 }
523 }
524
525
526
527
528
529
530
531
532 @Deprecated
533 public TimeValue getValidateAfterInactivity() {
534 return connectionConfig.getValidateAfterInactivity();
535 }
536
537
538
539
540
541
542
543
544
545
546
547 @Deprecated
548 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
549 this.connectionConfig = ConnectionConfig.custom()
550 .setValidateAfterInactivity(validateAfterInactivity)
551 .build();
552 }
553
554 class InternalConnectionEndpoint extends ConnectionEndpoint {
555
556 private final HttpRoute route;
557 private final AtomicReference<ManagedHttpClientConnection> connRef;
558
559 public InternalConnectionEndpoint(final HttpRoute route, final ManagedHttpClientConnection conn) {
560 this.route = route;
561 this.connRef = new AtomicReference<>(conn);
562 }
563
564 HttpRoute getRoute() {
565 return route;
566 }
567
568 ManagedHttpClientConnection getConnection() {
569 final ManagedHttpClientConnection conn = this.connRef.get();
570 if (conn == null) {
571 throw new ConnectionShutdownException();
572 }
573 return conn;
574 }
575
576 ManagedHttpClientConnection getValidatedConnection() {
577 final ManagedHttpClientConnection conn = getConnection();
578 Asserts.check(conn.isOpen(), "Endpoint is not connected");
579 return conn;
580 }
581
582 ManagedHttpClientConnection detach() {
583 return this.connRef.getAndSet(null);
584 }
585
586 @Override
587 public boolean isConnected() {
588 final ManagedHttpClientConnection conn = getConnection();
589 return conn != null && conn.isOpen();
590 }
591
592 @Override
593 public void close(final CloseMode closeMode) {
594 final ManagedHttpClientConnection conn = detach();
595 if (conn != null) {
596 conn.close(closeMode);
597 }
598 }
599
600 @Override
601 public void close() throws IOException {
602 final ManagedHttpClientConnection conn = detach();
603 if (conn != null) {
604 conn.close();
605 }
606 }
607
608 @Override
609 public void setSocketTimeout(final Timeout timeout) {
610 getValidatedConnection().setSocketTimeout(timeout);
611 }
612
613 @Override
614 public ClassicHttpResponse execute(
615 final String exchangeId,
616 final ClassicHttpRequest request,
617 final HttpRequestExecutor requestExecutor,
618 final HttpContext context) throws IOException, HttpException {
619 Args.notNull(request, "HTTP request");
620 Args.notNull(requestExecutor, "Request executor");
621 if (LOG.isDebugEnabled()) {
622 LOG.debug("{} Executing exchange {}", id, exchangeId);
623 }
624 return requestExecutor.execute(request, getValidatedConnection(), context);
625 }
626
627 }
628
629
630
631
632
633
634
635
636 boolean isClosed() {
637 return this.closed.get();
638 }
639
640 }