1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactor;
28
29 import java.util.ArrayDeque;
30 import java.util.HashSet;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38 import org.apache.hc.core5.annotation.Contract;
39 import org.apache.hc.core5.annotation.ThreadingBehavior;
40 import org.apache.hc.core5.concurrent.ComplexFuture;
41 import org.apache.hc.core5.concurrent.FutureCallback;
42 import org.apache.hc.core5.concurrent.FutureContribution;
43 import org.apache.hc.core5.function.Callback;
44 import org.apache.hc.core5.http.ConnectionClosedException;
45 import org.apache.hc.core5.io.CloseMode;
46 import org.apache.hc.core5.io.ModalCloseable;
47 import org.apache.hc.core5.util.Args;
48 import org.apache.hc.core5.util.Asserts;
49 import org.apache.hc.core5.util.TimeValue;
50 import org.apache.hc.core5.util.Timeout;
51
52
53
54
55 @Contract(threading = ThreadingBehavior.SAFE)
56 public abstract class AbstractIOSessionPool<T> implements ModalCloseable {
57
58 private final ConcurrentMap<T, PoolEntry> sessionPool;
59 private final AtomicBoolean closed;
60
61 public AbstractIOSessionPool() {
62 super();
63 this.sessionPool = new ConcurrentHashMap<>();
64 this.closed = new AtomicBoolean(false);
65 }
66
67 protected abstract Future<IOSession> connectSession(
68 T namedEndpoint,
69 Timeout connectTimeout,
70 FutureCallback<IOSession> callback);
71
72 protected abstract void validateSession(
73 IOSession ioSession,
74 Callback<Boolean> callback);
75
76 protected abstract void closeSession(
77 IOSession ioSession,
78 CloseMode closeMode);
79
80 @Override
81 public final void close(final CloseMode closeMode) {
82 if (closed.compareAndSet(false, true)) {
83 for (final PoolEntry poolEntry : sessionPool.values()) {
84 synchronized (poolEntry) {
85 if (poolEntry.session != null) {
86 closeSession(poolEntry.session, closeMode);
87 poolEntry.session = null;
88 }
89 if (poolEntry.sessionFuture != null) {
90 poolEntry.sessionFuture.cancel(true);
91 poolEntry.sessionFuture = null;
92 }
93 for (;;) {
94 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
95 if (callback != null) {
96 callback.cancelled();
97 } else {
98 break;
99 }
100 }
101 }
102 }
103 sessionPool.clear();
104 }
105 }
106
107 @Override
108 public final void close() {
109 close(CloseMode.GRACEFUL);
110 }
111
112 PoolEntry getPoolEntry(final T endpoint) {
113 PoolEntry poolEntry = sessionPool.get(endpoint);
114 if (poolEntry == null) {
115 final PoolEntry newPoolEntry = new PoolEntry();
116 poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
117 if (poolEntry == null) {
118 poolEntry = newPoolEntry;
119 }
120 }
121 return poolEntry;
122 }
123
124 public final Future<IOSession> getSession(
125 final T endpoint,
126 final Timeout connectTimeout,
127 final FutureCallback<IOSession> callback) {
128 Args.notNull(endpoint, "Endpoint");
129 Asserts.check(!closed.get(), "Connection pool shut down");
130 final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
131 final PoolEntry poolEntry = getPoolEntry(endpoint);
132 getSessionInternal(poolEntry, false, endpoint, connectTimeout, new FutureCallback<IOSession>() {
133
134 @Override
135 public void completed(final IOSession ioSession) {
136 validateSession(ioSession, new Callback<Boolean>() {
137
138 @Override
139 public void execute(final Boolean result) {
140 if (result) {
141 future.completed(ioSession);
142 } else {
143 getSessionInternal(poolEntry, true, endpoint, connectTimeout,
144 new FutureContribution<IOSession>(future) {
145
146 @Override
147 public void completed(final IOSession ioSession) {
148 future.completed(ioSession);
149 }
150
151 });
152 }
153 }
154
155 });
156 }
157
158 @Override
159 public void failed(final Exception ex) {
160 future.failed(ex);
161 }
162
163 @Override
164 public void cancelled() {
165 future.cancel();
166 }
167
168 });
169 return future;
170 }
171
172 private void getSessionInternal(
173 final PoolEntry poolEntry,
174 final boolean requestNew,
175 final T namedEndpoint,
176 final Timeout connectTimeout,
177 final FutureCallback<IOSession> callback) {
178 synchronized (poolEntry) {
179 if (poolEntry.session != null && requestNew) {
180 closeSession(poolEntry.session, CloseMode.GRACEFUL);
181 poolEntry.session = null;
182 }
183 if (poolEntry.session != null && !poolEntry.session.isOpen()) {
184 poolEntry.session = null;
185 }
186 if (poolEntry.session != null) {
187 callback.completed(poolEntry.session);
188 } else {
189 poolEntry.requestQueue.add(callback);
190 if (poolEntry.sessionFuture == null) {
191 poolEntry.sessionFuture = connectSession(
192 namedEndpoint,
193 connectTimeout,
194 new FutureCallback<IOSession>() {
195
196 @Override
197 public void completed(final IOSession result) {
198 synchronized (poolEntry) {
199 poolEntry.session = result;
200 poolEntry.sessionFuture = null;
201 for (;;) {
202 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
203 if (callback != null) {
204 callback.completed(result);
205 } else {
206 break;
207 }
208 }
209 }
210 }
211
212 @Override
213 public void failed(final Exception ex) {
214 synchronized (poolEntry) {
215 poolEntry.session = null;
216 poolEntry.sessionFuture = null;
217 for (;;) {
218 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
219 if (callback != null) {
220 callback.failed(ex);
221 } else {
222 break;
223 }
224 }
225 }
226 }
227
228 @Override
229 public void cancelled() {
230 failed(new ConnectionClosedException("Connection request cancelled"));
231 }
232
233 });
234 }
235 }
236 }
237 }
238
239 public final void enumAvailable(final Callback<IOSession> callback) {
240 for (final PoolEntry poolEntry: sessionPool.values()) {
241 if (poolEntry.session != null) {
242 synchronized (poolEntry) {
243 if (poolEntry.session != null) {
244 callback.execute(poolEntry.session);
245 if (!poolEntry.session.isOpen()) {
246 poolEntry.session = null;
247 }
248 }
249 }
250 }
251 }
252 }
253
254 public final void closeIdle(final TimeValue idleTime) {
255 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
256 for (final PoolEntry poolEntry: sessionPool.values()) {
257 if (poolEntry.session != null) {
258 synchronized (poolEntry) {
259 if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) {
260 closeSession(poolEntry.session, CloseMode.GRACEFUL);
261 poolEntry.session = null;
262 }
263 }
264 }
265 }
266 }
267
268 public final Set<T> getRoutes() {
269 return new HashSet<>(sessionPool.keySet());
270 }
271
272 @Override
273 public String toString() {
274 final StringBuilder buffer = new StringBuilder();
275 buffer.append("I/O sessions: ");
276 buffer.append(sessionPool.size());
277 return buffer.toString();
278 }
279
280 static class PoolEntry {
281
282 final Queue<FutureCallback<IOSession>> requestQueue;
283 volatile Future<IOSession> sessionFuture;
284 volatile IOSession session;
285
286 PoolEntry() {
287 this.requestQueue = new ArrayDeque<>();
288 }
289
290 }
291
292 }