1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.pool;
28
29 import java.io.IOException;
30 import java.util.Date;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.Iterator;
34 import java.util.LinkedList;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.CancellationException;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.TimeoutException;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.concurrent.atomic.AtomicReference;
44 import java.util.concurrent.locks.Condition;
45 import java.util.concurrent.locks.Lock;
46 import java.util.concurrent.locks.ReentrantLock;
47
48 import org.apache.http.annotation.Contract;
49 import org.apache.http.annotation.ThreadingBehavior;
50 import org.apache.http.concurrent.FutureCallback;
51 import org.apache.http.util.Args;
52 import org.apache.http.util.Asserts;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
70 public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
71 implements ConnPool<T, E>, ConnPoolControl<T> {
72
73 private final Lock lock;
74 private final Condition condition;
75 private final ConnFactory<T, C> connFactory;
76 private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
77 private final Set<E> leased;
78 private final LinkedList<E> available;
79 private final LinkedList<Future<E>> pending;
80 private final Map<T, Integer> maxPerRoute;
81
82 private volatile boolean isShutDown;
83 private volatile int defaultMaxPerRoute;
84 private volatile int maxTotal;
85 private volatile int validateAfterInactivity;
86
87 public AbstractConnPool(
88 final ConnFactory<T, C> connFactory,
89 final int defaultMaxPerRoute,
90 final int maxTotal) {
91 super();
92 this.connFactory = Args.notNull(connFactory, "Connection factory");
93 this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
94 this.maxTotal = Args.positive(maxTotal, "Max total value");
95 this.lock = new ReentrantLock();
96 this.condition = this.lock.newCondition();
97 this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
98 this.leased = new HashSet<E>();
99 this.available = new LinkedList<E>();
100 this.pending = new LinkedList<Future<E>>();
101 this.maxPerRoute = new HashMap<T, Integer>();
102 }
103
104
105
106
107 protected abstract E createEntry(T route, C conn);
108
109
110
111
112 protected void onLease(final E entry) {
113 }
114
115
116
117
118 protected void onRelease(final E entry) {
119 }
120
121
122
123
124 protected void onReuse(final E entry) {
125 }
126
127
128
129
130 protected boolean validate(final E entry) {
131 return true;
132 }
133
134 public boolean isShutdown() {
135 return this.isShutDown;
136 }
137
138
139
140
141 public void shutdown() throws IOException {
142 if (this.isShutDown) {
143 return ;
144 }
145 this.isShutDown = true;
146 this.lock.lock();
147 try {
148 for (final E entry: this.available) {
149 entry.close();
150 }
151 for (final E entry: this.leased) {
152 entry.close();
153 }
154 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
155 pool.shutdown();
156 }
157 this.routeToPool.clear();
158 this.leased.clear();
159 this.available.clear();
160 } finally {
161 this.lock.unlock();
162 }
163 }
164
165 private RouteSpecificPool<T, C, E> getPool(final T route) {
166 RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
167 if (pool == null) {
168 pool = new RouteSpecificPool<T, C, E>(route) {
169
170 @Override
171 protected E createEntry(final C conn) {
172 return AbstractConnPool.this.createEntry(route, conn);
173 }
174
175 };
176 this.routeToPool.put(route, pool);
177 }
178 return pool;
179 }
180
181 private static Exception operationAborted() {
182 return new CancellationException("Operation aborted");
183 }
184
185
186
187
188
189
190
191
192
193 @Override
194 public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
195 Args.notNull(route, "Route");
196 Asserts.check(!this.isShutDown, "Connection pool shut down");
197
198 return new Future<E>() {
199
200 private final AtomicBoolean cancelled = new AtomicBoolean(false);
201 private final AtomicBoolean done = new AtomicBoolean(false);
202 private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
203
204 @Override
205 public boolean cancel(final boolean mayInterruptIfRunning) {
206 if (done.compareAndSet(false, true)) {
207 cancelled.set(true);
208 lock.lock();
209 try {
210 condition.signalAll();
211 } finally {
212 lock.unlock();
213 }
214 if (callback != null) {
215 callback.cancelled();
216 }
217 return true;
218 }
219 return false;
220 }
221
222 @Override
223 public boolean isCancelled() {
224 return cancelled.get();
225 }
226
227 @Override
228 public boolean isDone() {
229 return done.get();
230 }
231
232 @Override
233 public E get() throws InterruptedException, ExecutionException {
234 try {
235 return get(0L, TimeUnit.MILLISECONDS);
236 } catch (final TimeoutException ex) {
237 throw new ExecutionException(ex);
238 }
239 }
240
241 @Override
242 public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
243 for (;;) {
244 synchronized (this) {
245 try {
246 final E entry = entryRef.get();
247 if (entry != null) {
248 return entry;
249 }
250 if (done.get()) {
251 throw new ExecutionException(operationAborted());
252 }
253 final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
254 if (validateAfterInactivity > 0) {
255 if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
256 if (!validate(leasedEntry)) {
257 leasedEntry.close();
258 release(leasedEntry, false);
259 continue;
260 }
261 }
262 }
263 if (done.compareAndSet(false, true)) {
264 entryRef.set(leasedEntry);
265 done.set(true);
266 onLease(leasedEntry);
267 if (callback != null) {
268 callback.completed(leasedEntry);
269 }
270 return leasedEntry;
271 } else {
272 release(leasedEntry, true);
273 throw new ExecutionException(operationAborted());
274 }
275 } catch (final IOException ex) {
276 if (done.compareAndSet(false, true)) {
277 if (callback != null) {
278 callback.failed(ex);
279 }
280 }
281 throw new ExecutionException(ex);
282 }
283 }
284 }
285 }
286
287 };
288 }
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306 public Future<E> lease(final T route, final Object state) {
307 return lease(route, state, null);
308 }
309
310 private E getPoolEntryBlocking(
311 final T route, final Object state,
312 final long timeout, final TimeUnit timeUnit,
313 final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
314
315 Date deadline = null;
316 if (timeout > 0) {
317 deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
318 }
319 this.lock.lock();
320 try {
321 E entry;
322 for (;;) {
323 Asserts.check(!this.isShutDown, "Connection pool shut down");
324 if (future.isCancelled()) {
325 throw new ExecutionException(operationAborted());
326 }
327 final RouteSpecificPool<T, C, E> pool = getPool(route);
328 for (;;) {
329 entry = pool.getFree(state);
330 if (entry == null) {
331 break;
332 }
333 if (entry.isExpired(System.currentTimeMillis())) {
334 entry.close();
335 }
336 if (entry.isClosed()) {
337 this.available.remove(entry);
338 pool.free(entry, false);
339 } else {
340 break;
341 }
342 }
343 if (entry != null) {
344 this.available.remove(entry);
345 this.leased.add(entry);
346 onReuse(entry);
347 return entry;
348 }
349
350
351 final int maxPerRoute = getMax(route);
352
353 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
354 if (excess > 0) {
355 for (int i = 0; i < excess; i++) {
356 final E lastUsed = pool.getLastUsed();
357 if (lastUsed == null) {
358 break;
359 }
360 lastUsed.close();
361 this.available.remove(lastUsed);
362 pool.remove(lastUsed);
363 }
364 }
365
366 if (pool.getAllocatedCount() < maxPerRoute) {
367 final int totalUsed = this.leased.size();
368 final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
369 if (freeCapacity > 0) {
370 final int totalAvailable = this.available.size();
371 if (totalAvailable > freeCapacity - 1) {
372 if (!this.available.isEmpty()) {
373 final E lastUsed = this.available.removeLast();
374 lastUsed.close();
375 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
376 otherpool.remove(lastUsed);
377 }
378 }
379 final C conn = this.connFactory.create(route);
380 entry = pool.add(conn);
381 this.leased.add(entry);
382 return entry;
383 }
384 }
385
386 boolean success = false;
387 try {
388 pool.queue(future);
389 this.pending.add(future);
390 if (deadline != null) {
391 success = this.condition.awaitUntil(deadline);
392 } else {
393 this.condition.await();
394 success = true;
395 }
396 if (future.isCancelled()) {
397 throw new ExecutionException(operationAborted());
398 }
399 } finally {
400
401
402
403
404 pool.unqueue(future);
405 this.pending.remove(future);
406 }
407
408 if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
409 break;
410 }
411 }
412 throw new TimeoutException("Timeout waiting for connection");
413 } finally {
414 this.lock.unlock();
415 }
416 }
417
418 @Override
419 public void release(final E entry, final boolean reusable) {
420 this.lock.lock();
421 try {
422 if (this.leased.remove(entry)) {
423 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
424 pool.free(entry, reusable);
425 if (reusable && !this.isShutDown) {
426 this.available.addFirst(entry);
427 } else {
428 entry.close();
429 }
430 onRelease(entry);
431 Future<E> future = pool.nextPending();
432 if (future != null) {
433 this.pending.remove(future);
434 } else {
435 future = this.pending.poll();
436 }
437 if (future != null) {
438 this.condition.signalAll();
439 }
440 }
441 } finally {
442 this.lock.unlock();
443 }
444 }
445
446 private int getMax(final T route) {
447 final Integer v = this.maxPerRoute.get(route);
448 return v != null ? v.intValue() : this.defaultMaxPerRoute;
449 }
450
451 @Override
452 public void setMaxTotal(final int max) {
453 Args.positive(max, "Max value");
454 this.lock.lock();
455 try {
456 this.maxTotal = max;
457 } finally {
458 this.lock.unlock();
459 }
460 }
461
462 @Override
463 public int getMaxTotal() {
464 this.lock.lock();
465 try {
466 return this.maxTotal;
467 } finally {
468 this.lock.unlock();
469 }
470 }
471
472 @Override
473 public void setDefaultMaxPerRoute(final int max) {
474 Args.positive(max, "Max per route value");
475 this.lock.lock();
476 try {
477 this.defaultMaxPerRoute = max;
478 } finally {
479 this.lock.unlock();
480 }
481 }
482
483 @Override
484 public int getDefaultMaxPerRoute() {
485 this.lock.lock();
486 try {
487 return this.defaultMaxPerRoute;
488 } finally {
489 this.lock.unlock();
490 }
491 }
492
493 @Override
494 public void setMaxPerRoute(final T route, final int max) {
495 Args.notNull(route, "Route");
496 this.lock.lock();
497 try {
498 if (max > -1) {
499 this.maxPerRoute.put(route, Integer.valueOf(max));
500 } else {
501 this.maxPerRoute.remove(route);
502 }
503 } finally {
504 this.lock.unlock();
505 }
506 }
507
508 @Override
509 public int getMaxPerRoute(final T route) {
510 Args.notNull(route, "Route");
511 this.lock.lock();
512 try {
513 return getMax(route);
514 } finally {
515 this.lock.unlock();
516 }
517 }
518
519 @Override
520 public PoolStats getTotalStats() {
521 this.lock.lock();
522 try {
523 return new PoolStats(
524 this.leased.size(),
525 this.pending.size(),
526 this.available.size(),
527 this.maxTotal);
528 } finally {
529 this.lock.unlock();
530 }
531 }
532
533 @Override
534 public PoolStats getStats(final T route) {
535 Args.notNull(route, "Route");
536 this.lock.lock();
537 try {
538 final RouteSpecificPool<T, C, E> pool = getPool(route);
539 return new PoolStats(
540 pool.getLeasedCount(),
541 pool.getPendingCount(),
542 pool.getAvailableCount(),
543 getMax(route));
544 } finally {
545 this.lock.unlock();
546 }
547 }
548
549
550
551
552
553
554
555 public Set<T> getRoutes() {
556 this.lock.lock();
557 try {
558 return new HashSet<T>(routeToPool.keySet());
559 } finally {
560 this.lock.unlock();
561 }
562 }
563
564
565
566
567
568
569 protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
570 this.lock.lock();
571 try {
572 final Iterator<E> it = this.available.iterator();
573 while (it.hasNext()) {
574 final E entry = it.next();
575 callback.process(entry);
576 if (entry.isClosed()) {
577 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
578 pool.remove(entry);
579 it.remove();
580 }
581 }
582 purgePoolMap();
583 } finally {
584 this.lock.unlock();
585 }
586 }
587
588
589
590
591
592
593 protected void enumLeased(final PoolEntryCallback<T, C> callback) {
594 this.lock.lock();
595 try {
596 final Iterator<E> it = this.leased.iterator();
597 while (it.hasNext()) {
598 final E entry = it.next();
599 callback.process(entry);
600 }
601 } finally {
602 this.lock.unlock();
603 }
604 }
605
606 private void purgePoolMap() {
607 final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
608 while (it.hasNext()) {
609 final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
610 final RouteSpecificPool<T, C, E> pool = entry.getValue();
611 if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
612 it.remove();
613 }
614 }
615 }
616
617
618
619
620
621
622
623
624 public void closeIdle(final long idletime, final TimeUnit timeUnit) {
625 Args.notNull(timeUnit, "Time unit");
626 long time = timeUnit.toMillis(idletime);
627 if (time < 0) {
628 time = 0;
629 }
630 final long deadline = System.currentTimeMillis() - time;
631 enumAvailable(new PoolEntryCallback<T, C>() {
632
633 @Override
634 public void process(final PoolEntry<T, C> entry) {
635 if (entry.getUpdated() <= deadline) {
636 entry.close();
637 }
638 }
639
640 });
641 }
642
643
644
645
646 public void closeExpired() {
647 final long now = System.currentTimeMillis();
648 enumAvailable(new PoolEntryCallback<T, C>() {
649
650 @Override
651 public void process(final PoolEntry<T, C> entry) {
652 if (entry.isExpired(now)) {
653 entry.close();
654 }
655 }
656
657 });
658 }
659
660
661
662
663
664 public int getValidateAfterInactivity() {
665 return this.validateAfterInactivity;
666 }
667
668
669
670
671
672 public void setValidateAfterInactivity(final int ms) {
673 this.validateAfterInactivity = ms;
674 }
675
676 @Override
677 public String toString() {
678 this.lock.lock();
679 try {
680 final StringBuilder buffer = new StringBuilder();
681 buffer.append("[leased: ");
682 buffer.append(this.leased);
683 buffer.append("][available: ");
684 buffer.append(this.available);
685 buffer.append("][pending: ");
686 buffer.append(this.pending);
687 buffer.append("]");
688 return buffer.toString();
689 } finally {
690 this.lock.unlock();
691 }
692 }
693
694 }