View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.LinkedList;
33  import java.util.ListIterator;
34  import java.util.Map;
35  import java.util.Objects;
36  import java.util.Set;
37  import java.util.concurrent.ConcurrentLinkedQueue;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import org.apache.hc.core5.annotation.Contract;
47  import org.apache.hc.core5.annotation.ThreadingBehavior;
48  import org.apache.hc.core5.concurrent.BasicFuture;
49  import org.apache.hc.core5.concurrent.FutureCallback;
50  import org.apache.hc.core5.function.Callback;
51  import org.apache.hc.core5.io.CloseMode;
52  import org.apache.hc.core5.io.ModalCloseable;
53  import org.apache.hc.core5.util.Args;
54  import org.apache.hc.core5.util.Asserts;
55  import org.apache.hc.core5.util.Deadline;
56  import org.apache.hc.core5.util.DeadlineTimeoutException;
57  import org.apache.hc.core5.util.TimeValue;
58  import org.apache.hc.core5.util.Timeout;
59  
60  /**
61   * Connection pool with strict connection limit guarantees.
62   *
63   * @param <T> route
64   * @param <C> connection object
65   *
66   * @since 4.2
67   */
68  @Contract(threading = ThreadingBehavior.SAFE)
69  public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
70  
71      private final TimeValue timeToLive;
72      private final PoolReusePolicy policy;
73      private final DisposalCallback<C> disposalCallback;
74      private final ConnPoolListener<T> connPoolListener;
75      private final Map<T, PerRoutePool<T, C>> routeToPool;
76      private final LinkedList<LeaseRequest<T, C>> pendingRequests;
77      private final Set<PoolEntry<T, C>> leased;
78      private final LinkedList<PoolEntry<T, C>> available;
79      private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
80      private final Map<T, Integer> maxPerRoute;
81      private final Lock lock;
82      private final AtomicBoolean isShutDown;
83  
84      private volatile int defaultMaxPerRoute;
85      private volatile int maxTotal;
86  
87      /**
88       * @since 5.0
89       */
90      public StrictConnPool(
91              final int defaultMaxPerRoute,
92              final int maxTotal,
93              final TimeValue timeToLive,
94              final PoolReusePolicy policy,
95              final DisposalCallback<C> disposalCallback,
96              final ConnPoolListener<T> connPoolListener) {
97          super();
98          Args.positive(defaultMaxPerRoute, "Max per route value");
99          Args.positive(maxTotal, "Max total value");
100         this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
101         this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
102         this.disposalCallback = disposalCallback;
103         this.connPoolListener = connPoolListener;
104         this.routeToPool = new HashMap<>();
105         this.pendingRequests = new LinkedList<>();
106         this.leased = new HashSet<>();
107         this.available = new LinkedList<>();
108         this.completedRequests = new ConcurrentLinkedQueue<>();
109         this.maxPerRoute = new HashMap<>();
110         this.lock = new ReentrantLock();
111         this.isShutDown = new AtomicBoolean(false);
112         this.defaultMaxPerRoute = defaultMaxPerRoute;
113         this.maxTotal = maxTotal;
114     }
115 
116     /**
117      * @since 5.0
118      */
119     public StrictConnPool(
120             final int defaultMaxPerRoute,
121             final int maxTotal,
122             final TimeValue timeToLive,
123             final PoolReusePolicy policy,
124             final ConnPoolListener<T> connPoolListener) {
125         this(defaultMaxPerRoute, maxTotal, timeToLive, policy, null, connPoolListener);
126     }
127 
128     public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
129         this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null);
130     }
131 
132     public boolean isShutdown() {
133         return this.isShutDown.get();
134     }
135 
136     @Override
137     public void close(final CloseMode closeMode) {
138         if (this.isShutDown.compareAndSet(false, true)) {
139             fireCallbacks();
140             this.lock.lock();
141             try {
142                 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
143                     pool.shutdown(closeMode);
144                 }
145                 this.routeToPool.clear();
146                 this.leased.clear();
147                 this.available.clear();
148                 this.pendingRequests.clear();
149             } finally {
150                 this.lock.unlock();
151             }
152         }
153     }
154 
155     @Override
156     public void close() {
157         close(CloseMode.GRACEFUL);
158     }
159 
160     private PerRoutePool<T, C> getPool(final T route) {
161         PerRoutePool<T, C> pool = this.routeToPool.get(route);
162         if (pool == null) {
163             pool = new PerRoutePool<>(route, this.disposalCallback);
164             this.routeToPool.put(route, pool);
165         }
166         return pool;
167     }
168 
169     @Override
170     public Future<PoolEntry<T, C>> lease(
171             final T route, final Object state,
172             final Timeout requestTimeout,
173             final FutureCallback<PoolEntry<T, C>> callback) {
174         Args.notNull(route, "Route");
175         Args.notNull(requestTimeout, "Request timeout");
176         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
177         final Deadline deadline = Deadline.calculate(requestTimeout);
178         final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
179 
180             @Override
181             public synchronized PoolEntry<T, C> get(
182                     final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
183                 try {
184                     return super.get(timeout, unit);
185                 } catch (final TimeoutException ex) {
186                     cancel();
187                     throw ex;
188                 }
189             }
190 
191         };
192         final boolean acquiredLock;
193 
194         try {
195             acquiredLock = this.lock.tryLock(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
196         } catch (final InterruptedException interruptedException) {
197             Thread.currentThread().interrupt();
198             future.cancel();
199             return future;
200         }
201 
202         if (acquiredLock) {
203             try {
204                 final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
205                 final boolean completed = processPendingRequest(request);
206                 if (!request.isDone() && !completed) {
207                     this.pendingRequests.add(request);
208                 }
209                 if (request.isDone()) {
210                     this.completedRequests.add(request);
211                 }
212             } finally {
213                 this.lock.unlock();
214             }
215             fireCallbacks();
216         } else {
217             future.failed(DeadlineTimeoutException.from(deadline));
218         }
219 
220         return future;
221     }
222 
223     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
224         return lease(route, state, Timeout.DISABLED, null);
225     }
226 
227     @Override
228     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
229         if (entry == null) {
230             return;
231         }
232         if (this.isShutDown.get()) {
233             return;
234         }
235         if (!reusable) {
236             entry.discardConnection(CloseMode.GRACEFUL);
237         }
238         this.lock.lock();
239         try {
240             if (this.leased.remove(entry)) {
241                 if (this.connPoolListener != null) {
242                     this.connPoolListener.onRelease(entry.getRoute(), this);
243                 }
244                 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
245                 final boolean keepAlive = entry.hasConnection() && reusable;
246                 pool.free(entry, keepAlive);
247                 if (keepAlive) {
248                     switch (policy) {
249                         case LIFO:
250                             this.available.addFirst(entry);
251                             break;
252                         case FIFO:
253                             this.available.addLast(entry);
254                             break;
255                         default:
256                             throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
257                     }
258                 } else {
259                     entry.discardConnection(CloseMode.GRACEFUL);
260                 }
261                 processNextPendingRequest();
262             } else {
263                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
264             }
265         } finally {
266             this.lock.unlock();
267         }
268         fireCallbacks();
269     }
270 
271     private void processPendingRequests() {
272         final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
273         while (it.hasNext()) {
274             final LeaseRequest<T, C> request = it.next();
275             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
276             if (future.isCancelled()) {
277                 it.remove();
278                 continue;
279             }
280             final boolean completed = processPendingRequest(request);
281             if (request.isDone() || completed) {
282                 it.remove();
283             }
284             if (request.isDone()) {
285                 this.completedRequests.add(request);
286             }
287         }
288     }
289 
290     private void processNextPendingRequest() {
291         final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
292         while (it.hasNext()) {
293             final LeaseRequest<T, C> request = it.next();
294             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
295             if (future.isCancelled()) {
296                 it.remove();
297                 continue;
298             }
299             final boolean completed = processPendingRequest(request);
300             if (request.isDone() || completed) {
301                 it.remove();
302             }
303             if (request.isDone()) {
304                 this.completedRequests.add(request);
305             }
306             if (completed) {
307                 return;
308             }
309         }
310     }
311 
312     private boolean processPendingRequest(final LeaseRequest<T, C> request) {
313         final T route = request.getRoute();
314         final Object state = request.getState();
315         final Deadline deadline = request.getDeadline();
316 
317         if (deadline.isExpired()) {
318             request.failed(DeadlineTimeoutException.from(deadline));
319             return false;
320         }
321 
322         final PerRoutePool<T, C> pool = getPool(route);
323         PoolEntry<T, C> entry;
324         for (;;) {
325             entry = pool.getFree(state);
326             if (entry == null) {
327                 break;
328             }
329             if (entry.getExpiryDeadline().isExpired()) {
330                 entry.discardConnection(CloseMode.GRACEFUL);
331                 this.available.remove(entry);
332                 pool.free(entry, false);
333             } else {
334                 break;
335             }
336         }
337         if (entry != null) {
338             this.available.remove(entry);
339             this.leased.add(entry);
340             request.completed(entry);
341             if (this.connPoolListener != null) {
342                 this.connPoolListener.onLease(entry.getRoute(), this);
343             }
344             return true;
345         }
346 
347         // New connection is needed
348         final int maxPerRoute = getMax(route);
349         // Shrink the pool prior to allocating a new connection
350         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
351         if (excess > 0) {
352             for (int i = 0; i < excess; i++) {
353                 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
354                 if (lastUsed == null) {
355                     break;
356                 }
357                 lastUsed.discardConnection(CloseMode.GRACEFUL);
358                 this.available.remove(lastUsed);
359                 pool.remove(lastUsed);
360             }
361         }
362 
363         if (pool.getAllocatedCount() < maxPerRoute) {
364             final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
365             if (freeCapacity == 0) {
366                 return false;
367             }
368             final int totalAvailable = this.available.size();
369             if (totalAvailable > freeCapacity - 1) {
370                 final PoolEntry<T, C> lastUsed = this.available.removeLast();
371                 lastUsed.discardConnection(CloseMode.GRACEFUL);
372                 final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
373                 otherpool.remove(lastUsed);
374             }
375 
376             entry = pool.createEntry(this.timeToLive);
377             this.leased.add(entry);
378             request.completed(entry);
379             if (this.connPoolListener != null) {
380                 this.connPoolListener.onLease(entry.getRoute(), this);
381             }
382             return true;
383         }
384         return false;
385     }
386 
387     private void fireCallbacks() {
388         LeaseRequest<T, C> request;
389         while ((request = this.completedRequests.poll()) != null) {
390             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
391             final Exception ex = request.getException();
392             final PoolEntry<T, C> result = request.getResult();
393             boolean successfullyCompleted = false;
394             if (ex != null) {
395                 future.failed(ex);
396             } else if (result != null) {
397                 if (future.completed(result)) {
398                     successfullyCompleted = true;
399                 }
400             } else {
401                 future.cancel();
402             }
403             if (!successfullyCompleted) {
404                 release(result, true);
405             }
406         }
407     }
408 
409     public void validatePendingRequests() {
410         this.lock.lock();
411         try {
412             final long now = System.currentTimeMillis();
413             final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
414             while (it.hasNext()) {
415                 final LeaseRequest<T, C> request = it.next();
416                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
417                 if (future.isCancelled() && !request.isDone()) {
418                     it.remove();
419                 } else {
420                     final Deadline deadline = request.getDeadline();
421                     if (deadline.isBefore(now)) {
422                         request.failed(DeadlineTimeoutException.from(deadline));
423                     }
424                     if (request.isDone()) {
425                         it.remove();
426                         this.completedRequests.add(request);
427                     }
428                 }
429             }
430         } finally {
431             this.lock.unlock();
432         }
433         fireCallbacks();
434     }
435 
436     private int getMax(final T route) {
437         final Integer v = this.maxPerRoute.get(route);
438         if (v != null) {
439             return v;
440         }
441         return this.defaultMaxPerRoute;
442     }
443 
444     @Override
445     public void setMaxTotal(final int max) {
446         Args.positive(max, "Max value");
447         this.lock.lock();
448         try {
449             this.maxTotal = max;
450         } finally {
451             this.lock.unlock();
452         }
453     }
454 
455     @Override
456     public int getMaxTotal() {
457         this.lock.lock();
458         try {
459             return this.maxTotal;
460         } finally {
461             this.lock.unlock();
462         }
463     }
464 
465     @Override
466     public void setDefaultMaxPerRoute(final int max) {
467         Args.positive(max, "Max value");
468         this.lock.lock();
469         try {
470             this.defaultMaxPerRoute = max;
471         } finally {
472             this.lock.unlock();
473         }
474     }
475 
476     @Override
477     public int getDefaultMaxPerRoute() {
478         this.lock.lock();
479         try {
480             return this.defaultMaxPerRoute;
481         } finally {
482             this.lock.unlock();
483         }
484     }
485 
486     @Override
487     public void setMaxPerRoute(final T route, final int max) {
488         Args.notNull(route, "Route");
489         this.lock.lock();
490         try {
491             if (max > -1) {
492                 this.maxPerRoute.put(route, max);
493             } else {
494                 this.maxPerRoute.remove(route);
495             }
496         } finally {
497             this.lock.unlock();
498         }
499     }
500 
501     @Override
502     public int getMaxPerRoute(final T route) {
503         Args.notNull(route, "Route");
504         this.lock.lock();
505         try {
506             return getMax(route);
507         } finally {
508             this.lock.unlock();
509         }
510     }
511 
512     @Override
513     public PoolStats getTotalStats() {
514         this.lock.lock();
515         try {
516             return new PoolStats(
517                     this.leased.size(),
518                     this.pendingRequests.size(),
519                     this.available.size(),
520                     this.maxTotal);
521         } finally {
522             this.lock.unlock();
523         }
524     }
525 
526     @Override
527     public PoolStats getStats(final T route) {
528         Args.notNull(route, "Route");
529         this.lock.lock();
530         try {
531             final PerRoutePool<T, C> pool = getPool(route);
532             int pendingCount = 0;
533             for (final LeaseRequest<T, C> request: pendingRequests) {
534                 if (Objects.equals(route, request.getRoute())) {
535                     pendingCount++;
536                 }
537             }
538             return new PoolStats(
539                     pool.getLeasedCount(),
540                     pendingCount,
541                     pool.getAvailableCount(),
542                     getMax(route));
543         } finally {
544             this.lock.unlock();
545         }
546     }
547 
548     /**
549      * Returns snapshot of all knows routes
550      *
551      * @since 4.4
552      */
553     @Override
554     public Set<T> getRoutes() {
555         this.lock.lock();
556         try {
557             return new HashSet<>(routeToPool.keySet());
558         } finally {
559             this.lock.unlock();
560         }
561     }
562 
563     /**
564      * Enumerates all available connections.
565      *
566      * @since 4.3
567      */
568     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
569         this.lock.lock();
570         try {
571             final Iterator<PoolEntry<T, C>> it = this.available.iterator();
572             while (it.hasNext()) {
573                 final PoolEntry<T, C> entry = it.next();
574                 callback.execute(entry);
575                 if (!entry.hasConnection()) {
576                     final PerRoutePool<T, C> pool = getPool(entry.getRoute());
577                     pool.remove(entry);
578                     it.remove();
579                 }
580             }
581             processPendingRequests();
582             purgePoolMap();
583         } finally {
584             this.lock.unlock();
585         }
586     }
587 
588     /**
589      * Enumerates all leased connections.
590      *
591      * @since 4.3
592      */
593     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
594         this.lock.lock();
595         try {
596             final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
597             while (it.hasNext()) {
598                 final PoolEntry<T, C> entry = it.next();
599                 callback.execute(entry);
600             }
601             processPendingRequests();
602         } finally {
603             this.lock.unlock();
604         }
605     }
606 
607     private void purgePoolMap() {
608         final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
609         while (it.hasNext()) {
610             final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
611             final PerRoutePool<T, C> pool = entry.getValue();
612             if (pool.getAllocatedCount() == 0) {
613                 it.remove();
614             }
615         }
616     }
617 
618     @Override
619     public void closeIdle(final TimeValue idleTime) {
620         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
621         enumAvailable(entry -> {
622             if (entry.getUpdated() <= deadline) {
623                 entry.discardConnection(CloseMode.GRACEFUL);
624             }
625         });
626     }
627 
628     @Override
629     public void closeExpired() {
630         final long now = System.currentTimeMillis();
631         enumAvailable(entry -> {
632             if (entry.getExpiryDeadline().isBefore(now)) {
633                 entry.discardConnection(CloseMode.GRACEFUL);
634             }
635         });
636     }
637 
638     @Override
639     public String toString() {
640         final StringBuilder buffer = new StringBuilder();
641         buffer.append("[leased: ");
642         buffer.append(this.leased.size());
643         buffer.append("][available: ");
644         buffer.append(this.available.size());
645         buffer.append("][pending: ");
646         buffer.append(this.pendingRequests.size());
647         buffer.append("]");
648         return buffer.toString();
649     }
650 
651 
652     static class LeaseRequest<T, C extends ModalCloseable> {
653 
654         private final T route;
655         private final Object state;
656         private final Deadline deadline;
657         private final BasicFuture<PoolEntry<T, C>> future;
658         // 'completed' is used internally to guard setting
659         // 'result' and 'ex', but mustn't be used by 'isDone()'.
660         private final AtomicBoolean completed;
661         private volatile PoolEntry<T, C> result;
662         private volatile Exception ex;
663 
664         /**
665          * Constructor
666          *
667          * @param route route
668          * @param state state
669          * @param requestTimeout timeout to wait in a request queue until kicked off
670          * @param future future callback
671          */
672         public LeaseRequest(
673                 final T route,
674                 final Object state,
675                 final Timeout requestTimeout,
676                 final BasicFuture<PoolEntry<T, C>> future) {
677             super();
678             this.route = route;
679             this.state = state;
680             this.deadline = Deadline.calculate(requestTimeout);
681             this.future = future;
682             this.completed = new AtomicBoolean(false);
683         }
684 
685         public T getRoute() {
686             return this.route;
687         }
688 
689         public Object getState() {
690             return this.state;
691         }
692 
693         public Deadline getDeadline() {
694             return this.deadline;
695         }
696 
697         public boolean isDone() {
698             // This method must not use 'completed.get()' which would result in a race
699             // where a caller may observe completed=true while neither result nor ex
700             // have been set yet.
701             return ex != null || result != null;
702         }
703 
704         public void failed(final Exception ex) {
705             if (this.completed.compareAndSet(false, true)) {
706                 this.ex = ex;
707             }
708         }
709 
710         public void completed(final PoolEntry<T, C> result) {
711             if (this.completed.compareAndSet(false, true)) {
712                 this.result = result;
713             }
714         }
715 
716         public BasicFuture<PoolEntry<T, C>> getFuture() {
717             return this.future;
718         }
719 
720         public PoolEntry<T, C> getResult() {
721             return this.result;
722         }
723 
724         public Exception getException() {
725             return this.ex;
726         }
727 
728         @Override
729         public String toString() {
730             final StringBuilder buffer = new StringBuilder();
731             buffer.append("[");
732             buffer.append(this.route);
733             buffer.append("][");
734             buffer.append(this.state);
735             buffer.append("]");
736             return buffer.toString();
737         }
738 
739     }
740 
741     static class PerRoutePool<T, C extends ModalCloseable> {
742 
743         private final T route;
744         private final Set<PoolEntry<T, C>> leased;
745         private final LinkedList<PoolEntry<T, C>> available;
746         private final DisposalCallback<C> disposalCallback;
747 
748         PerRoutePool(final T route, final DisposalCallback<C> disposalCallback) {
749             super();
750             this.route = route;
751             this.disposalCallback = disposalCallback;
752             this.leased = new HashSet<>();
753             this.available = new LinkedList<>();
754         }
755 
756         public final T getRoute() {
757             return route;
758         }
759 
760         public int getLeasedCount() {
761             return this.leased.size();
762         }
763 
764         public int getAvailableCount() {
765             return this.available.size();
766         }
767 
768         public int getAllocatedCount() {
769             return this.available.size() + this.leased.size();
770         }
771 
772         public PoolEntry<T, C> getFree(final Object state) {
773             if (!this.available.isEmpty()) {
774                 if (state != null) {
775                     final Iterator<PoolEntry<T, C>> it = this.available.iterator();
776                     while (it.hasNext()) {
777                         final PoolEntry<T, C> entry = it.next();
778                         if (state.equals(entry.getState())) {
779                             it.remove();
780                             this.leased.add(entry);
781                             return entry;
782                         }
783                     }
784                 }
785                 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
786                 while (it.hasNext()) {
787                     final PoolEntry<T, C> entry = it.next();
788                     if (entry.getState() == null) {
789                         it.remove();
790                         this.leased.add(entry);
791                         return entry;
792                     }
793                 }
794             }
795             return null;
796         }
797 
798         public PoolEntry<T, C> getLastUsed() {
799             return this.available.peekLast();
800         }
801 
802         public boolean remove(final PoolEntry<T, C> entry) {
803             return this.available.remove(entry) || this.leased.remove(entry);
804         }
805 
806         public void free(final PoolEntry<T, C> entry, final boolean reusable) {
807             final boolean found = this.leased.remove(entry);
808             Asserts.check(found, "Entry %s has not been leased from this pool", entry);
809             if (reusable) {
810                 this.available.addFirst(entry);
811             }
812         }
813 
814         public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
815             final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
816             this.leased.add(entry);
817             return entry;
818         }
819 
820         public void shutdown(final CloseMode closeMode) {
821             PoolEntry<T, C> availableEntry;
822             while ((availableEntry = available.poll()) != null) {
823                 availableEntry.discardConnection(closeMode);
824             }
825             for (final PoolEntry<T, C> entry: this.leased) {
826                 entry.discardConnection(closeMode);
827             }
828             this.leased.clear();
829         }
830 
831         @Override
832         public String toString() {
833             final StringBuilder buffer = new StringBuilder();
834             buffer.append("[route: ");
835             buffer.append(this.route);
836             buffer.append("][leased: ");
837             buffer.append(this.leased.size());
838             buffer.append("][available: ");
839             buffer.append(this.available.size());
840             buffer.append("]");
841             return buffer.toString();
842         }
843 
844     }
845 }