View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactor;
28  
29  import java.util.ArrayDeque;
30  import java.util.HashSet;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  
38  import org.apache.hc.core5.annotation.Contract;
39  import org.apache.hc.core5.annotation.ThreadingBehavior;
40  import org.apache.hc.core5.concurrent.ComplexFuture;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.concurrent.FutureContribution;
43  import org.apache.hc.core5.function.Callback;
44  import org.apache.hc.core5.http.ConnectionClosedException;
45  import org.apache.hc.core5.io.CloseMode;
46  import org.apache.hc.core5.io.ModalCloseable;
47  import org.apache.hc.core5.util.Args;
48  import org.apache.hc.core5.util.Asserts;
49  import org.apache.hc.core5.util.TimeValue;
50  import org.apache.hc.core5.util.Timeout;
51  
52  /**
53   * @since 5.0
54   */
55  @Contract(threading = ThreadingBehavior.SAFE)
56  public abstract class AbstractIOSessionPool<T> implements ModalCloseable {
57  
58      private final ConcurrentMap<T, PoolEntry> sessionPool;
59      private final AtomicBoolean closed;
60  
61      public AbstractIOSessionPool() {
62          super();
63          this.sessionPool = new ConcurrentHashMap<>();
64          this.closed = new AtomicBoolean(false);
65      }
66  
67      protected abstract Future<IOSession> connectSession(
68              T namedEndpoint,
69              Timeout connectTimeout,
70              FutureCallback<IOSession> callback);
71  
72      protected abstract void validateSession(
73              IOSession ioSession,
74              Callback<Boolean> callback);
75  
76      protected abstract void closeSession(
77              IOSession ioSession,
78              CloseMode closeMode);
79  
80      @Override
81      public final void close(final CloseMode closeMode) {
82          if (closed.compareAndSet(false, true)) {
83              for (final PoolEntry poolEntry : sessionPool.values()) {
84                  synchronized (poolEntry) {
85                      if (poolEntry.session != null) {
86                          closeSession(poolEntry.session, closeMode);
87                          poolEntry.session = null;
88                      }
89                      if (poolEntry.sessionFuture != null) {
90                          poolEntry.sessionFuture.cancel(true);
91                          poolEntry.sessionFuture = null;
92                      }
93                      for (;;) {
94                          final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
95                          if (callback != null) {
96                              callback.cancelled();
97                          } else {
98                              break;
99                          }
100                     }
101                 }
102             }
103             sessionPool.clear();
104         }
105     }
106 
107     @Override
108     public final void close() {
109         close(CloseMode.GRACEFUL);
110     }
111 
112     PoolEntry getPoolEntry(final T endpoint) {
113         PoolEntry poolEntry = sessionPool.get(endpoint);
114         if (poolEntry == null) {
115             final PoolEntry newPoolEntry = new PoolEntry();
116             poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
117             if (poolEntry == null) {
118                 poolEntry = newPoolEntry;
119             }
120         }
121         return poolEntry;
122     }
123 
124     public final Future<IOSession> getSession(
125             final T endpoint,
126             final Timeout connectTimeout,
127             final FutureCallback<IOSession> callback) {
128         Args.notNull(endpoint, "Endpoint");
129         Asserts.check(!closed.get(), "Connection pool shut down");
130         final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
131         final PoolEntry poolEntry = getPoolEntry(endpoint);
132         getSessionInternal(poolEntry, false, endpoint, connectTimeout, new FutureCallback<IOSession>() {
133 
134             @Override
135             public void completed(final IOSession ioSession) {
136                 validateSession(ioSession, result -> {
137                     if (result) {
138                         future.completed(ioSession);
139                     } else {
140                         getSessionInternal(poolEntry, true, endpoint, connectTimeout,
141                             new FutureContribution<IOSession>(future) {
142 
143                             @Override
144                             public void completed(final IOSession ioSession1) {
145                                 future.completed(ioSession1);
146                             }
147 
148                         });
149                     }
150                 });
151             }
152 
153             @Override
154             public void failed(final Exception ex) {
155                 future.failed(ex);
156             }
157 
158             @Override
159             public void cancelled() {
160                 future.cancel();
161             }
162 
163         });
164         return future;
165     }
166 
167     private void getSessionInternal(
168             final PoolEntry poolEntry,
169             final boolean requestNew,
170             final T namedEndpoint,
171             final Timeout connectTimeout,
172             final FutureCallback<IOSession> callback) {
173         synchronized (poolEntry) {
174             if (poolEntry.session != null && requestNew) {
175                 closeSession(poolEntry.session, CloseMode.GRACEFUL);
176                 poolEntry.session = null;
177             }
178             if (poolEntry.session != null && !poolEntry.session.isOpen()) {
179                 poolEntry.session = null;
180             }
181             if (poolEntry.session != null) {
182                 callback.completed(poolEntry.session);
183             } else {
184                 poolEntry.requestQueue.add(callback);
185                 if (poolEntry.sessionFuture != null && poolEntry.sessionFuture.isDone()) {
186                     poolEntry.sessionFuture = null;
187                 }
188                 if (poolEntry.sessionFuture == null) {
189                     poolEntry.sessionFuture = connectSession(
190                             namedEndpoint,
191                             connectTimeout,
192                             new FutureCallback<IOSession>() {
193 
194                                 @Override
195                                 public void completed(final IOSession result) {
196                                     synchronized (poolEntry) {
197                                         poolEntry.session = result;
198                                         for (;;) {
199                                             final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
200                                             if (callback != null) {
201                                                 callback.completed(result);
202                                             } else {
203                                                 break;
204                                             }
205                                         }
206                                     }
207                                 }
208 
209                                 @Override
210                                 public void failed(final Exception ex) {
211                                     synchronized (poolEntry) {
212                                         poolEntry.session = null;
213                                         for (;;) {
214                                             final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
215                                             if (callback != null) {
216                                                 callback.failed(ex);
217                                             } else {
218                                                 break;
219                                             }
220                                         }
221                                     }
222                                 }
223 
224                                 @Override
225                                 public void cancelled() {
226                                     failed(new ConnectionClosedException("Connection request cancelled"));
227                                 }
228 
229                             });
230                 }
231             }
232         }
233     }
234 
235     public final void enumAvailable(final Callback<IOSession> callback) {
236         for (final PoolEntry poolEntry: sessionPool.values()) {
237             if (poolEntry.session != null) {
238                 synchronized (poolEntry) {
239                     if (poolEntry.session != null) {
240                         callback.execute(poolEntry.session);
241                         if (!poolEntry.session.isOpen()) {
242                             poolEntry.session = null;
243                         }
244                     }
245                 }
246             }
247         }
248     }
249 
250     public final void closeIdle(final TimeValue idleTime) {
251         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
252         for (final PoolEntry poolEntry: sessionPool.values()) {
253             if (poolEntry.session != null) {
254                 synchronized (poolEntry) {
255                     if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) {
256                         closeSession(poolEntry.session, CloseMode.GRACEFUL);
257                         poolEntry.session = null;
258                     }
259                 }
260             }
261         }
262     }
263 
264     public final Set<T> getRoutes() {
265         return new HashSet<>(sessionPool.keySet());
266     }
267 
268     @Override
269     public String toString() {
270         final StringBuilder buffer = new StringBuilder();
271         buffer.append("I/O sessions: ");
272         buffer.append(sessionPool.size());
273         return buffer.toString();
274     }
275 
276     static class PoolEntry {
277 
278         final Queue<FutureCallback<IOSession>> requestQueue;
279         volatile Future<IOSession> sessionFuture;
280         volatile IOSession session;
281 
282         PoolEntry() {
283             this.requestQueue = new ArrayDeque<>();
284         }
285 
286     }
287 
288 }