View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.CancellationException;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.atomic.AtomicReference;
44  import java.util.concurrent.locks.Condition;
45  import java.util.concurrent.locks.Lock;
46  import java.util.concurrent.locks.ReentrantLock;
47  
48  import org.apache.http.annotation.Contract;
49  import org.apache.http.annotation.ThreadingBehavior;
50  import org.apache.http.concurrent.FutureCallback;
51  import org.apache.http.util.Args;
52  import org.apache.http.util.Asserts;
53  
54  /**
55   * Abstract synchronous (blocking) pool of connections.
56   * <p>
57   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
58   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
59   * method on the {@link Future} object returned by the
60   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
61   * to complete.
62   *
63   * @param <T> the route type that represents the opposite endpoint of a pooled
64   *   connection.
65   * @param <C> the connection type.
66   * @param <E> the type of the pool entry containing a pooled connection.
67   * @since 4.2
68   */
69  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
70  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
71                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
72  
73      private final Lock lock;
74      private final Condition condition;
75      private final ConnFactory<T, C> connFactory;
76      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
77      private final Set<E> leased;
78      private final LinkedList<E> available;
79      private final LinkedList<Future<E>> pending;
80      private final Map<T, Integer> maxPerRoute;
81  
82      private volatile boolean isShutDown;
83      private volatile int defaultMaxPerRoute;
84      private volatile int maxTotal;
85      private volatile int validateAfterInactivity;
86  
87      public AbstractConnPool(
88              final ConnFactory<T, C> connFactory,
89              final int defaultMaxPerRoute,
90              final int maxTotal) {
91          super();
92          this.connFactory = Args.notNull(connFactory, "Connection factory");
93          this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
94          this.maxTotal = Args.positive(maxTotal, "Max total value");
95          this.lock = new ReentrantLock();
96          this.condition = this.lock.newCondition();
97          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
98          this.leased = new HashSet<E>();
99          this.available = new LinkedList<E>();
100         this.pending = new LinkedList<Future<E>>();
101         this.maxPerRoute = new HashMap<T, Integer>();
102     }
103 
104     /**
105      * Creates a new entry for the given connection with the given route.
106      */
107     protected abstract E createEntry(T route, C conn);
108 
109     /**
110      * @since 4.3
111      */
112     protected void onLease(final E entry) {
113     }
114 
115     /**
116      * @since 4.3
117      */
118     protected void onRelease(final E entry) {
119     }
120 
121     /**
122      * @since 4.4
123      */
124     protected void onReuse(final E entry) {
125     }
126 
127     /**
128      * @since 4.4
129      */
130     protected boolean validate(final E entry) {
131         return true;
132     }
133 
134     public boolean isShutdown() {
135         return this.isShutDown;
136     }
137 
138     /**
139      * Shuts down the pool.
140      */
141     public void shutdown() throws IOException {
142         if (this.isShutDown) {
143             return ;
144         }
145         this.isShutDown = true;
146         this.lock.lock();
147         try {
148             for (final E entry: this.available) {
149                 entry.close();
150             }
151             for (final E entry: this.leased) {
152                 entry.close();
153             }
154             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
155                 pool.shutdown();
156             }
157             this.routeToPool.clear();
158             this.leased.clear();
159             this.available.clear();
160         } finally {
161             this.lock.unlock();
162         }
163     }
164 
165     private RouteSpecificPool<T, C, E> getPool(final T route) {
166         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
167         if (pool == null) {
168             pool = new RouteSpecificPool<T, C, E>(route) {
169 
170                 @Override
171                 protected E createEntry(final C conn) {
172                     return AbstractConnPool.this.createEntry(route, conn);
173                 }
174 
175             };
176             this.routeToPool.put(route, pool);
177         }
178         return pool;
179     }
180 
181     private static Exception operationAborted() {
182         return new CancellationException("Operation aborted");
183     }
184 
185     /**
186      * {@inheritDoc}
187      * <p>
188      * Please note that this class does not maintain its own pool of execution
189      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
190      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
191      * returned by this method in order for the lease operation to complete.
192      */
193     @Override
194     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
195         Args.notNull(route, "Route");
196         Asserts.check(!this.isShutDown, "Connection pool shut down");
197 
198         return new Future<E>() {
199 
200             private final AtomicBoolean cancelled = new AtomicBoolean(false);
201             private final AtomicBoolean done = new AtomicBoolean(false);
202             private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
203 
204             @Override
205             public boolean cancel(final boolean mayInterruptIfRunning) {
206                 if (done.compareAndSet(false, true)) {
207                     cancelled.set(true);
208                     lock.lock();
209                     try {
210                         condition.signalAll();
211                     } finally {
212                         lock.unlock();
213                     }
214                     if (callback != null) {
215                         callback.cancelled();
216                     }
217                     return true;
218                 }
219                 return false;
220             }
221 
222             @Override
223             public boolean isCancelled() {
224                 return cancelled.get();
225             }
226 
227             @Override
228             public boolean isDone() {
229                 return done.get();
230             }
231 
232             @Override
233             public E get() throws InterruptedException, ExecutionException {
234                 try {
235                     return get(0L, TimeUnit.MILLISECONDS);
236                 } catch (final TimeoutException ex) {
237                     throw new ExecutionException(ex);
238                 }
239             }
240 
241             @Override
242             public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
243                 for (;;) {
244                     synchronized (this) {
245                         try {
246                             final E entry = entryRef.get();
247                             if (entry != null) {
248                                 return entry;
249                             }
250                             if (done.get()) {
251                                 throw new ExecutionException(operationAborted());
252                             }
253                             final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
254                             if (validateAfterInactivity > 0)  {
255                                 if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
256                                     if (!validate(leasedEntry)) {
257                                         leasedEntry.close();
258                                         release(leasedEntry, false);
259                                         continue;
260                                     }
261                                 }
262                             }
263                             if (done.compareAndSet(false, true)) {
264                                 entryRef.set(leasedEntry);
265                                 done.set(true);
266                                 onLease(leasedEntry);
267                                 if (callback != null) {
268                                     callback.completed(leasedEntry);
269                                 }
270                                 return leasedEntry;
271                             } else {
272                                 release(leasedEntry, true);
273                                 throw new ExecutionException(operationAborted());
274                             }
275                         } catch (final IOException ex) {
276                             if (done.compareAndSet(false, true)) {
277                                 if (callback != null) {
278                                     callback.failed(ex);
279                                 }
280                             }
281                             throw new ExecutionException(ex);
282                         }
283                     }
284                 }
285             }
286 
287         };
288     }
289 
290     /**
291      * Attempts to lease a connection for the given route and with the given
292      * state from the pool.
293      * <p>
294      * Please note that this class does not maintain its own pool of execution
295      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
296      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
297      * returned by this method in order for the lease operation to complete.
298      *
299      * @param route route of the connection.
300      * @param state arbitrary object that represents a particular state
301      *  (usually a security principal or a unique token identifying
302      *  the user whose credentials have been used while establishing the connection).
303      *  May be {@code null}.
304      * @return future for a leased pool entry.
305      */
306     public Future<E> lease(final T route, final Object state) {
307         return lease(route, state, null);
308     }
309 
310     private E getPoolEntryBlocking(
311             final T route, final Object state,
312             final long timeout, final TimeUnit timeUnit,
313             final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
314 
315         Date deadline = null;
316         if (timeout > 0) {
317             deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
318         }
319         this.lock.lock();
320         try {
321             E entry;
322             for (;;) {
323                 Asserts.check(!this.isShutDown, "Connection pool shut down");
324                 if (future.isCancelled()) {
325                     throw new ExecutionException(operationAborted());
326                 }
327                 final RouteSpecificPool<T, C, E> pool = getPool(route);
328                 for (;;) {
329                     entry = pool.getFree(state);
330                     if (entry == null) {
331                         break;
332                     }
333                     if (entry.isExpired(System.currentTimeMillis())) {
334                         entry.close();
335                     }
336                     if (entry.isClosed()) {
337                         this.available.remove(entry);
338                         pool.free(entry, false);
339                     } else {
340                         break;
341                     }
342                 }
343                 if (entry != null) {
344                     this.available.remove(entry);
345                     this.leased.add(entry);
346                     onReuse(entry);
347                     return entry;
348                 }
349 
350                 // New connection is needed
351                 final int maxPerRoute = getMax(route);
352                 // Shrink the pool prior to allocating a new connection
353                 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
354                 if (excess > 0) {
355                     for (int i = 0; i < excess; i++) {
356                         final E lastUsed = pool.getLastUsed();
357                         if (lastUsed == null) {
358                             break;
359                         }
360                         lastUsed.close();
361                         this.available.remove(lastUsed);
362                         pool.remove(lastUsed);
363                     }
364                 }
365 
366                 if (pool.getAllocatedCount() < maxPerRoute) {
367                     final int totalUsed = this.leased.size();
368                     final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
369                     if (freeCapacity > 0) {
370                         final int totalAvailable = this.available.size();
371                         if (totalAvailable > freeCapacity - 1) {
372                             if (!this.available.isEmpty()) {
373                                 final E lastUsed = this.available.removeLast();
374                                 lastUsed.close();
375                                 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
376                                 otherpool.remove(lastUsed);
377                             }
378                         }
379                         final C conn = this.connFactory.create(route);
380                         entry = pool.add(conn);
381                         this.leased.add(entry);
382                         return entry;
383                     }
384                 }
385 
386                 boolean success = false;
387                 try {
388                     pool.queue(future);
389                     this.pending.add(future);
390                     if (deadline != null) {
391                         success = this.condition.awaitUntil(deadline);
392                     } else {
393                         this.condition.await();
394                         success = true;
395                     }
396                     if (future.isCancelled()) {
397                         throw new ExecutionException(operationAborted());
398                     }
399                 } finally {
400                     // In case of 'success', we were woken up by the
401                     // connection pool and should now have a connection
402                     // waiting for us, or else we're shutting down.
403                     // Just continue in the loop, both cases are checked.
404                     pool.unqueue(future);
405                     this.pending.remove(future);
406                 }
407                 // check for spurious wakeup vs. timeout
408                 if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
409                     break;
410                 }
411             }
412             throw new TimeoutException("Timeout waiting for connection");
413         } finally {
414             this.lock.unlock();
415         }
416     }
417 
418     @Override
419     public void release(final E entry, final boolean reusable) {
420         this.lock.lock();
421         try {
422             if (this.leased.remove(entry)) {
423                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
424                 pool.free(entry, reusable);
425                 if (reusable && !this.isShutDown) {
426                     this.available.addFirst(entry);
427                 } else {
428                     entry.close();
429                 }
430                 onRelease(entry);
431                 Future<E> future = pool.nextPending();
432                 if (future != null) {
433                     this.pending.remove(future);
434                 } else {
435                     future = this.pending.poll();
436                 }
437                 if (future != null) {
438                     this.condition.signalAll();
439                 }
440             }
441         } finally {
442             this.lock.unlock();
443         }
444     }
445 
446     private int getMax(final T route) {
447         final Integer v = this.maxPerRoute.get(route);
448         return v != null ? v.intValue() : this.defaultMaxPerRoute;
449     }
450 
451     @Override
452     public void setMaxTotal(final int max) {
453         Args.positive(max, "Max value");
454         this.lock.lock();
455         try {
456             this.maxTotal = max;
457         } finally {
458             this.lock.unlock();
459         }
460     }
461 
462     @Override
463     public int getMaxTotal() {
464         this.lock.lock();
465         try {
466             return this.maxTotal;
467         } finally {
468             this.lock.unlock();
469         }
470     }
471 
472     @Override
473     public void setDefaultMaxPerRoute(final int max) {
474         Args.positive(max, "Max per route value");
475         this.lock.lock();
476         try {
477             this.defaultMaxPerRoute = max;
478         } finally {
479             this.lock.unlock();
480         }
481     }
482 
483     @Override
484     public int getDefaultMaxPerRoute() {
485         this.lock.lock();
486         try {
487             return this.defaultMaxPerRoute;
488         } finally {
489             this.lock.unlock();
490         }
491     }
492 
493     @Override
494     public void setMaxPerRoute(final T route, final int max) {
495         Args.notNull(route, "Route");
496         this.lock.lock();
497         try {
498             if (max > -1) {
499                 this.maxPerRoute.put(route, Integer.valueOf(max));
500             } else {
501                 this.maxPerRoute.remove(route);
502             }
503         } finally {
504             this.lock.unlock();
505         }
506     }
507 
508     @Override
509     public int getMaxPerRoute(final T route) {
510         Args.notNull(route, "Route");
511         this.lock.lock();
512         try {
513             return getMax(route);
514         } finally {
515             this.lock.unlock();
516         }
517     }
518 
519     @Override
520     public PoolStats getTotalStats() {
521         this.lock.lock();
522         try {
523             return new PoolStats(
524                     this.leased.size(),
525                     this.pending.size(),
526                     this.available.size(),
527                     this.maxTotal);
528         } finally {
529             this.lock.unlock();
530         }
531     }
532 
533     @Override
534     public PoolStats getStats(final T route) {
535         Args.notNull(route, "Route");
536         this.lock.lock();
537         try {
538             final RouteSpecificPool<T, C, E> pool = getPool(route);
539             return new PoolStats(
540                     pool.getLeasedCount(),
541                     pool.getPendingCount(),
542                     pool.getAvailableCount(),
543                     getMax(route));
544         } finally {
545             this.lock.unlock();
546         }
547     }
548 
549     /**
550      * Returns snapshot of all knows routes
551      * @return the set of routes
552      *
553      * @since 4.4
554      */
555     public Set<T> getRoutes() {
556         this.lock.lock();
557         try {
558             return new HashSet<T>(routeToPool.keySet());
559         } finally {
560             this.lock.unlock();
561         }
562     }
563 
564     /**
565      * Enumerates all available connections.
566      *
567      * @since 4.3
568      */
569     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
570         this.lock.lock();
571         try {
572             final Iterator<E> it = this.available.iterator();
573             while (it.hasNext()) {
574                 final E entry = it.next();
575                 callback.process(entry);
576                 if (entry.isClosed()) {
577                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
578                     pool.remove(entry);
579                     it.remove();
580                 }
581             }
582             purgePoolMap();
583         } finally {
584             this.lock.unlock();
585         }
586     }
587 
588     /**
589      * Enumerates all leased connections.
590      *
591      * @since 4.3
592      */
593     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
594         this.lock.lock();
595         try {
596             final Iterator<E> it = this.leased.iterator();
597             while (it.hasNext()) {
598                 final E entry = it.next();
599                 callback.process(entry);
600             }
601         } finally {
602             this.lock.unlock();
603         }
604     }
605 
606     private void purgePoolMap() {
607         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
608         while (it.hasNext()) {
609             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
610             final RouteSpecificPool<T, C, E> pool = entry.getValue();
611             if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
612                 it.remove();
613             }
614         }
615     }
616 
617     /**
618      * Closes connections that have been idle longer than the given period
619      * of time and evicts them from the pool.
620      *
621      * @param idletime maximum idle time.
622      * @param timeUnit time unit.
623      */
624     public void closeIdle(final long idletime, final TimeUnit timeUnit) {
625         Args.notNull(timeUnit, "Time unit");
626         long time = timeUnit.toMillis(idletime);
627         if (time < 0) {
628             time = 0;
629         }
630         final long deadline = System.currentTimeMillis() - time;
631         enumAvailable(new PoolEntryCallback<T, C>() {
632 
633             @Override
634             public void process(final PoolEntry<T, C> entry) {
635                 if (entry.getUpdated() <= deadline) {
636                     entry.close();
637                 }
638             }
639 
640         });
641     }
642 
643     /**
644      * Closes expired connections and evicts them from the pool.
645      */
646     public void closeExpired() {
647         final long now = System.currentTimeMillis();
648         enumAvailable(new PoolEntryCallback<T, C>() {
649 
650             @Override
651             public void process(final PoolEntry<T, C> entry) {
652                 if (entry.isExpired(now)) {
653                     entry.close();
654                 }
655             }
656 
657         });
658     }
659 
660     /**
661      * @return the number of milliseconds
662      * @since 4.4
663      */
664     public int getValidateAfterInactivity() {
665         return this.validateAfterInactivity;
666     }
667 
668     /**
669      * @param ms the number of milliseconds
670      * @since 4.4
671      */
672     public void setValidateAfterInactivity(final int ms) {
673         this.validateAfterInactivity = ms;
674     }
675 
676     @Override
677     public String toString() {
678         this.lock.lock();
679         try {
680             final StringBuilder buffer = new StringBuilder();
681             buffer.append("[leased: ");
682             buffer.append(this.leased);
683             buffer.append("][available: ");
684             buffer.append(this.available);
685             buffer.append("][pending: ");
686             buffer.append(this.pending);
687             buffer.append("]");
688             return buffer.toString();
689         } finally {
690             this.lock.unlock();
691         }
692     }
693 
694 }