1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.pool;
28
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.LinkedList;
33 import java.util.ListIterator;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.locks.Lock;
40 import java.util.concurrent.locks.ReentrantLock;
41
42 import org.apache.hc.core5.annotation.Contract;
43 import org.apache.hc.core5.annotation.ThreadingBehavior;
44 import org.apache.hc.core5.concurrent.BasicFuture;
45 import org.apache.hc.core5.concurrent.FutureCallback;
46 import org.apache.hc.core5.function.Callback;
47 import org.apache.hc.core5.io.CloseMode;
48 import org.apache.hc.core5.io.ModalCloseable;
49 import org.apache.hc.core5.util.Args;
50 import org.apache.hc.core5.util.Asserts;
51 import org.apache.hc.core5.util.Deadline;
52 import org.apache.hc.core5.util.DeadlineTimeoutException;
53 import org.apache.hc.core5.util.LangUtils;
54 import org.apache.hc.core5.util.TimeValue;
55 import org.apache.hc.core5.util.Timeout;
56
57
58
59
60
61
62
63
64
65 @Contract(threading = ThreadingBehavior.SAFE)
66 public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
67
68 private final TimeValue timeToLive;
69 private final PoolReusePolicy policy;
70 private final DisposalCallback<C> disposalCallback;
71 private final ConnPoolListener<T> connPoolListener;
72 private final Map<T, PerRoutePool<T, C>> routeToPool;
73 private final LinkedList<LeaseRequest<T, C>> leasingRequests;
74 private final Set<PoolEntry<T, C>> leased;
75 private final LinkedList<PoolEntry<T, C>> available;
76 private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
77 private final Map<T, Integer> maxPerRoute;
78 private final Lock lock;
79 private final AtomicBoolean isShutDown;
80
81 private volatile int defaultMaxPerRoute;
82 private volatile int maxTotal;
83
84
85
86
87 public StrictConnPool(
88 final int defaultMaxPerRoute,
89 final int maxTotal,
90 final TimeValue timeToLive,
91 final PoolReusePolicy policy,
92 final DisposalCallback<C> disposalCallback,
93 final ConnPoolListener<T> connPoolListener) {
94 super();
95 Args.positive(defaultMaxPerRoute, "Max per route value");
96 Args.positive(maxTotal, "Max total value");
97 this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
98 this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
99 this.disposalCallback = disposalCallback;
100 this.connPoolListener = connPoolListener;
101 this.routeToPool = new HashMap<>();
102 this.leasingRequests = new LinkedList<>();
103 this.leased = new HashSet<>();
104 this.available = new LinkedList<>();
105 this.completedRequests = new ConcurrentLinkedQueue<>();
106 this.maxPerRoute = new HashMap<>();
107 this.lock = new ReentrantLock();
108 this.isShutDown = new AtomicBoolean(false);
109 this.defaultMaxPerRoute = defaultMaxPerRoute;
110 this.maxTotal = maxTotal;
111 }
112
113
114
115
116 public StrictConnPool(
117 final int defaultMaxPerRoute,
118 final int maxTotal,
119 final TimeValue timeToLive,
120 final PoolReusePolicy policy,
121 final ConnPoolListener<T> connPoolListener) {
122 this(defaultMaxPerRoute, maxTotal, timeToLive, policy, null, connPoolListener);
123 }
124
125 public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
126 this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null);
127 }
128
129 public boolean isShutdown() {
130 return this.isShutDown.get();
131 }
132
133 @Override
134 public void close(final CloseMode closeMode) {
135 if (this.isShutDown.compareAndSet(false, true)) {
136 fireCallbacks();
137 this.lock.lock();
138 try {
139 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
140 pool.shutdown(closeMode);
141 }
142 this.routeToPool.clear();
143 this.leased.clear();
144 this.available.clear();
145 this.leasingRequests.clear();
146 } finally {
147 this.lock.unlock();
148 }
149 }
150 }
151
152 @Override
153 public void close() {
154 close(CloseMode.GRACEFUL);
155 }
156
157 private PerRoutePool<T, C> getPool(final T route) {
158 PerRoutePool<T, C> pool = this.routeToPool.get(route);
159 if (pool == null) {
160 pool = new PerRoutePool<>(route, this.disposalCallback);
161 this.routeToPool.put(route, pool);
162 }
163 return pool;
164 }
165
166 @Override
167 public Future<PoolEntry<T, C>> lease(
168 final T route, final Object state,
169 final Timeout requestTimeout,
170 final FutureCallback<PoolEntry<T, C>> callback) {
171 Args.notNull(route, "Route");
172 Args.notNull(requestTimeout, "Request timeout");
173 Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
174 final Deadline deadline = Deadline.calculate(requestTimeout);
175 final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<>(callback);
176 final boolean acquiredLock;
177
178 try {
179 acquiredLock = this.lock.tryLock(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
180 } catch (final InterruptedException interruptedException) {
181 Thread.currentThread().interrupt();
182 future.cancel();
183 return future;
184 }
185
186 if (acquiredLock) {
187 try {
188 final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
189 final boolean completed = processPendingRequest(request);
190 if (!request.isDone() && !completed) {
191 this.leasingRequests.add(request);
192 }
193 if (request.isDone()) {
194 this.completedRequests.add(request);
195 }
196 } finally {
197 this.lock.unlock();
198 }
199 fireCallbacks();
200 } else {
201 future.failed(DeadlineTimeoutException.from(deadline));
202 }
203
204 return future;
205 }
206
207 public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
208 return lease(route, state, Timeout.DISABLED, null);
209 }
210
211 @Override
212 public void release(final PoolEntry<T, C> entry, final boolean reusable) {
213 if (entry == null) {
214 return;
215 }
216 if (this.isShutDown.get()) {
217 return;
218 }
219 if (!reusable) {
220 entry.discardConnection(CloseMode.GRACEFUL);
221 }
222 this.lock.lock();
223 try {
224 if (this.leased.remove(entry)) {
225 if (this.connPoolListener != null) {
226 this.connPoolListener.onRelease(entry.getRoute(), this);
227 }
228 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
229 final boolean keepAlive = entry.hasConnection() && reusable;
230 pool.free(entry, keepAlive);
231 if (keepAlive) {
232 switch (policy) {
233 case LIFO:
234 this.available.addFirst(entry);
235 break;
236 case FIFO:
237 this.available.addLast(entry);
238 break;
239 default:
240 throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
241 }
242 } else {
243 entry.discardConnection(CloseMode.GRACEFUL);
244 }
245 processNextPendingRequest();
246 } else {
247 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
248 }
249 } finally {
250 this.lock.unlock();
251 }
252 fireCallbacks();
253 }
254
255 private void processPendingRequests() {
256 final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
257 while (it.hasNext()) {
258 final LeaseRequest<T, C> request = it.next();
259 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
260 if (future.isCancelled()) {
261 it.remove();
262 continue;
263 }
264 final boolean completed = processPendingRequest(request);
265 if (request.isDone() || completed) {
266 it.remove();
267 }
268 if (request.isDone()) {
269 this.completedRequests.add(request);
270 }
271 }
272 }
273
274 private void processNextPendingRequest() {
275 final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
276 while (it.hasNext()) {
277 final LeaseRequest<T, C> request = it.next();
278 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
279 if (future.isCancelled()) {
280 it.remove();
281 continue;
282 }
283 final boolean completed = processPendingRequest(request);
284 if (request.isDone() || completed) {
285 it.remove();
286 }
287 if (request.isDone()) {
288 this.completedRequests.add(request);
289 }
290 if (completed) {
291 return;
292 }
293 }
294 }
295
296 private boolean processPendingRequest(final LeaseRequest<T, C> request) {
297 final T route = request.getRoute();
298 final Object state = request.getState();
299 final Deadline deadline = request.getDeadline();
300
301 if (deadline.isExpired()) {
302 request.failed(DeadlineTimeoutException.from(deadline));
303 return false;
304 }
305
306 final PerRoutePool<T, C> pool = getPool(route);
307 PoolEntry<T, C> entry;
308 for (;;) {
309 entry = pool.getFree(state);
310 if (entry == null) {
311 break;
312 }
313 if (entry.getExpiryDeadline().isExpired()) {
314 entry.discardConnection(CloseMode.GRACEFUL);
315 this.available.remove(entry);
316 pool.free(entry, false);
317 } else {
318 break;
319 }
320 }
321 if (entry != null) {
322 this.available.remove(entry);
323 this.leased.add(entry);
324 request.completed(entry);
325 if (this.connPoolListener != null) {
326 this.connPoolListener.onLease(entry.getRoute(), this);
327 }
328 return true;
329 }
330
331
332 final int maxPerRoute = getMax(route);
333
334 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
335 if (excess > 0) {
336 for (int i = 0; i < excess; i++) {
337 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
338 if (lastUsed == null) {
339 break;
340 }
341 lastUsed.discardConnection(CloseMode.GRACEFUL);
342 this.available.remove(lastUsed);
343 pool.remove(lastUsed);
344 }
345 }
346
347 if (pool.getAllocatedCount() < maxPerRoute) {
348 final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
349 if (freeCapacity == 0) {
350 return false;
351 }
352 final int totalAvailable = this.available.size();
353 if (totalAvailable > freeCapacity - 1) {
354 if (!this.available.isEmpty()) {
355 final PoolEntry<T, C> lastUsed = this.available.removeLast();
356 lastUsed.discardConnection(CloseMode.GRACEFUL);
357 final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
358 otherpool.remove(lastUsed);
359 }
360 }
361
362 entry = pool.createEntry(this.timeToLive);
363 this.leased.add(entry);
364 request.completed(entry);
365 if (this.connPoolListener != null) {
366 this.connPoolListener.onLease(entry.getRoute(), this);
367 }
368 return true;
369 }
370 return false;
371 }
372
373 private void fireCallbacks() {
374 LeaseRequest<T, C> request;
375 while ((request = this.completedRequests.poll()) != null) {
376 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
377 final Exception ex = request.getException();
378 final PoolEntry<T, C> result = request.getResult();
379 boolean successfullyCompleted = false;
380 if (ex != null) {
381 future.failed(ex);
382 } else if (result != null) {
383 if (future.completed(result)) {
384 successfullyCompleted = true;
385 }
386 } else {
387 future.cancel();
388 }
389 if (!successfullyCompleted) {
390 release(result, true);
391 }
392 }
393 }
394
395 public void validatePendingRequests() {
396 this.lock.lock();
397 try {
398 final long now = System.currentTimeMillis();
399 final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
400 while (it.hasNext()) {
401 final LeaseRequest<T, C> request = it.next();
402 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
403 if (future.isCancelled() && !request.isDone()) {
404 it.remove();
405 } else {
406 final Deadline deadline = request.getDeadline();
407 if (deadline.isBefore(now)) {
408 request.failed(DeadlineTimeoutException.from(deadline));
409 }
410 if (request.isDone()) {
411 it.remove();
412 this.completedRequests.add(request);
413 }
414 }
415 }
416 } finally {
417 this.lock.unlock();
418 }
419 fireCallbacks();
420 }
421
422 private int getMax(final T route) {
423 final Integer v = this.maxPerRoute.get(route);
424 if (v != null) {
425 return v;
426 }
427 return this.defaultMaxPerRoute;
428 }
429
430 @Override
431 public void setMaxTotal(final int max) {
432 Args.positive(max, "Max value");
433 this.lock.lock();
434 try {
435 this.maxTotal = max;
436 } finally {
437 this.lock.unlock();
438 }
439 }
440
441 @Override
442 public int getMaxTotal() {
443 this.lock.lock();
444 try {
445 return this.maxTotal;
446 } finally {
447 this.lock.unlock();
448 }
449 }
450
451 @Override
452 public void setDefaultMaxPerRoute(final int max) {
453 Args.positive(max, "Max value");
454 this.lock.lock();
455 try {
456 this.defaultMaxPerRoute = max;
457 } finally {
458 this.lock.unlock();
459 }
460 }
461
462 @Override
463 public int getDefaultMaxPerRoute() {
464 this.lock.lock();
465 try {
466 return this.defaultMaxPerRoute;
467 } finally {
468 this.lock.unlock();
469 }
470 }
471
472 @Override
473 public void setMaxPerRoute(final T route, final int max) {
474 Args.notNull(route, "Route");
475 this.lock.lock();
476 try {
477 if (max > -1) {
478 this.maxPerRoute.put(route, max);
479 } else {
480 this.maxPerRoute.remove(route);
481 }
482 } finally {
483 this.lock.unlock();
484 }
485 }
486
487 @Override
488 public int getMaxPerRoute(final T route) {
489 Args.notNull(route, "Route");
490 this.lock.lock();
491 try {
492 return getMax(route);
493 } finally {
494 this.lock.unlock();
495 }
496 }
497
498 @Override
499 public PoolStats getTotalStats() {
500 this.lock.lock();
501 try {
502 return new PoolStats(
503 this.leased.size(),
504 this.leasingRequests.size(),
505 this.available.size(),
506 this.maxTotal);
507 } finally {
508 this.lock.unlock();
509 }
510 }
511
512 @Override
513 public PoolStats getStats(final T route) {
514 Args.notNull(route, "Route");
515 this.lock.lock();
516 try {
517 final PerRoutePool<T, C> pool = getPool(route);
518 int pendingCount = 0;
519 for (final LeaseRequest<T, C> request: leasingRequests) {
520 if (LangUtils.equals(route, request.getRoute())) {
521 pendingCount++;
522 }
523 }
524 return new PoolStats(
525 pool.getLeasedCount(),
526 pendingCount,
527 pool.getAvailableCount(),
528 getMax(route));
529 } finally {
530 this.lock.unlock();
531 }
532 }
533
534
535
536
537
538
539 @Override
540 public Set<T> getRoutes() {
541 this.lock.lock();
542 try {
543 return new HashSet<>(routeToPool.keySet());
544 } finally {
545 this.lock.unlock();
546 }
547 }
548
549
550
551
552
553
554 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
555 this.lock.lock();
556 try {
557 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
558 while (it.hasNext()) {
559 final PoolEntry<T, C> entry = it.next();
560 callback.execute(entry);
561 if (!entry.hasConnection()) {
562 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
563 pool.remove(entry);
564 it.remove();
565 }
566 }
567 processPendingRequests();
568 purgePoolMap();
569 } finally {
570 this.lock.unlock();
571 }
572 }
573
574
575
576
577
578
579 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
580 this.lock.lock();
581 try {
582 final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
583 while (it.hasNext()) {
584 final PoolEntry<T, C> entry = it.next();
585 callback.execute(entry);
586 }
587 processPendingRequests();
588 } finally {
589 this.lock.unlock();
590 }
591 }
592
593 private void purgePoolMap() {
594 final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
595 while (it.hasNext()) {
596 final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
597 final PerRoutePool<T, C> pool = entry.getValue();
598 if (pool.getAllocatedCount() == 0) {
599 it.remove();
600 }
601 }
602 }
603
604 @Override
605 public void closeIdle(final TimeValue idleTime) {
606 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
607 enumAvailable(new Callback<PoolEntry<T, C>>() {
608
609 @Override
610 public void execute(final PoolEntry<T, C> entry) {
611 if (entry.getUpdated() <= deadline) {
612 entry.discardConnection(CloseMode.GRACEFUL);
613 }
614 }
615
616 });
617 }
618
619 @Override
620 public void closeExpired() {
621 final long now = System.currentTimeMillis();
622 enumAvailable(new Callback<PoolEntry<T, C>>() {
623
624 @Override
625 public void execute(final PoolEntry<T, C> entry) {
626 if (entry.getExpiryDeadline().isBefore(now)) {
627 entry.discardConnection(CloseMode.GRACEFUL);
628 }
629 }
630
631 });
632 }
633
634 @Override
635 public String toString() {
636 final StringBuilder buffer = new StringBuilder();
637 buffer.append("[leased: ");
638 buffer.append(this.leased.size());
639 buffer.append("][available: ");
640 buffer.append(this.available.size());
641 buffer.append("][pending: ");
642 buffer.append(this.leasingRequests.size());
643 buffer.append("]");
644 return buffer.toString();
645 }
646
647
648 static class LeaseRequest<T, C extends ModalCloseable> {
649
650 private final T route;
651 private final Object state;
652 private final Deadline deadline;
653 private final BasicFuture<PoolEntry<T, C>> future;
654 private final AtomicBoolean completed;
655 private volatile PoolEntry<T, C> result;
656 private volatile Exception ex;
657
658
659
660
661
662
663
664
665
666 public LeaseRequest(
667 final T route,
668 final Object state,
669 final Timeout requestTimeout,
670 final BasicFuture<PoolEntry<T, C>> future) {
671 super();
672 this.route = route;
673 this.state = state;
674 this.deadline = Deadline.calculate(requestTimeout);
675 this.future = future;
676 this.completed = new AtomicBoolean(false);
677 }
678
679 public T getRoute() {
680 return this.route;
681 }
682
683 public Object getState() {
684 return this.state;
685 }
686
687 public Deadline getDeadline() {
688 return this.deadline;
689 }
690
691 public boolean isDone() {
692 return this.completed.get();
693 }
694
695 public void failed(final Exception ex) {
696 if (this.completed.compareAndSet(false, true)) {
697 this.ex = ex;
698 }
699 }
700
701 public void completed(final PoolEntry<T, C> result) {
702 if (this.completed.compareAndSet(false, true)) {
703 this.result = result;
704 }
705 }
706
707 public BasicFuture<PoolEntry<T, C>> getFuture() {
708 return this.future;
709 }
710
711 public PoolEntry<T, C> getResult() {
712 return this.result;
713 }
714
715 public Exception getException() {
716 return this.ex;
717 }
718
719 @Override
720 public String toString() {
721 final StringBuilder buffer = new StringBuilder();
722 buffer.append("[");
723 buffer.append(this.route);
724 buffer.append("][");
725 buffer.append(this.state);
726 buffer.append("]");
727 return buffer.toString();
728 }
729
730 }
731
732 static class PerRoutePool<T, C extends ModalCloseable> {
733
734 private final T route;
735 private final Set<PoolEntry<T, C>> leased;
736 private final LinkedList<PoolEntry<T, C>> available;
737 private final DisposalCallback<C> disposalCallback;
738
739 PerRoutePool(final T route, final DisposalCallback<C> disposalCallback) {
740 super();
741 this.route = route;
742 this.disposalCallback = disposalCallback;
743 this.leased = new HashSet<>();
744 this.available = new LinkedList<>();
745 }
746
747 public final T getRoute() {
748 return route;
749 }
750
751 public int getLeasedCount() {
752 return this.leased.size();
753 }
754
755 public int getAvailableCount() {
756 return this.available.size();
757 }
758
759 public int getAllocatedCount() {
760 return this.available.size() + this.leased.size();
761 }
762
763 public PoolEntry<T, C> getFree(final Object state) {
764 if (!this.available.isEmpty()) {
765 if (state != null) {
766 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
767 while (it.hasNext()) {
768 final PoolEntry<T, C> entry = it.next();
769 if (state.equals(entry.getState())) {
770 it.remove();
771 this.leased.add(entry);
772 return entry;
773 }
774 }
775 }
776 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
777 while (it.hasNext()) {
778 final PoolEntry<T, C> entry = it.next();
779 if (entry.getState() == null) {
780 it.remove();
781 this.leased.add(entry);
782 return entry;
783 }
784 }
785 }
786 return null;
787 }
788
789 public PoolEntry<T, C> getLastUsed() {
790 return this.available.peekLast();
791 }
792
793 public boolean remove(final PoolEntry<T, C> entry) {
794 return this.available.remove(entry) || this.leased.remove(entry);
795 }
796
797 public void free(final PoolEntry<T, C> entry, final boolean reusable) {
798 final boolean found = this.leased.remove(entry);
799 Asserts.check(found, "Entry %s has not been leased from this pool", entry);
800 if (reusable) {
801 this.available.addFirst(entry);
802 }
803 }
804
805 public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
806 final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
807 this.leased.add(entry);
808 return entry;
809 }
810
811 public void shutdown(final CloseMode closeMode) {
812 PoolEntry<T, C> availableEntry;
813 while ((availableEntry = available.poll()) != null) {
814 availableEntry.discardConnection(closeMode);
815 }
816 for (final PoolEntry<T, C> entry: this.leased) {
817 entry.discardConnection(closeMode);
818 }
819 this.leased.clear();
820 }
821
822 @Override
823 public String toString() {
824 final StringBuilder buffer = new StringBuilder();
825 buffer.append("[route: ");
826 buffer.append(this.route);
827 buffer.append("][leased: ");
828 buffer.append(this.leased.size());
829 buffer.append("][available: ");
830 buffer.append(this.available.size());
831 buffer.append("]");
832 return buffer.toString();
833 }
834
835 }
836 }