View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.Deque;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.Objects;
33  import java.util.Set;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ConcurrentLinkedDeque;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.TimeoutException;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicInteger;
43  import java.util.concurrent.atomic.AtomicLong;
44  import java.util.concurrent.atomic.AtomicMarkableReference;
45  
46  import org.apache.hc.core5.annotation.Contract;
47  import org.apache.hc.core5.annotation.Experimental;
48  import org.apache.hc.core5.annotation.ThreadingBehavior;
49  import org.apache.hc.core5.concurrent.BasicFuture;
50  import org.apache.hc.core5.concurrent.Cancellable;
51  import org.apache.hc.core5.concurrent.FutureCallback;
52  import org.apache.hc.core5.function.Callback;
53  import org.apache.hc.core5.io.CloseMode;
54  import org.apache.hc.core5.io.ModalCloseable;
55  import org.apache.hc.core5.util.Args;
56  import org.apache.hc.core5.util.Asserts;
57  import org.apache.hc.core5.util.Deadline;
58  import org.apache.hc.core5.util.DeadlineTimeoutException;
59  import org.apache.hc.core5.util.TimeValue;
60  import org.apache.hc.core5.util.Timeout;
61  
62  /**
63   * Connection pool with higher concurrency but with lax connection limit guarantees.
64   *
65   * @param <T> route
66   * @param <C> connection object
67   *
68   * @since 5.0
69   */
70  @Contract(threading = ThreadingBehavior.SAFE)
71  @Experimental
72  public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
73  
74      private final TimeValue timeToLive;
75      private final PoolReusePolicy policy;
76      private final DisposalCallback<C> disposalCallback;
77      private final ConnPoolListener<T> connPoolListener;
78      private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
79      private final AtomicBoolean isShutDown;
80  
81      private volatile int defaultMaxPerRoute;
82  
83      /**
84       * @since 5.0
85       */
86      public LaxConnPool(
87              final int defaultMaxPerRoute,
88              final TimeValue timeToLive,
89              final PoolReusePolicy policy,
90              final DisposalCallback<C> disposalCallback,
91              final ConnPoolListener<T> connPoolListener) {
92          super();
93          Args.positive(defaultMaxPerRoute, "Max per route value");
94          this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
95          this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
96          this.disposalCallback = disposalCallback;
97          this.connPoolListener = connPoolListener;
98          this.routeToPool = new ConcurrentHashMap<>();
99          this.isShutDown = new AtomicBoolean(false);
100         this.defaultMaxPerRoute = defaultMaxPerRoute;
101     }
102 
103     /**
104      * @since 5.0
105      */
106     public LaxConnPool(
107             final int defaultMaxPerRoute,
108             final TimeValue timeToLive,
109             final PoolReusePolicy policy,
110             final ConnPoolListener<T> connPoolListener) {
111         this(defaultMaxPerRoute, timeToLive, policy, null, connPoolListener);
112     }
113 
114     public LaxConnPool(final int defaultMaxPerRoute) {
115         this(defaultMaxPerRoute, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null, null);
116     }
117 
118     public boolean isShutdown() {
119         return isShutDown.get();
120     }
121 
122     @Override
123     public void close(final CloseMode closeMode) {
124         if (isShutDown.compareAndSet(false, true)) {
125             for (final Iterator<PerRoutePool<T, C>> it = routeToPool.values().iterator(); it.hasNext(); ) {
126                 final PerRoutePool<T, C> routePool = it.next();
127                 routePool.shutdown(closeMode);
128             }
129             routeToPool.clear();
130         }
131     }
132 
133     @Override
134     public void close() {
135         close(CloseMode.GRACEFUL);
136     }
137 
138     private PerRoutePool<T, C> getPool(final T route) {
139         PerRoutePool<T, C> routePool = routeToPool.get(route);
140         if (routePool == null) {
141             final PerRoutePool<T, C> newRoutePool = new PerRoutePool<>(
142                     route,
143                     defaultMaxPerRoute,
144                     timeToLive,
145                     policy,
146                     this,
147                     disposalCallback,
148                     connPoolListener);
149             routePool = routeToPool.putIfAbsent(route, newRoutePool);
150             if (routePool == null) {
151                 routePool = newRoutePool;
152             }
153         }
154         return routePool;
155     }
156 
157     @Override
158     public Future<PoolEntry<T, C>> lease(
159             final T route, final Object state,
160             final Timeout requestTimeout,
161             final FutureCallback<PoolEntry<T, C>> callback) {
162         Args.notNull(route, "Route");
163         Asserts.check(!isShutDown.get(), "Connection pool shut down");
164         final PerRoutePool<T, C> routePool = getPool(route);
165         return routePool.lease(state, requestTimeout, callback);
166     }
167 
168     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
169         return lease(route, state, Timeout.DISABLED, null);
170     }
171 
172     @Override
173     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
174         if (entry == null) {
175             return;
176         }
177         if (isShutDown.get()) {
178             return;
179         }
180         final PerRoutePool<T, C> routePool = getPool(entry.getRoute());
181         routePool.release(entry, reusable);
182     }
183 
184     public void validatePendingRequests() {
185         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
186             routePool.validatePendingRequests();
187         }
188     }
189 
190     @Override
191     public void setMaxTotal(final int max) {
192     }
193 
194     @Override
195     public int getMaxTotal() {
196         return 0;
197     }
198 
199     @Override
200     public void setDefaultMaxPerRoute(final int max) {
201         Args.positive(max, "Max value");
202         defaultMaxPerRoute = max;
203     }
204 
205     @Override
206     public int getDefaultMaxPerRoute() {
207         return defaultMaxPerRoute;
208     }
209 
210     @Override
211     public void setMaxPerRoute(final T route, final int max) {
212         Args.notNull(route, "Route");
213         final PerRoutePool<T, C> routePool = getPool(route);
214         routePool.setMax(max > -1 ? max : defaultMaxPerRoute);
215     }
216 
217     @Override
218     public int getMaxPerRoute(final T route) {
219         Args.notNull(route, "Route");
220         final PerRoutePool<T, C> routePool = getPool(route);
221         return routePool.getMax();
222     }
223 
224     @Override
225     public PoolStats getTotalStats() {
226         int leasedTotal = 0;
227         int pendingTotal = 0;
228         int availableTotal = 0;
229         int maxTotal = 0;
230         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
231             leasedTotal += routePool.getLeasedCount();
232             pendingTotal += routePool.getPendingCount();
233             availableTotal += routePool.getAvailableCount();
234             maxTotal += routePool.getMax();
235         }
236         return new PoolStats(leasedTotal, pendingTotal, availableTotal, maxTotal);
237     }
238 
239     @Override
240     public PoolStats getStats(final T route) {
241         Args.notNull(route, "Route");
242         final PerRoutePool<T, C> routePool = getPool(route);
243         return new PoolStats(
244                 routePool.getLeasedCount(),
245                 routePool.getPendingCount(),
246                 routePool.getAvailableCount(),
247                 routePool.getMax());
248     }
249 
250     @Override
251     public Set<T> getRoutes() {
252         return new HashSet<>(routeToPool.keySet());
253     }
254 
255     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
256         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
257             routePool.enumAvailable(callback);
258         }
259     }
260 
261     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
262         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
263             routePool.enumLeased(callback);
264         }
265     }
266 
267     @Override
268     public void closeIdle(final TimeValue idleTime) {
269         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
270         enumAvailable(entry -> {
271             if (entry.getUpdated() <= deadline) {
272                 entry.discardConnection(CloseMode.GRACEFUL);
273             }
274         });
275     }
276 
277     @Override
278     public void closeExpired() {
279         final long now = System.currentTimeMillis();
280         enumAvailable(entry -> {
281             if (entry.getExpiryDeadline().isBefore(now)) {
282                 entry.discardConnection(CloseMode.GRACEFUL);
283             }
284         });
285     }
286 
287     @Override
288     public String toString() {
289         final PoolStats totalStats = getTotalStats();
290         final StringBuilder buffer = new StringBuilder();
291         buffer.append("[leased: ");
292         buffer.append(totalStats.getLeased());
293         buffer.append("][available: ");
294         buffer.append(totalStats.getAvailable());
295         buffer.append("][pending: ");
296         buffer.append(totalStats.getPending());
297         buffer.append("]");
298         return buffer.toString();
299     }
300 
301     static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
302 
303         private final Object state;
304         private final Deadline deadline;
305         private final BasicFuture<PoolEntry<T, C>> future;
306 
307         LeaseRequest(
308                 final Object state,
309                 final Timeout requestTimeout,
310                 final BasicFuture<PoolEntry<T, C>> future) {
311             super();
312             this.state = state;
313             this.deadline = Deadline.calculate(requestTimeout);
314             this.future = future;
315         }
316 
317         BasicFuture<PoolEntry<T, C>> getFuture() {
318             return this.future;
319         }
320 
321         public Object getState() {
322             return this.state;
323         }
324 
325         public Deadline getDeadline() {
326             return this.deadline;
327         }
328 
329         public boolean isDone() {
330             return this.future.isDone();
331         }
332 
333         public boolean completed(final PoolEntry<T, C> result) {
334             return future.completed(result);
335         }
336 
337         public boolean failed(final Exception ex) {
338             return future.failed(ex);
339         }
340 
341         @Override
342         public boolean cancel() {
343             return future.cancel();
344         }
345 
346     }
347 
348     static class PerRoutePool<T, C extends ModalCloseable> {
349 
350         private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
351 
352         private final T route;
353         private final TimeValue timeToLive;
354         private final PoolReusePolicy policy;
355         private final DisposalCallback<C> disposalCallback;
356         private final ConnPoolListener<T> connPoolListener;
357         private final ConnPoolStats<T> connPoolStats;
358         private final ConcurrentMap<PoolEntry<T, C>, Boolean> leased;
359         private final Deque<AtomicMarkableReference<PoolEntry<T, C>>> available;
360         private final Deque<LeaseRequest<T, C>> pending;
361         private final AtomicBoolean terminated;
362         private final AtomicInteger allocated;
363         private final AtomicLong releaseSeqNum;
364 
365         private volatile int max;
366 
367         PerRoutePool(
368                 final T route,
369                 final int max,
370                 final TimeValue timeToLive,
371                 final PoolReusePolicy policy,
372                 final ConnPoolStats<T> connPoolStats,
373                 final DisposalCallback<C> disposalCallback,
374                 final ConnPoolListener<T> connPoolListener) {
375             super();
376             this.route = route;
377             this.timeToLive = timeToLive;
378             this.policy = policy;
379             this.connPoolStats = connPoolStats;
380             this.disposalCallback = disposalCallback;
381             this.connPoolListener = connPoolListener;
382             this.leased = new ConcurrentHashMap<>();
383             this.available = new ConcurrentLinkedDeque<>();
384             this.pending = new ConcurrentLinkedDeque<>();
385             this.terminated = new AtomicBoolean(false);
386             this.allocated = new AtomicInteger(0);
387             this.releaseSeqNum = new AtomicLong(0);
388             this.max = max;
389         }
390 
391         public void shutdown(final CloseMode closeMode) {
392             if (terminated.compareAndSet(false, true)) {
393                 AtomicMarkableReference<PoolEntry<T, C>> entryRef;
394                 while ((entryRef = available.poll()) != null) {
395                     entryRef.getReference().discardConnection(closeMode);
396                 }
397                 for (final PoolEntry<T, C> entry : leased.keySet()) {
398                     entry.discardConnection(closeMode);
399                 }
400                 leased.clear();
401                 LeaseRequest<T, C> leaseRequest;
402                 while ((leaseRequest = pending.poll()) != null) {
403                     leaseRequest.cancel();
404                 }
405             }
406         }
407 
408         private PoolEntry<T, C> createPoolEntry() {
409             final int poolMax = max;
410             int prev, next;
411             do {
412                 prev = allocated.get();
413                 next = (prev<poolMax)? prev+1 : prev;
414             } while (!allocated.compareAndSet(prev, next));
415             return (prev < next)? new PoolEntry<>(route, timeToLive, disposalCallback) : null;
416         }
417 
418         private void deallocatePoolEntry() {
419             allocated.decrementAndGet();
420         }
421 
422         private void addLeased(final PoolEntry<T, C> entry) {
423             if (leased.putIfAbsent(entry, Boolean.TRUE) != null) {
424                 throw new IllegalStateException("Pool entry already present in the set of leased entries");
425             } else if (connPoolListener != null) {
426                 connPoolListener.onLease(route, connPoolStats);
427             }
428         }
429 
430         private void removeLeased(final PoolEntry<T, C> entry) {
431             if (connPoolListener != null) {
432                 connPoolListener.onRelease(route, connPoolStats);
433             }
434             if (!leased.remove(entry, Boolean.TRUE)) {
435                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
436             }
437         }
438 
439         private PoolEntry<T, C> getAvailableEntry(final Object state) {
440             for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
441                 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
442                 final PoolEntry<T, C> entry = ref.getReference();
443                 if (ref.compareAndSet(entry, entry, false, true)) {
444                     it.remove();
445                     if (entry.getExpiryDeadline().isExpired()) {
446                         entry.discardConnection(CloseMode.GRACEFUL);
447                     }
448                     if (!Objects.equals(entry.getState(), state)) {
449                         entry.discardConnection(CloseMode.GRACEFUL);
450                     }
451                     return entry;
452                 }
453             }
454             return null;
455         }
456 
457         public Future<PoolEntry<T, C>> lease(
458                 final Object state,
459                 final Timeout requestTimeout,
460                 final FutureCallback<PoolEntry<T, C>> callback) {
461             Asserts.check(!terminated.get(), "Connection pool shut down");
462             final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
463 
464                 @Override
465                 public synchronized PoolEntry<T, C> get(
466                         final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
467                     try {
468                         return super.get(timeout, unit);
469                     } catch (final TimeoutException ex) {
470                         cancel();
471                         throw ex;
472                     }
473                 }
474 
475             };
476             final long releaseState = releaseSeqNum.get();
477             PoolEntry<T, C> entry = null;
478             if (pending.isEmpty()) {
479                 entry = getAvailableEntry(state);
480                 if (entry == null) {
481                     entry = createPoolEntry();
482                 }
483             }
484             if (entry != null) {
485                 addLeased(entry);
486                 future.completed(entry);
487             } else {
488                 pending.add(new LeaseRequest<>(state, requestTimeout, future));
489                 if (releaseState != releaseSeqNum.get()) {
490                     servicePendingRequest();
491                 }
492             }
493             return future;
494         }
495 
496         public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
497             removeLeased(releasedEntry);
498             if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
499                 releasedEntry.discardConnection(CloseMode.GRACEFUL);
500             }
501             if (releasedEntry.hasConnection()) {
502                 switch (policy) {
503                     case LIFO:
504                         available.addFirst(new AtomicMarkableReference<>(releasedEntry, false));
505                         break;
506                     case FIFO:
507                         available.addLast(new AtomicMarkableReference<>(releasedEntry, false));
508                         break;
509                     default:
510                         throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
511                 }
512             }
513             else {
514                 deallocatePoolEntry();
515             }
516             releaseSeqNum.incrementAndGet();
517             servicePendingRequest();
518         }
519 
520 
521         private void servicePendingRequest() {
522             servicePendingRequests(RequestServiceStrategy.FIRST_SUCCESSFUL);
523         }
524 
525         private void servicePendingRequests(final RequestServiceStrategy serviceStrategy) {
526             LeaseRequest<T, C> leaseRequest;
527             while ((leaseRequest = pending.poll()) != null) {
528                 if (leaseRequest.isDone()) {
529                     continue;
530                 }
531                 final Object state = leaseRequest.getState();
532                 final Deadline deadline = leaseRequest.getDeadline();
533 
534                 if (deadline.isExpired()) {
535                     leaseRequest.failed(DeadlineTimeoutException.from(deadline));
536                 } else {
537                     final long releaseState = releaseSeqNum.get();
538                     PoolEntry<T, C> entry = getAvailableEntry(state);
539                     if (entry == null) {
540                         entry = createPoolEntry();
541                     }
542                     if (entry != null) {
543                         addLeased(entry);
544                         if (!leaseRequest.completed(entry)) {
545                             release(entry, true);
546                         }
547                         if (serviceStrategy == RequestServiceStrategy.FIRST_SUCCESSFUL) {
548                             break;
549                         }
550                     }
551                     else {
552                         pending.addFirst(leaseRequest);
553                         if (releaseState == releaseSeqNum.get()) {
554                             break;
555                         }
556                     }
557                 }
558             }
559         }
560 
561         public void validatePendingRequests() {
562             final Iterator<LeaseRequest<T, C>> it = pending.iterator();
563             while (it.hasNext()) {
564                 final LeaseRequest<T, C> request = it.next();
565                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
566                 if (future.isCancelled() && !request.isDone()) {
567                     it.remove();
568                 } else {
569                     final Deadline deadline = request.getDeadline();
570                     if (deadline.isExpired()) {
571                         request.failed(DeadlineTimeoutException.from(deadline));
572                     }
573                     if (request.isDone()) {
574                         it.remove();
575                     }
576                 }
577             }
578         }
579 
580         public final T getRoute() {
581             return route;
582         }
583 
584         public int getMax() {
585             return max;
586         }
587 
588         public void setMax(final int max) {
589             this.max = max;
590         }
591 
592         public int getPendingCount() {
593             return pending.size();
594         }
595 
596         public int getLeasedCount() {
597             return leased.size();
598         }
599 
600         public int getAvailableCount() {
601             return available.size();
602         }
603 
604         public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
605             for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
606                 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
607                 final PoolEntry<T, C> entry = ref.getReference();
608                 if (ref.compareAndSet(entry, entry, false, true)) {
609                     callback.execute(entry);
610                     if (!entry.hasConnection()) {
611                         deallocatePoolEntry();
612                         it.remove();
613                     }
614                     else {
615                         ref.set(entry, false);
616                     }
617                 }
618             }
619             releaseSeqNum.incrementAndGet();
620             servicePendingRequests(RequestServiceStrategy.ALL);
621         }
622 
623         public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
624             for (final Iterator<PoolEntry<T, C>> it = leased.keySet().iterator(); it.hasNext(); ) {
625                 final PoolEntry<T, C> entry = it.next();
626                 callback.execute(entry);
627                 if (!entry.hasConnection()) {
628                     deallocatePoolEntry();
629                     it.remove();
630                 }
631             }
632         }
633 
634         @Override
635         public String toString() {
636             final StringBuilder buffer = new StringBuilder();
637             buffer.append("[route: ");
638             buffer.append(route);
639             buffer.append("][leased: ");
640             buffer.append(leased.size());
641             buffer.append("][available: ");
642             buffer.append(available.size());
643             buffer.append("][pending: ");
644             buffer.append(pending.size());
645             buffer.append("]");
646             return buffer.toString();
647         }
648 
649     }
650 
651 }