View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.LinkedList;
33  import java.util.ListIterator;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.TimeoutException;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.locks.Lock;
43  import java.util.concurrent.locks.ReentrantLock;
44  
45  import org.apache.hc.core5.annotation.Contract;
46  import org.apache.hc.core5.annotation.ThreadingBehavior;
47  import org.apache.hc.core5.concurrent.BasicFuture;
48  import org.apache.hc.core5.concurrent.FutureCallback;
49  import org.apache.hc.core5.function.Callback;
50  import org.apache.hc.core5.io.CloseMode;
51  import org.apache.hc.core5.io.ModalCloseable;
52  import org.apache.hc.core5.util.Args;
53  import org.apache.hc.core5.util.Asserts;
54  import org.apache.hc.core5.util.Deadline;
55  import org.apache.hc.core5.util.DeadlineTimeoutException;
56  import org.apache.hc.core5.util.LangUtils;
57  import org.apache.hc.core5.util.TimeValue;
58  import org.apache.hc.core5.util.Timeout;
59  
60  /**
61   * Connection pool with strict connection limit guarantees.
62   *
63   * @param <T> route
64   * @param <C> connection object
65   *
66   * @since 4.2
67   */
68  @Contract(threading = ThreadingBehavior.SAFE)
69  public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
70  
71      private final TimeValue timeToLive;
72      private final PoolReusePolicy policy;
73      private final DisposalCallback<C> disposalCallback;
74      private final ConnPoolListener<T> connPoolListener;
75      private final Map<T, PerRoutePool<T, C>> routeToPool;
76      private final LinkedList<LeaseRequest<T, C>> leasingRequests;
77      private final Set<PoolEntry<T, C>> leased;
78      private final LinkedList<PoolEntry<T, C>> available;
79      private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
80      private final Map<T, Integer> maxPerRoute;
81      private final Lock lock;
82      private final AtomicBoolean isShutDown;
83  
84      private volatile int defaultMaxPerRoute;
85      private volatile int maxTotal;
86  
87      /**
88       * @since 5.0
89       */
90      public StrictConnPool(
91              final int defaultMaxPerRoute,
92              final int maxTotal,
93              final TimeValue timeToLive,
94              final PoolReusePolicy policy,
95              final DisposalCallback<C> disposalCallback,
96              final ConnPoolListener<T> connPoolListener) {
97          super();
98          Args.positive(defaultMaxPerRoute, "Max per route value");
99          Args.positive(maxTotal, "Max total value");
100         this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
101         this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
102         this.disposalCallback = disposalCallback;
103         this.connPoolListener = connPoolListener;
104         this.routeToPool = new HashMap<>();
105         this.leasingRequests = new LinkedList<>();
106         this.leased = new HashSet<>();
107         this.available = new LinkedList<>();
108         this.completedRequests = new ConcurrentLinkedQueue<>();
109         this.maxPerRoute = new HashMap<>();
110         this.lock = new ReentrantLock();
111         this.isShutDown = new AtomicBoolean(false);
112         this.defaultMaxPerRoute = defaultMaxPerRoute;
113         this.maxTotal = maxTotal;
114     }
115 
116     /**
117      * @since 5.0
118      */
119     public StrictConnPool(
120             final int defaultMaxPerRoute,
121             final int maxTotal,
122             final TimeValue timeToLive,
123             final PoolReusePolicy policy,
124             final ConnPoolListener<T> connPoolListener) {
125         this(defaultMaxPerRoute, maxTotal, timeToLive, policy, null, connPoolListener);
126     }
127 
128     public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
129         this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null);
130     }
131 
132     public boolean isShutdown() {
133         return this.isShutDown.get();
134     }
135 
136     @Override
137     public void close(final CloseMode closeMode) {
138         if (this.isShutDown.compareAndSet(false, true)) {
139             fireCallbacks();
140             this.lock.lock();
141             try {
142                 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
143                     pool.shutdown(closeMode);
144                 }
145                 this.routeToPool.clear();
146                 this.leased.clear();
147                 this.available.clear();
148                 this.leasingRequests.clear();
149             } finally {
150                 this.lock.unlock();
151             }
152         }
153     }
154 
155     @Override
156     public void close() {
157         close(CloseMode.GRACEFUL);
158     }
159 
160     private PerRoutePool<T, C> getPool(final T route) {
161         PerRoutePool<T, C> pool = this.routeToPool.get(route);
162         if (pool == null) {
163             pool = new PerRoutePool<>(route, this.disposalCallback);
164             this.routeToPool.put(route, pool);
165         }
166         return pool;
167     }
168 
169     @Override
170     public Future<PoolEntry<T, C>> lease(
171             final T route, final Object state,
172             final Timeout requestTimeout,
173             final FutureCallback<PoolEntry<T, C>> callback) {
174         Args.notNull(route, "Route");
175         Args.notNull(requestTimeout, "Request timeout");
176         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
177         final Deadline deadline = Deadline.calculate(requestTimeout);
178         final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
179 
180             @Override
181             public synchronized PoolEntry<T, C> get(
182                     final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
183                 try {
184                     return super.get(timeout, unit);
185                 } catch (final TimeoutException ex) {
186                     cancel();
187                     throw ex;
188                 }
189             }
190 
191         };
192         final boolean acquiredLock;
193 
194         try {
195             acquiredLock = this.lock.tryLock(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
196         } catch (final InterruptedException interruptedException) {
197             Thread.currentThread().interrupt();
198             future.cancel();
199             return future;
200         }
201 
202         if (acquiredLock) {
203             try {
204                 final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
205                 final boolean completed = processPendingRequest(request);
206                 if (!request.isDone() && !completed) {
207                     this.leasingRequests.add(request);
208                 }
209                 if (request.isDone()) {
210                     this.completedRequests.add(request);
211                 }
212             } finally {
213                 this.lock.unlock();
214             }
215             fireCallbacks();
216         } else {
217             future.failed(DeadlineTimeoutException.from(deadline));
218         }
219 
220         return future;
221     }
222 
223     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
224         return lease(route, state, Timeout.DISABLED, null);
225     }
226 
227     @Override
228     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
229         if (entry == null) {
230             return;
231         }
232         if (this.isShutDown.get()) {
233             return;
234         }
235         if (!reusable) {
236             entry.discardConnection(CloseMode.GRACEFUL);
237         }
238         this.lock.lock();
239         try {
240             if (this.leased.remove(entry)) {
241                 if (this.connPoolListener != null) {
242                     this.connPoolListener.onRelease(entry.getRoute(), this);
243                 }
244                 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
245                 final boolean keepAlive = entry.hasConnection() && reusable;
246                 pool.free(entry, keepAlive);
247                 if (keepAlive) {
248                     switch (policy) {
249                         case LIFO:
250                             this.available.addFirst(entry);
251                             break;
252                         case FIFO:
253                             this.available.addLast(entry);
254                             break;
255                         default:
256                             throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
257                     }
258                 } else {
259                     entry.discardConnection(CloseMode.GRACEFUL);
260                 }
261                 processNextPendingRequest();
262             } else {
263                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
264             }
265         } finally {
266             this.lock.unlock();
267         }
268         fireCallbacks();
269     }
270 
271     private void processPendingRequests() {
272         final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
273         while (it.hasNext()) {
274             final LeaseRequest<T, C> request = it.next();
275             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
276             if (future.isCancelled()) {
277                 it.remove();
278                 continue;
279             }
280             final boolean completed = processPendingRequest(request);
281             if (request.isDone() || completed) {
282                 it.remove();
283             }
284             if (request.isDone()) {
285                 this.completedRequests.add(request);
286             }
287         }
288     }
289 
290     private void processNextPendingRequest() {
291         final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
292         while (it.hasNext()) {
293             final LeaseRequest<T, C> request = it.next();
294             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
295             if (future.isCancelled()) {
296                 it.remove();
297                 continue;
298             }
299             final boolean completed = processPendingRequest(request);
300             if (request.isDone() || completed) {
301                 it.remove();
302             }
303             if (request.isDone()) {
304                 this.completedRequests.add(request);
305             }
306             if (completed) {
307                 return;
308             }
309         }
310     }
311 
312     private boolean processPendingRequest(final LeaseRequest<T, C> request) {
313         final T route = request.getRoute();
314         final Object state = request.getState();
315         final Deadline deadline = request.getDeadline();
316 
317         if (deadline.isExpired()) {
318             request.failed(DeadlineTimeoutException.from(deadline));
319             return false;
320         }
321 
322         final PerRoutePool<T, C> pool = getPool(route);
323         PoolEntry<T, C> entry;
324         for (;;) {
325             entry = pool.getFree(state);
326             if (entry == null) {
327                 break;
328             }
329             if (entry.getExpiryDeadline().isExpired()) {
330                 entry.discardConnection(CloseMode.GRACEFUL);
331                 this.available.remove(entry);
332                 pool.free(entry, false);
333             } else {
334                 break;
335             }
336         }
337         if (entry != null) {
338             this.available.remove(entry);
339             this.leased.add(entry);
340             request.completed(entry);
341             if (this.connPoolListener != null) {
342                 this.connPoolListener.onLease(entry.getRoute(), this);
343             }
344             return true;
345         }
346 
347         // New connection is needed
348         final int maxPerRoute = getMax(route);
349         // Shrink the pool prior to allocating a new connection
350         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
351         if (excess > 0) {
352             for (int i = 0; i < excess; i++) {
353                 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
354                 if (lastUsed == null) {
355                     break;
356                 }
357                 lastUsed.discardConnection(CloseMode.GRACEFUL);
358                 this.available.remove(lastUsed);
359                 pool.remove(lastUsed);
360             }
361         }
362 
363         if (pool.getAllocatedCount() < maxPerRoute) {
364             final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
365             if (freeCapacity == 0) {
366                 return false;
367             }
368             final int totalAvailable = this.available.size();
369             if (totalAvailable > freeCapacity - 1) {
370                 if (!this.available.isEmpty()) {
371                     final PoolEntry<T, C> lastUsed = this.available.removeLast();
372                     lastUsed.discardConnection(CloseMode.GRACEFUL);
373                     final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
374                     otherpool.remove(lastUsed);
375                 }
376             }
377 
378             entry = pool.createEntry(this.timeToLive);
379             this.leased.add(entry);
380             request.completed(entry);
381             if (this.connPoolListener != null) {
382                 this.connPoolListener.onLease(entry.getRoute(), this);
383             }
384             return true;
385         }
386         return false;
387     }
388 
389     private void fireCallbacks() {
390         LeaseRequest<T, C> request;
391         while ((request = this.completedRequests.poll()) != null) {
392             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
393             final Exception ex = request.getException();
394             final PoolEntry<T, C> result = request.getResult();
395             boolean successfullyCompleted = false;
396             if (ex != null) {
397                 future.failed(ex);
398             } else if (result != null) {
399                 if (future.completed(result)) {
400                     successfullyCompleted = true;
401                 }
402             } else {
403                 future.cancel();
404             }
405             if (!successfullyCompleted) {
406                 release(result, true);
407             }
408         }
409     }
410 
411     public void validatePendingRequests() {
412         this.lock.lock();
413         try {
414             final long now = System.currentTimeMillis();
415             final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
416             while (it.hasNext()) {
417                 final LeaseRequest<T, C> request = it.next();
418                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
419                 if (future.isCancelled() && !request.isDone()) {
420                     it.remove();
421                 } else {
422                     final Deadline deadline = request.getDeadline();
423                     if (deadline.isBefore(now)) {
424                         request.failed(DeadlineTimeoutException.from(deadline));
425                     }
426                     if (request.isDone()) {
427                         it.remove();
428                         this.completedRequests.add(request);
429                     }
430                 }
431             }
432         } finally {
433             this.lock.unlock();
434         }
435         fireCallbacks();
436     }
437 
438     private int getMax(final T route) {
439         final Integer v = this.maxPerRoute.get(route);
440         if (v != null) {
441             return v;
442         }
443         return this.defaultMaxPerRoute;
444     }
445 
446     @Override
447     public void setMaxTotal(final int max) {
448         Args.positive(max, "Max value");
449         this.lock.lock();
450         try {
451             this.maxTotal = max;
452         } finally {
453             this.lock.unlock();
454         }
455     }
456 
457     @Override
458     public int getMaxTotal() {
459         this.lock.lock();
460         try {
461             return this.maxTotal;
462         } finally {
463             this.lock.unlock();
464         }
465     }
466 
467     @Override
468     public void setDefaultMaxPerRoute(final int max) {
469         Args.positive(max, "Max value");
470         this.lock.lock();
471         try {
472             this.defaultMaxPerRoute = max;
473         } finally {
474             this.lock.unlock();
475         }
476     }
477 
478     @Override
479     public int getDefaultMaxPerRoute() {
480         this.lock.lock();
481         try {
482             return this.defaultMaxPerRoute;
483         } finally {
484             this.lock.unlock();
485         }
486     }
487 
488     @Override
489     public void setMaxPerRoute(final T route, final int max) {
490         Args.notNull(route, "Route");
491         this.lock.lock();
492         try {
493             if (max > -1) {
494                 this.maxPerRoute.put(route, max);
495             } else {
496                 this.maxPerRoute.remove(route);
497             }
498         } finally {
499             this.lock.unlock();
500         }
501     }
502 
503     @Override
504     public int getMaxPerRoute(final T route) {
505         Args.notNull(route, "Route");
506         this.lock.lock();
507         try {
508             return getMax(route);
509         } finally {
510             this.lock.unlock();
511         }
512     }
513 
514     @Override
515     public PoolStats getTotalStats() {
516         this.lock.lock();
517         try {
518             return new PoolStats(
519                     this.leased.size(),
520                     this.leasingRequests.size(),
521                     this.available.size(),
522                     this.maxTotal);
523         } finally {
524             this.lock.unlock();
525         }
526     }
527 
528     @Override
529     public PoolStats getStats(final T route) {
530         Args.notNull(route, "Route");
531         this.lock.lock();
532         try {
533             final PerRoutePool<T, C> pool = getPool(route);
534             int pendingCount = 0;
535             for (final LeaseRequest<T, C> request: leasingRequests) {
536                 if (LangUtils.equals(route, request.getRoute())) {
537                     pendingCount++;
538                 }
539             }
540             return new PoolStats(
541                     pool.getLeasedCount(),
542                     pendingCount,
543                     pool.getAvailableCount(),
544                     getMax(route));
545         } finally {
546             this.lock.unlock();
547         }
548     }
549 
550     /**
551      * Returns snapshot of all knows routes
552      *
553      * @since 4.4
554      */
555     @Override
556     public Set<T> getRoutes() {
557         this.lock.lock();
558         try {
559             return new HashSet<>(routeToPool.keySet());
560         } finally {
561             this.lock.unlock();
562         }
563     }
564 
565     /**
566      * Enumerates all available connections.
567      *
568      * @since 4.3
569      */
570     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
571         this.lock.lock();
572         try {
573             final Iterator<PoolEntry<T, C>> it = this.available.iterator();
574             while (it.hasNext()) {
575                 final PoolEntry<T, C> entry = it.next();
576                 callback.execute(entry);
577                 if (!entry.hasConnection()) {
578                     final PerRoutePool<T, C> pool = getPool(entry.getRoute());
579                     pool.remove(entry);
580                     it.remove();
581                 }
582             }
583             processPendingRequests();
584             purgePoolMap();
585         } finally {
586             this.lock.unlock();
587         }
588     }
589 
590     /**
591      * Enumerates all leased connections.
592      *
593      * @since 4.3
594      */
595     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
596         this.lock.lock();
597         try {
598             final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
599             while (it.hasNext()) {
600                 final PoolEntry<T, C> entry = it.next();
601                 callback.execute(entry);
602             }
603             processPendingRequests();
604         } finally {
605             this.lock.unlock();
606         }
607     }
608 
609     private void purgePoolMap() {
610         final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
611         while (it.hasNext()) {
612             final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
613             final PerRoutePool<T, C> pool = entry.getValue();
614             if (pool.getAllocatedCount() == 0) {
615                 it.remove();
616             }
617         }
618     }
619 
620     @Override
621     public void closeIdle(final TimeValue idleTime) {
622         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
623         enumAvailable(new Callback<PoolEntry<T, C>>() {
624 
625             @Override
626             public void execute(final PoolEntry<T, C> entry) {
627                 if (entry.getUpdated() <= deadline) {
628                     entry.discardConnection(CloseMode.GRACEFUL);
629                 }
630             }
631 
632         });
633     }
634 
635     @Override
636     public void closeExpired() {
637         final long now = System.currentTimeMillis();
638         enumAvailable(new Callback<PoolEntry<T, C>>() {
639 
640             @Override
641             public void execute(final PoolEntry<T, C> entry) {
642                 if (entry.getExpiryDeadline().isBefore(now)) {
643                     entry.discardConnection(CloseMode.GRACEFUL);
644                 }
645             }
646 
647         });
648     }
649 
650     @Override
651     public String toString() {
652         final StringBuilder buffer = new StringBuilder();
653         buffer.append("[leased: ");
654         buffer.append(this.leased.size());
655         buffer.append("][available: ");
656         buffer.append(this.available.size());
657         buffer.append("][pending: ");
658         buffer.append(this.leasingRequests.size());
659         buffer.append("]");
660         return buffer.toString();
661     }
662 
663 
664     static class LeaseRequest<T, C extends ModalCloseable> {
665 
666         private final T route;
667         private final Object state;
668         private final Deadline deadline;
669         private final BasicFuture<PoolEntry<T, C>> future;
670         private final AtomicBoolean completed;
671         private volatile PoolEntry<T, C> result;
672         private volatile Exception ex;
673 
674         /**
675          * Constructor
676          *
677          * @param route route
678          * @param state state
679          * @param requestTimeout timeout to wait in a request queue until kicked off
680          * @param future future callback
681          */
682         public LeaseRequest(
683                 final T route,
684                 final Object state,
685                 final Timeout requestTimeout,
686                 final BasicFuture<PoolEntry<T, C>> future) {
687             super();
688             this.route = route;
689             this.state = state;
690             this.deadline = Deadline.calculate(requestTimeout);
691             this.future = future;
692             this.completed = new AtomicBoolean(false);
693         }
694 
695         public T getRoute() {
696             return this.route;
697         }
698 
699         public Object getState() {
700             return this.state;
701         }
702 
703         public Deadline getDeadline() {
704             return this.deadline;
705         }
706 
707         public boolean isDone() {
708             return this.completed.get();
709         }
710 
711         public void failed(final Exception ex) {
712             if (this.completed.compareAndSet(false, true)) {
713                 this.ex = ex;
714             }
715         }
716 
717         public void completed(final PoolEntry<T, C> result) {
718             if (this.completed.compareAndSet(false, true)) {
719                 this.result = result;
720             }
721         }
722 
723         public BasicFuture<PoolEntry<T, C>> getFuture() {
724             return this.future;
725         }
726 
727         public PoolEntry<T, C> getResult() {
728             return this.result;
729         }
730 
731         public Exception getException() {
732             return this.ex;
733         }
734 
735         @Override
736         public String toString() {
737             final StringBuilder buffer = new StringBuilder();
738             buffer.append("[");
739             buffer.append(this.route);
740             buffer.append("][");
741             buffer.append(this.state);
742             buffer.append("]");
743             return buffer.toString();
744         }
745 
746     }
747 
748     static class PerRoutePool<T, C extends ModalCloseable> {
749 
750         private final T route;
751         private final Set<PoolEntry<T, C>> leased;
752         private final LinkedList<PoolEntry<T, C>> available;
753         private final DisposalCallback<C> disposalCallback;
754 
755         PerRoutePool(final T route, final DisposalCallback<C> disposalCallback) {
756             super();
757             this.route = route;
758             this.disposalCallback = disposalCallback;
759             this.leased = new HashSet<>();
760             this.available = new LinkedList<>();
761         }
762 
763         public final T getRoute() {
764             return route;
765         }
766 
767         public int getLeasedCount() {
768             return this.leased.size();
769         }
770 
771         public int getAvailableCount() {
772             return this.available.size();
773         }
774 
775         public int getAllocatedCount() {
776             return this.available.size() + this.leased.size();
777         }
778 
779         public PoolEntry<T, C> getFree(final Object state) {
780             if (!this.available.isEmpty()) {
781                 if (state != null) {
782                     final Iterator<PoolEntry<T, C>> it = this.available.iterator();
783                     while (it.hasNext()) {
784                         final PoolEntry<T, C> entry = it.next();
785                         if (state.equals(entry.getState())) {
786                             it.remove();
787                             this.leased.add(entry);
788                             return entry;
789                         }
790                     }
791                 }
792                 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
793                 while (it.hasNext()) {
794                     final PoolEntry<T, C> entry = it.next();
795                     if (entry.getState() == null) {
796                         it.remove();
797                         this.leased.add(entry);
798                         return entry;
799                     }
800                 }
801             }
802             return null;
803         }
804 
805         public PoolEntry<T, C> getLastUsed() {
806             return this.available.peekLast();
807         }
808 
809         public boolean remove(final PoolEntry<T, C> entry) {
810             return this.available.remove(entry) || this.leased.remove(entry);
811         }
812 
813         public void free(final PoolEntry<T, C> entry, final boolean reusable) {
814             final boolean found = this.leased.remove(entry);
815             Asserts.check(found, "Entry %s has not been leased from this pool", entry);
816             if (reusable) {
817                 this.available.addFirst(entry);
818             }
819         }
820 
821         public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
822             final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
823             this.leased.add(entry);
824             return entry;
825         }
826 
827         public void shutdown(final CloseMode closeMode) {
828             PoolEntry<T, C> availableEntry;
829             while ((availableEntry = available.poll()) != null) {
830                 availableEntry.discardConnection(closeMode);
831             }
832             for (final PoolEntry<T, C> entry: this.leased) {
833                 entry.discardConnection(closeMode);
834             }
835             this.leased.clear();
836         }
837 
838         @Override
839         public String toString() {
840             final StringBuilder buffer = new StringBuilder();
841             buffer.append("[route: ");
842             buffer.append(this.route);
843             buffer.append("][leased: ");
844             buffer.append(this.leased.size());
845             buffer.append("][available: ");
846             buffer.append(this.available.size());
847             buffer.append("]");
848             return buffer.toString();
849         }
850 
851     }
852 }