View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.LinkedList;
33  import java.util.ListIterator;
34  import java.util.Map;
35  import java.util.Objects;
36  import java.util.Set;
37  import java.util.concurrent.ConcurrentLinkedQueue;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import org.apache.hc.core5.annotation.Contract;
47  import org.apache.hc.core5.annotation.ThreadingBehavior;
48  import org.apache.hc.core5.concurrent.BasicFuture;
49  import org.apache.hc.core5.concurrent.FutureCallback;
50  import org.apache.hc.core5.function.Callback;
51  import org.apache.hc.core5.io.CloseMode;
52  import org.apache.hc.core5.io.ModalCloseable;
53  import org.apache.hc.core5.util.Args;
54  import org.apache.hc.core5.util.Asserts;
55  import org.apache.hc.core5.util.Deadline;
56  import org.apache.hc.core5.util.DeadlineTimeoutException;
57  import org.apache.hc.core5.util.TimeValue;
58  import org.apache.hc.core5.util.Timeout;
59  
60  /**
61   * Connection pool with strict connection limit guarantees.
62   *
63   * @param <T> route
64   * @param <C> connection object
65   *
66   * @since 4.2
67   */
68  @Contract(threading = ThreadingBehavior.SAFE)
69  public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
70  
71      private final TimeValue timeToLive;
72      private final PoolReusePolicy policy;
73      private final DisposalCallback<C> disposalCallback;
74      private final ConnPoolListener<T> connPoolListener;
75      private final Map<T, PerRoutePool<T, C>> routeToPool;
76      private final LinkedList<LeaseRequest<T, C>> pendingRequests;
77      private final Set<PoolEntry<T, C>> leased;
78      private final LinkedList<PoolEntry<T, C>> available;
79      private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
80      private final Map<T, Integer> maxPerRoute;
81      private final Lock lock;
82      private final AtomicBoolean isShutDown;
83  
84      private volatile int defaultMaxPerRoute;
85      private volatile int maxTotal;
86  
87      /**
88       * @since 5.0
89       */
90      public StrictConnPool(
91              final int defaultMaxPerRoute,
92              final int maxTotal,
93              final TimeValue timeToLive,
94              final PoolReusePolicy policy,
95              final DisposalCallback<C> disposalCallback,
96              final ConnPoolListener<T> connPoolListener) {
97          super();
98          Args.positive(defaultMaxPerRoute, "Max per route value");
99          Args.positive(maxTotal, "Max total value");
100         this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
101         this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
102         this.disposalCallback = disposalCallback;
103         this.connPoolListener = connPoolListener;
104         this.routeToPool = new HashMap<>();
105         this.pendingRequests = new LinkedList<>();
106         this.leased = new HashSet<>();
107         this.available = new LinkedList<>();
108         this.completedRequests = new ConcurrentLinkedQueue<>();
109         this.maxPerRoute = new HashMap<>();
110         this.lock = new ReentrantLock();
111         this.isShutDown = new AtomicBoolean(false);
112         this.defaultMaxPerRoute = defaultMaxPerRoute;
113         this.maxTotal = maxTotal;
114     }
115 
116     /**
117      * @since 5.0
118      */
119     public StrictConnPool(
120             final int defaultMaxPerRoute,
121             final int maxTotal,
122             final TimeValue timeToLive,
123             final PoolReusePolicy policy,
124             final ConnPoolListener<T> connPoolListener) {
125         this(defaultMaxPerRoute, maxTotal, timeToLive, policy, null, connPoolListener);
126     }
127 
128     public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
129         this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null);
130     }
131 
132     public boolean isShutdown() {
133         return this.isShutDown.get();
134     }
135 
136     @Override
137     public void close(final CloseMode closeMode) {
138         if (this.isShutDown.compareAndSet(false, true)) {
139             fireCallbacks();
140             this.lock.lock();
141             try {
142                 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
143                     pool.shutdown(closeMode);
144                 }
145                 this.routeToPool.clear();
146                 this.leased.clear();
147                 this.available.clear();
148                 this.pendingRequests.clear();
149             } finally {
150                 this.lock.unlock();
151             }
152         }
153     }
154 
155     @Override
156     public void close() {
157         close(CloseMode.GRACEFUL);
158     }
159 
160     private PerRoutePool<T, C> getPool(final T route) {
161         PerRoutePool<T, C> pool = this.routeToPool.get(route);
162         if (pool == null) {
163             pool = new PerRoutePool<>(route, this.disposalCallback);
164             this.routeToPool.put(route, pool);
165         }
166         return pool;
167     }
168 
169     @Override
170     public Future<PoolEntry<T, C>> lease(
171             final T route, final Object state,
172             final Timeout requestTimeout,
173             final FutureCallback<PoolEntry<T, C>> callback) {
174         Args.notNull(route, "Route");
175         Args.notNull(requestTimeout, "Request timeout");
176         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
177         final Deadline deadline = Deadline.calculate(requestTimeout);
178         final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
179 
180             @Override
181             public synchronized PoolEntry<T, C> get(
182                     final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
183                 try {
184                     return super.get(timeout, unit);
185                 } catch (final TimeoutException ex) {
186                     cancel();
187                     throw ex;
188                 }
189             }
190 
191         };
192         final boolean acquiredLock;
193 
194         try {
195             if (Timeout.isPositive(requestTimeout)) {
196                 acquiredLock = this.lock.tryLock(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
197             } else {
198                 this.lock.lockInterruptibly();
199                 acquiredLock = true;
200             }
201         } catch (final InterruptedException interruptedException) {
202             Thread.currentThread().interrupt();
203             future.cancel();
204             return future;
205         }
206 
207         if (acquiredLock) {
208             try {
209                 final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
210                 final boolean completed = processPendingRequest(request);
211                 if (!request.isDone() && !completed) {
212                     this.pendingRequests.add(request);
213                 }
214                 if (request.isDone()) {
215                     this.completedRequests.add(request);
216                 }
217             } finally {
218                 this.lock.unlock();
219             }
220             fireCallbacks();
221         } else {
222             future.failed(DeadlineTimeoutException.from(deadline));
223         }
224 
225         return future;
226     }
227 
228     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
229         return lease(route, state, Timeout.DISABLED, null);
230     }
231 
232     @Override
233     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
234         if (entry == null) {
235             return;
236         }
237         if (this.isShutDown.get()) {
238             return;
239         }
240         if (!reusable) {
241             entry.discardConnection(CloseMode.GRACEFUL);
242         }
243         this.lock.lock();
244         try {
245             if (this.leased.remove(entry)) {
246                 if (this.connPoolListener != null) {
247                     this.connPoolListener.onRelease(entry.getRoute(), this);
248                 }
249                 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
250                 final boolean keepAlive = entry.hasConnection() && reusable;
251                 pool.free(entry, keepAlive);
252                 if (keepAlive) {
253                     switch (policy) {
254                         case LIFO:
255                             this.available.addFirst(entry);
256                             break;
257                         case FIFO:
258                             this.available.addLast(entry);
259                             break;
260                         default:
261                             throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
262                     }
263                 } else {
264                     entry.discardConnection(CloseMode.GRACEFUL);
265                 }
266                 processNextPendingRequest();
267             } else {
268                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
269             }
270         } finally {
271             this.lock.unlock();
272         }
273         fireCallbacks();
274     }
275 
276     private void processPendingRequests() {
277         final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
278         while (it.hasNext()) {
279             final LeaseRequest<T, C> request = it.next();
280             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
281             if (future.isCancelled()) {
282                 it.remove();
283                 continue;
284             }
285             final boolean completed = processPendingRequest(request);
286             if (request.isDone() || completed) {
287                 it.remove();
288             }
289             if (request.isDone()) {
290                 this.completedRequests.add(request);
291             }
292         }
293     }
294 
295     private void processNextPendingRequest() {
296         final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
297         while (it.hasNext()) {
298             final LeaseRequest<T, C> request = it.next();
299             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
300             if (future.isCancelled()) {
301                 it.remove();
302                 continue;
303             }
304             final boolean completed = processPendingRequest(request);
305             if (request.isDone() || completed) {
306                 it.remove();
307             }
308             if (request.isDone()) {
309                 this.completedRequests.add(request);
310             }
311             if (completed) {
312                 return;
313             }
314         }
315     }
316 
317     private boolean processPendingRequest(final LeaseRequest<T, C> request) {
318         final T route = request.getRoute();
319         final Object state = request.getState();
320         final Deadline deadline = request.getDeadline();
321 
322         if (deadline.isExpired()) {
323             request.failed(DeadlineTimeoutException.from(deadline));
324             return false;
325         }
326 
327         final PerRoutePool<T, C> pool = getPool(route);
328         PoolEntry<T, C> entry;
329         for (;;) {
330             entry = pool.getFree(state);
331             if (entry == null) {
332                 break;
333             }
334             if (entry.getExpiryDeadline().isExpired()) {
335                 entry.discardConnection(CloseMode.GRACEFUL);
336                 this.available.remove(entry);
337                 pool.free(entry, false);
338             } else {
339                 break;
340             }
341         }
342         if (entry != null) {
343             this.available.remove(entry);
344             this.leased.add(entry);
345             request.completed(entry);
346             if (this.connPoolListener != null) {
347                 this.connPoolListener.onLease(entry.getRoute(), this);
348             }
349             return true;
350         }
351 
352         // New connection is needed
353         final int maxPerRoute = getMax(route);
354         // Shrink the pool prior to allocating a new connection
355         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
356         if (excess > 0) {
357             for (int i = 0; i < excess; i++) {
358                 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
359                 if (lastUsed == null) {
360                     break;
361                 }
362                 lastUsed.discardConnection(CloseMode.GRACEFUL);
363                 this.available.remove(lastUsed);
364                 pool.remove(lastUsed);
365             }
366         }
367 
368         if (pool.getAllocatedCount() < maxPerRoute) {
369             final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
370             if (freeCapacity == 0) {
371                 return false;
372             }
373             final int totalAvailable = this.available.size();
374             if (totalAvailable > freeCapacity - 1) {
375                 final PoolEntry<T, C> lastUsed = this.available.removeLast();
376                 lastUsed.discardConnection(CloseMode.GRACEFUL);
377                 final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
378                 otherpool.remove(lastUsed);
379             }
380 
381             entry = pool.createEntry(this.timeToLive);
382             this.leased.add(entry);
383             request.completed(entry);
384             if (this.connPoolListener != null) {
385                 this.connPoolListener.onLease(entry.getRoute(), this);
386             }
387             return true;
388         }
389         return false;
390     }
391 
392     private void fireCallbacks() {
393         LeaseRequest<T, C> request;
394         while ((request = this.completedRequests.poll()) != null) {
395             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
396             final Exception ex = request.getException();
397             final PoolEntry<T, C> result = request.getResult();
398             boolean successfullyCompleted = false;
399             if (ex != null) {
400                 future.failed(ex);
401             } else if (result != null) {
402                 if (future.completed(result)) {
403                     successfullyCompleted = true;
404                 }
405             } else {
406                 future.cancel();
407             }
408             if (!successfullyCompleted) {
409                 release(result, true);
410             }
411         }
412     }
413 
414     public void validatePendingRequests() {
415         this.lock.lock();
416         try {
417             final long now = System.currentTimeMillis();
418             final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
419             while (it.hasNext()) {
420                 final LeaseRequest<T, C> request = it.next();
421                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
422                 if (future.isCancelled() && !request.isDone()) {
423                     it.remove();
424                 } else {
425                     final Deadline deadline = request.getDeadline();
426                     if (deadline.isBefore(now)) {
427                         request.failed(DeadlineTimeoutException.from(deadline));
428                     }
429                     if (request.isDone()) {
430                         it.remove();
431                         this.completedRequests.add(request);
432                     }
433                 }
434             }
435         } finally {
436             this.lock.unlock();
437         }
438         fireCallbacks();
439     }
440 
441     private int getMax(final T route) {
442         final Integer v = this.maxPerRoute.get(route);
443         if (v != null) {
444             return v;
445         }
446         return this.defaultMaxPerRoute;
447     }
448 
449     @Override
450     public void setMaxTotal(final int max) {
451         Args.positive(max, "Max value");
452         this.lock.lock();
453         try {
454             this.maxTotal = max;
455         } finally {
456             this.lock.unlock();
457         }
458     }
459 
460     @Override
461     public int getMaxTotal() {
462         this.lock.lock();
463         try {
464             return this.maxTotal;
465         } finally {
466             this.lock.unlock();
467         }
468     }
469 
470     @Override
471     public void setDefaultMaxPerRoute(final int max) {
472         Args.positive(max, "Max value");
473         this.lock.lock();
474         try {
475             this.defaultMaxPerRoute = max;
476         } finally {
477             this.lock.unlock();
478         }
479     }
480 
481     @Override
482     public int getDefaultMaxPerRoute() {
483         this.lock.lock();
484         try {
485             return this.defaultMaxPerRoute;
486         } finally {
487             this.lock.unlock();
488         }
489     }
490 
491     @Override
492     public void setMaxPerRoute(final T route, final int max) {
493         Args.notNull(route, "Route");
494         this.lock.lock();
495         try {
496             if (max > -1) {
497                 this.maxPerRoute.put(route, max);
498             } else {
499                 this.maxPerRoute.remove(route);
500             }
501         } finally {
502             this.lock.unlock();
503         }
504     }
505 
506     @Override
507     public int getMaxPerRoute(final T route) {
508         Args.notNull(route, "Route");
509         this.lock.lock();
510         try {
511             return getMax(route);
512         } finally {
513             this.lock.unlock();
514         }
515     }
516 
517     @Override
518     public PoolStats getTotalStats() {
519         this.lock.lock();
520         try {
521             return new PoolStats(
522                     this.leased.size(),
523                     this.pendingRequests.size(),
524                     this.available.size(),
525                     this.maxTotal);
526         } finally {
527             this.lock.unlock();
528         }
529     }
530 
531     @Override
532     public PoolStats getStats(final T route) {
533         Args.notNull(route, "Route");
534         this.lock.lock();
535         try {
536             final PerRoutePool<T, C> pool = getPool(route);
537             int pendingCount = 0;
538             for (final LeaseRequest<T, C> request: pendingRequests) {
539                 if (Objects.equals(route, request.getRoute())) {
540                     pendingCount++;
541                 }
542             }
543             return new PoolStats(
544                     pool.getLeasedCount(),
545                     pendingCount,
546                     pool.getAvailableCount(),
547                     getMax(route));
548         } finally {
549             this.lock.unlock();
550         }
551     }
552 
553     /**
554      * Returns snapshot of all knows routes
555      *
556      * @since 4.4
557      */
558     @Override
559     public Set<T> getRoutes() {
560         this.lock.lock();
561         try {
562             return new HashSet<>(routeToPool.keySet());
563         } finally {
564             this.lock.unlock();
565         }
566     }
567 
568     /**
569      * Enumerates all available connections.
570      *
571      * @since 4.3
572      */
573     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
574         this.lock.lock();
575         try {
576             final Iterator<PoolEntry<T, C>> it = this.available.iterator();
577             while (it.hasNext()) {
578                 final PoolEntry<T, C> entry = it.next();
579                 callback.execute(entry);
580                 if (!entry.hasConnection()) {
581                     final PerRoutePool<T, C> pool = getPool(entry.getRoute());
582                     pool.remove(entry);
583                     it.remove();
584                 }
585             }
586             processPendingRequests();
587             purgePoolMap();
588         } finally {
589             this.lock.unlock();
590         }
591     }
592 
593     /**
594      * Enumerates all leased connections.
595      *
596      * @since 4.3
597      */
598     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
599         this.lock.lock();
600         try {
601             final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
602             while (it.hasNext()) {
603                 final PoolEntry<T, C> entry = it.next();
604                 callback.execute(entry);
605             }
606             processPendingRequests();
607         } finally {
608             this.lock.unlock();
609         }
610     }
611 
612     private void purgePoolMap() {
613         final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
614         while (it.hasNext()) {
615             final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
616             final PerRoutePool<T, C> pool = entry.getValue();
617             if (pool.getAllocatedCount() == 0) {
618                 it.remove();
619             }
620         }
621     }
622 
623     @Override
624     public void closeIdle(final TimeValue idleTime) {
625         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
626         enumAvailable(entry -> {
627             if (entry.getUpdated() <= deadline) {
628                 entry.discardConnection(CloseMode.GRACEFUL);
629             }
630         });
631     }
632 
633     @Override
634     public void closeExpired() {
635         final long now = System.currentTimeMillis();
636         enumAvailable(entry -> {
637             if (entry.getExpiryDeadline().isBefore(now)) {
638                 entry.discardConnection(CloseMode.GRACEFUL);
639             }
640         });
641     }
642 
643     @Override
644     public String toString() {
645         final StringBuilder buffer = new StringBuilder();
646         buffer.append("[leased: ");
647         buffer.append(this.leased.size());
648         buffer.append("][available: ");
649         buffer.append(this.available.size());
650         buffer.append("][pending: ");
651         buffer.append(this.pendingRequests.size());
652         buffer.append("]");
653         return buffer.toString();
654     }
655 
656 
657     static class LeaseRequest<T, C extends ModalCloseable> {
658 
659         private final T route;
660         private final Object state;
661         private final Deadline deadline;
662         private final BasicFuture<PoolEntry<T, C>> future;
663         // 'completed' is used internally to guard setting
664         // 'result' and 'ex', but mustn't be used by 'isDone()'.
665         private final AtomicBoolean completed;
666         private volatile PoolEntry<T, C> result;
667         private volatile Exception ex;
668 
669         /**
670          * Constructor
671          *
672          * @param route route
673          * @param state state
674          * @param requestTimeout timeout to wait in a request queue until kicked off
675          * @param future future callback
676          */
677         public LeaseRequest(
678                 final T route,
679                 final Object state,
680                 final Timeout requestTimeout,
681                 final BasicFuture<PoolEntry<T, C>> future) {
682             super();
683             this.route = route;
684             this.state = state;
685             this.deadline = Deadline.calculate(requestTimeout);
686             this.future = future;
687             this.completed = new AtomicBoolean(false);
688         }
689 
690         public T getRoute() {
691             return this.route;
692         }
693 
694         public Object getState() {
695             return this.state;
696         }
697 
698         public Deadline getDeadline() {
699             return this.deadline;
700         }
701 
702         public boolean isDone() {
703             // This method must not use 'completed.get()' which would result in a race
704             // where a caller may observe completed=true while neither result nor ex
705             // have been set yet.
706             return ex != null || result != null;
707         }
708 
709         public void failed(final Exception ex) {
710             if (this.completed.compareAndSet(false, true)) {
711                 this.ex = ex;
712             }
713         }
714 
715         public void completed(final PoolEntry<T, C> result) {
716             if (this.completed.compareAndSet(false, true)) {
717                 this.result = result;
718             }
719         }
720 
721         public BasicFuture<PoolEntry<T, C>> getFuture() {
722             return this.future;
723         }
724 
725         public PoolEntry<T, C> getResult() {
726             return this.result;
727         }
728 
729         public Exception getException() {
730             return this.ex;
731         }
732 
733         @Override
734         public String toString() {
735             final StringBuilder buffer = new StringBuilder();
736             buffer.append("[");
737             buffer.append(this.route);
738             buffer.append("][");
739             buffer.append(this.state);
740             buffer.append("]");
741             return buffer.toString();
742         }
743 
744     }
745 
746     static class PerRoutePool<T, C extends ModalCloseable> {
747 
748         private final T route;
749         private final Set<PoolEntry<T, C>> leased;
750         private final LinkedList<PoolEntry<T, C>> available;
751         private final DisposalCallback<C> disposalCallback;
752 
753         PerRoutePool(final T route, final DisposalCallback<C> disposalCallback) {
754             super();
755             this.route = route;
756             this.disposalCallback = disposalCallback;
757             this.leased = new HashSet<>();
758             this.available = new LinkedList<>();
759         }
760 
761         public final T getRoute() {
762             return route;
763         }
764 
765         public int getLeasedCount() {
766             return this.leased.size();
767         }
768 
769         public int getAvailableCount() {
770             return this.available.size();
771         }
772 
773         public int getAllocatedCount() {
774             return this.available.size() + this.leased.size();
775         }
776 
777         public PoolEntry<T, C> getFree(final Object state) {
778             if (!this.available.isEmpty()) {
779                 if (state != null) {
780                     final Iterator<PoolEntry<T, C>> it = this.available.iterator();
781                     while (it.hasNext()) {
782                         final PoolEntry<T, C> entry = it.next();
783                         if (state.equals(entry.getState())) {
784                             it.remove();
785                             this.leased.add(entry);
786                             return entry;
787                         }
788                     }
789                 }
790                 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
791                 while (it.hasNext()) {
792                     final PoolEntry<T, C> entry = it.next();
793                     if (entry.getState() == null) {
794                         it.remove();
795                         this.leased.add(entry);
796                         return entry;
797                     }
798                 }
799             }
800             return null;
801         }
802 
803         public PoolEntry<T, C> getLastUsed() {
804             return this.available.peekLast();
805         }
806 
807         public boolean remove(final PoolEntry<T, C> entry) {
808             return this.available.remove(entry) || this.leased.remove(entry);
809         }
810 
811         public void free(final PoolEntry<T, C> entry, final boolean reusable) {
812             final boolean found = this.leased.remove(entry);
813             Asserts.check(found, "Entry %s has not been leased from this pool", entry);
814             if (reusable) {
815                 this.available.addFirst(entry);
816             }
817         }
818 
819         public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
820             final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
821             this.leased.add(entry);
822             return entry;
823         }
824 
825         public void shutdown(final CloseMode closeMode) {
826             PoolEntry<T, C> availableEntry;
827             while ((availableEntry = available.poll()) != null) {
828                 availableEntry.discardConnection(closeMode);
829             }
830             for (final PoolEntry<T, C> entry: this.leased) {
831                 entry.discardConnection(closeMode);
832             }
833             this.leased.clear();
834         }
835 
836         @Override
837         public String toString() {
838             final StringBuilder buffer = new StringBuilder();
839             buffer.append("[route: ");
840             buffer.append(this.route);
841             buffer.append("][leased: ");
842             buffer.append(this.leased.size());
843             buffer.append("][available: ");
844             buffer.append(this.available.size());
845             buffer.append("]");
846             return buffer.toString();
847         }
848 
849     }
850 }