/* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * . * */ package org.apache.hc.core5.pool; import java.io.Closeable; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.ListIterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.concurrent.BasicFuture; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.Asserts; import org.apache.hc.core5.util.LangUtils; /** * Connection pool with strict max connection limit guarantees. * * @param route * @param connection object * * @since 4.2 */ @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) public class StrictConnPool implements ControlledConnPool { private final long timeToLive; private final TimeUnit timeUnit; private final ConnPoolListener connPoolListener; private final ConnPoolPolicy policy; private final Map> routeToPool; private final LinkedList> leasingRequests; private final Set> leased; private final LinkedList> available; private final ConcurrentLinkedQueue> completedRequests; private final Map maxPerRoute; private final Lock lock; private final AtomicBoolean isShutDown; private volatile int defaultMaxPerRoute; private volatile int maxTotal; /** * @since 5.0 */ public StrictConnPool( final int defaultMaxPerRoute, final int maxTotal, final long timeToLive, final TimeUnit timeUnit, final ConnPoolPolicy policy, final ConnPoolListener connPoolListener) { super(); Args.positive(defaultMaxPerRoute, "Max per route value"); Args.positive(maxTotal, "Max total value"); this.timeToLive = timeToLive; this.timeUnit = timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS; this.connPoolListener = connPoolListener; this.policy = policy != null ? policy : ConnPoolPolicy.LIFO; this.routeToPool = new HashMap<>(); this.leasingRequests = new LinkedList<>(); this.leased = new HashSet<>(); this.available = new LinkedList<>(); this.completedRequests = new ConcurrentLinkedQueue<>(); this.maxPerRoute = new HashMap<>(); this.lock = new ReentrantLock(); this.isShutDown = new AtomicBoolean(false); this.defaultMaxPerRoute = defaultMaxPerRoute; this.maxTotal = maxTotal; } public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) { this(defaultMaxPerRoute, maxTotal, 0, TimeUnit.MILLISECONDS, ConnPoolPolicy.LIFO, null); } public boolean isShutdown() { return this.isShutDown.get(); } @Override public void shutdown() { if (this.isShutDown.compareAndSet(false, true)) { fireCallbacks(); this.lock.lock(); try { for (final RoutePool pool: this.routeToPool.values()) { pool.shutdown(); } this.routeToPool.clear(); this.leased.clear(); this.available.clear(); this.leasingRequests.clear(); } finally { this.lock.unlock(); } } } @Override public void close() throws Exception { shutdown(); } private RoutePool getPool(final T route) { RoutePool pool = this.routeToPool.get(route); if (pool == null) { pool = new RoutePool<>(route); this.routeToPool.put(route, pool); } return pool; } public Future> lease( final T route, final Object state, final long leaseTimeout, final TimeUnit tunit, final FutureCallback> callback) { Args.notNull(route, "Route"); Args.notNull(tunit, "Time unit"); Asserts.check(!this.isShutDown.get(), "Connection pool shut down"); final BasicFuture> future = new BasicFuture<>(callback); this.lock.lock(); try { final LeaseRequest request = new LeaseRequest<>(route, state, leaseTimeout, future); final boolean completed = processPendingRequest(request); if (!request.isDone() && !completed) { this.leasingRequests.add(request); } if (request.isDone()) { this.completedRequests.add(request); } } finally { this.lock.unlock(); } fireCallbacks(); return future; } @Override public Future> lease(final T route, final Object state, final FutureCallback> callback) { return lease(route, state, -1, TimeUnit.MICROSECONDS, callback); } public Future> lease(final T route, final Object state) { return lease(route, state, -1, TimeUnit.MICROSECONDS, null); } @Override public void release(final PoolEntry entry, final boolean reusable) { if (entry == null) { return; } if (this.isShutDown.get()) { return; } if (!reusable) { entry.discardConnection(); } this.lock.lock(); try { if (this.leased.remove(entry)) { final RoutePool pool = getPool(entry.getRoute()); final boolean keepAlive = entry.hasConnection() && reusable; pool.free(entry, keepAlive); if (keepAlive) { switch (policy) { case LIFO: this.available.addFirst(entry); break; case FIFO: this.available.addLast(entry); break; default: throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy); } if (this.connPoolListener != null) { this.connPoolListener.onRelease(entry.getRoute(), this); } } else { entry.discardConnection(); } processNextPendingRequest(); } } finally { this.lock.unlock(); } fireCallbacks(); } private void processPendingRequests() { final ListIterator> it = this.leasingRequests.listIterator(); while (it.hasNext()) { final LeaseRequest request = it.next(); final BasicFuture> future = request.getFuture(); if (future.isCancelled()) { it.remove(); continue; } final boolean completed = processPendingRequest(request); if (request.isDone() || completed) { it.remove(); } if (request.isDone()) { this.completedRequests.add(request); } } } private void processNextPendingRequest() { final ListIterator> it = this.leasingRequests.listIterator(); while (it.hasNext()) { final LeaseRequest request = it.next(); final BasicFuture> future = request.getFuture(); if (future.isCancelled()) { it.remove(); continue; } final boolean completed = processPendingRequest(request); if (request.isDone() || completed) { it.remove(); } if (request.isDone()) { this.completedRequests.add(request); } if (completed) { return; } } } private boolean processPendingRequest(final LeaseRequest request) { final T route = request.getRoute(); final Object state = request.getState(); final long deadline = request.getDeadline(); final long now = System.currentTimeMillis(); if (now > deadline) { request.failed(new TimeoutException()); return false; } final RoutePool pool = getPool(route); PoolEntry entry; for (;;) { entry = pool.getFree(state); if (entry == null) { break; } if (entry.getExpiry() < System.currentTimeMillis()) { entry.discardConnection(); this.available.remove(entry); pool.free(entry, false); } else { break; } } if (entry != null) { this.available.remove(entry); this.leased.add(entry); request.completed(entry); if (this.connPoolListener != null) { this.connPoolListener.onLease(entry.getRoute(), this); } return true; } // New connection is needed final int maxPerRoute = getMax(route); // Shrink the pool prior to allocating a new connection final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); if (excess > 0) { for (int i = 0; i < excess; i++) { final PoolEntry lastUsed = pool.getLastUsed(); if (lastUsed == null) { break; } lastUsed.discardConnection(); this.available.remove(lastUsed); pool.remove(lastUsed); } } if (pool.getAllocatedCount() < maxPerRoute) { final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0); if (freeCapacity == 0) { return false; } final int totalAvailable = this.available.size(); if (totalAvailable > freeCapacity - 1) { if (!this.available.isEmpty()) { final PoolEntry lastUsed = this.available.removeLast(); lastUsed.discardConnection(); final RoutePool otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); } } entry = pool.createEntry(this.timeToLive, this.timeUnit); this.leased.add(entry); request.completed(entry); if (this.connPoolListener != null) { this.connPoolListener.onLease(entry.getRoute(), this); } return true; } return false; } private void fireCallbacks() { LeaseRequest request; while ((request = this.completedRequests.poll()) != null) { final BasicFuture> future = request.getFuture(); final Exception ex = request.getException(); final PoolEntry result = request.getResult(); boolean successfullyCompleted = false; if (ex != null) { future.failed(ex); } else if (result != null) { if (future.completed(result)) { successfullyCompleted = true; } } else { future.cancel(); } if (!successfullyCompleted) { release(result, true); } } } public void validatePendingRequests() { this.lock.lock(); try { final long now = System.currentTimeMillis(); final ListIterator> it = this.leasingRequests.listIterator(); while (it.hasNext()) { final LeaseRequest request = it.next(); final long deadline = request.getDeadline(); if (now > deadline) { it.remove(); request.failed(new TimeoutException()); this.completedRequests.add(request); } } } finally { this.lock.unlock(); } fireCallbacks(); } private int getMax(final T route) { final Integer v = this.maxPerRoute.get(route); if (v != null) { return v.intValue(); } return this.defaultMaxPerRoute; } @Override public void setMaxTotal(final int max) { Args.positive(max, "Max value"); this.lock.lock(); try { this.maxTotal = max; } finally { this.lock.unlock(); } } @Override public int getMaxTotal() { this.lock.lock(); try { return this.maxTotal; } finally { this.lock.unlock(); } } @Override public void setDefaultMaxPerRoute(final int max) { Args.positive(max, "Max value"); this.lock.lock(); try { this.defaultMaxPerRoute = max; } finally { this.lock.unlock(); } } @Override public int getDefaultMaxPerRoute() { this.lock.lock(); try { return this.defaultMaxPerRoute; } finally { this.lock.unlock(); } } @Override public void setMaxPerRoute(final T route, final int max) { Args.notNull(route, "Route"); Args.positive(max, "Max value"); this.lock.lock(); try { this.maxPerRoute.put(route, Integer.valueOf(max)); } finally { this.lock.unlock(); } } @Override public int getMaxPerRoute(final T route) { Args.notNull(route, "Route"); this.lock.lock(); try { return getMax(route); } finally { this.lock.unlock(); } } @Override public PoolStats getTotalStats() { this.lock.lock(); try { return new PoolStats( this.leased.size(), this.leasingRequests.size(), this.available.size(), this.maxTotal); } finally { this.lock.unlock(); } } @Override public PoolStats getStats(final T route) { Args.notNull(route, "Route"); this.lock.lock(); try { final RoutePool pool = getPool(route); int pendingCount = 0; for (LeaseRequest request: leasingRequests) { if (LangUtils.equals(route, request.getRoute())) { pendingCount++; } } return new PoolStats( pool.getLeasedCount(), pendingCount, pool.getAvailableCount(), getMax(route)); } finally { this.lock.unlock(); } } /** * Returns snapshot of all knows routes * * @since 4.4 */ public Set getRoutes() { this.lock.lock(); try { return new HashSet<>(routeToPool.keySet()); } finally { this.lock.unlock(); } } /** * Enumerates all available connections. * * @since 4.3 */ public void enumAvailable(final Callback> callback) { this.lock.lock(); try { final Iterator> it = this.available.iterator(); while (it.hasNext()) { final PoolEntry entry = it.next(); callback.execute(entry); if (!entry.hasConnection()) { final RoutePool pool = getPool(entry.getRoute()); pool.remove(entry); it.remove(); } } processPendingRequests(); purgePoolMap(); } finally { this.lock.unlock(); } } /** * Enumerates all leased connections. * * @since 4.3 */ public void enumLeased(final Callback> callback) { this.lock.lock(); try { final Iterator> it = this.leased.iterator(); while (it.hasNext()) { final PoolEntry entry = it.next(); callback.execute(entry); } processPendingRequests(); } finally { this.lock.unlock(); } } private void purgePoolMap() { final Iterator>> it = this.routeToPool.entrySet().iterator(); while (it.hasNext()) { final Map.Entry> entry = it.next(); final RoutePool pool = entry.getValue(); if (pool.getAllocatedCount() == 0) { it.remove(); } } } @Override public void closeIdle(final long idletime, final TimeUnit tunit) { Args.notNull(tunit, "Time unit"); long time = tunit.toMillis(idletime); if (time < 0) { time = 0; } final long deadline = System.currentTimeMillis() - time; enumAvailable(new Callback>() { @Override public void execute(final PoolEntry entry) { if (entry.getUpdated() <= deadline) { entry.discardConnection(); } } }); } @Override public void closeExpired() { final long now = System.currentTimeMillis(); enumAvailable(new Callback>() { @Override public void execute(final PoolEntry entry) { if (entry.getExpiry() < now) { entry.discardConnection(); } } }); } @Override public String toString() { final StringBuilder buffer = new StringBuilder(); buffer.append("[leased: "); buffer.append(this.leased.size()); buffer.append("][available: "); buffer.append(this.available.size()); buffer.append("][pending: "); buffer.append(this.leasingRequests.size()); buffer.append("]"); return buffer.toString(); } }