View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactor;
28  
29  import java.util.ArrayDeque;
30  import java.util.HashSet;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  
38  import org.apache.hc.core5.annotation.Contract;
39  import org.apache.hc.core5.annotation.ThreadingBehavior;
40  import org.apache.hc.core5.concurrent.ComplexFuture;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.concurrent.FutureContribution;
43  import org.apache.hc.core5.function.Callback;
44  import org.apache.hc.core5.http.ConnectionClosedException;
45  import org.apache.hc.core5.io.CloseMode;
46  import org.apache.hc.core5.io.ModalCloseable;
47  import org.apache.hc.core5.util.Args;
48  import org.apache.hc.core5.util.Asserts;
49  import org.apache.hc.core5.util.TimeValue;
50  import org.apache.hc.core5.util.Timeout;
51  
52  /**
53   * @since 5.0
54   */
55  @Contract(threading = ThreadingBehavior.SAFE)
56  public abstract class AbstractIOSessionPool<T> implements ModalCloseable {
57  
58      private final ConcurrentMap<T, PoolEntry> sessionPool;
59      private final AtomicBoolean closed;
60  
61      public AbstractIOSessionPool() {
62          super();
63          this.sessionPool = new ConcurrentHashMap<>();
64          this.closed = new AtomicBoolean(false);
65      }
66  
67      protected abstract Future<IOSession> connectSession(
68              T namedEndpoint,
69              Timeout connectTimeout,
70              FutureCallback<IOSession> callback);
71  
72      protected abstract void validateSession(
73              IOSession ioSession,
74              Callback<Boolean> callback);
75  
76      protected abstract void closeSession(
77              IOSession ioSession,
78              CloseMode closeMode);
79  
80      @Override
81      public final void close(final CloseMode closeMode) {
82          if (closed.compareAndSet(false, true)) {
83              for (final PoolEntry poolEntry : sessionPool.values()) {
84                  synchronized (poolEntry) {
85                      if (poolEntry.session != null) {
86                          closeSession(poolEntry.session, closeMode);
87                          poolEntry.session = null;
88                      }
89                      if (poolEntry.sessionFuture != null) {
90                          poolEntry.sessionFuture.cancel(true);
91                          poolEntry.sessionFuture = null;
92                      }
93                      for (;;) {
94                          final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
95                          if (callback != null) {
96                              callback.cancelled();
97                          } else {
98                              break;
99                          }
100                     }
101                 }
102             }
103             sessionPool.clear();
104         }
105     }
106 
107     @Override
108     public final void close() {
109         close(CloseMode.GRACEFUL);
110     }
111 
112     PoolEntry getPoolEntry(final T endpoint) {
113         PoolEntry poolEntry = sessionPool.get(endpoint);
114         if (poolEntry == null) {
115             final PoolEntry newPoolEntry = new PoolEntry();
116             poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
117             if (poolEntry == null) {
118                 poolEntry = newPoolEntry;
119             }
120         }
121         return poolEntry;
122     }
123 
124     public final Future<IOSession> getSession(
125             final T endpoint,
126             final Timeout connectTimeout,
127             final FutureCallback<IOSession> callback) {
128         Args.notNull(endpoint, "Endpoint");
129         Asserts.check(!closed.get(), "Connection pool shut down");
130         final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
131         final PoolEntry poolEntry = getPoolEntry(endpoint);
132         getSessionInternal(poolEntry, false, endpoint, connectTimeout, new FutureCallback<IOSession>() {
133 
134             @Override
135             public void completed(final IOSession ioSession) {
136                 validateSession(ioSession, new Callback<Boolean>() {
137 
138                     @Override
139                     public void execute(final Boolean result) {
140                         if (result) {
141                             future.completed(ioSession);
142                         } else {
143                             getSessionInternal(poolEntry, true, endpoint, connectTimeout,
144                                 new FutureContribution<IOSession>(future) {
145 
146                                 @Override
147                                 public void completed(final IOSession ioSession) {
148                                     future.completed(ioSession);
149                                 }
150 
151                             });
152                         }
153                     }
154 
155                 });
156             }
157 
158             @Override
159             public void failed(final Exception ex) {
160                 future.failed(ex);
161             }
162 
163             @Override
164             public void cancelled() {
165                 future.cancel();
166             }
167 
168         });
169         return future;
170     }
171 
172     private void getSessionInternal(
173             final PoolEntry poolEntry,
174             final boolean requestNew,
175             final T namedEndpoint,
176             final Timeout connectTimeout,
177             final FutureCallback<IOSession> callback) {
178         synchronized (poolEntry) {
179             if (poolEntry.session != null && requestNew) {
180                 closeSession(poolEntry.session, CloseMode.GRACEFUL);
181                 poolEntry.session = null;
182             }
183             if (poolEntry.session != null && !poolEntry.session.isOpen()) {
184                 poolEntry.session = null;
185             }
186             if (poolEntry.session != null) {
187                 callback.completed(poolEntry.session);
188             } else {
189                 poolEntry.requestQueue.add(callback);
190                 if (poolEntry.sessionFuture != null && poolEntry.sessionFuture.isDone()) {
191                     poolEntry.sessionFuture = null;
192                 }
193                 if (poolEntry.sessionFuture == null) {
194                     poolEntry.sessionFuture = connectSession(
195                             namedEndpoint,
196                             connectTimeout,
197                             new FutureCallback<IOSession>() {
198 
199                                 @Override
200                                 public void completed(final IOSession result) {
201                                     synchronized (poolEntry) {
202                                         poolEntry.session = result;
203                                         for (;;) {
204                                             final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
205                                             if (callback != null) {
206                                                 callback.completed(result);
207                                             } else {
208                                                 break;
209                                             }
210                                         }
211                                     }
212                                 }
213 
214                                 @Override
215                                 public void failed(final Exception ex) {
216                                     synchronized (poolEntry) {
217                                         poolEntry.session = null;
218                                         for (;;) {
219                                             final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
220                                             if (callback != null) {
221                                                 callback.failed(ex);
222                                             } else {
223                                                 break;
224                                             }
225                                         }
226                                     }
227                                 }
228 
229                                 @Override
230                                 public void cancelled() {
231                                     failed(new ConnectionClosedException("Connection request cancelled"));
232                                 }
233 
234                             });
235                 }
236             }
237         }
238     }
239 
240     public final void enumAvailable(final Callback<IOSession> callback) {
241         for (final PoolEntry poolEntry: sessionPool.values()) {
242             if (poolEntry.session != null) {
243                 synchronized (poolEntry) {
244                     if (poolEntry.session != null) {
245                         callback.execute(poolEntry.session);
246                         if (!poolEntry.session.isOpen()) {
247                             poolEntry.session = null;
248                         }
249                     }
250                 }
251             }
252         }
253     }
254 
255     public final void closeIdle(final TimeValue idleTime) {
256         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
257         for (final PoolEntry poolEntry: sessionPool.values()) {
258             if (poolEntry.session != null) {
259                 synchronized (poolEntry) {
260                     if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) {
261                         closeSession(poolEntry.session, CloseMode.GRACEFUL);
262                         poolEntry.session = null;
263                     }
264                 }
265             }
266         }
267     }
268 
269     public final Set<T> getRoutes() {
270         return new HashSet<>(sessionPool.keySet());
271     }
272 
273     @Override
274     public String toString() {
275         final StringBuilder buffer = new StringBuilder();
276         buffer.append("I/O sessions: ");
277         buffer.append(sessionPool.size());
278         return buffer.toString();
279     }
280 
281     static class PoolEntry {
282 
283         final Queue<FutureCallback<IOSession>> requestQueue;
284         volatile Future<IOSession> sessionFuture;
285         volatile IOSession session;
286 
287         PoolEntry() {
288             this.requestQueue = new ArrayDeque<>();
289         }
290 
291     }
292 
293 }