View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.Deque;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentLinkedDeque;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.atomic.AtomicInteger;
42  import java.util.concurrent.atomic.AtomicLong;
43  import java.util.concurrent.atomic.AtomicMarkableReference;
44  
45  import org.apache.hc.core5.annotation.Contract;
46  import org.apache.hc.core5.annotation.Experimental;
47  import org.apache.hc.core5.annotation.ThreadingBehavior;
48  import org.apache.hc.core5.concurrent.BasicFuture;
49  import org.apache.hc.core5.concurrent.Cancellable;
50  import org.apache.hc.core5.concurrent.FutureCallback;
51  import org.apache.hc.core5.function.Callback;
52  import org.apache.hc.core5.io.CloseMode;
53  import org.apache.hc.core5.io.ModalCloseable;
54  import org.apache.hc.core5.util.Args;
55  import org.apache.hc.core5.util.Asserts;
56  import org.apache.hc.core5.util.Deadline;
57  import org.apache.hc.core5.util.DeadlineTimeoutException;
58  import org.apache.hc.core5.util.LangUtils;
59  import org.apache.hc.core5.util.TimeValue;
60  import org.apache.hc.core5.util.Timeout;
61  
62  /**
63   * Connection pool with higher concurrency but with lax connection limit guarantees.
64   *
65   * @param <T> route
66   * @param <C> connection object
67   *
68   * @since 5.0
69   */
70  @Contract(threading = ThreadingBehavior.SAFE)
71  @Experimental
72  public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
73  
74      private final TimeValue timeToLive;
75      private final PoolReusePolicy policy;
76      private final DisposalCallback<C> disposalCallback;
77      private final ConnPoolListener<T> connPoolListener;
78      private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
79      private final AtomicBoolean isShutDown;
80  
81      private volatile int defaultMaxPerRoute;
82  
83      /**
84       * @since 5.0
85       */
86      public LaxConnPool(
87              final int defaultMaxPerRoute,
88              final TimeValue timeToLive,
89              final PoolReusePolicy policy,
90              final DisposalCallback<C> disposalCallback,
91              final ConnPoolListener<T> connPoolListener) {
92          super();
93          Args.positive(defaultMaxPerRoute, "Max per route value");
94          this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
95          this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
96          this.disposalCallback = disposalCallback;
97          this.connPoolListener = connPoolListener;
98          this.routeToPool = new ConcurrentHashMap<>();
99          this.isShutDown = new AtomicBoolean(false);
100         this.defaultMaxPerRoute = defaultMaxPerRoute;
101     }
102 
103     /**
104      * @since 5.0
105      */
106     public LaxConnPool(
107             final int defaultMaxPerRoute,
108             final TimeValue timeToLive,
109             final PoolReusePolicy policy,
110             final ConnPoolListener<T> connPoolListener) {
111         this(defaultMaxPerRoute, timeToLive, policy, null, connPoolListener);
112     }
113 
114     public LaxConnPool(final int defaultMaxPerRoute) {
115         this(defaultMaxPerRoute, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null, null);
116     }
117 
118     public boolean isShutdown() {
119         return isShutDown.get();
120     }
121 
122     @Override
123     public void close(final CloseMode closeMode) {
124         if (isShutDown.compareAndSet(false, true)) {
125             for (final Iterator<PerRoutePool<T, C>> it = routeToPool.values().iterator(); it.hasNext(); ) {
126                 final PerRoutePool<T, C> routePool = it.next();
127                 routePool.shutdown(closeMode);
128             }
129             routeToPool.clear();
130         }
131     }
132 
133     @Override
134     public void close() {
135         close(CloseMode.GRACEFUL);
136     }
137 
138     private PerRoutePool<T, C> getPool(final T route) {
139         PerRoutePool<T, C> routePool = routeToPool.get(route);
140         if (routePool == null) {
141             final PerRoutePool<T, C> newRoutePool = new PerRoutePool<>(
142                     route,
143                     defaultMaxPerRoute,
144                     timeToLive,
145                     policy,
146                     this,
147                     disposalCallback,
148                     connPoolListener);
149             routePool = routeToPool.putIfAbsent(route, newRoutePool);
150             if (routePool == null) {
151                 routePool = newRoutePool;
152             }
153         }
154         return routePool;
155     }
156 
157     @Override
158     public Future<PoolEntry<T, C>> lease(
159             final T route, final Object state,
160             final Timeout requestTimeout,
161             final FutureCallback<PoolEntry<T, C>> callback) {
162         Args.notNull(route, "Route");
163         Asserts.check(!isShutDown.get(), "Connection pool shut down");
164         final PerRoutePool<T, C> routePool = getPool(route);
165         return routePool.lease(state, requestTimeout, callback);
166     }
167 
168     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
169         return lease(route, state, Timeout.DISABLED, null);
170     }
171 
172     @Override
173     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
174         if (entry == null) {
175             return;
176         }
177         if (isShutDown.get()) {
178             return;
179         }
180         final PerRoutePool<T, C> routePool = getPool(entry.getRoute());
181         routePool.release(entry, reusable);
182     }
183 
184     public void validatePendingRequests() {
185         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
186             routePool.validatePendingRequests();
187         }
188     }
189 
190     @Override
191     public void setMaxTotal(final int max) {
192     }
193 
194     @Override
195     public int getMaxTotal() {
196         return 0;
197     }
198 
199     @Override
200     public void setDefaultMaxPerRoute(final int max) {
201         Args.positive(max, "Max value");
202         defaultMaxPerRoute = max;
203     }
204 
205     @Override
206     public int getDefaultMaxPerRoute() {
207         return defaultMaxPerRoute;
208     }
209 
210     @Override
211     public void setMaxPerRoute(final T route, final int max) {
212         Args.notNull(route, "Route");
213         final PerRoutePool<T, C> routePool = getPool(route);
214         routePool.setMax(max > -1 ? max : defaultMaxPerRoute);
215     }
216 
217     @Override
218     public int getMaxPerRoute(final T route) {
219         Args.notNull(route, "Route");
220         final PerRoutePool<T, C> routePool = getPool(route);
221         return routePool.getMax();
222     }
223 
224     @Override
225     public PoolStats getTotalStats() {
226         int leasedTotal = 0;
227         int pendingTotal = 0;
228         int availableTotal = 0;
229         int maxTotal = 0;
230         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
231             leasedTotal += routePool.getLeasedCount();
232             pendingTotal += routePool.getPendingCount();
233             availableTotal += routePool.getAvailableCount();
234             maxTotal += routePool.getMax();
235         }
236         return new PoolStats(leasedTotal, pendingTotal, availableTotal, maxTotal);
237     }
238 
239     @Override
240     public PoolStats getStats(final T route) {
241         Args.notNull(route, "Route");
242         final PerRoutePool<T, C> routePool = getPool(route);
243         return new PoolStats(
244                 routePool.getLeasedCount(),
245                 routePool.getPendingCount(),
246                 routePool.getAvailableCount(),
247                 routePool.getMax());
248     }
249 
250     @Override
251     public Set<T> getRoutes() {
252         return new HashSet<>(routeToPool.keySet());
253     }
254 
255     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
256         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
257             routePool.enumAvailable(callback);
258         }
259     }
260 
261     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
262         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
263             routePool.enumLeased(callback);
264         }
265     }
266 
267     @Override
268     public void closeIdle(final TimeValue idleTime) {
269         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
270         enumAvailable(new Callback<PoolEntry<T, C>>() {
271 
272             @Override
273             public void execute(final PoolEntry<T, C> entry) {
274                 if (entry.getUpdated() <= deadline) {
275                     entry.discardConnection(CloseMode.GRACEFUL);
276                 }
277             }
278 
279         });
280     }
281 
282     @Override
283     public void closeExpired() {
284         final long now = System.currentTimeMillis();
285         enumAvailable(new Callback<PoolEntry<T, C>>() {
286 
287             @Override
288             public void execute(final PoolEntry<T, C> entry) {
289                 if (entry.getExpiryDeadline().isBefore(now)) {
290                     entry.discardConnection(CloseMode.GRACEFUL);
291                 }
292             }
293 
294         });
295     }
296 
297     @Override
298     public String toString() {
299         final PoolStats totalStats = getTotalStats();
300         final StringBuilder buffer = new StringBuilder();
301         buffer.append("[leased: ");
302         buffer.append(totalStats.getLeased());
303         buffer.append("][available: ");
304         buffer.append(totalStats.getAvailable());
305         buffer.append("][pending: ");
306         buffer.append(totalStats.getPending());
307         buffer.append("]");
308         return buffer.toString();
309     }
310 
311     static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
312 
313         private final Object state;
314         private final Deadline deadline;
315         private final BasicFuture<PoolEntry<T, C>> future;
316 
317         LeaseRequest(
318                 final Object state,
319                 final Timeout requestTimeout,
320                 final BasicFuture<PoolEntry<T, C>> future) {
321             super();
322             this.state = state;
323             this.deadline = Deadline.calculate(requestTimeout);
324             this.future = future;
325         }
326 
327         BasicFuture<PoolEntry<T, C>> getFuture() {
328             return this.future;
329         }
330 
331         public Object getState() {
332             return this.state;
333         }
334 
335         public Deadline getDeadline() {
336             return this.deadline;
337         }
338 
339         public boolean isDone() {
340             return this.future.isDone();
341         }
342 
343         public boolean completed(final PoolEntry<T, C> result) {
344             return future.completed(result);
345         }
346 
347         public boolean failed(final Exception ex) {
348             return future.failed(ex);
349         }
350 
351         @Override
352         public boolean cancel() {
353             return future.cancel();
354         }
355 
356     }
357 
358     static class PerRoutePool<T, C extends ModalCloseable> {
359 
360         private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
361 
362         private final T route;
363         private final TimeValue timeToLive;
364         private final PoolReusePolicy policy;
365         private final DisposalCallback<C> disposalCallback;
366         private final ConnPoolListener<T> connPoolListener;
367         private final ConnPoolStats<T> connPoolStats;
368         private final ConcurrentMap<PoolEntry<T, C>, Boolean> leased;
369         private final Deque<AtomicMarkableReference<PoolEntry<T, C>>> available;
370         private final Deque<LeaseRequest<T, C>> pending;
371         private final AtomicBoolean terminated;
372         private final AtomicInteger allocated;
373         private final AtomicLong releaseSeqNum;
374 
375         private volatile int max;
376 
377         PerRoutePool(
378                 final T route,
379                 final int max,
380                 final TimeValue timeToLive,
381                 final PoolReusePolicy policy,
382                 final ConnPoolStats<T> connPoolStats,
383                 final DisposalCallback<C> disposalCallback,
384                 final ConnPoolListener<T> connPoolListener) {
385             super();
386             this.route = route;
387             this.timeToLive = timeToLive;
388             this.policy = policy;
389             this.connPoolStats = connPoolStats;
390             this.disposalCallback = disposalCallback;
391             this.connPoolListener = connPoolListener;
392             this.leased = new ConcurrentHashMap<>();
393             this.available = new ConcurrentLinkedDeque<>();
394             this.pending = new ConcurrentLinkedDeque<>();
395             this.terminated = new AtomicBoolean(false);
396             this.allocated = new AtomicInteger(0);
397             this.releaseSeqNum = new AtomicLong(0);
398             this.max = max;
399         }
400 
401         public void shutdown(final CloseMode closeMode) {
402             if (terminated.compareAndSet(false, true)) {
403                 AtomicMarkableReference<PoolEntry<T, C>> entryRef;
404                 while ((entryRef = available.poll()) != null) {
405                     entryRef.getReference().discardConnection(closeMode);
406                 }
407                 for (final PoolEntry<T, C> entry : leased.keySet()) {
408                     entry.discardConnection(closeMode);
409                 }
410                 leased.clear();
411                 LeaseRequest<T, C> leaseRequest;
412                 while ((leaseRequest = pending.poll()) != null) {
413                     leaseRequest.cancel();
414                 }
415             }
416         }
417 
418         private PoolEntry<T, C> createPoolEntry() {
419             final int poolmax = max;
420             int prev, next;
421             do {
422                 prev = allocated.get();
423                 next = (prev<poolmax)? prev+1 : prev;
424             } while (!allocated.compareAndSet(prev, next));
425             return (prev < next)? new PoolEntry<>(route, timeToLive, disposalCallback) : null;
426         }
427 
428         private void deallocatePoolEntry() {
429             allocated.decrementAndGet();
430         }
431 
432         private void addLeased(final PoolEntry<T, C> entry) {
433             if (leased.putIfAbsent(entry, Boolean.TRUE) != null) {
434                 throw new IllegalStateException("Pool entry already present in the set of leased entries");
435             } else if (connPoolListener != null) {
436                 connPoolListener.onLease(route, connPoolStats);
437             }
438         }
439 
440         private void removeLeased(final PoolEntry<T, C> entry) {
441             if (connPoolListener != null) {
442                 connPoolListener.onRelease(route, connPoolStats);
443             }
444             if (!leased.remove(entry, Boolean.TRUE)) {
445                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
446             }
447         }
448 
449         private PoolEntry<T, C> getAvailableEntry(final Object state) {
450             for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
451                 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
452                 final PoolEntry<T, C> entry = ref.getReference();
453                 if (ref.compareAndSet(entry, entry, false, true)) {
454                     it.remove();
455                     if (entry.getExpiryDeadline().isExpired()) {
456                         entry.discardConnection(CloseMode.GRACEFUL);
457                     }
458                     if (!LangUtils.equals(entry.getState(), state)) {
459                         entry.discardConnection(CloseMode.GRACEFUL);
460                     }
461                     return entry;
462                 }
463             }
464             return null;
465         }
466 
467         public Future<PoolEntry<T, C>> lease(
468                 final Object state,
469                 final Timeout requestTimeout,
470                 final FutureCallback<PoolEntry<T, C>> callback) {
471             Asserts.check(!terminated.get(), "Connection pool shut down");
472             final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
473 
474                 @Override
475                 public synchronized PoolEntry<T, C> get(
476                         final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
477                     try {
478                         return super.get(timeout, unit);
479                     } catch (final TimeoutException ex) {
480                         cancel();
481                         throw ex;
482                     }
483                 }
484 
485             };
486             final long releaseState = releaseSeqNum.get();
487             PoolEntry<T, C> entry = null;
488             if (pending.isEmpty()) {
489                 entry = getAvailableEntry(state);
490                 if (entry == null) {
491                     entry = createPoolEntry();
492                 }
493             }
494             if (entry != null) {
495                 addLeased(entry);
496                 future.completed(entry);
497             } else {
498                 pending.add(new LeaseRequest<>(state, requestTimeout, future));
499                 if (releaseState != releaseSeqNum.get()) {
500                     servicePendingRequest();
501                 }
502             }
503             return future;
504         }
505 
506         public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
507             removeLeased(releasedEntry);
508             if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
509                 releasedEntry.discardConnection(CloseMode.GRACEFUL);
510             }
511             if (releasedEntry.hasConnection()) {
512                 switch (policy) {
513                     case LIFO:
514                         available.addFirst(new AtomicMarkableReference<>(releasedEntry, false));
515                         break;
516                     case FIFO:
517                         available.addLast(new AtomicMarkableReference<>(releasedEntry, false));
518                         break;
519                     default:
520                         throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
521                 }
522             }
523             else {
524                 deallocatePoolEntry();
525             }
526             releaseSeqNum.incrementAndGet();
527             servicePendingRequest();
528         }
529 
530 
531         private void servicePendingRequest() {
532             servicePendingRequests(RequestServiceStrategy.FIRST_SUCCESSFUL);
533         }
534 
535         private void servicePendingRequests(final RequestServiceStrategy serviceStrategy) {
536             LeaseRequest<T, C> leaseRequest;
537             while ((leaseRequest = pending.poll()) != null) {
538                 if (leaseRequest.isDone()) {
539                     continue;
540                 }
541                 final Object state = leaseRequest.getState();
542                 final Deadline deadline = leaseRequest.getDeadline();
543 
544                 if (deadline.isExpired()) {
545                     leaseRequest.failed(DeadlineTimeoutException.from(deadline));
546                 } else {
547                     final long releaseState = releaseSeqNum.get();
548                     PoolEntry<T, C> entry = getAvailableEntry(state);
549                     if (entry == null) {
550                         entry = createPoolEntry();
551                     }
552                     if (entry != null) {
553                         addLeased(entry);
554                         if (!leaseRequest.completed(entry)) {
555                             release(entry, true);
556                         }
557                         if (serviceStrategy == RequestServiceStrategy.FIRST_SUCCESSFUL) {
558                             break;
559                         }
560                     }
561                     else {
562                         pending.addFirst(leaseRequest);
563                         if (releaseState == releaseSeqNum.get()) {
564                             break;
565                         }
566                     }
567                 }
568             }
569         }
570 
571         public void validatePendingRequests() {
572             final Iterator<LeaseRequest<T, C>> it = pending.iterator();
573             while (it.hasNext()) {
574                 final LeaseRequest<T, C> request = it.next();
575                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
576                 if (future.isCancelled() && !request.isDone()) {
577                     it.remove();
578                 } else {
579                     final Deadline deadline = request.getDeadline();
580                     if (deadline.isExpired()) {
581                         request.failed(DeadlineTimeoutException.from(deadline));
582                     }
583                     if (request.isDone()) {
584                         it.remove();
585                     }
586                 }
587             }
588         }
589 
590         public final T getRoute() {
591             return route;
592         }
593 
594         public int getMax() {
595             return max;
596         }
597 
598         public void setMax(final int max) {
599             this.max = max;
600         }
601 
602         public int getPendingCount() {
603             return pending.size();
604         }
605 
606         public int getLeasedCount() {
607             return leased.size();
608         }
609 
610         public int getAvailableCount() {
611             return available.size();
612         }
613 
614         public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
615             for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
616                 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
617                 final PoolEntry<T, C> entry = ref.getReference();
618                 if (ref.compareAndSet(entry, entry, false, true)) {
619                     callback.execute(entry);
620                     if (!entry.hasConnection()) {
621                         deallocatePoolEntry();
622                         it.remove();
623                     }
624                     else {
625                         ref.set(entry, false);
626                     }
627                 }
628             }
629             releaseSeqNum.incrementAndGet();
630             servicePendingRequests(RequestServiceStrategy.ALL);
631         }
632 
633         public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
634             for (final Iterator<PoolEntry<T, C>> it = leased.keySet().iterator(); it.hasNext(); ) {
635                 final PoolEntry<T, C> entry = it.next();
636                 callback.execute(entry);
637                 if (!entry.hasConnection()) {
638                     deallocatePoolEntry();
639                     it.remove();
640                 }
641             }
642         }
643 
644         @Override
645         public String toString() {
646             final StringBuilder buffer = new StringBuilder();
647             buffer.append("[route: ");
648             buffer.append(route);
649             buffer.append("][leased: ");
650             buffer.append(leased.size());
651             buffer.append("][available: ");
652             buffer.append(available.size());
653             buffer.append("][pending: ");
654             buffer.append(pending.size());
655             buffer.append("]");
656             return buffer.toString();
657         }
658 
659     }
660 
661 }