1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.myfaces.push.cdi;
21
22 import java.lang.ref.Reference;
23 import java.lang.ref.SoftReference;
24 import java.util.HashSet;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.Queue;
28 import java.util.Set;
29 import java.util.WeakHashMap;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.Future;
32 import java.util.logging.Logger;
33 import javax.faces.context.ExternalContext;
34 import javax.websocket.Session;
35 import org.apache.myfaces.push.WebsocketSessionClusterSerializedRestore;
36 import org.apache.myfaces.push.util.Json;
37 import org.apache.myfaces.shared.util.ClassUtils;
38 import org.apache.myfaces.shared.util.ConcurrentLRUCache;
39 import org.apache.myfaces.shared.util.WebConfigParamUtils;
40
41
42
43
44 public final class WebsocketApplicationSessionHolder
45 {
46
47 public static final String INIT_PARAM_WEBSOCKET_MAX_CONNECTIONS = "org.apache.myfaces.WEBSOCKET_MAX_CONNECTIONS";
48
49 public static final Integer INIT_PARAM_WEBSOCKET_MAX_CONNECTIONS_DEFAULT = 5000;
50
51 private volatile static WeakHashMap<ClassLoader, ConcurrentLRUCache<String, Reference<Session>>>
52 clWebsocketMap = new WeakHashMap<ClassLoader, ConcurrentLRUCache<String, Reference<Session>>>();
53
54 private volatile static WeakHashMap<ClassLoader, Queue<String>> clWebsocketRestoredQueue =
55 new WeakHashMap<ClassLoader, Queue<String>>();
56
57
58
59
60
61 public static ConcurrentLRUCache<String, Reference<Session>> getWebsocketSessionLRUCache()
62 {
63 ClassLoader cl = ClassUtils.getContextClassLoader();
64
65 ConcurrentLRUCache<String, Reference<Session>> metadata = (ConcurrentLRUCache<String, Reference<Session>>)
66 WebsocketApplicationSessionHolder.clWebsocketMap.get(cl);
67
68 if (metadata == null)
69 {
70
71
72 synchronized (WebsocketApplicationSessionHolder.clWebsocketMap)
73 {
74 metadata = createWebsocketSessionLRUCache(cl, metadata, INIT_PARAM_WEBSOCKET_MAX_CONNECTIONS_DEFAULT);
75 }
76 }
77
78 return metadata;
79 }
80
81
82
83
84
85 public static void initWebsocketSessionLRUCache(ExternalContext context)
86 {
87 ClassLoader cl = ClassUtils.getContextClassLoader();
88
89 ConcurrentLRUCache<String, Reference<Session>> lruCache = (ConcurrentLRUCache<String, Reference<Session>>)
90 WebsocketApplicationSessionHolder.clWebsocketMap.get(cl);
91
92 int size = WebConfigParamUtils.getIntegerInitParameter(context,
93 INIT_PARAM_WEBSOCKET_MAX_CONNECTIONS, INIT_PARAM_WEBSOCKET_MAX_CONNECTIONS_DEFAULT);
94
95 ConcurrentLRUCache<String, Reference<Session>> newMetadata =
96 new ConcurrentLRUCache<String, Reference<Session>>( (size*4+3)/3, size);
97
98 synchronized (WebsocketApplicationSessionHolder.clWebsocketMap)
99 {
100 if (lruCache == null)
101 {
102 WebsocketApplicationSessionHolder.clWebsocketMap.put(cl, newMetadata);
103 lruCache = newMetadata;
104 }
105 else
106 {
107
108
109
110 for (Map.Entry<String, Reference<Session>> entry :
111 lruCache.getLatestAccessedItems(INIT_PARAM_WEBSOCKET_MAX_CONNECTIONS_DEFAULT).entrySet())
112 {
113 if (entry.getValue() != null && entry.getValue().get() != null && entry.getValue().get().isOpen())
114 {
115 newMetadata.put(entry.getKey(), entry.getValue());
116 }
117 }
118 WebsocketApplicationSessionHolder.clWebsocketMap.put(cl, newMetadata);
119 lruCache = newMetadata;
120 }
121 }
122 }
123
124 private static ConcurrentLRUCache<String, Reference<Session>> createWebsocketSessionLRUCache(
125 ClassLoader cl, ConcurrentLRUCache<String, Reference<Session>> metadata, int size)
126 {
127 metadata = (ConcurrentLRUCache<String, Reference<Session>>)
128 WebsocketApplicationSessionHolder.clWebsocketMap.get(cl);
129 if (metadata == null)
130 {
131 metadata = new ConcurrentLRUCache<String, Reference<Session>>( (size*4+3)/3, size);
132 WebsocketApplicationSessionHolder.clWebsocketMap.put(cl, metadata);
133 }
134 return metadata;
135 }
136
137
138
139
140
141
142 public static void clearWebsocketSessionLRUCache()
143 {
144 clWebsocketMap.remove(ClassUtils.getContextClassLoader());
145 clWebsocketRestoredQueue.remove(ClassUtils.getContextClassLoader());
146 }
147
148 public static boolean addOrUpdateSession(String channelToken, Session session)
149 {
150 Reference oldInstance = getWebsocketSessionLRUCache().get(channelToken);
151 if (oldInstance == null)
152 {
153 getWebsocketSessionLRUCache().put(channelToken, new SoftReference<Session>(session));
154 }
155 else if (!session.equals(oldInstance.get()))
156 {
157 getWebsocketSessionLRUCache().put(channelToken, new SoftReference<Session>(session));
158 }
159 return true;
160 }
161
162
163
164
165
166
167
168
169
170
171 public static boolean removeSession(String channelToken)
172 {
173 getWebsocketSessionLRUCache().remove(channelToken);
174 return false;
175 }
176
177
178 protected static Set<Future<Void>> send(String channelToken, Object message)
179 {
180
181 synchronizeSessionInstances();
182
183 Set< Future<Void> > results = new HashSet< Future<Void> >(1);
184 Reference<Session> sessionRef = (channelToken != null) ? getWebsocketSessionLRUCache().get(channelToken) : null;
185
186 if (sessionRef != null && sessionRef.get() != null)
187 {
188 String json = Json.encode(message);
189 Session session = sessionRef.get();
190 if (session.isOpen())
191 {
192 send(session, json, results, 0);
193 }
194 else
195 {
196
197
198 getWebsocketSessionLRUCache().remove(channelToken);
199 }
200 }
201 return results;
202 }
203
204 private static final String WARNING_TOMCAT_WEB_SOCKET_BOMBED =
205 "Tomcat cannot handle concurrent push messages. A push message has been sent only after %s retries."
206 + " Consider rate limiting sending push messages. For example, once every 500ms.";
207
208 private static void send(Session session, String text, Set<Future<Void>> results, int retries)
209 {
210 try
211 {
212 results.add(session.getAsyncRemote().sendText(text));
213
214 if (retries > 0)
215 {
216 Logger.getLogger(WebsocketApplicationSessionHolder.class.getName())
217 .warning(String.format(WARNING_TOMCAT_WEB_SOCKET_BOMBED, retries));
218 }
219 }
220 catch (IllegalStateException e)
221 {
222 if (isTomcatWebSocketBombed(session, e))
223 {
224 synchronized (session)
225 {
226 send(session, text, results, retries + 1);
227 }
228 }
229 else
230 {
231 throw e;
232 }
233 }
234 }
235
236
237
238
239
240
241
242
243
244
245
246
247 private static boolean isTomcatWebSocketBombed(Session session, IllegalStateException illegalStateException)
248 {
249 return session.getClass().getName().startsWith("org.apache.tomcat.websocket.")
250 && illegalStateException.getMessage().contains("[TEXT_FULL_WRITING]");
251 }
252
253 private static void synchronizeSessionInstances()
254 {
255 Queue<String> queue = getRestoredQueue();
256
257
258
259
260 if (!queue.isEmpty())
261 {
262
263
264 Map<String, Reference<Session>> map = getWebsocketSessionLRUCache().getLatestAccessedItems(1);
265 if (map != null && !map.isEmpty())
266 {
267 Reference<Session> ref = map.values().iterator().next();
268 if (ref != null)
269 {
270 Session s = ref.get();
271 if (s != null)
272 {
273 Set<Session> set = s.getOpenSessions();
274
275 for (Iterator<Session> it = set.iterator(); it.hasNext();)
276 {
277 Session instance = it.next();
278 WebsocketSessionClusterSerializedRestore r =
279 (WebsocketSessionClusterSerializedRestore) instance.getUserProperties().get(
280 WebsocketSessionClusterSerializedRestore.WEBSOCKET_SESSION_SERIALIZED_RESTORE);
281 if (r != null && r.isDeserialized())
282 {
283 addOrUpdateSession(r.getChannelToken(), s);
284 }
285 }
286
287
288 queue.poll();
289 }
290 }
291 }
292 }
293 }
294
295 public static Queue<String> getRestoredQueue()
296 {
297 ClassLoader cl = ClassUtils.getContextClassLoader();
298
299 Queue<String> metadata = (Queue<String>)
300 WebsocketApplicationSessionHolder.clWebsocketRestoredQueue.get(cl);
301
302 if (metadata == null)
303 {
304
305
306 synchronized (WebsocketApplicationSessionHolder.clWebsocketRestoredQueue)
307 {
308 metadata = createRestoredQueue(cl, metadata);
309 }
310 }
311
312 return metadata;
313 }
314
315 private static Queue<String> createRestoredQueue(ClassLoader cl, Queue<String> metadata)
316 {
317 metadata = (Queue<String>) WebsocketApplicationSessionHolder.clWebsocketRestoredQueue.get(cl);
318 if (metadata == null)
319 {
320 metadata = (Queue<String>) new ConcurrentLinkedQueue<String>();
321 WebsocketApplicationSessionHolder.clWebsocketRestoredQueue.put(cl, metadata);
322 }
323 return metadata;
324 }
325
326 }