1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.io;
28
29 import java.io.IOException;
30 import java.util.Set;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import org.apache.hc.client5.http.DnsResolver;
38 import org.apache.hc.client5.http.HttpRoute;
39 import org.apache.hc.client5.http.SchemePortResolver;
40 import org.apache.hc.client5.http.config.ConnectionConfig;
41 import org.apache.hc.client5.http.config.TlsConfig;
42 import org.apache.hc.client5.http.impl.ConnPoolSupport;
43 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
44 import org.apache.hc.client5.http.impl.PrefixedIncrementingId;
45 import org.apache.hc.client5.http.io.ConnectionEndpoint;
46 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
47 import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
48 import org.apache.hc.client5.http.io.LeaseRequest;
49 import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
50 import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
51 import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
52 import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
53 import org.apache.hc.core5.annotation.Contract;
54 import org.apache.hc.core5.annotation.Internal;
55 import org.apache.hc.core5.annotation.ThreadingBehavior;
56 import org.apache.hc.core5.function.Resolver;
57 import org.apache.hc.core5.http.ClassicHttpRequest;
58 import org.apache.hc.core5.http.ClassicHttpResponse;
59 import org.apache.hc.core5.http.HttpException;
60 import org.apache.hc.core5.http.HttpHost;
61 import org.apache.hc.core5.http.URIScheme;
62 import org.apache.hc.core5.http.config.Registry;
63 import org.apache.hc.core5.http.config.RegistryBuilder;
64 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
65 import org.apache.hc.core5.http.io.HttpConnectionFactory;
66 import org.apache.hc.core5.http.io.SocketConfig;
67 import org.apache.hc.core5.http.protocol.HttpContext;
68 import org.apache.hc.core5.io.CloseMode;
69 import org.apache.hc.core5.pool.ConnPoolControl;
70 import org.apache.hc.core5.pool.LaxConnPool;
71 import org.apache.hc.core5.pool.ManagedConnPool;
72 import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
73 import org.apache.hc.core5.pool.PoolEntry;
74 import org.apache.hc.core5.pool.PoolReusePolicy;
75 import org.apache.hc.core5.pool.PoolStats;
76 import org.apache.hc.core5.pool.StrictConnPool;
77 import org.apache.hc.core5.util.Args;
78 import org.apache.hc.core5.util.Deadline;
79 import org.apache.hc.core5.util.Identifiable;
80 import org.apache.hc.core5.util.TimeValue;
81 import org.apache.hc.core5.util.Timeout;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
104 public class PoolingHttpClientConnectionManager
105 implements HttpClientConnectionManager, ConnPoolControl<HttpRoute> {
106
107 private static final Logger LOG = LoggerFactory.getLogger(PoolingHttpClientConnectionManager.class);
108
109 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
110 public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
111
112 private final HttpClientConnectionOperator connectionOperator;
113 private final ManagedConnPool<HttpRoute, ManagedHttpClientConnection> pool;
114 private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
115 private final AtomicBoolean closed;
116
117 private volatile Resolver<HttpRoute, SocketConfig> socketConfigResolver;
118 private volatile Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
119 private volatile Resolver<HttpHost, TlsConfig> tlsConfigResolver;
120
121 public PoolingHttpClientConnectionManager() {
122 this(RegistryBuilder.<ConnectionSocketFactory>create()
123 .register(URIScheme.HTTP.id, PlainConnectionSocketFactory.getSocketFactory())
124 .register(URIScheme.HTTPS.id, SSLConnectionSocketFactory.getSocketFactory())
125 .build());
126 }
127
128 public PoolingHttpClientConnectionManager(
129 final Registry<ConnectionSocketFactory> socketFactoryRegistry) {
130 this(socketFactoryRegistry, null);
131 }
132
133 public PoolingHttpClientConnectionManager(
134 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
135 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
136 this(socketFactoryRegistry, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND, connFactory);
137 }
138
139 public PoolingHttpClientConnectionManager(
140 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
141 final PoolConcurrencyPolicy poolConcurrencyPolicy,
142 final TimeValue timeToLive,
143 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
144 this(socketFactoryRegistry, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive, connFactory);
145 }
146
147 public PoolingHttpClientConnectionManager(
148 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
149 final PoolConcurrencyPolicy poolConcurrencyPolicy,
150 final PoolReusePolicy poolReusePolicy,
151 final TimeValue timeToLive) {
152 this(socketFactoryRegistry, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null);
153 }
154
155 public PoolingHttpClientConnectionManager(
156 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
157 final PoolConcurrencyPolicy poolConcurrencyPolicy,
158 final PoolReusePolicy poolReusePolicy,
159 final TimeValue timeToLive,
160 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
161 this(socketFactoryRegistry, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null, connFactory);
162 }
163
164 public PoolingHttpClientConnectionManager(
165 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
166 final PoolConcurrencyPolicy poolConcurrencyPolicy,
167 final PoolReusePolicy poolReusePolicy,
168 final TimeValue timeToLive,
169 final SchemePortResolver schemePortResolver,
170 final DnsResolver dnsResolver,
171 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
172 this(new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver),
173 poolConcurrencyPolicy,
174 poolReusePolicy,
175 timeToLive,
176 connFactory);
177 }
178
179 @Internal
180 protected PoolingHttpClientConnectionManager(
181 final HttpClientConnectionOperator httpClientConnectionOperator,
182 final PoolConcurrencyPolicy poolConcurrencyPolicy,
183 final PoolReusePolicy poolReusePolicy,
184 final TimeValue timeToLive,
185 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
186 super();
187 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
188 switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
189 case STRICT:
190 this.pool = new StrictConnPool<HttpRoute, ManagedHttpClientConnection>(
191 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
192 DEFAULT_MAX_TOTAL_CONNECTIONS,
193 timeToLive,
194 poolReusePolicy,
195 null) {
196
197 @Override
198 public void closeExpired() {
199 enumAvailable(e -> closeIfExpired(e));
200 }
201
202 };
203 break;
204 case LAX:
205 this.pool = new LaxConnPool<HttpRoute, ManagedHttpClientConnection>(
206 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
207 timeToLive,
208 poolReusePolicy,
209 null) {
210
211 @Override
212 public void closeExpired() {
213 enumAvailable(e -> closeIfExpired(e));
214 }
215
216 };
217 break;
218 default:
219 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
220 }
221 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
222 this.closed = new AtomicBoolean(false);
223 }
224
225 @Internal
226 protected PoolingHttpClientConnectionManager(
227 final HttpClientConnectionOperator httpClientConnectionOperator,
228 final ManagedConnPool<HttpRoute, ManagedHttpClientConnection> pool,
229 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
230 super();
231 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
232 this.pool = Args.notNull(pool, "Connection pool");
233 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
234 this.closed = new AtomicBoolean(false);
235 }
236
237 @Override
238 public void close() {
239 close(CloseMode.GRACEFUL);
240 }
241
242 @Override
243 public void close(final CloseMode closeMode) {
244 if (this.closed.compareAndSet(false, true)) {
245 if (LOG.isDebugEnabled()) {
246 LOG.debug("Shutdown connection pool {}", closeMode);
247 }
248 this.pool.close(closeMode);
249 LOG.debug("Connection pool shut down");
250 }
251 }
252
253 private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
254 if (endpoint instanceof InternalConnectionEndpoint) {
255 return (InternalConnectionEndpoint) endpoint;
256 }
257 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
258 }
259
260 private SocketConfig resolveSocketConfig(final HttpRoute route) {
261 final Resolver<HttpRoute, SocketConfig> resolver = this.socketConfigResolver;
262 final SocketConfig socketConfig = resolver != null ? resolver.resolve(route) : null;
263 return socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
264 }
265
266 private ConnectionConfig resolveConnectionConfig(final HttpRoute route) {
267 final Resolver<HttpRoute, ConnectionConfig> resolver = this.connectionConfigResolver;
268 final ConnectionConfig connectionConfig = resolver != null ? resolver.resolve(route) : null;
269 return connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
270 }
271
272 private TlsConfig resolveTlsConfig(final HttpHost host) {
273 final Resolver<HttpHost, TlsConfig> resolver = this.tlsConfigResolver;
274 final TlsConfig tlsConfig = resolver != null ? resolver.resolve(host) : null;
275 return tlsConfig != null ? tlsConfig : TlsConfig.DEFAULT;
276 }
277
278 private TimeValue resolveValidateAfterInactivity(final ConnectionConfig connectionConfig) {
279 final TimeValue timeValue = connectionConfig.getValidateAfterInactivity();
280 return timeValue != null ? timeValue : TimeValue.ofSeconds(2);
281 }
282
283 public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
284 return lease(id, route, Timeout.DISABLED, state);
285 }
286
287 @Override
288 public LeaseRequest lease(
289 final String id,
290 final HttpRoute route,
291 final Timeout requestTimeout,
292 final Object state) {
293 Args.notNull(route, "HTTP route");
294 if (LOG.isDebugEnabled()) {
295 LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
296 }
297 final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(route, state, requestTimeout, null);
298 return new LeaseRequest() {
299
300 private volatile ConnectionEndpoint endpoint;
301
302 @Override
303 public synchronized ConnectionEndpoint get(
304 final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
305 Args.notNull(timeout, "Operation timeout");
306 if (this.endpoint != null) {
307 return this.endpoint;
308 }
309 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry;
310 try {
311 poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
312 } catch (final TimeoutException ex) {
313 leaseFuture.cancel(true);
314 throw ex;
315 }
316 if (LOG.isDebugEnabled()) {
317 LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
318 }
319 final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
320 try {
321 if (poolEntry.hasConnection()) {
322 final TimeValue timeToLive = connectionConfig.getTimeToLive();
323 if (TimeValue.isNonNegative(timeToLive)) {
324 if (timeToLive.getDuration() == 0
325 || Deadline.calculate(poolEntry.getCreated(), timeToLive).isExpired()) {
326 poolEntry.discardConnection(CloseMode.GRACEFUL);
327 }
328 }
329 }
330 if (poolEntry.hasConnection()) {
331 final TimeValue timeValue = resolveValidateAfterInactivity(connectionConfig);
332 if (TimeValue.isNonNegative(timeValue)) {
333 if (timeValue.getDuration() == 0
334 || Deadline.calculate(poolEntry.getUpdated(), timeValue).isExpired()) {
335 final ManagedHttpClientConnection conn = poolEntry.getConnection();
336 boolean stale;
337 try {
338 stale = conn.isStale();
339 } catch (final IOException ignore) {
340 stale = true;
341 }
342 if (stale) {
343 if (LOG.isDebugEnabled()) {
344 LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(conn));
345 }
346 poolEntry.discardConnection(CloseMode.IMMEDIATE);
347 }
348 }
349 }
350 }
351 final ManagedHttpClientConnection conn = poolEntry.getConnection();
352 if (conn != null) {
353 conn.activate();
354 } else {
355 poolEntry.assignConnection(connFactory.createConnection(null));
356 }
357 this.endpoint = new InternalConnectionEndpoint(poolEntry);
358 if (LOG.isDebugEnabled()) {
359 LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
360 }
361 return this.endpoint;
362 } catch (final Exception ex) {
363 if (LOG.isDebugEnabled()) {
364 LOG.debug("{} endpoint lease failed", id);
365 }
366 pool.release(poolEntry, false);
367 throw new ExecutionException(ex.getMessage(), ex);
368 }
369 }
370
371 @Override
372 public boolean cancel() {
373 return leaseFuture.cancel(true);
374 }
375
376 };
377
378 }
379
380 @Override
381 public void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
382 Args.notNull(endpoint, "Managed endpoint");
383 final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry = cast(endpoint).detach();
384 if (entry == null) {
385 return;
386 }
387 if (LOG.isDebugEnabled()) {
388 LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
389 }
390 final ManagedHttpClientConnection conn = entry.getConnection();
391 if (conn != null && keepAlive == null) {
392 conn.close(CloseMode.GRACEFUL);
393 }
394 boolean reusable = conn != null && conn.isOpen() && conn.isConsistent();
395 try {
396 if (reusable) {
397 entry.updateState(state);
398 entry.updateExpiry(keepAlive);
399 conn.passivate();
400 if (LOG.isDebugEnabled()) {
401 final String s;
402 if (TimeValue.isPositive(keepAlive)) {
403 s = "for " + keepAlive;
404 } else {
405 s = "indefinitely";
406 }
407 LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn), s);
408 }
409 } else {
410 if (LOG.isDebugEnabled()) {
411 if (conn != null && !conn.isConsistent()) {
412 LOG.debug("{} connection is in an inconsistent state and cannot be kept alive)", ConnPoolSupport.getId(endpoint));
413 } else {
414 LOG.debug("{} connection is not kept alive)", ConnPoolSupport.getId(endpoint));
415 }
416 }
417 }
418 } catch (final RuntimeException ex) {
419 reusable = false;
420 throw ex;
421 } finally {
422 this.pool.release(entry, reusable);
423 if (LOG.isDebugEnabled()) {
424 LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
425 }
426 }
427 }
428
429 @Override
430 public void connect(final ConnectionEndpoint endpoint, final TimeValue timeout, final HttpContext context) throws IOException {
431 Args.notNull(endpoint, "Managed endpoint");
432 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
433 if (internalEndpoint.isConnected()) {
434 return;
435 }
436 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = internalEndpoint.getPoolEntry();
437 if (!poolEntry.hasConnection()) {
438 poolEntry.assignConnection(connFactory.createConnection(null));
439 }
440 final HttpRoute route = poolEntry.getRoute();
441 final HttpHost host = route.getProxyHost() != null ? route.getProxyHost() : route.getTargetHost();
442 final SocketConfig socketConfig = resolveSocketConfig(route);
443 final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
444 final TlsConfig tlsConfig = resolveTlsConfig(host);
445 final Timeout connectTimeout = timeout != null ? Timeout.of(timeout.getDuration(), timeout.getTimeUnit()) : connectionConfig.getConnectTimeout();
446 if (LOG.isDebugEnabled()) {
447 LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
448 }
449 final ManagedHttpClientConnection conn = poolEntry.getConnection();
450 this.connectionOperator.connect(
451 conn,
452 host,
453 route.getLocalSocketAddress(),
454 connectTimeout,
455 socketConfig,
456 tlsConfig,
457 context);
458 if (LOG.isDebugEnabled()) {
459 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
460 }
461 final Timeout socketTimeout = connectionConfig.getSocketTimeout();
462 if (socketTimeout != null) {
463 conn.setSocketTimeout(socketTimeout);
464 }
465 }
466
467 @Override
468 public void upgrade(final ConnectionEndpoint endpoint, final HttpContext context) throws IOException {
469 Args.notNull(endpoint, "Managed endpoint");
470 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
471 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
472 final HttpRoute route = poolEntry.getRoute();
473 final HttpHost host = route.getProxyHost() != null ? route.getProxyHost() : route.getTargetHost();
474 final TlsConfig tlsConfig = resolveTlsConfig(host);
475 this.connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), tlsConfig, context);
476 }
477
478 @Override
479 public void closeIdle(final TimeValue idleTime) {
480 Args.notNull(idleTime, "Idle time");
481 if (LOG.isDebugEnabled()) {
482 LOG.debug("Closing connections idle longer than {}", idleTime);
483 }
484 this.pool.closeIdle(idleTime);
485 }
486
487 @Override
488 public void closeExpired() {
489 LOG.debug("Closing expired connections");
490 this.pool.closeExpired();
491 }
492
493 @Override
494 public Set<HttpRoute> getRoutes() {
495 return this.pool.getRoutes();
496 }
497
498 @Override
499 public int getMaxTotal() {
500 return this.pool.getMaxTotal();
501 }
502
503 @Override
504 public void setMaxTotal(final int max) {
505 this.pool.setMaxTotal(max);
506 }
507
508 @Override
509 public int getDefaultMaxPerRoute() {
510 return this.pool.getDefaultMaxPerRoute();
511 }
512
513 @Override
514 public void setDefaultMaxPerRoute(final int max) {
515 this.pool.setDefaultMaxPerRoute(max);
516 }
517
518 @Override
519 public int getMaxPerRoute(final HttpRoute route) {
520 return this.pool.getMaxPerRoute(route);
521 }
522
523 @Override
524 public void setMaxPerRoute(final HttpRoute route, final int max) {
525 this.pool.setMaxPerRoute(route, max);
526 }
527
528 @Override
529 public PoolStats getTotalStats() {
530 return this.pool.getTotalStats();
531 }
532
533 @Override
534 public PoolStats getStats(final HttpRoute route) {
535 return this.pool.getStats(route);
536 }
537
538
539
540
541 public void setDefaultSocketConfig(final SocketConfig config) {
542 this.socketConfigResolver = (route) -> config;
543 }
544
545
546
547
548
549
550 public void setSocketConfigResolver(final Resolver<HttpRoute, SocketConfig> socketConfigResolver) {
551 this.socketConfigResolver = socketConfigResolver;
552 }
553
554
555
556
557
558
559 public void setDefaultConnectionConfig(final ConnectionConfig config) {
560 this.connectionConfigResolver = (route) -> config;
561 }
562
563
564
565
566
567
568 public void setConnectionConfigResolver(final Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver) {
569 this.connectionConfigResolver = connectionConfigResolver;
570 }
571
572
573
574
575
576
577 public void setDefaultTlsConfig(final TlsConfig config) {
578 this.tlsConfigResolver = (host) -> config;
579 }
580
581
582
583
584
585
586 public void setTlsConfigResolver(final Resolver<HttpHost, TlsConfig> tlsConfigResolver) {
587 this.tlsConfigResolver = tlsConfigResolver;
588 }
589
590 void closeIfExpired(final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry) {
591 final long now = System.currentTimeMillis();
592 if (entry.getExpiryDeadline().isBefore(now)) {
593 entry.discardConnection(CloseMode.GRACEFUL);
594 } else {
595 final ConnectionConfig connectionConfig = resolveConnectionConfig(entry.getRoute());
596 final TimeValue timeToLive = connectionConfig.getTimeToLive();
597 if (timeToLive != null && Deadline.calculate(entry.getCreated(), timeToLive).isBefore(now)) {
598 entry.discardConnection(CloseMode.GRACEFUL);
599 }
600 }
601 }
602
603
604
605
606 @Deprecated
607 public SocketConfig getDefaultSocketConfig() {
608 return SocketConfig.DEFAULT;
609 }
610
611
612
613
614
615
616 @Deprecated
617 public TimeValue getValidateAfterInactivity() {
618 return ConnectionConfig.DEFAULT.getValidateAfterInactivity();
619 }
620
621
622
623
624
625
626
627
628
629
630
631 @Deprecated
632 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
633 setDefaultConnectionConfig(ConnectionConfig.custom()
634 .setValidateAfterInactivity(validateAfterInactivity)
635 .build());
636 }
637
638 private static final PrefixedIncrementingId INCREMENTING_ID = new PrefixedIncrementingId("ep-");
639
640 static class InternalConnectionEndpoint extends ConnectionEndpoint implements Identifiable {
641
642 private final AtomicReference<PoolEntry<HttpRoute, ManagedHttpClientConnection>> poolEntryRef;
643 private final String id;
644
645 InternalConnectionEndpoint(
646 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry) {
647 this.poolEntryRef = new AtomicReference<>(poolEntry);
648 this.id = INCREMENTING_ID.getNextId();
649 }
650
651 @Override
652 public String getId() {
653 return id;
654 }
655
656 PoolEntry<HttpRoute, ManagedHttpClientConnection> getPoolEntry() {
657 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = poolEntryRef.get();
658 if (poolEntry == null) {
659 throw new ConnectionShutdownException();
660 }
661 return poolEntry;
662 }
663
664 PoolEntry<HttpRoute, ManagedHttpClientConnection> getValidatedPoolEntry() {
665 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = getPoolEntry();
666 final ManagedHttpClientConnection connection = poolEntry.getConnection();
667 if (connection == null || !connection.isOpen()) {
668 throw new ConnectionShutdownException();
669 }
670 return poolEntry;
671 }
672
673 PoolEntry<HttpRoute, ManagedHttpClientConnection> detach() {
674 return poolEntryRef.getAndSet(null);
675 }
676
677 @Override
678 public void close(final CloseMode closeMode) {
679 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = poolEntryRef.get();
680 if (poolEntry != null) {
681 poolEntry.discardConnection(closeMode);
682 }
683 }
684
685 @Override
686 public void close() throws IOException {
687 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = poolEntryRef.get();
688 if (poolEntry != null) {
689 poolEntry.discardConnection(CloseMode.GRACEFUL);
690 }
691 }
692
693 @Override
694 public boolean isConnected() {
695 final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = getPoolEntry();
696 final ManagedHttpClientConnection connection = poolEntry.getConnection();
697 return connection != null && connection.isOpen();
698 }
699
700 @Override
701 public void setSocketTimeout(final Timeout timeout) {
702 getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
703 }
704
705 @Override
706 public ClassicHttpResponse execute(
707 final String exchangeId,
708 final ClassicHttpRequest request,
709 final HttpRequestExecutor requestExecutor,
710 final HttpContext context) throws IOException, HttpException {
711 Args.notNull(request, "HTTP request");
712 Args.notNull(requestExecutor, "Request executor");
713 final ManagedHttpClientConnection connection = getValidatedPoolEntry().getConnection();
714 if (LOG.isDebugEnabled()) {
715 LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
716 }
717 return requestExecutor.execute(request, connection, context);
718 }
719
720 }
721 }