1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.pool;
28
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.LinkedList;
33 import java.util.ListIterator;
34 import java.util.Map;
35 import java.util.Objects;
36 import java.util.Set;
37 import java.util.concurrent.ConcurrentLinkedQueue;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.TimeoutException;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.concurrent.locks.Lock;
44 import java.util.concurrent.locks.ReentrantLock;
45
46 import org.apache.hc.core5.annotation.Contract;
47 import org.apache.hc.core5.annotation.ThreadingBehavior;
48 import org.apache.hc.core5.concurrent.BasicFuture;
49 import org.apache.hc.core5.concurrent.FutureCallback;
50 import org.apache.hc.core5.function.Callback;
51 import org.apache.hc.core5.io.CloseMode;
52 import org.apache.hc.core5.io.ModalCloseable;
53 import org.apache.hc.core5.util.Args;
54 import org.apache.hc.core5.util.Asserts;
55 import org.apache.hc.core5.util.Deadline;
56 import org.apache.hc.core5.util.DeadlineTimeoutException;
57 import org.apache.hc.core5.util.TimeValue;
58 import org.apache.hc.core5.util.Timeout;
59
60
61
62
63
64
65
66
67
68 @Contract(threading = ThreadingBehavior.SAFE)
69 public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
70
71 private final TimeValue timeToLive;
72 private final PoolReusePolicy policy;
73 private final DisposalCallback<C> disposalCallback;
74 private final ConnPoolListener<T> connPoolListener;
75 private final Map<T, PerRoutePool<T, C>> routeToPool;
76 private final LinkedList<LeaseRequest<T, C>> pendingRequests;
77 private final Set<PoolEntry<T, C>> leased;
78 private final LinkedList<PoolEntry<T, C>> available;
79 private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
80 private final Map<T, Integer> maxPerRoute;
81 private final Lock lock;
82 private final AtomicBoolean isShutDown;
83
84 private volatile int defaultMaxPerRoute;
85 private volatile int maxTotal;
86
87
88
89
90 public StrictConnPool(
91 final int defaultMaxPerRoute,
92 final int maxTotal,
93 final TimeValue timeToLive,
94 final PoolReusePolicy policy,
95 final DisposalCallback<C> disposalCallback,
96 final ConnPoolListener<T> connPoolListener) {
97 super();
98 Args.positive(defaultMaxPerRoute, "Max per route value");
99 Args.positive(maxTotal, "Max total value");
100 this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
101 this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
102 this.disposalCallback = disposalCallback;
103 this.connPoolListener = connPoolListener;
104 this.routeToPool = new HashMap<>();
105 this.pendingRequests = new LinkedList<>();
106 this.leased = new HashSet<>();
107 this.available = new LinkedList<>();
108 this.completedRequests = new ConcurrentLinkedQueue<>();
109 this.maxPerRoute = new HashMap<>();
110 this.lock = new ReentrantLock();
111 this.isShutDown = new AtomicBoolean(false);
112 this.defaultMaxPerRoute = defaultMaxPerRoute;
113 this.maxTotal = maxTotal;
114 }
115
116
117
118
119 public StrictConnPool(
120 final int defaultMaxPerRoute,
121 final int maxTotal,
122 final TimeValue timeToLive,
123 final PoolReusePolicy policy,
124 final ConnPoolListener<T> connPoolListener) {
125 this(defaultMaxPerRoute, maxTotal, timeToLive, policy, null, connPoolListener);
126 }
127
128 public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
129 this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null);
130 }
131
132 public boolean isShutdown() {
133 return this.isShutDown.get();
134 }
135
136 @Override
137 public void close(final CloseMode closeMode) {
138 if (this.isShutDown.compareAndSet(false, true)) {
139 fireCallbacks();
140 this.lock.lock();
141 try {
142 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
143 pool.shutdown(closeMode);
144 }
145 this.routeToPool.clear();
146 this.leased.clear();
147 this.available.clear();
148 this.pendingRequests.clear();
149 } finally {
150 this.lock.unlock();
151 }
152 }
153 }
154
155 @Override
156 public void close() {
157 close(CloseMode.GRACEFUL);
158 }
159
160 private PerRoutePool<T, C> getPool(final T route) {
161 PerRoutePool<T, C> pool = this.routeToPool.get(route);
162 if (pool == null) {
163 pool = new PerRoutePool<>(route, this.disposalCallback);
164 this.routeToPool.put(route, pool);
165 }
166 return pool;
167 }
168
169 @Override
170 public Future<PoolEntry<T, C>> lease(
171 final T route, final Object state,
172 final Timeout requestTimeout,
173 final FutureCallback<PoolEntry<T, C>> callback) {
174 Args.notNull(route, "Route");
175 Args.notNull(requestTimeout, "Request timeout");
176 Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
177 final Deadline deadline = Deadline.calculate(requestTimeout);
178 final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
179
180 @Override
181 public synchronized PoolEntry<T, C> get(
182 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
183 try {
184 return super.get(timeout, unit);
185 } catch (final TimeoutException ex) {
186 cancel();
187 throw ex;
188 }
189 }
190
191 };
192 final boolean acquiredLock;
193
194 try {
195 acquiredLock = this.lock.tryLock(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
196 } catch (final InterruptedException interruptedException) {
197 Thread.currentThread().interrupt();
198 future.cancel();
199 return future;
200 }
201
202 if (acquiredLock) {
203 try {
204 final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
205 final boolean completed = processPendingRequest(request);
206 if (!request.isDone() && !completed) {
207 this.pendingRequests.add(request);
208 }
209 if (request.isDone()) {
210 this.completedRequests.add(request);
211 }
212 } finally {
213 this.lock.unlock();
214 }
215 fireCallbacks();
216 } else {
217 future.failed(DeadlineTimeoutException.from(deadline));
218 }
219
220 return future;
221 }
222
223 public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
224 return lease(route, state, Timeout.DISABLED, null);
225 }
226
227 @Override
228 public void release(final PoolEntry<T, C> entry, final boolean reusable) {
229 if (entry == null) {
230 return;
231 }
232 if (this.isShutDown.get()) {
233 return;
234 }
235 if (!reusable) {
236 entry.discardConnection(CloseMode.GRACEFUL);
237 }
238 this.lock.lock();
239 try {
240 if (this.leased.remove(entry)) {
241 if (this.connPoolListener != null) {
242 this.connPoolListener.onRelease(entry.getRoute(), this);
243 }
244 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
245 final boolean keepAlive = entry.hasConnection() && reusable;
246 pool.free(entry, keepAlive);
247 if (keepAlive) {
248 switch (policy) {
249 case LIFO:
250 this.available.addFirst(entry);
251 break;
252 case FIFO:
253 this.available.addLast(entry);
254 break;
255 default:
256 throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
257 }
258 } else {
259 entry.discardConnection(CloseMode.GRACEFUL);
260 }
261 processNextPendingRequest();
262 } else {
263 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
264 }
265 } finally {
266 this.lock.unlock();
267 }
268 fireCallbacks();
269 }
270
271 private void processPendingRequests() {
272 final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
273 while (it.hasNext()) {
274 final LeaseRequest<T, C> request = it.next();
275 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
276 if (future.isCancelled()) {
277 it.remove();
278 continue;
279 }
280 final boolean completed = processPendingRequest(request);
281 if (request.isDone() || completed) {
282 it.remove();
283 }
284 if (request.isDone()) {
285 this.completedRequests.add(request);
286 }
287 }
288 }
289
290 private void processNextPendingRequest() {
291 final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
292 while (it.hasNext()) {
293 final LeaseRequest<T, C> request = it.next();
294 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
295 if (future.isCancelled()) {
296 it.remove();
297 continue;
298 }
299 final boolean completed = processPendingRequest(request);
300 if (request.isDone() || completed) {
301 it.remove();
302 }
303 if (request.isDone()) {
304 this.completedRequests.add(request);
305 }
306 if (completed) {
307 return;
308 }
309 }
310 }
311
312 private boolean processPendingRequest(final LeaseRequest<T, C> request) {
313 final T route = request.getRoute();
314 final Object state = request.getState();
315 final Deadline deadline = request.getDeadline();
316
317 if (deadline.isExpired()) {
318 request.failed(DeadlineTimeoutException.from(deadline));
319 return false;
320 }
321
322 final PerRoutePool<T, C> pool = getPool(route);
323 PoolEntry<T, C> entry;
324 for (;;) {
325 entry = pool.getFree(state);
326 if (entry == null) {
327 break;
328 }
329 if (entry.getExpiryDeadline().isExpired()) {
330 entry.discardConnection(CloseMode.GRACEFUL);
331 this.available.remove(entry);
332 pool.free(entry, false);
333 } else {
334 break;
335 }
336 }
337 if (entry != null) {
338 this.available.remove(entry);
339 this.leased.add(entry);
340 request.completed(entry);
341 if (this.connPoolListener != null) {
342 this.connPoolListener.onLease(entry.getRoute(), this);
343 }
344 return true;
345 }
346
347
348 final int maxPerRoute = getMax(route);
349
350 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
351 if (excess > 0) {
352 for (int i = 0; i < excess; i++) {
353 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
354 if (lastUsed == null) {
355 break;
356 }
357 lastUsed.discardConnection(CloseMode.GRACEFUL);
358 this.available.remove(lastUsed);
359 pool.remove(lastUsed);
360 }
361 }
362
363 if (pool.getAllocatedCount() < maxPerRoute) {
364 final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
365 if (freeCapacity == 0) {
366 return false;
367 }
368 final int totalAvailable = this.available.size();
369 if (totalAvailable > freeCapacity - 1) {
370 final PoolEntry<T, C> lastUsed = this.available.removeLast();
371 lastUsed.discardConnection(CloseMode.GRACEFUL);
372 final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
373 otherpool.remove(lastUsed);
374 }
375
376 entry = pool.createEntry(this.timeToLive);
377 this.leased.add(entry);
378 request.completed(entry);
379 if (this.connPoolListener != null) {
380 this.connPoolListener.onLease(entry.getRoute(), this);
381 }
382 return true;
383 }
384 return false;
385 }
386
387 private void fireCallbacks() {
388 LeaseRequest<T, C> request;
389 while ((request = this.completedRequests.poll()) != null) {
390 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
391 final Exception ex = request.getException();
392 final PoolEntry<T, C> result = request.getResult();
393 boolean successfullyCompleted = false;
394 if (ex != null) {
395 future.failed(ex);
396 } else if (result != null) {
397 if (future.completed(result)) {
398 successfullyCompleted = true;
399 }
400 } else {
401 future.cancel();
402 }
403 if (!successfullyCompleted) {
404 release(result, true);
405 }
406 }
407 }
408
409 public void validatePendingRequests() {
410 this.lock.lock();
411 try {
412 final long now = System.currentTimeMillis();
413 final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
414 while (it.hasNext()) {
415 final LeaseRequest<T, C> request = it.next();
416 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
417 if (future.isCancelled() && !request.isDone()) {
418 it.remove();
419 } else {
420 final Deadline deadline = request.getDeadline();
421 if (deadline.isBefore(now)) {
422 request.failed(DeadlineTimeoutException.from(deadline));
423 }
424 if (request.isDone()) {
425 it.remove();
426 this.completedRequests.add(request);
427 }
428 }
429 }
430 } finally {
431 this.lock.unlock();
432 }
433 fireCallbacks();
434 }
435
436 private int getMax(final T route) {
437 final Integer v = this.maxPerRoute.get(route);
438 if (v != null) {
439 return v;
440 }
441 return this.defaultMaxPerRoute;
442 }
443
444 @Override
445 public void setMaxTotal(final int max) {
446 Args.positive(max, "Max value");
447 this.lock.lock();
448 try {
449 this.maxTotal = max;
450 } finally {
451 this.lock.unlock();
452 }
453 }
454
455 @Override
456 public int getMaxTotal() {
457 this.lock.lock();
458 try {
459 return this.maxTotal;
460 } finally {
461 this.lock.unlock();
462 }
463 }
464
465 @Override
466 public void setDefaultMaxPerRoute(final int max) {
467 Args.positive(max, "Max value");
468 this.lock.lock();
469 try {
470 this.defaultMaxPerRoute = max;
471 } finally {
472 this.lock.unlock();
473 }
474 }
475
476 @Override
477 public int getDefaultMaxPerRoute() {
478 this.lock.lock();
479 try {
480 return this.defaultMaxPerRoute;
481 } finally {
482 this.lock.unlock();
483 }
484 }
485
486 @Override
487 public void setMaxPerRoute(final T route, final int max) {
488 Args.notNull(route, "Route");
489 this.lock.lock();
490 try {
491 if (max > -1) {
492 this.maxPerRoute.put(route, max);
493 } else {
494 this.maxPerRoute.remove(route);
495 }
496 } finally {
497 this.lock.unlock();
498 }
499 }
500
501 @Override
502 public int getMaxPerRoute(final T route) {
503 Args.notNull(route, "Route");
504 this.lock.lock();
505 try {
506 return getMax(route);
507 } finally {
508 this.lock.unlock();
509 }
510 }
511
512 @Override
513 public PoolStats getTotalStats() {
514 this.lock.lock();
515 try {
516 return new PoolStats(
517 this.leased.size(),
518 this.pendingRequests.size(),
519 this.available.size(),
520 this.maxTotal);
521 } finally {
522 this.lock.unlock();
523 }
524 }
525
526 @Override
527 public PoolStats getStats(final T route) {
528 Args.notNull(route, "Route");
529 this.lock.lock();
530 try {
531 final PerRoutePool<T, C> pool = getPool(route);
532 int pendingCount = 0;
533 for (final LeaseRequest<T, C> request: pendingRequests) {
534 if (Objects.equals(route, request.getRoute())) {
535 pendingCount++;
536 }
537 }
538 return new PoolStats(
539 pool.getLeasedCount(),
540 pendingCount,
541 pool.getAvailableCount(),
542 getMax(route));
543 } finally {
544 this.lock.unlock();
545 }
546 }
547
548
549
550
551
552
553 @Override
554 public Set<T> getRoutes() {
555 this.lock.lock();
556 try {
557 return new HashSet<>(routeToPool.keySet());
558 } finally {
559 this.lock.unlock();
560 }
561 }
562
563
564
565
566
567
568 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
569 this.lock.lock();
570 try {
571 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
572 while (it.hasNext()) {
573 final PoolEntry<T, C> entry = it.next();
574 callback.execute(entry);
575 if (!entry.hasConnection()) {
576 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
577 pool.remove(entry);
578 it.remove();
579 }
580 }
581 processPendingRequests();
582 purgePoolMap();
583 } finally {
584 this.lock.unlock();
585 }
586 }
587
588
589
590
591
592
593 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
594 this.lock.lock();
595 try {
596 final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
597 while (it.hasNext()) {
598 final PoolEntry<T, C> entry = it.next();
599 callback.execute(entry);
600 }
601 processPendingRequests();
602 } finally {
603 this.lock.unlock();
604 }
605 }
606
607 private void purgePoolMap() {
608 final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
609 while (it.hasNext()) {
610 final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
611 final PerRoutePool<T, C> pool = entry.getValue();
612 if (pool.getAllocatedCount() == 0) {
613 it.remove();
614 }
615 }
616 }
617
618 @Override
619 public void closeIdle(final TimeValue idleTime) {
620 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
621 enumAvailable(entry -> {
622 if (entry.getUpdated() <= deadline) {
623 entry.discardConnection(CloseMode.GRACEFUL);
624 }
625 });
626 }
627
628 @Override
629 public void closeExpired() {
630 final long now = System.currentTimeMillis();
631 enumAvailable(entry -> {
632 if (entry.getExpiryDeadline().isBefore(now)) {
633 entry.discardConnection(CloseMode.GRACEFUL);
634 }
635 });
636 }
637
638 @Override
639 public String toString() {
640 final StringBuilder buffer = new StringBuilder();
641 buffer.append("[leased: ");
642 buffer.append(this.leased.size());
643 buffer.append("][available: ");
644 buffer.append(this.available.size());
645 buffer.append("][pending: ");
646 buffer.append(this.pendingRequests.size());
647 buffer.append("]");
648 return buffer.toString();
649 }
650
651
652 static class LeaseRequest<T, C extends ModalCloseable> {
653
654 private final T route;
655 private final Object state;
656 private final Deadline deadline;
657 private final BasicFuture<PoolEntry<T, C>> future;
658
659
660 private final AtomicBoolean completed;
661 private volatile PoolEntry<T, C> result;
662 private volatile Exception ex;
663
664
665
666
667
668
669
670
671
672 public LeaseRequest(
673 final T route,
674 final Object state,
675 final Timeout requestTimeout,
676 final BasicFuture<PoolEntry<T, C>> future) {
677 super();
678 this.route = route;
679 this.state = state;
680 this.deadline = Deadline.calculate(requestTimeout);
681 this.future = future;
682 this.completed = new AtomicBoolean(false);
683 }
684
685 public T getRoute() {
686 return this.route;
687 }
688
689 public Object getState() {
690 return this.state;
691 }
692
693 public Deadline getDeadline() {
694 return this.deadline;
695 }
696
697 public boolean isDone() {
698
699
700
701 return ex != null || result != null;
702 }
703
704 public void failed(final Exception ex) {
705 if (this.completed.compareAndSet(false, true)) {
706 this.ex = ex;
707 }
708 }
709
710 public void completed(final PoolEntry<T, C> result) {
711 if (this.completed.compareAndSet(false, true)) {
712 this.result = result;
713 }
714 }
715
716 public BasicFuture<PoolEntry<T, C>> getFuture() {
717 return this.future;
718 }
719
720 public PoolEntry<T, C> getResult() {
721 return this.result;
722 }
723
724 public Exception getException() {
725 return this.ex;
726 }
727
728 @Override
729 public String toString() {
730 final StringBuilder buffer = new StringBuilder();
731 buffer.append("[");
732 buffer.append(this.route);
733 buffer.append("][");
734 buffer.append(this.state);
735 buffer.append("]");
736 return buffer.toString();
737 }
738
739 }
740
741 static class PerRoutePool<T, C extends ModalCloseable> {
742
743 private final T route;
744 private final Set<PoolEntry<T, C>> leased;
745 private final LinkedList<PoolEntry<T, C>> available;
746 private final DisposalCallback<C> disposalCallback;
747
748 PerRoutePool(final T route, final DisposalCallback<C> disposalCallback) {
749 super();
750 this.route = route;
751 this.disposalCallback = disposalCallback;
752 this.leased = new HashSet<>();
753 this.available = new LinkedList<>();
754 }
755
756 public final T getRoute() {
757 return route;
758 }
759
760 public int getLeasedCount() {
761 return this.leased.size();
762 }
763
764 public int getAvailableCount() {
765 return this.available.size();
766 }
767
768 public int getAllocatedCount() {
769 return this.available.size() + this.leased.size();
770 }
771
772 public PoolEntry<T, C> getFree(final Object state) {
773 if (!this.available.isEmpty()) {
774 if (state != null) {
775 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
776 while (it.hasNext()) {
777 final PoolEntry<T, C> entry = it.next();
778 if (state.equals(entry.getState())) {
779 it.remove();
780 this.leased.add(entry);
781 return entry;
782 }
783 }
784 }
785 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
786 while (it.hasNext()) {
787 final PoolEntry<T, C> entry = it.next();
788 if (entry.getState() == null) {
789 it.remove();
790 this.leased.add(entry);
791 return entry;
792 }
793 }
794 }
795 return null;
796 }
797
798 public PoolEntry<T, C> getLastUsed() {
799 return this.available.peekLast();
800 }
801
802 public boolean remove(final PoolEntry<T, C> entry) {
803 return this.available.remove(entry) || this.leased.remove(entry);
804 }
805
806 public void free(final PoolEntry<T, C> entry, final boolean reusable) {
807 final boolean found = this.leased.remove(entry);
808 Asserts.check(found, "Entry %s has not been leased from this pool", entry);
809 if (reusable) {
810 this.available.addFirst(entry);
811 }
812 }
813
814 public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
815 final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
816 this.leased.add(entry);
817 return entry;
818 }
819
820 public void shutdown(final CloseMode closeMode) {
821 PoolEntry<T, C> availableEntry;
822 while ((availableEntry = available.poll()) != null) {
823 availableEntry.discardConnection(closeMode);
824 }
825 for (final PoolEntry<T, C> entry: this.leased) {
826 entry.discardConnection(closeMode);
827 }
828 this.leased.clear();
829 }
830
831 @Override
832 public String toString() {
833 final StringBuilder buffer = new StringBuilder();
834 buffer.append("[route: ");
835 buffer.append(this.route);
836 buffer.append("][leased: ");
837 buffer.append(this.leased.size());
838 buffer.append("][available: ");
839 buffer.append(this.available.size());
840 buffer.append("]");
841 return buffer.toString();
842 }
843
844 }
845 }