1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactor;
28
29 import java.util.ArrayDeque;
30 import java.util.HashSet;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38 import org.apache.hc.core5.annotation.Contract;
39 import org.apache.hc.core5.annotation.ThreadingBehavior;
40 import org.apache.hc.core5.concurrent.ComplexFuture;
41 import org.apache.hc.core5.concurrent.FutureCallback;
42 import org.apache.hc.core5.concurrent.FutureContribution;
43 import org.apache.hc.core5.function.Callback;
44 import org.apache.hc.core5.http.ConnectionClosedException;
45 import org.apache.hc.core5.io.CloseMode;
46 import org.apache.hc.core5.io.ModalCloseable;
47 import org.apache.hc.core5.util.Args;
48 import org.apache.hc.core5.util.Asserts;
49 import org.apache.hc.core5.util.TimeValue;
50 import org.apache.hc.core5.util.Timeout;
51
52
53
54
55 @Contract(threading = ThreadingBehavior.SAFE)
56 public abstract class AbstractIOSessionPool<T> implements ModalCloseable {
57
58 private final ConcurrentMap<T, PoolEntry> sessionPool;
59 private final AtomicBoolean closed;
60
61 public AbstractIOSessionPool() {
62 super();
63 this.sessionPool = new ConcurrentHashMap<>();
64 this.closed = new AtomicBoolean(false);
65 }
66
67 protected abstract Future<IOSession> connectSession(
68 T namedEndpoint,
69 Timeout connectTimeout,
70 FutureCallback<IOSession> callback);
71
72 protected abstract void validateSession(
73 IOSession ioSession,
74 Callback<Boolean> callback);
75
76 protected abstract void closeSession(
77 IOSession ioSession,
78 CloseMode closeMode);
79
80 @Override
81 public final void close(final CloseMode closeMode) {
82 if (closed.compareAndSet(false, true)) {
83 for (final PoolEntry poolEntry : sessionPool.values()) {
84 synchronized (poolEntry) {
85 if (poolEntry.session != null) {
86 closeSession(poolEntry.session, closeMode);
87 poolEntry.session = null;
88 }
89 if (poolEntry.sessionFuture != null) {
90 poolEntry.sessionFuture.cancel(true);
91 poolEntry.sessionFuture = null;
92 }
93 for (;;) {
94 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
95 if (callback != null) {
96 callback.cancelled();
97 } else {
98 break;
99 }
100 }
101 }
102 }
103 sessionPool.clear();
104 }
105 }
106
107 @Override
108 public final void close() {
109 close(CloseMode.GRACEFUL);
110 }
111
112 PoolEntry getPoolEntry(final T endpoint) {
113 PoolEntry poolEntry = sessionPool.get(endpoint);
114 if (poolEntry == null) {
115 final PoolEntry newPoolEntry = new PoolEntry();
116 poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
117 if (poolEntry == null) {
118 poolEntry = newPoolEntry;
119 }
120 }
121 return poolEntry;
122 }
123
124 public final Future<IOSession> getSession(
125 final T endpoint,
126 final Timeout connectTimeout,
127 final FutureCallback<IOSession> callback) {
128 Args.notNull(endpoint, "Endpoint");
129 Asserts.check(!closed.get(), "Connection pool shut down");
130 final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
131 final PoolEntry poolEntry = getPoolEntry(endpoint);
132 getSessionInternal(poolEntry, false, endpoint, connectTimeout, new FutureCallback<IOSession>() {
133
134 @Override
135 public void completed(final IOSession ioSession) {
136 validateSession(ioSession, result -> {
137 if (result) {
138 future.completed(ioSession);
139 } else {
140 getSessionInternal(poolEntry, true, endpoint, connectTimeout,
141 new FutureContribution<IOSession>(future) {
142
143 @Override
144 public void completed(final IOSession ioSession1) {
145 future.completed(ioSession1);
146 }
147
148 });
149 }
150 });
151 }
152
153 @Override
154 public void failed(final Exception ex) {
155 future.failed(ex);
156 }
157
158 @Override
159 public void cancelled() {
160 future.cancel();
161 }
162
163 });
164 return future;
165 }
166
167 private void getSessionInternal(
168 final PoolEntry poolEntry,
169 final boolean requestNew,
170 final T namedEndpoint,
171 final Timeout connectTimeout,
172 final FutureCallback<IOSession> callback) {
173 synchronized (poolEntry) {
174 if (poolEntry.session != null && requestNew) {
175 closeSession(poolEntry.session, CloseMode.GRACEFUL);
176 poolEntry.session = null;
177 }
178 if (poolEntry.session != null && !poolEntry.session.isOpen()) {
179 poolEntry.session = null;
180 }
181 if (poolEntry.session != null) {
182 callback.completed(poolEntry.session);
183 } else {
184 poolEntry.requestQueue.add(callback);
185 if (poolEntry.sessionFuture != null && poolEntry.completed) {
186 poolEntry.sessionFuture = null;
187 }
188 if (poolEntry.sessionFuture == null) {
189 poolEntry.completed = false;
190 poolEntry.sessionFuture = connectSession(
191 namedEndpoint,
192 connectTimeout,
193 new FutureCallback<IOSession>() {
194
195 @Override
196 public void completed(final IOSession result) {
197 synchronized (poolEntry) {
198 poolEntry.completed = true;
199 if (poolEntry.session == null) {
200 poolEntry.session = result;
201 } else {
202 closeSession(result,CloseMode.GRACEFUL);
203 }
204 for (;;) {
205 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
206 if (callback != null) {
207 callback.completed(result);
208 } else {
209 break;
210 }
211 }
212 }
213 }
214
215 @Override
216 public void failed(final Exception ex) {
217 synchronized (poolEntry) {
218 poolEntry.completed = true;
219 poolEntry.session = null;
220 for (;;) {
221 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
222 if (callback != null) {
223 callback.failed(ex);
224 } else {
225 break;
226 }
227 }
228 }
229 }
230
231 @Override
232 public void cancelled() {
233 failed(new ConnectionClosedException("Connection request cancelled"));
234 }
235
236 });
237 }
238 }
239 }
240 }
241
242 public final void enumAvailable(final Callback<IOSession> callback) {
243 for (final PoolEntry poolEntry: sessionPool.values()) {
244 if (poolEntry.session != null) {
245 synchronized (poolEntry) {
246 if (poolEntry.session != null) {
247 callback.execute(poolEntry.session);
248 if (!poolEntry.session.isOpen()) {
249 poolEntry.session = null;
250 }
251 }
252 }
253 }
254 }
255 }
256
257 public final void closeIdle(final TimeValue idleTime) {
258 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
259 for (final PoolEntry poolEntry: sessionPool.values()) {
260 if (poolEntry.session != null) {
261 synchronized (poolEntry) {
262 if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) {
263 closeSession(poolEntry.session, CloseMode.GRACEFUL);
264 poolEntry.session = null;
265 }
266 }
267 }
268 }
269 }
270
271 public final Set<T> getRoutes() {
272 return new HashSet<>(sessionPool.keySet());
273 }
274
275 @Override
276 public String toString() {
277 final StringBuilder buffer = new StringBuilder();
278 buffer.append("I/O sessions: ");
279 buffer.append(sessionPool.size());
280 return buffer.toString();
281 }
282
283 static class PoolEntry {
284
285 final Queue<FutureCallback<IOSession>> requestQueue;
286 volatile boolean completed;
287 volatile Future<IOSession> sessionFuture;
288 volatile IOSession session;
289
290 PoolEntry() {
291 this.requestQueue = new ArrayDeque<>();
292 }
293
294 }
295
296 }