1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.pool;
28
29 import java.util.Deque;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.Objects;
33 import java.util.Set;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ConcurrentLinkedDeque;
36 import java.util.concurrent.ConcurrentMap;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.concurrent.atomic.AtomicLong;
44 import java.util.concurrent.atomic.AtomicMarkableReference;
45
46 import org.apache.hc.core5.annotation.Contract;
47 import org.apache.hc.core5.annotation.Experimental;
48 import org.apache.hc.core5.annotation.ThreadingBehavior;
49 import org.apache.hc.core5.concurrent.BasicFuture;
50 import org.apache.hc.core5.concurrent.Cancellable;
51 import org.apache.hc.core5.concurrent.FutureCallback;
52 import org.apache.hc.core5.function.Callback;
53 import org.apache.hc.core5.io.CloseMode;
54 import org.apache.hc.core5.io.ModalCloseable;
55 import org.apache.hc.core5.util.Args;
56 import org.apache.hc.core5.util.Asserts;
57 import org.apache.hc.core5.util.Deadline;
58 import org.apache.hc.core5.util.DeadlineTimeoutException;
59 import org.apache.hc.core5.util.TimeValue;
60 import org.apache.hc.core5.util.Timeout;
61
62
63
64
65
66
67
68
69
70 @Contract(threading = ThreadingBehavior.SAFE)
71 @Experimental
72 public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
73
74 private final TimeValue timeToLive;
75 private final PoolReusePolicy policy;
76 private final DisposalCallback<C> disposalCallback;
77 private final ConnPoolListener<T> connPoolListener;
78 private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
79 private final AtomicBoolean isShutDown;
80
81 private volatile int defaultMaxPerRoute;
82
83
84
85
86 public LaxConnPool(
87 final int defaultMaxPerRoute,
88 final TimeValue timeToLive,
89 final PoolReusePolicy policy,
90 final DisposalCallback<C> disposalCallback,
91 final ConnPoolListener<T> connPoolListener) {
92 super();
93 Args.positive(defaultMaxPerRoute, "Max per route value");
94 this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
95 this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
96 this.disposalCallback = disposalCallback;
97 this.connPoolListener = connPoolListener;
98 this.routeToPool = new ConcurrentHashMap<>();
99 this.isShutDown = new AtomicBoolean(false);
100 this.defaultMaxPerRoute = defaultMaxPerRoute;
101 }
102
103
104
105
106 public LaxConnPool(
107 final int defaultMaxPerRoute,
108 final TimeValue timeToLive,
109 final PoolReusePolicy policy,
110 final ConnPoolListener<T> connPoolListener) {
111 this(defaultMaxPerRoute, timeToLive, policy, null, connPoolListener);
112 }
113
114 public LaxConnPool(final int defaultMaxPerRoute) {
115 this(defaultMaxPerRoute, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null, null);
116 }
117
118 public boolean isShutdown() {
119 return isShutDown.get();
120 }
121
122 @Override
123 public void close(final CloseMode closeMode) {
124 if (isShutDown.compareAndSet(false, true)) {
125 for (final Iterator<PerRoutePool<T, C>> it = routeToPool.values().iterator(); it.hasNext(); ) {
126 final PerRoutePool<T, C> routePool = it.next();
127 routePool.shutdown(closeMode);
128 }
129 routeToPool.clear();
130 }
131 }
132
133 @Override
134 public void close() {
135 close(CloseMode.GRACEFUL);
136 }
137
138 private PerRoutePool<T, C> getPool(final T route) {
139 PerRoutePool<T, C> routePool = routeToPool.get(route);
140 if (routePool == null) {
141 final PerRoutePool<T, C> newRoutePool = new PerRoutePool<>(
142 route,
143 defaultMaxPerRoute,
144 timeToLive,
145 policy,
146 this,
147 disposalCallback,
148 connPoolListener);
149 routePool = routeToPool.putIfAbsent(route, newRoutePool);
150 if (routePool == null) {
151 routePool = newRoutePool;
152 }
153 }
154 return routePool;
155 }
156
157 @Override
158 public Future<PoolEntry<T, C>> lease(
159 final T route, final Object state,
160 final Timeout requestTimeout,
161 final FutureCallback<PoolEntry<T, C>> callback) {
162 Args.notNull(route, "Route");
163 Asserts.check(!isShutDown.get(), "Connection pool shut down");
164 final PerRoutePool<T, C> routePool = getPool(route);
165 return routePool.lease(state, requestTimeout, callback);
166 }
167
168 public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
169 return lease(route, state, Timeout.DISABLED, null);
170 }
171
172 @Override
173 public void release(final PoolEntry<T, C> entry, final boolean reusable) {
174 if (entry == null) {
175 return;
176 }
177 if (isShutDown.get()) {
178 return;
179 }
180 final PerRoutePool<T, C> routePool = getPool(entry.getRoute());
181 routePool.release(entry, reusable);
182 }
183
184 public void validatePendingRequests() {
185 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
186 routePool.validatePendingRequests();
187 }
188 }
189
190 @Override
191 public void setMaxTotal(final int max) {
192 }
193
194 @Override
195 public int getMaxTotal() {
196 return 0;
197 }
198
199 @Override
200 public void setDefaultMaxPerRoute(final int max) {
201 Args.positive(max, "Max value");
202 defaultMaxPerRoute = max;
203 }
204
205 @Override
206 public int getDefaultMaxPerRoute() {
207 return defaultMaxPerRoute;
208 }
209
210 @Override
211 public void setMaxPerRoute(final T route, final int max) {
212 Args.notNull(route, "Route");
213 final PerRoutePool<T, C> routePool = getPool(route);
214 routePool.setMax(max > -1 ? max : defaultMaxPerRoute);
215 }
216
217 @Override
218 public int getMaxPerRoute(final T route) {
219 Args.notNull(route, "Route");
220 final PerRoutePool<T, C> routePool = getPool(route);
221 return routePool.getMax();
222 }
223
224 @Override
225 public PoolStats getTotalStats() {
226 int leasedTotal = 0;
227 int pendingTotal = 0;
228 int availableTotal = 0;
229 int maxTotal = 0;
230 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
231 leasedTotal += routePool.getLeasedCount();
232 pendingTotal += routePool.getPendingCount();
233 availableTotal += routePool.getAvailableCount();
234 maxTotal += routePool.getMax();
235 }
236 return new PoolStats(leasedTotal, pendingTotal, availableTotal, maxTotal);
237 }
238
239 @Override
240 public PoolStats getStats(final T route) {
241 Args.notNull(route, "Route");
242 final PerRoutePool<T, C> routePool = getPool(route);
243 return new PoolStats(
244 routePool.getLeasedCount(),
245 routePool.getPendingCount(),
246 routePool.getAvailableCount(),
247 routePool.getMax());
248 }
249
250 @Override
251 public Set<T> getRoutes() {
252 return new HashSet<>(routeToPool.keySet());
253 }
254
255 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
256 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
257 routePool.enumAvailable(callback);
258 }
259 }
260
261 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
262 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
263 routePool.enumLeased(callback);
264 }
265 }
266
267 @Override
268 public void closeIdle(final TimeValue idleTime) {
269 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
270 enumAvailable(entry -> {
271 if (entry.getUpdated() <= deadline) {
272 entry.discardConnection(CloseMode.GRACEFUL);
273 }
274 });
275 }
276
277 @Override
278 public void closeExpired() {
279 final long now = System.currentTimeMillis();
280 enumAvailable(entry -> {
281 if (entry.getExpiryDeadline().isBefore(now)) {
282 entry.discardConnection(CloseMode.GRACEFUL);
283 }
284 });
285 }
286
287 @Override
288 public String toString() {
289 final PoolStats totalStats = getTotalStats();
290 final StringBuilder buffer = new StringBuilder();
291 buffer.append("[leased: ");
292 buffer.append(totalStats.getLeased());
293 buffer.append("][available: ");
294 buffer.append(totalStats.getAvailable());
295 buffer.append("][pending: ");
296 buffer.append(totalStats.getPending());
297 buffer.append("]");
298 return buffer.toString();
299 }
300
301 static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
302
303 private final Object state;
304 private final Deadline deadline;
305 private final BasicFuture<PoolEntry<T, C>> future;
306
307 LeaseRequest(
308 final Object state,
309 final Timeout requestTimeout,
310 final BasicFuture<PoolEntry<T, C>> future) {
311 super();
312 this.state = state;
313 this.deadline = Deadline.calculate(requestTimeout);
314 this.future = future;
315 }
316
317 BasicFuture<PoolEntry<T, C>> getFuture() {
318 return this.future;
319 }
320
321 public Object getState() {
322 return this.state;
323 }
324
325 public Deadline getDeadline() {
326 return this.deadline;
327 }
328
329 public boolean isDone() {
330 return this.future.isDone();
331 }
332
333 public boolean completed(final PoolEntry<T, C> result) {
334 return future.completed(result);
335 }
336
337 public boolean failed(final Exception ex) {
338 return future.failed(ex);
339 }
340
341 @Override
342 public boolean cancel() {
343 return future.cancel();
344 }
345
346 }
347
348 static class PerRoutePool<T, C extends ModalCloseable> {
349
350 private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
351
352 private final T route;
353 private final TimeValue timeToLive;
354 private final PoolReusePolicy policy;
355 private final DisposalCallback<C> disposalCallback;
356 private final ConnPoolListener<T> connPoolListener;
357 private final ConnPoolStats<T> connPoolStats;
358 private final ConcurrentMap<PoolEntry<T, C>, Boolean> leased;
359 private final Deque<AtomicMarkableReference<PoolEntry<T, C>>> available;
360 private final Deque<LeaseRequest<T, C>> pending;
361 private final AtomicBoolean terminated;
362 private final AtomicInteger allocated;
363 private final AtomicLong releaseSeqNum;
364
365 private volatile int max;
366
367 PerRoutePool(
368 final T route,
369 final int max,
370 final TimeValue timeToLive,
371 final PoolReusePolicy policy,
372 final ConnPoolStats<T> connPoolStats,
373 final DisposalCallback<C> disposalCallback,
374 final ConnPoolListener<T> connPoolListener) {
375 super();
376 this.route = route;
377 this.timeToLive = timeToLive;
378 this.policy = policy;
379 this.connPoolStats = connPoolStats;
380 this.disposalCallback = disposalCallback;
381 this.connPoolListener = connPoolListener;
382 this.leased = new ConcurrentHashMap<>();
383 this.available = new ConcurrentLinkedDeque<>();
384 this.pending = new ConcurrentLinkedDeque<>();
385 this.terminated = new AtomicBoolean(false);
386 this.allocated = new AtomicInteger(0);
387 this.releaseSeqNum = new AtomicLong(0);
388 this.max = max;
389 }
390
391 public void shutdown(final CloseMode closeMode) {
392 if (terminated.compareAndSet(false, true)) {
393 AtomicMarkableReference<PoolEntry<T, C>> entryRef;
394 while ((entryRef = available.poll()) != null) {
395 entryRef.getReference().discardConnection(closeMode);
396 }
397 for (final PoolEntry<T, C> entry : leased.keySet()) {
398 entry.discardConnection(closeMode);
399 }
400 leased.clear();
401 LeaseRequest<T, C> leaseRequest;
402 while ((leaseRequest = pending.poll()) != null) {
403 leaseRequest.cancel();
404 }
405 }
406 }
407
408 private PoolEntry<T, C> createPoolEntry() {
409 final int poolMax = max;
410 int prev, next;
411 do {
412 prev = allocated.get();
413 next = (prev<poolMax)? prev+1 : prev;
414 } while (!allocated.compareAndSet(prev, next));
415 return (prev < next)? new PoolEntry<>(route, timeToLive, disposalCallback) : null;
416 }
417
418 private void deallocatePoolEntry() {
419 allocated.decrementAndGet();
420 }
421
422 private void addLeased(final PoolEntry<T, C> entry) {
423 if (leased.putIfAbsent(entry, Boolean.TRUE) != null) {
424 throw new IllegalStateException("Pool entry already present in the set of leased entries");
425 } else if (connPoolListener != null) {
426 connPoolListener.onLease(route, connPoolStats);
427 }
428 }
429
430 private void removeLeased(final PoolEntry<T, C> entry) {
431 if (connPoolListener != null) {
432 connPoolListener.onRelease(route, connPoolStats);
433 }
434 if (!leased.remove(entry, Boolean.TRUE)) {
435 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
436 }
437 }
438
439 private PoolEntry<T, C> getAvailableEntry(final Object state) {
440 for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
441 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
442 final PoolEntry<T, C> entry = ref.getReference();
443 if (ref.compareAndSet(entry, entry, false, true)) {
444 it.remove();
445 if (entry.getExpiryDeadline().isExpired()) {
446 entry.discardConnection(CloseMode.GRACEFUL);
447 }
448 if (!Objects.equals(entry.getState(), state)) {
449 entry.discardConnection(CloseMode.GRACEFUL);
450 }
451 return entry;
452 }
453 }
454 return null;
455 }
456
457 public Future<PoolEntry<T, C>> lease(
458 final Object state,
459 final Timeout requestTimeout,
460 final FutureCallback<PoolEntry<T, C>> callback) {
461 Asserts.check(!terminated.get(), "Connection pool shut down");
462 final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
463
464 @Override
465 public synchronized PoolEntry<T, C> get(
466 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
467 try {
468 return super.get(timeout, unit);
469 } catch (final TimeoutException ex) {
470 cancel();
471 throw ex;
472 }
473 }
474
475 };
476 final long releaseState = releaseSeqNum.get();
477 PoolEntry<T, C> entry = null;
478 if (pending.isEmpty()) {
479 entry = getAvailableEntry(state);
480 if (entry == null) {
481 entry = createPoolEntry();
482 }
483 }
484 if (entry != null) {
485 addLeased(entry);
486 future.completed(entry);
487 } else {
488 pending.add(new LeaseRequest<>(state, requestTimeout, future));
489 if (releaseState != releaseSeqNum.get()) {
490 servicePendingRequest();
491 }
492 }
493 return future;
494 }
495
496 public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
497 removeLeased(releasedEntry);
498 if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
499 releasedEntry.discardConnection(CloseMode.GRACEFUL);
500 }
501 if (releasedEntry.hasConnection()) {
502 switch (policy) {
503 case LIFO:
504 available.addFirst(new AtomicMarkableReference<>(releasedEntry, false));
505 break;
506 case FIFO:
507 available.addLast(new AtomicMarkableReference<>(releasedEntry, false));
508 break;
509 default:
510 throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
511 }
512 }
513 else {
514 deallocatePoolEntry();
515 }
516 releaseSeqNum.incrementAndGet();
517 servicePendingRequest();
518 }
519
520
521 private void servicePendingRequest() {
522 servicePendingRequests(RequestServiceStrategy.FIRST_SUCCESSFUL);
523 }
524
525 private void servicePendingRequests(final RequestServiceStrategy serviceStrategy) {
526 LeaseRequest<T, C> leaseRequest;
527 while ((leaseRequest = pending.poll()) != null) {
528 if (leaseRequest.isDone()) {
529 continue;
530 }
531 final Object state = leaseRequest.getState();
532 final Deadline deadline = leaseRequest.getDeadline();
533
534 if (deadline.isExpired()) {
535 leaseRequest.failed(DeadlineTimeoutException.from(deadline));
536 } else {
537 final long releaseState = releaseSeqNum.get();
538 PoolEntry<T, C> entry = getAvailableEntry(state);
539 if (entry == null) {
540 entry = createPoolEntry();
541 }
542 if (entry != null) {
543 addLeased(entry);
544 if (!leaseRequest.completed(entry)) {
545 release(entry, true);
546 }
547 if (serviceStrategy == RequestServiceStrategy.FIRST_SUCCESSFUL) {
548 break;
549 }
550 }
551 else {
552 pending.addFirst(leaseRequest);
553 if (releaseState == releaseSeqNum.get()) {
554 break;
555 }
556 }
557 }
558 }
559 }
560
561 public void validatePendingRequests() {
562 final Iterator<LeaseRequest<T, C>> it = pending.iterator();
563 while (it.hasNext()) {
564 final LeaseRequest<T, C> request = it.next();
565 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
566 if (future.isCancelled() && !request.isDone()) {
567 it.remove();
568 } else {
569 final Deadline deadline = request.getDeadline();
570 if (deadline.isExpired()) {
571 request.failed(DeadlineTimeoutException.from(deadline));
572 }
573 if (request.isDone()) {
574 it.remove();
575 }
576 }
577 }
578 }
579
580 public final T getRoute() {
581 return route;
582 }
583
584 public int getMax() {
585 return max;
586 }
587
588 public void setMax(final int max) {
589 this.max = max;
590 }
591
592 public int getPendingCount() {
593 return pending.size();
594 }
595
596 public int getLeasedCount() {
597 return leased.size();
598 }
599
600 public int getAvailableCount() {
601 return available.size();
602 }
603
604 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
605 for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
606 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
607 final PoolEntry<T, C> entry = ref.getReference();
608 if (ref.compareAndSet(entry, entry, false, true)) {
609 callback.execute(entry);
610 if (!entry.hasConnection()) {
611 deallocatePoolEntry();
612 it.remove();
613 }
614 else {
615 ref.set(entry, false);
616 }
617 }
618 }
619 releaseSeqNum.incrementAndGet();
620 servicePendingRequests(RequestServiceStrategy.ALL);
621 }
622
623 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
624 for (final Iterator<PoolEntry<T, C>> it = leased.keySet().iterator(); it.hasNext(); ) {
625 final PoolEntry<T, C> entry = it.next();
626 callback.execute(entry);
627 if (!entry.hasConnection()) {
628 deallocatePoolEntry();
629 it.remove();
630 }
631 }
632 }
633
634 @Override
635 public String toString() {
636 final StringBuilder buffer = new StringBuilder();
637 buffer.append("[route: ");
638 buffer.append(route);
639 buffer.append("][leased: ");
640 buffer.append(leased.size());
641 buffer.append("][available: ");
642 buffer.append(available.size());
643 buffer.append("][pending: ");
644 buffer.append(pending.size());
645 buffer.append("]");
646 return buffer.toString();
647 }
648
649 }
650
651 }