View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.LinkedList;
33  import java.util.ListIterator;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.locks.Lock;
40  import java.util.concurrent.locks.ReentrantLock;
41  
42  import org.apache.hc.core5.annotation.Contract;
43  import org.apache.hc.core5.annotation.ThreadingBehavior;
44  import org.apache.hc.core5.concurrent.BasicFuture;
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.function.Callback;
47  import org.apache.hc.core5.io.CloseMode;
48  import org.apache.hc.core5.io.ModalCloseable;
49  import org.apache.hc.core5.util.Args;
50  import org.apache.hc.core5.util.Asserts;
51  import org.apache.hc.core5.util.Deadline;
52  import org.apache.hc.core5.util.DeadlineTimeoutException;
53  import org.apache.hc.core5.util.LangUtils;
54  import org.apache.hc.core5.util.TimeValue;
55  import org.apache.hc.core5.util.Timeout;
56  
57  /**
58   * Connection pool with strict connection limit guarantees.
59   *
60   * @param <T> route
61   * @param <C> connection object
62   *
63   * @since 4.2
64   */
65  @Contract(threading = ThreadingBehavior.SAFE)
66  public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
67  
68      private final TimeValue timeToLive;
69      private final PoolReusePolicy policy;
70      private final DisposalCallback<C> disposalCallback;
71      private final ConnPoolListener<T> connPoolListener;
72      private final Map<T, PerRoutePool<T, C>> routeToPool;
73      private final LinkedList<LeaseRequest<T, C>> leasingRequests;
74      private final Set<PoolEntry<T, C>> leased;
75      private final LinkedList<PoolEntry<T, C>> available;
76      private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
77      private final Map<T, Integer> maxPerRoute;
78      private final Lock lock;
79      private final AtomicBoolean isShutDown;
80  
81      private volatile int defaultMaxPerRoute;
82      private volatile int maxTotal;
83  
84      /**
85       * @since 5.0
86       */
87      public StrictConnPool(
88              final int defaultMaxPerRoute,
89              final int maxTotal,
90              final TimeValue timeToLive,
91              final PoolReusePolicy policy,
92              final DisposalCallback<C> disposalCallback,
93              final ConnPoolListener<T> connPoolListener) {
94          super();
95          Args.positive(defaultMaxPerRoute, "Max per route value");
96          Args.positive(maxTotal, "Max total value");
97          this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
98          this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
99          this.disposalCallback = disposalCallback;
100         this.connPoolListener = connPoolListener;
101         this.routeToPool = new HashMap<>();
102         this.leasingRequests = new LinkedList<>();
103         this.leased = new HashSet<>();
104         this.available = new LinkedList<>();
105         this.completedRequests = new ConcurrentLinkedQueue<>();
106         this.maxPerRoute = new HashMap<>();
107         this.lock = new ReentrantLock();
108         this.isShutDown = new AtomicBoolean(false);
109         this.defaultMaxPerRoute = defaultMaxPerRoute;
110         this.maxTotal = maxTotal;
111     }
112 
113     /**
114      * @since 5.0
115      */
116     public StrictConnPool(
117             final int defaultMaxPerRoute,
118             final int maxTotal,
119             final TimeValue timeToLive,
120             final PoolReusePolicy policy,
121             final ConnPoolListener<T> connPoolListener) {
122         this(defaultMaxPerRoute, maxTotal, timeToLive, policy, null, connPoolListener);
123     }
124 
125     public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
126         this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null);
127     }
128 
129     public boolean isShutdown() {
130         return this.isShutDown.get();
131     }
132 
133     @Override
134     public void close(final CloseMode closeMode) {
135         if (this.isShutDown.compareAndSet(false, true)) {
136             fireCallbacks();
137             this.lock.lock();
138             try {
139                 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
140                     pool.shutdown(closeMode);
141                 }
142                 this.routeToPool.clear();
143                 this.leased.clear();
144                 this.available.clear();
145                 this.leasingRequests.clear();
146             } finally {
147                 this.lock.unlock();
148             }
149         }
150     }
151 
152     @Override
153     public void close() {
154         close(CloseMode.GRACEFUL);
155     }
156 
157     private PerRoutePool<T, C> getPool(final T route) {
158         PerRoutePool<T, C> pool = this.routeToPool.get(route);
159         if (pool == null) {
160             pool = new PerRoutePool<>(route, this.disposalCallback);
161             this.routeToPool.put(route, pool);
162         }
163         return pool;
164     }
165 
166     @Override
167     public Future<PoolEntry<T, C>> lease(
168             final T route, final Object state,
169             final Timeout requestTimeout,
170             final FutureCallback<PoolEntry<T, C>> callback) {
171         Args.notNull(route, "Route");
172         Args.notNull(requestTimeout, "Request timeout");
173         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
174         final Deadline deadline = Deadline.calculate(requestTimeout);
175         final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<>(callback);
176         final boolean acquiredLock;
177 
178         try {
179             acquiredLock = this.lock.tryLock(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
180         } catch (final InterruptedException interruptedException) {
181             Thread.currentThread().interrupt();
182             future.cancel();
183             return future;
184         }
185 
186         if (acquiredLock) {
187             try {
188                 final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
189                 final boolean completed = processPendingRequest(request);
190                 if (!request.isDone() && !completed) {
191                     this.leasingRequests.add(request);
192                 }
193                 if (request.isDone()) {
194                     this.completedRequests.add(request);
195                 }
196             } finally {
197                 this.lock.unlock();
198             }
199             fireCallbacks();
200         } else {
201             future.failed(DeadlineTimeoutException.from(deadline));
202         }
203 
204         return future;
205     }
206 
207     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
208         return lease(route, state, Timeout.DISABLED, null);
209     }
210 
211     @Override
212     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
213         if (entry == null) {
214             return;
215         }
216         if (this.isShutDown.get()) {
217             return;
218         }
219         if (!reusable) {
220             entry.discardConnection(CloseMode.GRACEFUL);
221         }
222         this.lock.lock();
223         try {
224             if (this.leased.remove(entry)) {
225                 if (this.connPoolListener != null) {
226                     this.connPoolListener.onRelease(entry.getRoute(), this);
227                 }
228                 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
229                 final boolean keepAlive = entry.hasConnection() && reusable;
230                 pool.free(entry, keepAlive);
231                 if (keepAlive) {
232                     switch (policy) {
233                         case LIFO:
234                             this.available.addFirst(entry);
235                             break;
236                         case FIFO:
237                             this.available.addLast(entry);
238                             break;
239                         default:
240                             throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
241                     }
242                 } else {
243                     entry.discardConnection(CloseMode.GRACEFUL);
244                 }
245                 processNextPendingRequest();
246             } else {
247                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
248             }
249         } finally {
250             this.lock.unlock();
251         }
252         fireCallbacks();
253     }
254 
255     private void processPendingRequests() {
256         final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
257         while (it.hasNext()) {
258             final LeaseRequest<T, C> request = it.next();
259             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
260             if (future.isCancelled()) {
261                 it.remove();
262                 continue;
263             }
264             final boolean completed = processPendingRequest(request);
265             if (request.isDone() || completed) {
266                 it.remove();
267             }
268             if (request.isDone()) {
269                 this.completedRequests.add(request);
270             }
271         }
272     }
273 
274     private void processNextPendingRequest() {
275         final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
276         while (it.hasNext()) {
277             final LeaseRequest<T, C> request = it.next();
278             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
279             if (future.isCancelled()) {
280                 it.remove();
281                 continue;
282             }
283             final boolean completed = processPendingRequest(request);
284             if (request.isDone() || completed) {
285                 it.remove();
286             }
287             if (request.isDone()) {
288                 this.completedRequests.add(request);
289             }
290             if (completed) {
291                 return;
292             }
293         }
294     }
295 
296     private boolean processPendingRequest(final LeaseRequest<T, C> request) {
297         final T route = request.getRoute();
298         final Object state = request.getState();
299         final Deadline deadline = request.getDeadline();
300 
301         if (deadline.isExpired()) {
302             request.failed(DeadlineTimeoutException.from(deadline));
303             return false;
304         }
305 
306         final PerRoutePool<T, C> pool = getPool(route);
307         PoolEntry<T, C> entry;
308         for (;;) {
309             entry = pool.getFree(state);
310             if (entry == null) {
311                 break;
312             }
313             if (entry.getExpiryDeadline().isExpired()) {
314                 entry.discardConnection(CloseMode.GRACEFUL);
315                 this.available.remove(entry);
316                 pool.free(entry, false);
317             } else {
318                 break;
319             }
320         }
321         if (entry != null) {
322             this.available.remove(entry);
323             this.leased.add(entry);
324             request.completed(entry);
325             if (this.connPoolListener != null) {
326                 this.connPoolListener.onLease(entry.getRoute(), this);
327             }
328             return true;
329         }
330 
331         // New connection is needed
332         final int maxPerRoute = getMax(route);
333         // Shrink the pool prior to allocating a new connection
334         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
335         if (excess > 0) {
336             for (int i = 0; i < excess; i++) {
337                 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
338                 if (lastUsed == null) {
339                     break;
340                 }
341                 lastUsed.discardConnection(CloseMode.GRACEFUL);
342                 this.available.remove(lastUsed);
343                 pool.remove(lastUsed);
344             }
345         }
346 
347         if (pool.getAllocatedCount() < maxPerRoute) {
348             final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
349             if (freeCapacity == 0) {
350                 return false;
351             }
352             final int totalAvailable = this.available.size();
353             if (totalAvailable > freeCapacity - 1) {
354                 if (!this.available.isEmpty()) {
355                     final PoolEntry<T, C> lastUsed = this.available.removeLast();
356                     lastUsed.discardConnection(CloseMode.GRACEFUL);
357                     final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
358                     otherpool.remove(lastUsed);
359                 }
360             }
361 
362             entry = pool.createEntry(this.timeToLive);
363             this.leased.add(entry);
364             request.completed(entry);
365             if (this.connPoolListener != null) {
366                 this.connPoolListener.onLease(entry.getRoute(), this);
367             }
368             return true;
369         }
370         return false;
371     }
372 
373     private void fireCallbacks() {
374         LeaseRequest<T, C> request;
375         while ((request = this.completedRequests.poll()) != null) {
376             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
377             final Exception ex = request.getException();
378             final PoolEntry<T, C> result = request.getResult();
379             boolean successfullyCompleted = false;
380             if (ex != null) {
381                 future.failed(ex);
382             } else if (result != null) {
383                 if (future.completed(result)) {
384                     successfullyCompleted = true;
385                 }
386             } else {
387                 future.cancel();
388             }
389             if (!successfullyCompleted) {
390                 release(result, true);
391             }
392         }
393     }
394 
395     public void validatePendingRequests() {
396         this.lock.lock();
397         try {
398             final long now = System.currentTimeMillis();
399             final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
400             while (it.hasNext()) {
401                 final LeaseRequest<T, C> request = it.next();
402                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
403                 if (future.isCancelled() && !request.isDone()) {
404                     it.remove();
405                 } else {
406                     final Deadline deadline = request.getDeadline();
407                     if (deadline.isBefore(now)) {
408                         request.failed(DeadlineTimeoutException.from(deadline));
409                     }
410                     if (request.isDone()) {
411                         it.remove();
412                         this.completedRequests.add(request);
413                     }
414                 }
415             }
416         } finally {
417             this.lock.unlock();
418         }
419         fireCallbacks();
420     }
421 
422     private int getMax(final T route) {
423         final Integer v = this.maxPerRoute.get(route);
424         if (v != null) {
425             return v;
426         }
427         return this.defaultMaxPerRoute;
428     }
429 
430     @Override
431     public void setMaxTotal(final int max) {
432         Args.positive(max, "Max value");
433         this.lock.lock();
434         try {
435             this.maxTotal = max;
436         } finally {
437             this.lock.unlock();
438         }
439     }
440 
441     @Override
442     public int getMaxTotal() {
443         this.lock.lock();
444         try {
445             return this.maxTotal;
446         } finally {
447             this.lock.unlock();
448         }
449     }
450 
451     @Override
452     public void setDefaultMaxPerRoute(final int max) {
453         Args.positive(max, "Max value");
454         this.lock.lock();
455         try {
456             this.defaultMaxPerRoute = max;
457         } finally {
458             this.lock.unlock();
459         }
460     }
461 
462     @Override
463     public int getDefaultMaxPerRoute() {
464         this.lock.lock();
465         try {
466             return this.defaultMaxPerRoute;
467         } finally {
468             this.lock.unlock();
469         }
470     }
471 
472     @Override
473     public void setMaxPerRoute(final T route, final int max) {
474         Args.notNull(route, "Route");
475         this.lock.lock();
476         try {
477             if (max > -1) {
478                 this.maxPerRoute.put(route, max);
479             } else {
480                 this.maxPerRoute.remove(route);
481             }
482         } finally {
483             this.lock.unlock();
484         }
485     }
486 
487     @Override
488     public int getMaxPerRoute(final T route) {
489         Args.notNull(route, "Route");
490         this.lock.lock();
491         try {
492             return getMax(route);
493         } finally {
494             this.lock.unlock();
495         }
496     }
497 
498     @Override
499     public PoolStats getTotalStats() {
500         this.lock.lock();
501         try {
502             return new PoolStats(
503                     this.leased.size(),
504                     this.leasingRequests.size(),
505                     this.available.size(),
506                     this.maxTotal);
507         } finally {
508             this.lock.unlock();
509         }
510     }
511 
512     @Override
513     public PoolStats getStats(final T route) {
514         Args.notNull(route, "Route");
515         this.lock.lock();
516         try {
517             final PerRoutePool<T, C> pool = getPool(route);
518             int pendingCount = 0;
519             for (final LeaseRequest<T, C> request: leasingRequests) {
520                 if (LangUtils.equals(route, request.getRoute())) {
521                     pendingCount++;
522                 }
523             }
524             return new PoolStats(
525                     pool.getLeasedCount(),
526                     pendingCount,
527                     pool.getAvailableCount(),
528                     getMax(route));
529         } finally {
530             this.lock.unlock();
531         }
532     }
533 
534     /**
535      * Returns snapshot of all knows routes
536      *
537      * @since 4.4
538      */
539     @Override
540     public Set<T> getRoutes() {
541         this.lock.lock();
542         try {
543             return new HashSet<>(routeToPool.keySet());
544         } finally {
545             this.lock.unlock();
546         }
547     }
548 
549     /**
550      * Enumerates all available connections.
551      *
552      * @since 4.3
553      */
554     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
555         this.lock.lock();
556         try {
557             final Iterator<PoolEntry<T, C>> it = this.available.iterator();
558             while (it.hasNext()) {
559                 final PoolEntry<T, C> entry = it.next();
560                 callback.execute(entry);
561                 if (!entry.hasConnection()) {
562                     final PerRoutePool<T, C> pool = getPool(entry.getRoute());
563                     pool.remove(entry);
564                     it.remove();
565                 }
566             }
567             processPendingRequests();
568             purgePoolMap();
569         } finally {
570             this.lock.unlock();
571         }
572     }
573 
574     /**
575      * Enumerates all leased connections.
576      *
577      * @since 4.3
578      */
579     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
580         this.lock.lock();
581         try {
582             final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
583             while (it.hasNext()) {
584                 final PoolEntry<T, C> entry = it.next();
585                 callback.execute(entry);
586             }
587             processPendingRequests();
588         } finally {
589             this.lock.unlock();
590         }
591     }
592 
593     private void purgePoolMap() {
594         final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
595         while (it.hasNext()) {
596             final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
597             final PerRoutePool<T, C> pool = entry.getValue();
598             if (pool.getAllocatedCount() == 0) {
599                 it.remove();
600             }
601         }
602     }
603 
604     @Override
605     public void closeIdle(final TimeValue idleTime) {
606         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
607         enumAvailable(new Callback<PoolEntry<T, C>>() {
608 
609             @Override
610             public void execute(final PoolEntry<T, C> entry) {
611                 if (entry.getUpdated() <= deadline) {
612                     entry.discardConnection(CloseMode.GRACEFUL);
613                 }
614             }
615 
616         });
617     }
618 
619     @Override
620     public void closeExpired() {
621         final long now = System.currentTimeMillis();
622         enumAvailable(new Callback<PoolEntry<T, C>>() {
623 
624             @Override
625             public void execute(final PoolEntry<T, C> entry) {
626                 if (entry.getExpiryDeadline().isBefore(now)) {
627                     entry.discardConnection(CloseMode.GRACEFUL);
628                 }
629             }
630 
631         });
632     }
633 
634     @Override
635     public String toString() {
636         final StringBuilder buffer = new StringBuilder();
637         buffer.append("[leased: ");
638         buffer.append(this.leased.size());
639         buffer.append("][available: ");
640         buffer.append(this.available.size());
641         buffer.append("][pending: ");
642         buffer.append(this.leasingRequests.size());
643         buffer.append("]");
644         return buffer.toString();
645     }
646 
647 
648     static class LeaseRequest<T, C extends ModalCloseable> {
649 
650         private final T route;
651         private final Object state;
652         private final Deadline deadline;
653         private final BasicFuture<PoolEntry<T, C>> future;
654         private final AtomicBoolean completed;
655         private volatile PoolEntry<T, C> result;
656         private volatile Exception ex;
657 
658         /**
659          * Constructor
660          *
661          * @param route route
662          * @param state state
663          * @param requestTimeout timeout to wait in a request queue until kicked off
664          * @param future future callback
665          */
666         public LeaseRequest(
667                 final T route,
668                 final Object state,
669                 final Timeout requestTimeout,
670                 final BasicFuture<PoolEntry<T, C>> future) {
671             super();
672             this.route = route;
673             this.state = state;
674             this.deadline = Deadline.calculate(requestTimeout);
675             this.future = future;
676             this.completed = new AtomicBoolean(false);
677         }
678 
679         public T getRoute() {
680             return this.route;
681         }
682 
683         public Object getState() {
684             return this.state;
685         }
686 
687         public Deadline getDeadline() {
688             return this.deadline;
689         }
690 
691         public boolean isDone() {
692             return this.completed.get();
693         }
694 
695         public void failed(final Exception ex) {
696             if (this.completed.compareAndSet(false, true)) {
697                 this.ex = ex;
698             }
699         }
700 
701         public void completed(final PoolEntry<T, C> result) {
702             if (this.completed.compareAndSet(false, true)) {
703                 this.result = result;
704             }
705         }
706 
707         public BasicFuture<PoolEntry<T, C>> getFuture() {
708             return this.future;
709         }
710 
711         public PoolEntry<T, C> getResult() {
712             return this.result;
713         }
714 
715         public Exception getException() {
716             return this.ex;
717         }
718 
719         @Override
720         public String toString() {
721             final StringBuilder buffer = new StringBuilder();
722             buffer.append("[");
723             buffer.append(this.route);
724             buffer.append("][");
725             buffer.append(this.state);
726             buffer.append("]");
727             return buffer.toString();
728         }
729 
730     }
731 
732     static class PerRoutePool<T, C extends ModalCloseable> {
733 
734         private final T route;
735         private final Set<PoolEntry<T, C>> leased;
736         private final LinkedList<PoolEntry<T, C>> available;
737         private final DisposalCallback<C> disposalCallback;
738 
739         PerRoutePool(final T route, final DisposalCallback<C> disposalCallback) {
740             super();
741             this.route = route;
742             this.disposalCallback = disposalCallback;
743             this.leased = new HashSet<>();
744             this.available = new LinkedList<>();
745         }
746 
747         public final T getRoute() {
748             return route;
749         }
750 
751         public int getLeasedCount() {
752             return this.leased.size();
753         }
754 
755         public int getAvailableCount() {
756             return this.available.size();
757         }
758 
759         public int getAllocatedCount() {
760             return this.available.size() + this.leased.size();
761         }
762 
763         public PoolEntry<T, C> getFree(final Object state) {
764             if (!this.available.isEmpty()) {
765                 if (state != null) {
766                     final Iterator<PoolEntry<T, C>> it = this.available.iterator();
767                     while (it.hasNext()) {
768                         final PoolEntry<T, C> entry = it.next();
769                         if (state.equals(entry.getState())) {
770                             it.remove();
771                             this.leased.add(entry);
772                             return entry;
773                         }
774                     }
775                 }
776                 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
777                 while (it.hasNext()) {
778                     final PoolEntry<T, C> entry = it.next();
779                     if (entry.getState() == null) {
780                         it.remove();
781                         this.leased.add(entry);
782                         return entry;
783                     }
784                 }
785             }
786             return null;
787         }
788 
789         public PoolEntry<T, C> getLastUsed() {
790             return this.available.peekLast();
791         }
792 
793         public boolean remove(final PoolEntry<T, C> entry) {
794             return this.available.remove(entry) || this.leased.remove(entry);
795         }
796 
797         public void free(final PoolEntry<T, C> entry, final boolean reusable) {
798             final boolean found = this.leased.remove(entry);
799             Asserts.check(found, "Entry %s has not been leased from this pool", entry);
800             if (reusable) {
801                 this.available.addFirst(entry);
802             }
803         }
804 
805         public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
806             final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
807             this.leased.add(entry);
808             return entry;
809         }
810 
811         public void shutdown(final CloseMode closeMode) {
812             PoolEntry<T, C> availableEntry;
813             while ((availableEntry = available.poll()) != null) {
814                 availableEntry.discardConnection(closeMode);
815             }
816             for (final PoolEntry<T, C> entry: this.leased) {
817                 entry.discardConnection(closeMode);
818             }
819             this.leased.clear();
820         }
821 
822         @Override
823         public String toString() {
824             final StringBuilder buffer = new StringBuilder();
825             buffer.append("[route: ");
826             buffer.append(this.route);
827             buffer.append("][leased: ");
828             buffer.append(this.leased.size());
829             buffer.append("][available: ");
830             buffer.append(this.available.size());
831             buffer.append("]");
832             return buffer.toString();
833         }
834 
835     }
836 }