1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.keepalive;
21
22 import org.apache.mina.core.filterchain.IoFilter;
23 import org.apache.mina.core.filterchain.IoFilterAdapter;
24 import org.apache.mina.core.filterchain.IoFilterChain;
25 import org.apache.mina.core.service.IoHandler;
26 import org.apache.mina.core.session.AttributeKey;
27 import org.apache.mina.core.session.IdleStatus;
28 import org.apache.mina.core.session.IoEventType;
29 import org.apache.mina.core.session.IoSession;
30 import org.apache.mina.core.session.IoSessionConfig;
31 import org.apache.mina.core.write.DefaultWriteRequest;
32 import org.apache.mina.core.write.WriteRequest;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 public class KeepAliveFilter extends IoFilterAdapter {
141
142 private final AttributeKey WAITING_FOR_RESPONSE = new AttributeKey(
143 getClass(), "waitingForResponse");
144 private final AttributeKey IGNORE_READER_IDLE_ONCE = new AttributeKey(
145 getClass(), "ignoreReaderIdleOnce");
146
147 private final KeepAliveMessageFactory messageFactory;
148 private final IdleStatus interestedIdleStatus;
149 private volatile KeepAliveRequestTimeoutHandler requestTimeoutHandler;
150 private volatile int requestInterval;
151 private volatile int requestTimeout;
152 private volatile boolean forwardEvent;
153
154
155
156
157
158
159
160
161
162
163
164 public KeepAliveFilter(KeepAliveMessageFactory messageFactory) {
165 this(messageFactory, IdleStatus.READER_IDLE, KeepAliveRequestTimeoutHandler.CLOSE);
166 }
167
168
169
170
171
172
173
174
175
176
177 public KeepAliveFilter(
178 KeepAliveMessageFactory messageFactory,
179 IdleStatus interestedIdleStatus) {
180 this(messageFactory, interestedIdleStatus, KeepAliveRequestTimeoutHandler.CLOSE, 60, 30);
181 }
182
183
184
185
186
187
188
189
190
191
192 public KeepAliveFilter(
193 KeepAliveMessageFactory messageFactory, KeepAliveRequestTimeoutHandler policy) {
194 this(messageFactory, IdleStatus.READER_IDLE, policy, 60, 30);
195 }
196
197
198
199
200
201
202
203
204
205 public KeepAliveFilter(
206 KeepAliveMessageFactory messageFactory,
207 IdleStatus interestedIdleStatus, KeepAliveRequestTimeoutHandler policy) {
208 this(messageFactory, interestedIdleStatus, policy, 60, 30);
209 }
210
211
212
213
214 public KeepAliveFilter(
215 KeepAliveMessageFactory messageFactory,
216 IdleStatus interestedIdleStatus, KeepAliveRequestTimeoutHandler policy,
217 int keepAliveRequestInterval, int keepAliveRequestTimeout) {
218 if (messageFactory == null) {
219 throw new NullPointerException("messageFactory");
220 }
221 if (interestedIdleStatus == null) {
222 throw new NullPointerException("interestedIdleStatus");
223 }
224 if (policy == null) {
225 throw new NullPointerException("policy");
226 }
227
228 this.messageFactory = messageFactory;
229 this.interestedIdleStatus = interestedIdleStatus;
230 requestTimeoutHandler = policy;
231
232 setRequestInterval(keepAliveRequestInterval);
233 setRequestTimeout(keepAliveRequestTimeout);
234 }
235
236 public IdleStatus getInterestedIdleStatus() {
237 return interestedIdleStatus;
238 }
239
240 public KeepAliveRequestTimeoutHandler getRequestTimeoutHandler() {
241 return requestTimeoutHandler;
242 }
243
244 public void setRequestTimeoutHandler(KeepAliveRequestTimeoutHandler timeoutHandler) {
245 if (timeoutHandler == null) {
246 throw new NullPointerException("timeoutHandler");
247 }
248 requestTimeoutHandler = timeoutHandler;
249 }
250
251 public int getRequestInterval() {
252 return requestInterval;
253 }
254
255 public void setRequestInterval(int keepAliveRequestInterval) {
256 if (keepAliveRequestInterval <= 0) {
257 throw new IllegalArgumentException(
258 "keepAliveRequestInterval must be a positive integer: " +
259 keepAliveRequestInterval);
260 }
261 requestInterval = keepAliveRequestInterval;
262 }
263
264 public int getRequestTimeout() {
265 return requestTimeout;
266 }
267
268 public void setRequestTimeout(int keepAliveRequestTimeout) {
269 if (keepAliveRequestTimeout <= 0) {
270 throw new IllegalArgumentException(
271 "keepAliveRequestTimeout must be a positive integer: " +
272 keepAliveRequestTimeout);
273 }
274 requestTimeout = keepAliveRequestTimeout;
275 }
276
277 public KeepAliveMessageFactory getMessageFactory() {
278 return messageFactory;
279 }
280
281
282
283
284
285
286 public boolean isForwardEvent() {
287 return forwardEvent;
288 }
289
290
291
292
293
294
295 public void setForwardEvent(boolean forwardEvent) {
296 this.forwardEvent = forwardEvent;
297 }
298
299 @Override
300 public void onPreAdd(IoFilterChain parent, String name,
301 NextFilter nextFilter) throws Exception {
302 if (parent.contains(this)) {
303 throw new IllegalArgumentException(
304 "You can't add the same filter instance more than once. " +
305 "Create another instance and add it.");
306 }
307 }
308
309 @Override
310 public void onPostAdd(
311 IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
312 resetStatus(parent.getSession());
313 }
314
315 @Override
316 public void onPostRemove(
317 IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
318 resetStatus(parent.getSession());
319 }
320
321 @Override
322 public void messageReceived(
323 NextFilter nextFilter, IoSession session, Object message) throws Exception {
324 try {
325 if (messageFactory.isRequest(session, message)) {
326 Object pongMessage =
327 messageFactory.getResponse(session, message);
328
329 if (pongMessage != null) {
330 nextFilter.filterWrite(
331 session, new DefaultWriteRequest(pongMessage));
332 }
333 }
334
335 if (messageFactory.isResponse(session, message)) {
336 resetStatus(session);
337 }
338 } finally {
339 if (!isKeepAliveMessage(session, message)) {
340 nextFilter.messageReceived(session, message);
341 }
342 }
343 }
344
345 @Override
346 public void messageSent(
347 NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
348 Object message = writeRequest.getMessage();
349 if (!isKeepAliveMessage(session, message)) {
350 nextFilter.messageSent(session, writeRequest);
351 }
352 }
353
354 @Override
355 public void sessionIdle(
356 NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception {
357 if (status == interestedIdleStatus) {
358 if (!session.containsAttribute(WAITING_FOR_RESPONSE)) {
359 Object pingMessage = messageFactory.getRequest(session);
360 if (pingMessage != null) {
361 nextFilter.filterWrite(
362 session,
363 new DefaultWriteRequest(pingMessage));
364
365
366
367 if (getRequestTimeoutHandler() != KeepAliveRequestTimeoutHandler.DEAF_SPEAKER) {
368 markStatus(session);
369 if (interestedIdleStatus == IdleStatus.BOTH_IDLE) {
370 session.setAttribute(IGNORE_READER_IDLE_ONCE);
371 }
372 } else {
373 resetStatus(session);
374 }
375 }
376 } else {
377 handlePingTimeout(session);
378 }
379 } else if (status == IdleStatus.READER_IDLE) {
380 if (session.removeAttribute(IGNORE_READER_IDLE_ONCE) == null) {
381 if (session.containsAttribute(WAITING_FOR_RESPONSE)) {
382 handlePingTimeout(session);
383 }
384 }
385 }
386
387 if (forwardEvent) {
388 nextFilter.sessionIdle(session, status);
389 }
390 }
391
392 private void handlePingTimeout(IoSession session) throws Exception {
393 resetStatus(session);
394 KeepAliveRequestTimeoutHandler handler = getRequestTimeoutHandler();
395 if (handler == KeepAliveRequestTimeoutHandler.DEAF_SPEAKER) {
396 return;
397 }
398
399 handler.keepAliveRequestTimedOut(this, session);
400 }
401
402 private void markStatus(IoSession session) {
403 session.getConfig().setIdleTime(interestedIdleStatus, 0);
404 session.getConfig().setReaderIdleTime(getRequestTimeout());
405 session.setAttribute(WAITING_FOR_RESPONSE);
406 }
407
408 private void resetStatus(IoSession session) {
409 session.getConfig().setReaderIdleTime(0);
410 session.getConfig().setWriterIdleTime(0);
411 session.getConfig().setIdleTime(
412 interestedIdleStatus, getRequestInterval());
413 session.removeAttribute(WAITING_FOR_RESPONSE);
414 }
415
416 private boolean isKeepAliveMessage(IoSession session, Object message) {
417 return messageFactory.isRequest(session, message) ||
418 messageFactory.isResponse(session, message);
419 }
420 }