1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.keepalive;
21
22 import org.apache.mina.core.buffer.IoBuffer;
23 import org.apache.mina.core.filterchain.IoFilter;
24 import org.apache.mina.core.filterchain.IoFilterAdapter;
25 import org.apache.mina.core.filterchain.IoFilterChain;
26 import org.apache.mina.core.service.IoHandler;
27 import org.apache.mina.core.session.AttributeKey;
28 import org.apache.mina.core.session.IdleStatus;
29 import org.apache.mina.core.session.IoEventType;
30 import org.apache.mina.core.session.IoSession;
31 import org.apache.mina.core.session.IoSessionConfig;
32 import org.apache.mina.core.write.DefaultWriteRequest;
33 import org.apache.mina.core.write.WriteRequest;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159 public class KeepAliveFilter extends IoFilterAdapter {
160
161 private final AttributeKeyl#AttributeKey">AttributeKey WAITING_FOR_RESPONSE = new AttributeKey(getClass(), "waitingForResponse");
162
163 private final AttributeKeyttributeKey">AttributeKey IGNORE_READER_IDLE_ONCE = new AttributeKey(getClass(), "ignoreReaderIdleOnce");
164
165 private final KeepAliveMessageFactory messageFactory;
166
167 private final IdleStatus interestedIdleStatus;
168
169 private volatile KeepAliveRequestTimeoutHandler requestTimeoutHandler;
170
171 private volatile int requestInterval;
172
173 private volatile int requestTimeout;
174
175 private volatile boolean forwardEvent;
176
177
178
179
180
181
182
183
184
185
186
187
188
189 public KeepAliveFilter(KeepAliveMessageFactory messageFactory) {
190 this(messageFactory, IdleStatus.READER_IDLE, KeepAliveRequestTimeoutHandler.CLOSE);
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205 public KeepAliveFilter(KeepAliveMessageFactory messageFactory, IdleStatus interestedIdleStatus) {
206 this(messageFactory, interestedIdleStatus, KeepAliveRequestTimeoutHandler.CLOSE, 60, 30);
207 }
208
209
210
211
212
213
214
215
216
217
218
219
220
221 public KeepAliveFilter(KeepAliveMessageFactory messageFactory, KeepAliveRequestTimeoutHandler policy) {
222 this(messageFactory, IdleStatus.READER_IDLE, policy, 60, 30);
223 }
224
225
226
227
228
229
230
231
232
233
234
235
236
237 public KeepAliveFilter(KeepAliveMessageFactory messageFactory, IdleStatus interestedIdleStatus,
238 KeepAliveRequestTimeoutHandler policy) {
239 this(messageFactory, interestedIdleStatus, policy, 60, 30);
240 }
241
242
243
244
245
246
247
248
249
250
251 public KeepAliveFilter(KeepAliveMessageFactory messageFactory, IdleStatus interestedIdleStatus,
252 KeepAliveRequestTimeoutHandler policy, int keepAliveRequestInterval, int keepAliveRequestTimeout) {
253 if (messageFactory == null) {
254 throw new IllegalArgumentException("messageFactory");
255 }
256
257 if (interestedIdleStatus == null) {
258 throw new IllegalArgumentException("interestedIdleStatus");
259 }
260
261 if (policy == null) {
262 throw new IllegalArgumentException("policy");
263 }
264
265 this.messageFactory = messageFactory;
266 this.interestedIdleStatus = interestedIdleStatus;
267 requestTimeoutHandler = policy;
268
269 setRequestInterval(keepAliveRequestInterval);
270 setRequestTimeout(keepAliveRequestTimeout);
271 }
272
273
274
275
276 public IdleStatus getInterestedIdleStatus() {
277 return interestedIdleStatus;
278 }
279
280
281
282
283 public KeepAliveRequestTimeoutHandler getRequestTimeoutHandler() {
284 return requestTimeoutHandler;
285 }
286
287
288
289
290
291
292 public void setRequestTimeoutHandler(KeepAliveRequestTimeoutHandler timeoutHandler) {
293 if (timeoutHandler == null) {
294 throw new IllegalArgumentException("timeoutHandler");
295 }
296 requestTimeoutHandler = timeoutHandler;
297 }
298
299
300
301
302 public int getRequestInterval() {
303 return requestInterval;
304 }
305
306
307
308
309
310
311 public void setRequestInterval(int keepAliveRequestInterval) {
312 if (keepAliveRequestInterval <= 0) {
313 throw new IllegalArgumentException("keepAliveRequestInterval must be a positive integer: "
314 + keepAliveRequestInterval);
315 }
316
317 requestInterval = keepAliveRequestInterval;
318 }
319
320
321
322
323 public int getRequestTimeout() {
324 return requestTimeout;
325 }
326
327
328
329
330
331
332 public void setRequestTimeout(int keepAliveRequestTimeout) {
333 if (keepAliveRequestTimeout <= 0) {
334 throw new IllegalArgumentException("keepAliveRequestTimeout must be a positive integer: "
335 + keepAliveRequestTimeout);
336 }
337
338 requestTimeout = keepAliveRequestTimeout;
339 }
340
341
342
343
344 public KeepAliveMessageFactory getMessageFactory() {
345 return messageFactory;
346 }
347
348
349
350
351
352
353 public boolean isForwardEvent() {
354 return forwardEvent;
355 }
356
357
358
359
360
361
362
363
364 public void setForwardEvent(boolean forwardEvent) {
365 this.forwardEvent = forwardEvent;
366 }
367
368
369
370
371 @Override
372 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
373 if (parent.contains(this)) {
374 throw new IllegalArgumentException("You can't add the same filter instance more than once. "
375 + "Create another instance and add it.");
376 }
377 }
378
379
380
381
382 @Override
383 public void onPostAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
384 resetStatus(parent.getSession());
385 }
386
387
388
389
390 @Override
391 public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
392 resetStatus(parent.getSession());
393 }
394
395
396
397
398 @Override
399 public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
400 try {
401 if (messageFactory.isRequest(session, message)) {
402 Object pongMessage = messageFactory.getResponse(session, message);
403
404 if (pongMessage != null) {
405 nextFilter.filterWrite(session, new DefaultWriteRequest(pongMessage));
406 }
407 }
408
409 if (messageFactory.isResponse(session, message)) {
410 resetStatus(session);
411 }
412 } finally {
413 if (!isKeepAliveMessage(session, message)) {
414 nextFilter.messageReceived(session, message);
415 }
416 }
417 }
418
419
420
421
422 @Override
423 public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
424 Object message = writeRequest.getOriginalMessage();
425
426 if (message == null)
427 {
428 if (writeRequest.getMessage() instanceof IoBuffer) {
429 message = ((IoBuffer)writeRequest.getMessage()).duplicate().flip();
430 }
431 }
432
433 if (!isKeepAliveMessage(session, message)) {
434 nextFilter.messageSent(session, writeRequest);
435 }
436 }
437
438
439
440
441 @Override
442 public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception {
443 if (status == interestedIdleStatus) {
444 if (!session.containsAttribute(WAITING_FOR_RESPONSE)) {
445 Object pingMessage = messageFactory.getRequest(session);
446
447 if (pingMessage != null) {
448 nextFilter.filterWrite(session, new DefaultWriteRequest(pingMessage));
449
450
451
452 if (getRequestTimeoutHandler() != KeepAliveRequestTimeoutHandler.DEAF_SPEAKER) {
453 markStatus(session);
454 if (interestedIdleStatus == IdleStatus.BOTH_IDLE) {
455 session.setAttribute(IGNORE_READER_IDLE_ONCE);
456 }
457 } else {
458 resetStatus(session);
459 }
460 }
461 } else {
462 handlePingTimeout(session);
463 }
464 } else if (status == IdleStatus.READER_IDLE) {
465 if (session.removeAttribute(IGNORE_READER_IDLE_ONCE) == null) {
466 if (session.containsAttribute(WAITING_FOR_RESPONSE)) {
467 handlePingTimeout(session);
468 }
469 }
470 }
471
472 if (forwardEvent) {
473 nextFilter.sessionIdle(session, status);
474 }
475 }
476
477 private void handlePingTimeout(IoSession session) throws Exception {
478 resetStatus(session);
479 KeepAliveRequestTimeoutHandler handler = getRequestTimeoutHandler();
480 if (handler == KeepAliveRequestTimeoutHandler.DEAF_SPEAKER) {
481 return;
482 }
483
484 handler.keepAliveRequestTimedOut(this, session);
485 }
486
487 private void markStatus(IoSession session) {
488 session.getConfig().setIdleTime(interestedIdleStatus, 0);
489 session.getConfig().setReaderIdleTime(getRequestTimeout());
490 session.setAttribute(WAITING_FOR_RESPONSE);
491 }
492
493 private void resetStatus(IoSession session) {
494 session.getConfig().setReaderIdleTime(0);
495 session.getConfig().setWriterIdleTime(0);
496 session.getConfig().setIdleTime(interestedIdleStatus, getRequestInterval());
497 session.removeAttribute(WAITING_FOR_RESPONSE);
498 }
499
500 private boolean isKeepAliveMessage(IoSession session, Object message) {
501 return messageFactory.isRequest(session, message) || messageFactory.isResponse(session, message);
502 }
503 }