/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* .
*
*/
package org.apache.hc.core5.pool;
import java.io.Closeable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.LangUtils;
/**
* Connection pool with strict max connection limit guarantees.
*
* @param route
* @param connection object
*
* @since 4.2
*/
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public class StrictConnPool implements ControlledConnPool {
private final long timeToLive;
private final TimeUnit timeUnit;
private final ConnPoolListener connPoolListener;
private final ConnPoolPolicy policy;
private final Map> routeToPool;
private final LinkedList> leasingRequests;
private final Set> leased;
private final LinkedList> available;
private final ConcurrentLinkedQueue> completedRequests;
private final Map maxPerRoute;
private final Lock lock;
private final AtomicBoolean isShutDown;
private volatile int defaultMaxPerRoute;
private volatile int maxTotal;
/**
* @since 5.0
*/
public StrictConnPool(
final int defaultMaxPerRoute,
final int maxTotal,
final long timeToLive,
final TimeUnit timeUnit,
final ConnPoolPolicy policy,
final ConnPoolListener connPoolListener) {
super();
Args.positive(defaultMaxPerRoute, "Max per route value");
Args.positive(maxTotal, "Max total value");
this.timeToLive = timeToLive;
this.timeUnit = timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS;
this.connPoolListener = connPoolListener;
this.policy = policy != null ? policy : ConnPoolPolicy.LIFO;
this.routeToPool = new HashMap<>();
this.leasingRequests = new LinkedList<>();
this.leased = new HashSet<>();
this.available = new LinkedList<>();
this.completedRequests = new ConcurrentLinkedQueue<>();
this.maxPerRoute = new HashMap<>();
this.lock = new ReentrantLock();
this.isShutDown = new AtomicBoolean(false);
this.defaultMaxPerRoute = defaultMaxPerRoute;
this.maxTotal = maxTotal;
}
public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
this(defaultMaxPerRoute, maxTotal, 0, TimeUnit.MILLISECONDS, ConnPoolPolicy.LIFO, null);
}
public boolean isShutdown() {
return this.isShutDown.get();
}
@Override
public void shutdown() {
if (this.isShutDown.compareAndSet(false, true)) {
fireCallbacks();
this.lock.lock();
try {
for (final RoutePool pool: this.routeToPool.values()) {
pool.shutdown();
}
this.routeToPool.clear();
this.leased.clear();
this.available.clear();
this.leasingRequests.clear();
} finally {
this.lock.unlock();
}
}
}
@Override
public void close() throws Exception {
shutdown();
}
private RoutePool getPool(final T route) {
RoutePool pool = this.routeToPool.get(route);
if (pool == null) {
pool = new RoutePool<>(route);
this.routeToPool.put(route, pool);
}
return pool;
}
public Future> lease(
final T route, final Object state,
final long leaseTimeout, final TimeUnit tunit,
final FutureCallback> callback) {
Args.notNull(route, "Route");
Args.notNull(tunit, "Time unit");
Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
final BasicFuture> future = new BasicFuture<>(callback);
this.lock.lock();
try {
final LeaseRequest request = new LeaseRequest<>(route, state, leaseTimeout, future);
final boolean completed = processPendingRequest(request);
if (!request.isDone() && !completed) {
this.leasingRequests.add(request);
}
if (request.isDone()) {
this.completedRequests.add(request);
}
} finally {
this.lock.unlock();
}
fireCallbacks();
return future;
}
@Override
public Future> lease(final T route, final Object state, final FutureCallback> callback) {
return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
}
public Future> lease(final T route, final Object state) {
return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
}
@Override
public void release(final PoolEntry entry, final boolean reusable) {
if (entry == null) {
return;
}
if (this.isShutDown.get()) {
return;
}
if (!reusable) {
entry.discardConnection();
}
this.lock.lock();
try {
if (this.leased.remove(entry)) {
final RoutePool pool = getPool(entry.getRoute());
final boolean keepAlive = entry.hasConnection() && reusable;
pool.free(entry, keepAlive);
if (keepAlive) {
switch (policy) {
case LIFO:
this.available.addFirst(entry);
break;
case FIFO:
this.available.addLast(entry);
break;
default:
throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
}
if (this.connPoolListener != null) {
this.connPoolListener.onRelease(entry.getRoute(), this);
}
} else {
entry.discardConnection();
}
processNextPendingRequest();
}
} finally {
this.lock.unlock();
}
fireCallbacks();
}
private void processPendingRequests() {
final ListIterator> it = this.leasingRequests.listIterator();
while (it.hasNext()) {
final LeaseRequest request = it.next();
final BasicFuture> future = request.getFuture();
if (future.isCancelled()) {
it.remove();
continue;
}
final boolean completed = processPendingRequest(request);
if (request.isDone() || completed) {
it.remove();
}
if (request.isDone()) {
this.completedRequests.add(request);
}
}
}
private void processNextPendingRequest() {
final ListIterator> it = this.leasingRequests.listIterator();
while (it.hasNext()) {
final LeaseRequest request = it.next();
final BasicFuture> future = request.getFuture();
if (future.isCancelled()) {
it.remove();
continue;
}
final boolean completed = processPendingRequest(request);
if (request.isDone() || completed) {
it.remove();
}
if (request.isDone()) {
this.completedRequests.add(request);
}
if (completed) {
return;
}
}
}
private boolean processPendingRequest(final LeaseRequest request) {
final T route = request.getRoute();
final Object state = request.getState();
final long deadline = request.getDeadline();
final long now = System.currentTimeMillis();
if (now > deadline) {
request.failed(new TimeoutException());
return false;
}
final RoutePool pool = getPool(route);
PoolEntry entry;
for (;;) {
entry = pool.getFree(state);
if (entry == null) {
break;
}
if (entry.getExpiry() < System.currentTimeMillis()) {
entry.discardConnection();
this.available.remove(entry);
pool.free(entry, false);
} else {
break;
}
}
if (entry != null) {
this.available.remove(entry);
this.leased.add(entry);
request.completed(entry);
if (this.connPoolListener != null) {
this.connPoolListener.onLease(entry.getRoute(), this);
}
return true;
}
// New connection is needed
final int maxPerRoute = getMax(route);
// Shrink the pool prior to allocating a new connection
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
if (excess > 0) {
for (int i = 0; i < excess; i++) {
final PoolEntry lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.discardConnection();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
if (pool.getAllocatedCount() < maxPerRoute) {
final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
if (freeCapacity == 0) {
return false;
}
final int totalAvailable = this.available.size();
if (totalAvailable > freeCapacity - 1) {
if (!this.available.isEmpty()) {
final PoolEntry lastUsed = this.available.removeLast();
lastUsed.discardConnection();
final RoutePool otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
}
entry = pool.createEntry(this.timeToLive, this.timeUnit);
this.leased.add(entry);
request.completed(entry);
if (this.connPoolListener != null) {
this.connPoolListener.onLease(entry.getRoute(), this);
}
return true;
}
return false;
}
private void fireCallbacks() {
LeaseRequest request;
while ((request = this.completedRequests.poll()) != null) {
final BasicFuture> future = request.getFuture();
final Exception ex = request.getException();
final PoolEntry result = request.getResult();
boolean successfullyCompleted = false;
if (ex != null) {
future.failed(ex);
} else if (result != null) {
if (future.completed(result)) {
successfullyCompleted = true;
}
} else {
future.cancel();
}
if (!successfullyCompleted) {
release(result, true);
}
}
}
public void validatePendingRequests() {
this.lock.lock();
try {
final long now = System.currentTimeMillis();
final ListIterator> it = this.leasingRequests.listIterator();
while (it.hasNext()) {
final LeaseRequest request = it.next();
final long deadline = request.getDeadline();
if (now > deadline) {
it.remove();
request.failed(new TimeoutException());
this.completedRequests.add(request);
}
}
} finally {
this.lock.unlock();
}
fireCallbacks();
}
private int getMax(final T route) {
final Integer v = this.maxPerRoute.get(route);
if (v != null) {
return v.intValue();
}
return this.defaultMaxPerRoute;
}
@Override
public void setMaxTotal(final int max) {
Args.positive(max, "Max value");
this.lock.lock();
try {
this.maxTotal = max;
} finally {
this.lock.unlock();
}
}
@Override
public int getMaxTotal() {
this.lock.lock();
try {
return this.maxTotal;
} finally {
this.lock.unlock();
}
}
@Override
public void setDefaultMaxPerRoute(final int max) {
Args.positive(max, "Max value");
this.lock.lock();
try {
this.defaultMaxPerRoute = max;
} finally {
this.lock.unlock();
}
}
@Override
public int getDefaultMaxPerRoute() {
this.lock.lock();
try {
return this.defaultMaxPerRoute;
} finally {
this.lock.unlock();
}
}
@Override
public void setMaxPerRoute(final T route, final int max) {
Args.notNull(route, "Route");
Args.positive(max, "Max value");
this.lock.lock();
try {
this.maxPerRoute.put(route, Integer.valueOf(max));
} finally {
this.lock.unlock();
}
}
@Override
public int getMaxPerRoute(final T route) {
Args.notNull(route, "Route");
this.lock.lock();
try {
return getMax(route);
} finally {
this.lock.unlock();
}
}
@Override
public PoolStats getTotalStats() {
this.lock.lock();
try {
return new PoolStats(
this.leased.size(),
this.leasingRequests.size(),
this.available.size(),
this.maxTotal);
} finally {
this.lock.unlock();
}
}
@Override
public PoolStats getStats(final T route) {
Args.notNull(route, "Route");
this.lock.lock();
try {
final RoutePool pool = getPool(route);
int pendingCount = 0;
for (LeaseRequest request: leasingRequests) {
if (LangUtils.equals(route, request.getRoute())) {
pendingCount++;
}
}
return new PoolStats(
pool.getLeasedCount(),
pendingCount,
pool.getAvailableCount(),
getMax(route));
} finally {
this.lock.unlock();
}
}
/**
* Returns snapshot of all knows routes
*
* @since 4.4
*/
public Set getRoutes() {
this.lock.lock();
try {
return new HashSet<>(routeToPool.keySet());
} finally {
this.lock.unlock();
}
}
/**
* Enumerates all available connections.
*
* @since 4.3
*/
public void enumAvailable(final Callback> callback) {
this.lock.lock();
try {
final Iterator> it = this.available.iterator();
while (it.hasNext()) {
final PoolEntry entry = it.next();
callback.execute(entry);
if (!entry.hasConnection()) {
final RoutePool pool = getPool(entry.getRoute());
pool.remove(entry);
it.remove();
}
}
processPendingRequests();
purgePoolMap();
} finally {
this.lock.unlock();
}
}
/**
* Enumerates all leased connections.
*
* @since 4.3
*/
public void enumLeased(final Callback> callback) {
this.lock.lock();
try {
final Iterator> it = this.leased.iterator();
while (it.hasNext()) {
final PoolEntry entry = it.next();
callback.execute(entry);
}
processPendingRequests();
} finally {
this.lock.unlock();
}
}
private void purgePoolMap() {
final Iterator>> it = this.routeToPool.entrySet().iterator();
while (it.hasNext()) {
final Map.Entry> entry = it.next();
final RoutePool pool = entry.getValue();
if (pool.getAllocatedCount() == 0) {
it.remove();
}
}
}
@Override
public void closeIdle(final long idletime, final TimeUnit tunit) {
Args.notNull(tunit, "Time unit");
long time = tunit.toMillis(idletime);
if (time < 0) {
time = 0;
}
final long deadline = System.currentTimeMillis() - time;
enumAvailable(new Callback>() {
@Override
public void execute(final PoolEntry entry) {
if (entry.getUpdated() <= deadline) {
entry.discardConnection();
}
}
});
}
@Override
public void closeExpired() {
final long now = System.currentTimeMillis();
enumAvailable(new Callback>() {
@Override
public void execute(final PoolEntry entry) {
if (entry.getExpiry() < now) {
entry.discardConnection();
}
}
});
}
@Override
public String toString() {
final StringBuilder buffer = new StringBuilder();
buffer.append("[leased: ");
buffer.append(this.leased.size());
buffer.append("][available: ");
buffer.append(this.available.size());
buffer.append("][pending: ");
buffer.append(this.leasingRequests.size());
buffer.append("]");
return buffer.toString();
}
}