1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.io;
29
30 import java.io.IOException;
31 import java.time.Instant;
32 import java.util.Objects;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.TimeoutException;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.concurrent.atomic.AtomicReference;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import org.apache.hc.client5.http.DnsResolver;
41 import org.apache.hc.client5.http.EndpointInfo;
42 import org.apache.hc.client5.http.HttpRoute;
43 import org.apache.hc.client5.http.SchemePortResolver;
44 import org.apache.hc.client5.http.config.ConnectionConfig;
45 import org.apache.hc.client5.http.config.TlsConfig;
46 import org.apache.hc.client5.http.impl.ConnPoolSupport;
47 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
48 import org.apache.hc.client5.http.io.ConnectionEndpoint;
49 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
50 import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
51 import org.apache.hc.client5.http.io.LeaseRequest;
52 import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
53 import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
54 import org.apache.hc.client5.http.ssl.TlsSocketStrategy;
55 import org.apache.hc.core5.annotation.Contract;
56 import org.apache.hc.core5.annotation.ThreadingBehavior;
57 import org.apache.hc.core5.http.ClassicHttpRequest;
58 import org.apache.hc.core5.http.ClassicHttpResponse;
59 import org.apache.hc.core5.http.HttpException;
60 import org.apache.hc.core5.http.HttpHost;
61 import org.apache.hc.core5.http.URIScheme;
62 import org.apache.hc.core5.http.config.Lookup;
63 import org.apache.hc.core5.http.config.RegistryBuilder;
64 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
65 import org.apache.hc.core5.http.io.HttpConnectionFactory;
66 import org.apache.hc.core5.http.io.SocketConfig;
67 import org.apache.hc.core5.http.protocol.HttpContext;
68 import org.apache.hc.core5.io.CloseMode;
69 import org.apache.hc.core5.util.Args;
70 import org.apache.hc.core5.util.Asserts;
71 import org.apache.hc.core5.util.Deadline;
72 import org.apache.hc.core5.util.Identifiable;
73 import org.apache.hc.core5.util.TimeValue;
74 import org.apache.hc.core5.util.Timeout;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 @Contract(threading = ThreadingBehavior.SAFE)
97 public class BasicHttpClientConnectionManager implements HttpClientConnectionManager {
98
99 private static final Logger LOG = LoggerFactory.getLogger(BasicHttpClientConnectionManager.class);
100
101 private static final AtomicLong COUNT = new AtomicLong(0);
102
103 private final HttpClientConnectionOperator connectionOperator;
104 private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
105 private final String id;
106
107 private final ReentrantLock lock;
108
109 private ManagedHttpClientConnection conn;
110 private HttpRoute route;
111 private Object state;
112 private long created;
113 private long updated;
114 private long expiry;
115 private boolean leased;
116 private SocketConfig socketConfig;
117 private ConnectionConfig connectionConfig;
118 private TlsConfig tlsConfig;
119
120 private final AtomicBoolean closed;
121
122
123
124
125 public BasicHttpClientConnectionManager(
126 final HttpClientConnectionOperator httpClientConnectionOperator,
127 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
128 super();
129 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
130 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
131 this.id = String.format("ep-%010d", COUNT.getAndIncrement());
132 this.expiry = Long.MAX_VALUE;
133 this.socketConfig = SocketConfig.DEFAULT;
134 this.connectionConfig = ConnectionConfig.DEFAULT;
135 this.tlsConfig = TlsConfig.DEFAULT;
136 this.closed = new AtomicBoolean(false);
137 this.lock = new ReentrantLock();
138 }
139
140
141
142
143 public static BasicHttpClientConnectionManager create(
144 final SchemePortResolver schemePortResolver,
145 final DnsResolver dnsResolver,
146 final Lookup<TlsSocketStrategy> tlsSocketStrategyRegistry,
147 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
148 return new BasicHttpClientConnectionManager(
149 new DefaultHttpClientConnectionOperator(schemePortResolver, dnsResolver, tlsSocketStrategyRegistry),
150 connFactory);
151 }
152
153
154
155
156 public static BasicHttpClientConnectionManager create(
157 final Lookup<TlsSocketStrategy> tlsSocketStrategyRegistry,
158 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
159 return new BasicHttpClientConnectionManager(
160 new DefaultHttpClientConnectionOperator(null, null, tlsSocketStrategyRegistry), connFactory);
161 }
162
163
164
165
166 public static BasicHttpClientConnectionManager create(
167 final Lookup<TlsSocketStrategy> tlsSocketStrategyRegistry) {
168 return new BasicHttpClientConnectionManager(
169 new DefaultHttpClientConnectionOperator(null, null, tlsSocketStrategyRegistry), null);
170 }
171
172
173
174
175 @Deprecated
176 public BasicHttpClientConnectionManager(
177 final Lookup<org.apache.hc.client5.http.socket.ConnectionSocketFactory> socketFactoryRegistry,
178 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory,
179 final SchemePortResolver schemePortResolver,
180 final DnsResolver dnsResolver) {
181 this(new DefaultHttpClientConnectionOperator(
182 socketFactoryRegistry, schemePortResolver, dnsResolver), connFactory);
183 }
184
185
186
187
188 @Deprecated
189 public BasicHttpClientConnectionManager(
190 final Lookup<org.apache.hc.client5.http.socket.ConnectionSocketFactory> socketFactoryRegistry,
191 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
192 this(socketFactoryRegistry, connFactory, null, null);
193 }
194
195
196
197
198 @Deprecated
199 public BasicHttpClientConnectionManager(
200 final Lookup<org.apache.hc.client5.http.socket.ConnectionSocketFactory> socketFactoryRegistry) {
201 this(socketFactoryRegistry, null, null, null);
202 }
203
204 public BasicHttpClientConnectionManager() {
205 this(new DefaultHttpClientConnectionOperator(null, null,
206 RegistryBuilder.<TlsSocketStrategy>create()
207 .register(URIScheme.HTTPS.id, DefaultClientTlsStrategy.createDefault())
208 .build()),
209 null);
210 }
211
212 @Override
213 public void close() {
214 close(CloseMode.GRACEFUL);
215 }
216
217 @Override
218 public void close(final CloseMode closeMode) {
219 if (this.closed.compareAndSet(false, true)) {
220 closeConnection(closeMode);
221 }
222 }
223
224 HttpRoute getRoute() {
225 return route;
226 }
227
228 Object getState() {
229 return state;
230 }
231
232 public SocketConfig getSocketConfig() {
233 lock.lock();
234 try {
235 return socketConfig;
236 } finally {
237 lock.unlock();
238 }
239 }
240
241 public void setSocketConfig(final SocketConfig socketConfig) {
242 lock.lock();
243 try {
244 this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
245 } finally {
246 lock.unlock();
247 }
248 }
249
250
251
252
253 public ConnectionConfig getConnectionConfig() {
254 lock.lock();
255 try {
256 return connectionConfig;
257 } finally {
258 lock.unlock();
259 }
260
261 }
262
263
264
265
266 public void setConnectionConfig(final ConnectionConfig connectionConfig) {
267 lock.lock();
268 try {
269 this.connectionConfig = connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
270 } finally {
271 lock.unlock();
272 }
273 }
274
275
276
277
278 public TlsConfig getTlsConfig() {
279 lock.lock();
280 try {
281 return tlsConfig;
282 } finally {
283 lock.unlock();
284 }
285 }
286
287
288
289
290 public void setTlsConfig(final TlsConfig tlsConfig) {
291 lock.lock();
292 try {
293 this.tlsConfig = tlsConfig != null ? tlsConfig : TlsConfig.DEFAULT;
294 } finally {
295 lock.unlock();
296 }
297 }
298
299 public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
300 return lease(id, route, Timeout.DISABLED, state);
301 }
302
303 @Override
304 public LeaseRequest lease(final String id, final HttpRoute route, final Timeout requestTimeout, final Object state) {
305 return new LeaseRequest() {
306
307 @Override
308 public ConnectionEndpoint get(
309 final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
310 try {
311 return new InternalConnectionEndpoint(route, getConnection(route, state));
312 } catch (final IOException ex) {
313 throw new ExecutionException(ex.getMessage(), ex);
314 }
315 }
316
317 @Override
318 public boolean cancel() {
319 return false;
320 }
321
322 };
323 }
324
325 private void closeConnection(final CloseMode closeMode) {
326 lock.lock();
327 try {
328 if (this.conn != null) {
329 if (LOG.isDebugEnabled()) {
330 LOG.debug("{} Closing connection {}", id, closeMode);
331 }
332 this.conn.close(closeMode);
333 this.conn = null;
334 }
335 } finally {
336 lock.unlock();
337 }
338 }
339
340 private void checkExpiry() {
341 if (this.conn != null && System.currentTimeMillis() >= this.expiry) {
342 if (LOG.isDebugEnabled()) {
343 LOG.debug("{} Connection expired @ {}", id, Instant.ofEpochMilli(this.expiry));
344 }
345 closeConnection(CloseMode.GRACEFUL);
346 }
347 }
348
349 private void validate() {
350 if (this.conn != null) {
351 final TimeValue timeToLive = connectionConfig.getTimeToLive();
352 if (TimeValue.isNonNegative(timeToLive)) {
353 final Deadline deadline = Deadline.calculate(created, timeToLive);
354 if (deadline.isExpired()) {
355 closeConnection(CloseMode.GRACEFUL);
356 }
357 }
358 }
359 if (this.conn != null) {
360 final TimeValue timeValue = connectionConfig.getValidateAfterInactivity() != null ?
361 connectionConfig.getValidateAfterInactivity() : TimeValue.ofSeconds(2);
362 if (TimeValue.isNonNegative(timeValue)) {
363 final Deadline deadline = Deadline.calculate(updated, timeValue);
364 if (deadline.isExpired()) {
365 boolean stale;
366 try {
367 stale = conn.isStale();
368 } catch (final IOException ignore) {
369 stale = true;
370 }
371 if (stale) {
372 if (LOG.isDebugEnabled()) {
373 LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(conn));
374 }
375 closeConnection(CloseMode.GRACEFUL);
376 }
377 }
378 }
379 }
380 }
381
382 ManagedHttpClientConnection getConnection(final HttpRoute route, final Object state) throws IOException {
383 lock.lock();
384 try {
385 Asserts.check(!isClosed(), "Connection manager has been shut down");
386 if (LOG.isDebugEnabled()) {
387 LOG.debug("{} Get connection for route {}", id, route);
388 }
389 Asserts.check(!this.leased, "Connection %s is still allocated", conn);
390 if (!Objects.equals(this.route, route) || !Objects.equals(this.state, state)) {
391 closeConnection(CloseMode.GRACEFUL);
392 }
393 this.route = route;
394 this.state = state;
395 checkExpiry();
396 validate();
397 if (this.conn == null) {
398 this.conn = this.connFactory.createConnection(null);
399 this.created = System.currentTimeMillis();
400 } else {
401 this.conn.activate();
402 }
403 this.leased = true;
404 if (LOG.isDebugEnabled()) {
405 LOG.debug("{} Using connection {}", id, conn);
406 }
407 return this.conn;
408 } finally {
409 lock.unlock();
410 }
411 }
412
413
414 private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
415 if (endpoint instanceof InternalConnectionEndpoint) {
416 return (InternalConnectionEndpoint) endpoint;
417 }
418 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
419 }
420
421 @Override
422 public void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
423 lock.lock();
424 try {
425 Args.notNull(endpoint, "Managed endpoint");
426 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
427 final ManagedHttpClientConnection conn = internalEndpoint.detach();
428 if (LOG.isDebugEnabled()) {
429 LOG.debug("{} Releasing connection {}", id, conn);
430 }
431 if (isClosed()) {
432 return;
433 }
434 try {
435 if (keepAlive == null && conn != null) {
436 conn.close(CloseMode.GRACEFUL);
437 }
438 this.updated = System.currentTimeMillis();
439 if (conn != null && conn.isOpen() && conn.isConsistent()) {
440 this.state = state;
441 conn.passivate();
442 if (TimeValue.isPositive(keepAlive)) {
443 if (LOG.isDebugEnabled()) {
444 LOG.debug("{} Connection can be kept alive for {}", id, keepAlive);
445 }
446 this.expiry = this.updated + keepAlive.toMilliseconds();
447 } else {
448 if (LOG.isDebugEnabled()) {
449 LOG.debug("{} Connection can be kept alive indefinitely", id);
450 }
451 this.expiry = Long.MAX_VALUE;
452 }
453 } else {
454 this.route = null;
455 this.conn = null;
456 this.expiry = Long.MAX_VALUE;
457 if (LOG.isDebugEnabled()) {
458 LOG.debug("{} Connection is not kept alive", id);
459 }
460 }
461 } finally {
462 this.leased = false;
463 }
464 } finally {
465 lock.unlock();
466 }
467 }
468
469 @Override
470 public void connect(final ConnectionEndpoint endpoint, final TimeValue timeout, final HttpContext context) throws IOException {
471 lock.lock();
472 try {
473 Args.notNull(endpoint, "Endpoint");
474
475 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
476 if (internalEndpoint.isConnected()) {
477 return;
478 }
479 final HttpRoute route = internalEndpoint.getRoute();
480 final HttpHost firstHop = route.getProxyHost() != null ? route.getProxyHost() : route.getTargetHost();
481 final Timeout connectTimeout = timeout != null ? Timeout.of(timeout.getDuration(), timeout.getTimeUnit()) : connectionConfig.getConnectTimeout();
482 final ManagedHttpClientConnection connection = internalEndpoint.getConnection();
483 if (LOG.isDebugEnabled()) {
484 LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), firstHop, connectTimeout);
485 }
486 this.connectionOperator.connect(
487 connection,
488 firstHop,
489 route.getTargetName(),
490 route.getLocalSocketAddress(),
491 connectTimeout,
492 socketConfig,
493 route.isTunnelled() ? null : tlsConfig,
494 context);
495 if (LOG.isDebugEnabled()) {
496 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
497 }
498 final Timeout socketTimeout = connectionConfig.getSocketTimeout();
499 if (socketTimeout != null) {
500 connection.setSocketTimeout(socketTimeout);
501 }
502 } finally {
503 lock.unlock();
504 }
505 }
506
507 @Override
508 public void upgrade(
509 final ConnectionEndpoint endpoint,
510 final HttpContext context) throws IOException {
511 lock.lock();
512 try {
513 Args.notNull(endpoint, "Endpoint");
514 Args.notNull(route, "HTTP route");
515 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
516 final HttpRoute route = internalEndpoint.getRoute();
517 this.connectionOperator.upgrade(
518 internalEndpoint.getConnection(),
519 route.getTargetHost(),
520 route.getTargetName(),
521 tlsConfig,
522 context);
523 } finally {
524 lock.unlock();
525 }
526 }
527
528 public void closeExpired() {
529 lock.lock();
530 try {
531 if (isClosed()) {
532 return;
533 }
534 if (!this.leased) {
535 checkExpiry();
536 }
537 } finally {
538 lock.unlock();
539 }
540 }
541
542 public void closeIdle(final TimeValue idleTime) {
543 lock.lock();
544 try {
545 Args.notNull(idleTime, "Idle time");
546 if (isClosed()) {
547 return;
548 }
549 if (!this.leased) {
550 long time = idleTime.toMilliseconds();
551 if (time < 0) {
552 time = 0;
553 }
554 final long deadline = System.currentTimeMillis() - time;
555 if (this.updated <= deadline) {
556 closeConnection(CloseMode.GRACEFUL);
557 }
558 }
559 } finally {
560 lock.unlock();
561 }
562 }
563
564
565
566
567
568
569
570
571 @Deprecated
572 public TimeValue getValidateAfterInactivity() {
573 return connectionConfig.getValidateAfterInactivity();
574 }
575
576
577
578
579
580
581
582
583
584
585
586 @Deprecated
587 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
588 this.connectionConfig = ConnectionConfig.custom()
589 .setValidateAfterInactivity(validateAfterInactivity)
590 .build();
591 }
592
593 class InternalConnectionEndpoint extends ConnectionEndpoint implements Identifiable {
594
595 private final HttpRoute route;
596 private final AtomicReference<ManagedHttpClientConnection> connRef;
597
598 public InternalConnectionEndpoint(final HttpRoute route, final ManagedHttpClientConnection conn) {
599 this.route = route;
600 this.connRef = new AtomicReference<>(conn);
601 }
602
603 @Override
604 public String getId() {
605 return id;
606 }
607
608 HttpRoute getRoute() {
609 return route;
610 }
611
612 ManagedHttpClientConnection getConnection() {
613 final ManagedHttpClientConnection conn = this.connRef.get();
614 if (conn == null) {
615 throw new ConnectionShutdownException();
616 }
617 return conn;
618 }
619
620 ManagedHttpClientConnection getValidatedConnection() {
621 final ManagedHttpClientConnection conn = this.connRef.get();
622 if (conn == null || !conn.isOpen()) {
623 throw new ConnectionShutdownException();
624 }
625 return conn;
626 }
627
628 ManagedHttpClientConnection detach() {
629 return this.connRef.getAndSet(null);
630 }
631
632 @Override
633 public boolean isConnected() {
634 final ManagedHttpClientConnection conn = this.connRef.get();
635 return conn != null && conn.isOpen();
636 }
637
638 @Override
639 public void close(final CloseMode closeMode) {
640 final ManagedHttpClientConnection conn = this.connRef.get();
641 if (conn != null) {
642 conn.close(closeMode);
643 }
644 }
645
646 @Override
647 public void close() throws IOException {
648 final ManagedHttpClientConnection conn = this.connRef.get();
649 if (conn != null) {
650 conn.close();
651 }
652 }
653
654 @Override
655 public void setSocketTimeout(final Timeout timeout) {
656 getValidatedConnection().setSocketTimeout(timeout);
657 }
658
659
660
661
662 @Deprecated
663 @Override
664 public ClassicHttpResponse execute(
665 final String exchangeId,
666 final ClassicHttpRequest request,
667 final HttpRequestExecutor requestExecutor,
668 final HttpContext context) throws IOException, HttpException {
669 Args.notNull(request, "HTTP request");
670 Args.notNull(requestExecutor, "Request executor");
671 if (LOG.isDebugEnabled()) {
672 LOG.debug("{} Executing exchange {}", id, exchangeId);
673 }
674 return requestExecutor.execute(request, getValidatedConnection(), context);
675 }
676
677
678
679
680 @Override
681 public ClassicHttpResponse execute(
682 final String exchangeId,
683 final ClassicHttpRequest request,
684 final RequestExecutor requestExecutor,
685 final HttpContext context) throws IOException, HttpException {
686 Args.notNull(request, "HTTP request");
687 Args.notNull(requestExecutor, "Request executor");
688 if (LOG.isDebugEnabled()) {
689 LOG.debug("{} Executing exchange {}", id, exchangeId);
690 }
691 return requestExecutor.execute(request, getValidatedConnection(), context);
692 }
693
694
695
696
697 @Override
698 public EndpointInfo getInfo() {
699 final ManagedHttpClientConnection connection = this.connRef.get();
700 if (connection != null && connection.isOpen()) {
701 return new EndpointInfo(connection.getProtocolVersion(), connection.getSSLSession());
702 }
703 return null;
704 }
705
706 }
707
708
709
710
711
712
713
714
715
716 public boolean isClosed() {
717 return this.closed.get();
718 }
719
720 }