View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactor;
28  
29  import java.util.ArrayDeque;
30  import java.util.HashSet;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  
38  import org.apache.hc.core5.annotation.Contract;
39  import org.apache.hc.core5.annotation.ThreadingBehavior;
40  import org.apache.hc.core5.concurrent.ComplexFuture;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.concurrent.FutureContribution;
43  import org.apache.hc.core5.function.Callback;
44  import org.apache.hc.core5.http.ConnectionClosedException;
45  import org.apache.hc.core5.io.CloseMode;
46  import org.apache.hc.core5.io.ModalCloseable;
47  import org.apache.hc.core5.util.Args;
48  import org.apache.hc.core5.util.Asserts;
49  import org.apache.hc.core5.util.TimeValue;
50  import org.apache.hc.core5.util.Timeout;
51  
52  /**
53   * @since 5.0
54   */
55  @Contract(threading = ThreadingBehavior.SAFE)
56  public abstract class AbstractIOSessionPool<T> implements ModalCloseable {
57  
58      private final ConcurrentMap<T, PoolEntry> sessionPool;
59      private final AtomicBoolean closed;
60  
61      public AbstractIOSessionPool() {
62          super();
63          this.sessionPool = new ConcurrentHashMap<>();
64          this.closed = new AtomicBoolean(false);
65      }
66  
67      protected abstract Future<IOSession> connectSession(
68              T namedEndpoint,
69              Timeout connectTimeout,
70              FutureCallback<IOSession> callback);
71  
72      protected abstract void validateSession(
73              IOSession ioSession,
74              Callback<Boolean> callback);
75  
76      protected abstract void closeSession(
77              IOSession ioSession,
78              CloseMode closeMode);
79  
80      @Override
81      public final void close(final CloseMode closeMode) {
82          if (closed.compareAndSet(false, true)) {
83              for (final PoolEntry poolEntry : sessionPool.values()) {
84                  synchronized (poolEntry) {
85                      if (poolEntry.session != null) {
86                          closeSession(poolEntry.session, closeMode);
87                          poolEntry.session = null;
88                      }
89                      if (poolEntry.sessionFuture != null) {
90                          poolEntry.sessionFuture.cancel(true);
91                          poolEntry.sessionFuture = null;
92                      }
93                      for (;;) {
94                          final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
95                          if (callback != null) {
96                              callback.cancelled();
97                          } else {
98                              break;
99                          }
100                     }
101                 }
102             }
103             sessionPool.clear();
104         }
105     }
106 
107     @Override
108     public final void close() {
109         close(CloseMode.GRACEFUL);
110     }
111 
112     PoolEntry getPoolEntry(final T endpoint) {
113         PoolEntry poolEntry = sessionPool.get(endpoint);
114         if (poolEntry == null) {
115             final PoolEntry newPoolEntry = new PoolEntry();
116             poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
117             if (poolEntry == null) {
118                 poolEntry = newPoolEntry;
119             }
120         }
121         return poolEntry;
122     }
123 
124     public final Future<IOSession> getSession(
125             final T endpoint,
126             final Timeout connectTimeout,
127             final FutureCallback<IOSession> callback) {
128         Args.notNull(endpoint, "Endpoint");
129         Asserts.check(!closed.get(), "Connection pool shut down");
130         final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
131         final PoolEntry poolEntry = getPoolEntry(endpoint);
132         getSessionInternal(poolEntry, false, endpoint, connectTimeout, new FutureCallback<IOSession>() {
133 
134             @Override
135             public void completed(final IOSession ioSession) {
136                 validateSession(ioSession, new Callback<Boolean>() {
137 
138                     @Override
139                     public void execute(final Boolean result) {
140                         if (result) {
141                             future.completed(ioSession);
142                         } else {
143                             getSessionInternal(poolEntry, true, endpoint, connectTimeout,
144                                 new FutureContribution<IOSession>(future) {
145 
146                                 @Override
147                                 public void completed(final IOSession ioSession) {
148                                     future.completed(ioSession);
149                                 }
150 
151                             });
152                         }
153                     }
154 
155                 });
156             }
157 
158             @Override
159             public void failed(final Exception ex) {
160                 future.failed(ex);
161             }
162 
163             @Override
164             public void cancelled() {
165                 future.cancel();
166             }
167 
168         });
169         return future;
170     }
171 
172     private void getSessionInternal(
173             final PoolEntry poolEntry,
174             final boolean requestNew,
175             final T namedEndpoint,
176             final Timeout connectTimeout,
177             final FutureCallback<IOSession> callback) {
178         synchronized (poolEntry) {
179             if (poolEntry.session != null && requestNew) {
180                 closeSession(poolEntry.session, CloseMode.GRACEFUL);
181                 poolEntry.session = null;
182             }
183             if (poolEntry.session != null && !poolEntry.session.isOpen()) {
184                 poolEntry.session = null;
185             }
186             if (poolEntry.session != null) {
187                 callback.completed(poolEntry.session);
188             } else {
189                 poolEntry.requestQueue.add(callback);
190                 if (poolEntry.sessionFuture == null) {
191                     poolEntry.sessionFuture = connectSession(
192                             namedEndpoint,
193                             connectTimeout,
194                             new FutureCallback<IOSession>() {
195 
196                                 @Override
197                                 public void completed(final IOSession result) {
198                                     synchronized (poolEntry) {
199                                         poolEntry.session = result;
200                                         poolEntry.sessionFuture = null;
201                                         for (;;) {
202                                             final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
203                                             if (callback != null) {
204                                                 callback.completed(result);
205                                             } else {
206                                                 break;
207                                             }
208                                         }
209                                     }
210                                 }
211 
212                                 @Override
213                                 public void failed(final Exception ex) {
214                                     synchronized (poolEntry) {
215                                         poolEntry.session = null;
216                                         poolEntry.sessionFuture = null;
217                                         for (;;) {
218                                             final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
219                                             if (callback != null) {
220                                                 callback.failed(ex);
221                                             } else {
222                                                 break;
223                                             }
224                                         }
225                                     }
226                                 }
227 
228                                 @Override
229                                 public void cancelled() {
230                                     failed(new ConnectionClosedException("Connection request cancelled"));
231                                 }
232 
233                             });
234                 }
235             }
236         }
237     }
238 
239     public final void enumAvailable(final Callback<IOSession> callback) {
240         for (final PoolEntry poolEntry: sessionPool.values()) {
241             if (poolEntry.session != null) {
242                 synchronized (poolEntry) {
243                     if (poolEntry.session != null) {
244                         callback.execute(poolEntry.session);
245                         if (!poolEntry.session.isOpen()) {
246                             poolEntry.session = null;
247                         }
248                     }
249                 }
250             }
251         }
252     }
253 
254     public final void closeIdle(final TimeValue idleTime) {
255         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
256         for (final PoolEntry poolEntry: sessionPool.values()) {
257             if (poolEntry.session != null) {
258                 synchronized (poolEntry) {
259                     if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) {
260                         closeSession(poolEntry.session, CloseMode.GRACEFUL);
261                         poolEntry.session = null;
262                     }
263                 }
264             }
265         }
266     }
267 
268     public final Set<T> getRoutes() {
269         return new HashSet<>(sessionPool.keySet());
270     }
271 
272     @Override
273     public String toString() {
274         final StringBuilder buffer = new StringBuilder();
275         buffer.append("I/O sessions: ");
276         buffer.append(sessionPool.size());
277         return buffer.toString();
278     }
279 
280     static class PoolEntry {
281 
282         final Queue<FutureCallback<IOSession>> requestQueue;
283         volatile Future<IOSession> sessionFuture;
284         volatile IOSession session;
285 
286         PoolEntry() {
287             this.requestQueue = new ArrayDeque<>();
288         }
289 
290     }
291 
292 }