View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  
20  package org.apache.myfaces.push.cdi;
21  
22  import java.lang.ref.Reference;
23  import java.lang.ref.SoftReference;
24  import java.util.HashSet;
25  import java.util.Iterator;
26  import java.util.Map;
27  import java.util.Queue;
28  import java.util.Set;
29  import java.util.WeakHashMap;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.Future;
32  import java.util.logging.Logger;
33  import javax.faces.context.ExternalContext;
34  import javax.websocket.Session;
35  import org.apache.myfaces.push.WebsocketSessionClusterSerializedRestore;
36  import org.apache.myfaces.push.util.Json;
37  import org.apache.myfaces.shared.util.ClassUtils;
38  import org.apache.myfaces.shared.util.ConcurrentLRUCache;
39  import org.apache.myfaces.shared.util.WebConfigParamUtils;
40  
41  /**
42   *
43   */
44  public final class WebsocketApplicationSessionHolder
45  {
46      
47      public static final String INIT_PARAM_WEBSOCKET_MAX_CONNECTIONS = "org.apache.myfaces.WEBSOCKET_MAX_CONNECTIONS";
48      
49      public static final Integer INIT_PARAM_WEBSOCKET_MAX_CONNECTIONS_DEFAULT = 5000;
50      
51      private volatile static WeakHashMap<ClassLoader, ConcurrentLRUCache<String, Reference<Session>>> 
52              clWebsocketMap = new WeakHashMap<ClassLoader, ConcurrentLRUCache<String, Reference<Session>>>();
53      
54      private volatile static WeakHashMap<ClassLoader, Queue<String>> clWebsocketRestoredQueue =
55              new WeakHashMap<ClassLoader, Queue<String>>();
56      
57      /**
58       * 
59       * @return 
60       */
61      public static ConcurrentLRUCache<String, Reference<Session>> getWebsocketSessionLRUCache()
62      {
63          ClassLoader cl = ClassUtils.getContextClassLoader();
64          
65          ConcurrentLRUCache<String, Reference<Session>> metadata = (ConcurrentLRUCache<String, Reference<Session>>)
66                  WebsocketApplicationSessionHolder.clWebsocketMap.get(cl);
67  
68          if (metadata == null)
69          {
70              // Ensure thread-safe put over _metadata, and only create one map
71              // per classloader to hold metadata.
72              synchronized (WebsocketApplicationSessionHolder.clWebsocketMap)
73              {
74                  metadata = createWebsocketSessionLRUCache(cl, metadata, INIT_PARAM_WEBSOCKET_MAX_CONNECTIONS_DEFAULT);
75              }
76          }
77  
78          return metadata;
79      }
80  
81      /**
82       * 
83       * @param context
84       */
85      public static void initWebsocketSessionLRUCache(ExternalContext context)
86      {
87          ClassLoader cl = ClassUtils.getContextClassLoader();
88          
89          ConcurrentLRUCache<String, Reference<Session>> lruCache = (ConcurrentLRUCache<String, Reference<Session>>)
90                  WebsocketApplicationSessionHolder.clWebsocketMap.get(cl);
91  
92          int size = WebConfigParamUtils.getIntegerInitParameter(context, 
93                  INIT_PARAM_WEBSOCKET_MAX_CONNECTIONS, INIT_PARAM_WEBSOCKET_MAX_CONNECTIONS_DEFAULT);
94  
95          ConcurrentLRUCache<String, Reference<Session>> newMetadata = 
96                  new ConcurrentLRUCache<String, Reference<Session>>( (size*4+3)/3, size);
97          
98          synchronized (WebsocketApplicationSessionHolder.clWebsocketMap)
99          {
100             if (lruCache == null)
101             {
102                 WebsocketApplicationSessionHolder.clWebsocketMap.put(cl, newMetadata);
103                 lruCache = newMetadata;
104             }
105             else
106             {
107                 // If a Session has been restored, it could be already a lruCache instantiated, so in this case
108                 // we need to fill the new one with the old instances, but only the instances that are active
109                 // at the moment.
110                 for (Map.Entry<String, Reference<Session>> entry : 
111                         lruCache.getLatestAccessedItems(INIT_PARAM_WEBSOCKET_MAX_CONNECTIONS_DEFAULT).entrySet())
112                 {
113                     if (entry.getValue() != null && entry.getValue().get() != null && entry.getValue().get().isOpen())
114                     {
115                         newMetadata.put(entry.getKey(), entry.getValue());
116                     }
117                 }
118                 WebsocketApplicationSessionHolder.clWebsocketMap.put(cl, newMetadata);
119                 lruCache = newMetadata;
120             }
121         }
122     }
123 
124     private static ConcurrentLRUCache<String, Reference<Session>> createWebsocketSessionLRUCache(
125             ClassLoader cl, ConcurrentLRUCache<String, Reference<Session>> metadata, int size)
126     {
127         metadata = (ConcurrentLRUCache<String, Reference<Session>>) 
128                 WebsocketApplicationSessionHolder.clWebsocketMap.get(cl);
129         if (metadata == null)
130         {
131             metadata = new ConcurrentLRUCache<String, Reference<Session>>( (size*4+3)/3, size);
132             WebsocketApplicationSessionHolder.clWebsocketMap.put(cl, metadata);
133         }
134         return metadata;
135     }
136     
137             
138 
139     /**
140      * Removes the cached MetadataTarget instances in order to prevent a memory leak.
141      */
142     public static void clearWebsocketSessionLRUCache()
143     {
144         clWebsocketMap.remove(ClassUtils.getContextClassLoader());
145         clWebsocketRestoredQueue.remove(ClassUtils.getContextClassLoader());
146     }
147     
148     public static boolean addOrUpdateSession(String channelToken, Session session)
149     {
150         Reference oldInstance = getWebsocketSessionLRUCache().get(channelToken);
151         if (oldInstance == null)
152         {
153             getWebsocketSessionLRUCache().put(channelToken, new SoftReference<Session>(session));
154         }
155         else if (!session.equals(oldInstance.get()))
156         {
157             getWebsocketSessionLRUCache().put(channelToken, new SoftReference<Session>(session));
158         }
159         return true;
160     }
161 
162     /**
163      * Remove the Session associated to the channelToken. This happens when the websocket connection is closed.
164      * Please note the connection can be closed/reopened, so this method should not block another connection using
165      * the same channelToken. To destroy the channel token, WebsocketViewBean is used to destroy the channel token
166      * at view expiration time.
167      * 
168      * @param channelToken
169      * @return 
170      */
171     public static boolean removeSession(String channelToken)
172     {
173         getWebsocketSessionLRUCache().remove(channelToken);
174         return false;
175     }
176     
177     
178     protected static Set<Future<Void>> send(String channelToken, Object message)
179     {
180         // Before send, we need to check 
181         synchronizeSessionInstances();
182             
183         Set< Future<Void> > results = new HashSet< Future<Void> >(1);
184         Reference<Session> sessionRef = (channelToken != null) ? getWebsocketSessionLRUCache().get(channelToken) : null;
185 
186         if (sessionRef != null && sessionRef.get() != null)
187         {
188             String json = Json.encode(message);
189             Session session = sessionRef.get();
190             if (session.isOpen())
191             {
192                 send(session, json, results, 0);
193             }
194             else
195             {
196                 //If session is not open, remove the session, because a websocket session after is closed cannot
197                 //be alive.
198                 getWebsocketSessionLRUCache().remove(channelToken);
199             }
200         }
201         return results;
202     }
203 
204     private static final String WARNING_TOMCAT_WEB_SOCKET_BOMBED =
205             "Tomcat cannot handle concurrent push messages. A push message has been sent only after %s retries."
206             + " Consider rate limiting sending push messages. For example, once every 500ms.";    
207     
208     private static void send(Session session, String text, Set<Future<Void>> results, int retries)
209     {
210         try
211         {
212             results.add(session.getAsyncRemote().sendText(text));
213 
214             if (retries > 0)
215             {
216                 Logger.getLogger(WebsocketApplicationSessionHolder.class.getName())
217                         .warning(String.format(WARNING_TOMCAT_WEB_SOCKET_BOMBED, retries));
218             }
219         }
220         catch (IllegalStateException e)
221         {
222             if (isTomcatWebSocketBombed(session, e))
223             {
224                 synchronized (session)
225                 {
226                     send(session, text, results, retries + 1);
227                 }
228             }
229             else
230             {
231                 throw e;
232             }
233         }
234     }
235     
236     // Tomcat related -------------------------------------------------------------------------------------------------
237     /**
238      * Returns true if the given WS session is from Tomcat and given illegal state exception is caused by a push bomb
239      * which Tomcat couldn't handle. See also https://bz.apache.org/bugzilla/show_bug.cgi?id=56026 and
240      * https://github.com/omnifaces/omnifaces/issues/234
241      *
242      * @param session The WS session.
243      * @param illegalStateException The illegal state exception.
244      * @return Whether it was Tomcat who couldn't handle the push bomb.
245      * @since 2.5
246      */
247     private static boolean isTomcatWebSocketBombed(Session session, IllegalStateException illegalStateException)
248     {
249         return session.getClass().getName().startsWith("org.apache.tomcat.websocket.")
250                 && illegalStateException.getMessage().contains("[TEXT_FULL_WRITING]");
251     }
252     
253     private static void synchronizeSessionInstances()
254     {
255         Queue<String> queue = getRestoredQueue();
256         // The queue is always empty, unless a deserialization of Session instances happen. If that happens, 
257         // we need to ensure all Session instances that were deserialized are on the LRU cache, so all instances
258         // receive the message when a "push" is done.
259         // This is not the ideal, but this is the best we have with the current websocket spec.
260         if (!queue.isEmpty())
261         {
262             // It is necessary to have at least 1 registered Session instance to call getOpenSessions() and get all
263             // instances associated to javax.faces.push Endpoint.
264             Map<String, Reference<Session>> map = getWebsocketSessionLRUCache().getLatestAccessedItems(1);
265             if (map != null && !map.isEmpty())
266             {
267                 Reference<Session> ref = map.values().iterator().next();
268                 if (ref != null)
269                 {
270                     Session s = ref.get();
271                     if (s != null)
272                     {
273                         Set<Session> set = s.getOpenSessions();
274                         
275                         for (Iterator<Session> it = set.iterator(); it.hasNext();)
276                         {
277                             Session instance = it.next();
278                             WebsocketSessionClusterSerializedRestore r = 
279                                     (WebsocketSessionClusterSerializedRestore) instance.getUserProperties().get(
280                                         WebsocketSessionClusterSerializedRestore.WEBSOCKET_SESSION_SERIALIZED_RESTORE);
281                             if (r != null && r.isDeserialized())
282                             {
283                                 addOrUpdateSession(r.getChannelToken(), s);
284                             }
285                         }
286                         
287                         // Remove one element from the queue
288                         queue.poll();
289                     }
290                 }
291             }
292         }
293     }
294 
295     public static Queue<String> getRestoredQueue()
296     {
297         ClassLoader cl = ClassUtils.getContextClassLoader();
298         
299         Queue<String> metadata = (Queue<String>)
300                 WebsocketApplicationSessionHolder.clWebsocketRestoredQueue.get(cl);
301 
302         if (metadata == null)
303         {
304             // Ensure thread-safe put over _metadata, and only create one map
305             // per classloader to hold metadata.
306             synchronized (WebsocketApplicationSessionHolder.clWebsocketRestoredQueue)
307             {
308                 metadata = createRestoredQueue(cl, metadata);
309             }
310         }
311 
312         return metadata;
313     }
314     
315     private static Queue<String> createRestoredQueue(ClassLoader cl, Queue<String> metadata)
316     {
317         metadata = (Queue<String>) WebsocketApplicationSessionHolder.clWebsocketRestoredQueue.get(cl);
318         if (metadata == null)
319         {
320             metadata = (Queue<String>) new ConcurrentLinkedQueue<String>();
321             WebsocketApplicationSessionHolder.clWebsocketRestoredQueue.put(cl, metadata);
322         }
323         return metadata;
324     }
325     
326 }