1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.pool;
28
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.LinkedList;
33 import java.util.ListIterator;
34 import java.util.Map;
35 import java.util.Objects;
36 import java.util.Set;
37 import java.util.concurrent.ConcurrentLinkedQueue;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.TimeoutException;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.concurrent.locks.Lock;
44 import java.util.concurrent.locks.ReentrantLock;
45
46 import org.apache.hc.core5.annotation.Contract;
47 import org.apache.hc.core5.annotation.ThreadingBehavior;
48 import org.apache.hc.core5.concurrent.BasicFuture;
49 import org.apache.hc.core5.concurrent.FutureCallback;
50 import org.apache.hc.core5.function.Callback;
51 import org.apache.hc.core5.io.CloseMode;
52 import org.apache.hc.core5.io.ModalCloseable;
53 import org.apache.hc.core5.util.Args;
54 import org.apache.hc.core5.util.Asserts;
55 import org.apache.hc.core5.util.Deadline;
56 import org.apache.hc.core5.util.DeadlineTimeoutException;
57 import org.apache.hc.core5.util.TimeValue;
58 import org.apache.hc.core5.util.Timeout;
59
60
61
62
63
64
65
66
67
68 @Contract(threading = ThreadingBehavior.SAFE)
69 public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
70
71 private final TimeValue timeToLive;
72 private final PoolReusePolicy policy;
73 private final DisposalCallback<C> disposalCallback;
74 private final ConnPoolListener<T> connPoolListener;
75 private final Map<T, PerRoutePool<T, C>> routeToPool;
76 private final LinkedList<LeaseRequest<T, C>> pendingRequests;
77 private final Set<PoolEntry<T, C>> leased;
78 private final LinkedList<PoolEntry<T, C>> available;
79 private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
80 private final Map<T, Integer> maxPerRoute;
81 private final Lock lock;
82 private final AtomicBoolean isShutDown;
83
84 private volatile int defaultMaxPerRoute;
85 private volatile int maxTotal;
86
87
88
89
90 public StrictConnPool(
91 final int defaultMaxPerRoute,
92 final int maxTotal,
93 final TimeValue timeToLive,
94 final PoolReusePolicy policy,
95 final DisposalCallback<C> disposalCallback,
96 final ConnPoolListener<T> connPoolListener) {
97 super();
98 Args.positive(defaultMaxPerRoute, "Max per route value");
99 Args.positive(maxTotal, "Max total value");
100 this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
101 this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
102 this.disposalCallback = disposalCallback;
103 this.connPoolListener = connPoolListener;
104 this.routeToPool = new HashMap<>();
105 this.pendingRequests = new LinkedList<>();
106 this.leased = new HashSet<>();
107 this.available = new LinkedList<>();
108 this.completedRequests = new ConcurrentLinkedQueue<>();
109 this.maxPerRoute = new HashMap<>();
110 this.lock = new ReentrantLock();
111 this.isShutDown = new AtomicBoolean(false);
112 this.defaultMaxPerRoute = defaultMaxPerRoute;
113 this.maxTotal = maxTotal;
114 }
115
116
117
118
119 public StrictConnPool(
120 final int defaultMaxPerRoute,
121 final int maxTotal,
122 final TimeValue timeToLive,
123 final PoolReusePolicy policy,
124 final ConnPoolListener<T> connPoolListener) {
125 this(defaultMaxPerRoute, maxTotal, timeToLive, policy, null, connPoolListener);
126 }
127
128 public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
129 this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null);
130 }
131
132 public boolean isShutdown() {
133 return this.isShutDown.get();
134 }
135
136 @Override
137 public void close(final CloseMode closeMode) {
138 if (this.isShutDown.compareAndSet(false, true)) {
139 fireCallbacks();
140 this.lock.lock();
141 try {
142 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
143 pool.shutdown(closeMode);
144 }
145 this.routeToPool.clear();
146 this.leased.clear();
147 this.available.clear();
148 this.pendingRequests.clear();
149 } finally {
150 this.lock.unlock();
151 }
152 }
153 }
154
155 @Override
156 public void close() {
157 close(CloseMode.GRACEFUL);
158 }
159
160 private PerRoutePool<T, C> getPool(final T route) {
161 PerRoutePool<T, C> pool = this.routeToPool.get(route);
162 if (pool == null) {
163 pool = new PerRoutePool<>(route, this.disposalCallback);
164 this.routeToPool.put(route, pool);
165 }
166 return pool;
167 }
168
169 @Override
170 public Future<PoolEntry<T, C>> lease(
171 final T route, final Object state,
172 final Timeout requestTimeout,
173 final FutureCallback<PoolEntry<T, C>> callback) {
174 Args.notNull(route, "Route");
175 Args.notNull(requestTimeout, "Request timeout");
176 Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
177 final Deadline deadline = Deadline.calculate(requestTimeout);
178 final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
179
180 @Override
181 public synchronized PoolEntry<T, C> get(
182 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
183 try {
184 return super.get(timeout, unit);
185 } catch (final TimeoutException ex) {
186 cancel();
187 throw ex;
188 }
189 }
190
191 };
192 final boolean acquiredLock;
193
194 try {
195 if (Timeout.isPositive(requestTimeout)) {
196 acquiredLock = this.lock.tryLock(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
197 } else {
198 this.lock.lockInterruptibly();
199 acquiredLock = true;
200 }
201 } catch (final InterruptedException interruptedException) {
202 Thread.currentThread().interrupt();
203 future.cancel();
204 return future;
205 }
206
207 if (acquiredLock) {
208 try {
209 final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
210 final boolean completed = processPendingRequest(request);
211 if (!request.isDone() && !completed) {
212 this.pendingRequests.add(request);
213 }
214 if (request.isDone()) {
215 this.completedRequests.add(request);
216 }
217 } finally {
218 this.lock.unlock();
219 }
220 fireCallbacks();
221 } else {
222 future.failed(DeadlineTimeoutException.from(deadline));
223 }
224
225 return future;
226 }
227
228 public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
229 return lease(route, state, Timeout.DISABLED, null);
230 }
231
232 @Override
233 public void release(final PoolEntry<T, C> entry, final boolean reusable) {
234 if (entry == null) {
235 return;
236 }
237 if (this.isShutDown.get()) {
238 return;
239 }
240 if (!reusable) {
241 entry.discardConnection(CloseMode.GRACEFUL);
242 }
243 this.lock.lock();
244 try {
245 if (this.leased.remove(entry)) {
246 if (this.connPoolListener != null) {
247 this.connPoolListener.onRelease(entry.getRoute(), this);
248 }
249 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
250 final boolean keepAlive = entry.hasConnection() && reusable;
251 pool.free(entry, keepAlive);
252 if (keepAlive) {
253 switch (policy) {
254 case LIFO:
255 this.available.addFirst(entry);
256 break;
257 case FIFO:
258 this.available.addLast(entry);
259 break;
260 default:
261 throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
262 }
263 } else {
264 entry.discardConnection(CloseMode.GRACEFUL);
265 }
266 processNextPendingRequest();
267 } else {
268 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
269 }
270 } finally {
271 this.lock.unlock();
272 }
273 fireCallbacks();
274 }
275
276 private void processPendingRequests() {
277 final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
278 while (it.hasNext()) {
279 final LeaseRequest<T, C> request = it.next();
280 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
281 if (future.isCancelled()) {
282 it.remove();
283 continue;
284 }
285 final boolean completed = processPendingRequest(request);
286 if (request.isDone() || completed) {
287 it.remove();
288 }
289 if (request.isDone()) {
290 this.completedRequests.add(request);
291 }
292 }
293 }
294
295 private void processNextPendingRequest() {
296 final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
297 while (it.hasNext()) {
298 final LeaseRequest<T, C> request = it.next();
299 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
300 if (future.isCancelled()) {
301 it.remove();
302 continue;
303 }
304 final boolean completed = processPendingRequest(request);
305 if (request.isDone() || completed) {
306 it.remove();
307 }
308 if (request.isDone()) {
309 this.completedRequests.add(request);
310 }
311 if (completed) {
312 return;
313 }
314 }
315 }
316
317 private boolean processPendingRequest(final LeaseRequest<T, C> request) {
318 final T route = request.getRoute();
319 final Object state = request.getState();
320 final Deadline deadline = request.getDeadline();
321
322 if (deadline.isExpired()) {
323 request.failed(DeadlineTimeoutException.from(deadline));
324 return false;
325 }
326
327 final PerRoutePool<T, C> pool = getPool(route);
328 PoolEntry<T, C> entry;
329 for (;;) {
330 entry = pool.getFree(state);
331 if (entry == null) {
332 break;
333 }
334 if (entry.getExpiryDeadline().isExpired()) {
335 entry.discardConnection(CloseMode.GRACEFUL);
336 this.available.remove(entry);
337 pool.free(entry, false);
338 } else {
339 break;
340 }
341 }
342 if (entry != null) {
343 this.available.remove(entry);
344 this.leased.add(entry);
345 request.completed(entry);
346 if (this.connPoolListener != null) {
347 this.connPoolListener.onLease(entry.getRoute(), this);
348 }
349 return true;
350 }
351
352
353 final int maxPerRoute = getMax(route);
354
355 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
356 if (excess > 0) {
357 for (int i = 0; i < excess; i++) {
358 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
359 if (lastUsed == null) {
360 break;
361 }
362 lastUsed.discardConnection(CloseMode.GRACEFUL);
363 this.available.remove(lastUsed);
364 pool.remove(lastUsed);
365 }
366 }
367
368 if (pool.getAllocatedCount() < maxPerRoute) {
369 final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
370 if (freeCapacity == 0) {
371 return false;
372 }
373 final int totalAvailable = this.available.size();
374 if (totalAvailable > freeCapacity - 1) {
375 final PoolEntry<T, C> lastUsed = this.available.removeLast();
376 lastUsed.discardConnection(CloseMode.GRACEFUL);
377 final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
378 otherpool.remove(lastUsed);
379 }
380
381 entry = pool.createEntry(this.timeToLive);
382 this.leased.add(entry);
383 request.completed(entry);
384 if (this.connPoolListener != null) {
385 this.connPoolListener.onLease(entry.getRoute(), this);
386 }
387 return true;
388 }
389 return false;
390 }
391
392 private void fireCallbacks() {
393 LeaseRequest<T, C> request;
394 while ((request = this.completedRequests.poll()) != null) {
395 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
396 final Exception ex = request.getException();
397 final PoolEntry<T, C> result = request.getResult();
398 boolean successfullyCompleted = false;
399 if (ex != null) {
400 future.failed(ex);
401 } else if (result != null) {
402 if (future.completed(result)) {
403 successfullyCompleted = true;
404 }
405 } else {
406 future.cancel();
407 }
408 if (!successfullyCompleted) {
409 release(result, true);
410 }
411 }
412 }
413
414 public void validatePendingRequests() {
415 this.lock.lock();
416 try {
417 final long now = System.currentTimeMillis();
418 final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
419 while (it.hasNext()) {
420 final LeaseRequest<T, C> request = it.next();
421 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
422 if (future.isCancelled() && !request.isDone()) {
423 it.remove();
424 } else {
425 final Deadline deadline = request.getDeadline();
426 if (deadline.isBefore(now)) {
427 request.failed(DeadlineTimeoutException.from(deadline));
428 }
429 if (request.isDone()) {
430 it.remove();
431 this.completedRequests.add(request);
432 }
433 }
434 }
435 } finally {
436 this.lock.unlock();
437 }
438 fireCallbacks();
439 }
440
441 private int getMax(final T route) {
442 final Integer v = this.maxPerRoute.get(route);
443 if (v != null) {
444 return v;
445 }
446 return this.defaultMaxPerRoute;
447 }
448
449 @Override
450 public void setMaxTotal(final int max) {
451 Args.positive(max, "Max value");
452 this.lock.lock();
453 try {
454 this.maxTotal = max;
455 } finally {
456 this.lock.unlock();
457 }
458 }
459
460 @Override
461 public int getMaxTotal() {
462 this.lock.lock();
463 try {
464 return this.maxTotal;
465 } finally {
466 this.lock.unlock();
467 }
468 }
469
470 @Override
471 public void setDefaultMaxPerRoute(final int max) {
472 Args.positive(max, "Max value");
473 this.lock.lock();
474 try {
475 this.defaultMaxPerRoute = max;
476 } finally {
477 this.lock.unlock();
478 }
479 }
480
481 @Override
482 public int getDefaultMaxPerRoute() {
483 this.lock.lock();
484 try {
485 return this.defaultMaxPerRoute;
486 } finally {
487 this.lock.unlock();
488 }
489 }
490
491 @Override
492 public void setMaxPerRoute(final T route, final int max) {
493 Args.notNull(route, "Route");
494 this.lock.lock();
495 try {
496 if (max > -1) {
497 this.maxPerRoute.put(route, max);
498 } else {
499 this.maxPerRoute.remove(route);
500 }
501 } finally {
502 this.lock.unlock();
503 }
504 }
505
506 @Override
507 public int getMaxPerRoute(final T route) {
508 Args.notNull(route, "Route");
509 this.lock.lock();
510 try {
511 return getMax(route);
512 } finally {
513 this.lock.unlock();
514 }
515 }
516
517 @Override
518 public PoolStats getTotalStats() {
519 this.lock.lock();
520 try {
521 int pendingCount = 0;
522 for (final LeaseRequest<T, C> request: pendingRequests) {
523 if (!request.isDone()) {
524 final Deadline deadline = request.getDeadline();
525 if (!deadline.isExpired()) {
526 pendingCount++;
527 }
528 }
529 }
530 return new PoolStats(
531 this.leased.size(),
532 pendingCount,
533 this.available.size(),
534 this.maxTotal);
535 } finally {
536 this.lock.unlock();
537 }
538 }
539
540 @Override
541 public PoolStats getStats(final T route) {
542 Args.notNull(route, "Route");
543 this.lock.lock();
544 try {
545 final PerRoutePool<T, C> pool = getPool(route);
546 int pendingCount = 0;
547 for (final LeaseRequest<T, C> request: pendingRequests) {
548 if (!request.isDone() && Objects.equals(route, request.getRoute())) {
549 final Deadline deadline = request.getDeadline();
550 if (!deadline.isExpired()) {
551 pendingCount++;
552 }
553 }
554 }
555 return new PoolStats(
556 pool.getLeasedCount(),
557 pendingCount,
558 pool.getAvailableCount(),
559 getMax(route));
560 } finally {
561 this.lock.unlock();
562 }
563 }
564
565
566
567
568
569
570 @Override
571 public Set<T> getRoutes() {
572 this.lock.lock();
573 try {
574 return new HashSet<>(routeToPool.keySet());
575 } finally {
576 this.lock.unlock();
577 }
578 }
579
580
581
582
583
584
585 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
586 this.lock.lock();
587 try {
588 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
589 while (it.hasNext()) {
590 final PoolEntry<T, C> entry = it.next();
591 callback.execute(entry);
592 if (!entry.hasConnection()) {
593 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
594 pool.remove(entry);
595 it.remove();
596 }
597 }
598 processPendingRequests();
599 purgePoolMap();
600 } finally {
601 this.lock.unlock();
602 }
603 }
604
605
606
607
608
609
610 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
611 this.lock.lock();
612 try {
613 final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
614 while (it.hasNext()) {
615 final PoolEntry<T, C> entry = it.next();
616 callback.execute(entry);
617 }
618 processPendingRequests();
619 } finally {
620 this.lock.unlock();
621 }
622 }
623
624 private void purgePoolMap() {
625 final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
626 while (it.hasNext()) {
627 final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
628 final PerRoutePool<T, C> pool = entry.getValue();
629 if (pool.getAllocatedCount() == 0) {
630 it.remove();
631 }
632 }
633 }
634
635 @Override
636 public void closeIdle(final TimeValue idleTime) {
637 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
638 enumAvailable(entry -> {
639 if (entry.getUpdated() <= deadline) {
640 entry.discardConnection(CloseMode.GRACEFUL);
641 }
642 });
643 }
644
645 @Override
646 public void closeExpired() {
647 final long now = System.currentTimeMillis();
648 enumAvailable(entry -> {
649 if (entry.getExpiryDeadline().isBefore(now)) {
650 entry.discardConnection(CloseMode.GRACEFUL);
651 }
652 });
653 }
654
655 @Override
656 public String toString() {
657 final StringBuilder buffer = new StringBuilder();
658 buffer.append("[leased: ");
659 buffer.append(this.leased.size());
660 buffer.append("][available: ");
661 buffer.append(this.available.size());
662 buffer.append("][pending: ");
663 buffer.append(this.pendingRequests.size());
664 buffer.append("]");
665 return buffer.toString();
666 }
667
668
669 static class LeaseRequest<T, C extends ModalCloseable> {
670
671 private final T route;
672 private final Object state;
673 private final Deadline deadline;
674 private final BasicFuture<PoolEntry<T, C>> future;
675
676
677 private final AtomicBoolean completed;
678 private volatile PoolEntry<T, C> result;
679 private volatile Exception ex;
680
681
682
683
684
685
686
687
688
689 public LeaseRequest(
690 final T route,
691 final Object state,
692 final Timeout requestTimeout,
693 final BasicFuture<PoolEntry<T, C>> future) {
694 super();
695 this.route = route;
696 this.state = state;
697 this.deadline = Deadline.calculate(requestTimeout);
698 this.future = future;
699 this.completed = new AtomicBoolean(false);
700 }
701
702 public T getRoute() {
703 return this.route;
704 }
705
706 public Object getState() {
707 return this.state;
708 }
709
710 public Deadline getDeadline() {
711 return this.deadline;
712 }
713
714 public boolean isDone() {
715
716
717
718 return ex != null || result != null;
719 }
720
721 public void failed(final Exception ex) {
722 if (this.completed.compareAndSet(false, true)) {
723 this.ex = ex;
724 }
725 }
726
727 public void completed(final PoolEntry<T, C> result) {
728 if (this.completed.compareAndSet(false, true)) {
729 this.result = result;
730 }
731 }
732
733 public BasicFuture<PoolEntry<T, C>> getFuture() {
734 return this.future;
735 }
736
737 public PoolEntry<T, C> getResult() {
738 return this.result;
739 }
740
741 public Exception getException() {
742 return this.ex;
743 }
744
745 @Override
746 public String toString() {
747 final StringBuilder buffer = new StringBuilder();
748 buffer.append("[");
749 buffer.append(this.route);
750 buffer.append("][");
751 buffer.append(this.state);
752 buffer.append("]");
753 return buffer.toString();
754 }
755
756 }
757
758 static class PerRoutePool<T, C extends ModalCloseable> {
759
760 private final T route;
761 private final Set<PoolEntry<T, C>> leased;
762 private final LinkedList<PoolEntry<T, C>> available;
763 private final DisposalCallback<C> disposalCallback;
764
765 PerRoutePool(final T route, final DisposalCallback<C> disposalCallback) {
766 super();
767 this.route = route;
768 this.disposalCallback = disposalCallback;
769 this.leased = new HashSet<>();
770 this.available = new LinkedList<>();
771 }
772
773 public final T getRoute() {
774 return route;
775 }
776
777 public int getLeasedCount() {
778 return this.leased.size();
779 }
780
781 public int getAvailableCount() {
782 return this.available.size();
783 }
784
785 public int getAllocatedCount() {
786 return this.available.size() + this.leased.size();
787 }
788
789 public PoolEntry<T, C> getFree(final Object state) {
790 if (!this.available.isEmpty()) {
791 if (state != null) {
792 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
793 while (it.hasNext()) {
794 final PoolEntry<T, C> entry = it.next();
795 if (state.equals(entry.getState())) {
796 it.remove();
797 this.leased.add(entry);
798 return entry;
799 }
800 }
801 }
802 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
803 while (it.hasNext()) {
804 final PoolEntry<T, C> entry = it.next();
805 if (entry.getState() == null) {
806 it.remove();
807 this.leased.add(entry);
808 return entry;
809 }
810 }
811 }
812 return null;
813 }
814
815 public PoolEntry<T, C> getLastUsed() {
816 return this.available.peekLast();
817 }
818
819 public boolean remove(final PoolEntry<T, C> entry) {
820 return this.available.remove(entry) || this.leased.remove(entry);
821 }
822
823 public void free(final PoolEntry<T, C> entry, final boolean reusable) {
824 final boolean found = this.leased.remove(entry);
825 Asserts.check(found, "Entry %s has not been leased from this pool", entry);
826 if (reusable) {
827 this.available.addFirst(entry);
828 }
829 }
830
831 public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
832 final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
833 this.leased.add(entry);
834 return entry;
835 }
836
837 public void shutdown(final CloseMode closeMode) {
838 PoolEntry<T, C> availableEntry;
839 while ((availableEntry = available.poll()) != null) {
840 availableEntry.discardConnection(closeMode);
841 }
842 for (final PoolEntry<T, C> entry: this.leased) {
843 entry.discardConnection(closeMode);
844 }
845 this.leased.clear();
846 }
847
848 @Override
849 public String toString() {
850 final StringBuilder buffer = new StringBuilder();
851 buffer.append("[route: ");
852 buffer.append(this.route);
853 buffer.append("][leased: ");
854 buffer.append(this.leased.size());
855 buffer.append("][available: ");
856 buffer.append(this.available.size());
857 buffer.append("]");
858 return buffer.toString();
859 }
860
861 }
862 }