1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.EnumSet;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28
29 import org.apache.mina.core.filterchain.IoFilterAdapter;
30 import org.apache.mina.core.filterchain.IoFilterChain;
31 import org.apache.mina.core.filterchain.IoFilterEvent;
32 import org.apache.mina.core.session.IdleStatus;
33 import org.apache.mina.core.session.IoEventType;
34 import org.apache.mina.core.session.IoSession;
35 import org.apache.mina.core.write.WriteRequest;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 public class ExecutorFilter extends IoFilterAdapter {
113
114 private EnumSet<IoEventType> eventTypes;
115
116
117 private Executor executor;
118
119
120 private boolean manageableExecutor;
121
122
123 private static final int DEFAULT_MAX_POOL_SIZE = 16;
124
125
126 private static final int BASE_THREAD_NUMBER = 0;
127
128
129 private static final long DEFAULT_KEEPALIVE_TIME = 30;
130
131
132
133
134
135
136 private static final boolean MANAGEABLE_EXECUTOR = true;
137 private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
138
139
140 private static IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] {
141 IoEventType.EXCEPTION_CAUGHT,
142 IoEventType.MESSAGE_RECEIVED,
143 IoEventType.MESSAGE_SENT,
144 IoEventType.SESSION_CLOSED,
145 IoEventType.SESSION_IDLE,
146 IoEventType.SESSION_OPENED
147 };
148
149
150
151
152
153
154
155 public ExecutorFilter() {
156
157 Executor executor = createDefaultExecutor(
158 BASE_THREAD_NUMBER,
159 DEFAULT_MAX_POOL_SIZE,
160 DEFAULT_KEEPALIVE_TIME,
161 TimeUnit.SECONDS,
162 Executors.defaultThreadFactory(),
163 null);
164
165
166 init(executor, MANAGEABLE_EXECUTOR);
167 }
168
169
170
171
172
173
174
175
176
177 public ExecutorFilter(int maximumPoolSize) {
178
179 Executor executor = createDefaultExecutor(
180 BASE_THREAD_NUMBER,
181 maximumPoolSize,
182 DEFAULT_KEEPALIVE_TIME,
183 TimeUnit.SECONDS,
184 Executors.defaultThreadFactory(),
185 null);
186
187
188 init(executor, MANAGEABLE_EXECUTOR);
189 }
190
191
192
193
194
195
196
197
198
199
200 public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
201
202 Executor executor = createDefaultExecutor(
203 corePoolSize,
204 maximumPoolSize,
205 DEFAULT_KEEPALIVE_TIME,
206 TimeUnit.SECONDS,
207 Executors.defaultThreadFactory(),
208 null);
209
210
211 init(executor, MANAGEABLE_EXECUTOR);
212 }
213
214
215
216
217
218
219
220
221
222
223 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime,
224 TimeUnit unit) {
225
226 Executor executor = createDefaultExecutor(
227 corePoolSize,
228 maximumPoolSize,
229 keepAliveTime,
230 unit,
231 Executors.defaultThreadFactory(),
232 null);
233
234
235 init(executor, MANAGEABLE_EXECUTOR);
236 }
237
238
239
240
241
242
243
244
245
246
247
248 public ExecutorFilter(
249 int corePoolSize, int maximumPoolSize,
250 long keepAliveTime, TimeUnit unit,
251 IoEventQueueHandler queueHandler) {
252
253 Executor executor = createDefaultExecutor(
254 corePoolSize,
255 maximumPoolSize,
256 keepAliveTime,
257 unit,
258 Executors.defaultThreadFactory(),
259 queueHandler);
260
261
262 init(executor, MANAGEABLE_EXECUTOR);
263 }
264
265
266
267
268
269
270
271
272
273
274
275 public ExecutorFilter(
276 int corePoolSize, int maximumPoolSize,
277 long keepAliveTime, TimeUnit unit,
278 ThreadFactory threadFactory) {
279
280 Executor executor = createDefaultExecutor(
281 corePoolSize,
282 maximumPoolSize,
283 keepAliveTime,
284 unit,
285 threadFactory,
286 null);
287
288
289 init(executor, MANAGEABLE_EXECUTOR);
290 }
291
292
293
294
295
296
297
298
299
300
301
302
303 public ExecutorFilter(
304 int corePoolSize, int maximumPoolSize,
305 long keepAliveTime, TimeUnit unit,
306 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
307
308 Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler);
309
310
311 init(executor, MANAGEABLE_EXECUTOR);
312 }
313
314
315
316
317
318
319
320 public ExecutorFilter(IoEventType... eventTypes) {
321
322 Executor executor = createDefaultExecutor(
323 BASE_THREAD_NUMBER,
324 DEFAULT_MAX_POOL_SIZE,
325 DEFAULT_KEEPALIVE_TIME,
326 TimeUnit.SECONDS,
327 Executors.defaultThreadFactory(),
328 null);
329
330
331 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
332 }
333
334
335
336
337
338
339
340
341 public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
342
343 Executor executor = createDefaultExecutor(
344 BASE_THREAD_NUMBER,
345 maximumPoolSize,
346 DEFAULT_KEEPALIVE_TIME,
347 TimeUnit.SECONDS,
348 Executors.defaultThreadFactory(),
349 null);
350
351
352 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
353 }
354
355
356
357
358
359
360
361
362
363 public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
364
365 Executor executor = createDefaultExecutor(
366 corePoolSize,
367 maximumPoolSize,
368 DEFAULT_KEEPALIVE_TIME,
369 TimeUnit.SECONDS,
370 Executors.defaultThreadFactory(),
371 null);
372
373
374 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
375 }
376
377
378
379
380
381
382
383
384
385
386
387 public ExecutorFilter(
388 int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
389 IoEventType... eventTypes) {
390
391 Executor executor = createDefaultExecutor(
392 corePoolSize,
393 maximumPoolSize,
394 keepAliveTime,
395 unit,
396 Executors.defaultThreadFactory(),
397 null);
398
399
400 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
401 }
402
403
404
405
406
407
408
409
410
411
412
413
414 public ExecutorFilter(
415 int corePoolSize, int maximumPoolSize,
416 long keepAliveTime, TimeUnit unit,
417 IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
418
419 Executor executor = createDefaultExecutor(
420 corePoolSize,
421 maximumPoolSize,
422 keepAliveTime,
423 unit,
424 Executors.defaultThreadFactory(),
425 queueHandler);
426
427
428 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
429 }
430
431
432
433
434
435
436
437
438
439
440
441
442 public ExecutorFilter(
443 int corePoolSize, int maximumPoolSize,
444 long keepAliveTime, TimeUnit unit,
445 ThreadFactory threadFactory, IoEventType... eventTypes) {
446
447 Executor executor = createDefaultExecutor(
448 corePoolSize,
449 maximumPoolSize,
450 keepAliveTime,
451 unit,
452 threadFactory,
453 null);
454
455
456 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
457 }
458
459
460
461
462
463
464
465
466
467
468
469
470
471 public ExecutorFilter(
472 int corePoolSize, int maximumPoolSize,
473 long keepAliveTime, TimeUnit unit,
474 ThreadFactory threadFactory, IoEventQueueHandler queueHandler,
475 IoEventType... eventTypes) {
476
477 Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
478 keepAliveTime, unit, threadFactory, queueHandler);
479
480
481 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
482 }
483
484
485
486
487
488
489 public ExecutorFilter(Executor executor) {
490
491 init(executor, NOT_MANAGEABLE_EXECUTOR);
492 }
493
494
495
496
497
498
499
500 public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
501
502 init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
503 }
504
505
506
507
508
509
510
511
512
513
514
515
516 private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
517 TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
518
519 Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
520 keepAliveTime, unit, threadFactory, queueHandler);
521
522 return executor;
523 }
524
525
526
527
528
529
530
531 private void initEventTypes(IoEventType... eventTypes) {
532 if ((eventTypes == null) || (eventTypes.length == 0)) {
533 eventTypes = DEFAULT_EVENT_SET;
534 }
535
536
537 this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
538
539
540 if (this.eventTypes.contains( IoEventType.SESSION_CREATED )) {
541 this.eventTypes = null;
542 throw new IllegalArgumentException(IoEventType.SESSION_CREATED
543 + " is not allowed.");
544 }
545 }
546
547
548
549
550
551
552
553
554
555
556 private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
557 if (executor == null) {
558 throw new IllegalArgumentException("executor");
559 }
560
561 initEventTypes(eventTypes);
562 this.executor = executor;
563 this.manageableExecutor = manageableExecutor;
564 }
565
566
567
568
569
570 @Override
571 public void destroy() {
572 if (manageableExecutor) {
573 ((ExecutorService) executor).shutdown();
574 }
575 }
576
577
578
579
580
581
582 public final Executor getExecutor() {
583 return executor;
584 }
585
586
587
588
589
590
591 protected void fireEvent(IoFilterEvent event) {
592 executor.execute(event);
593 }
594
595
596
597
598 @Override
599 public void onPreAdd(IoFilterChain parent, String name,
600 NextFilter nextFilter) throws Exception {
601 if (parent.contains(this)) {
602 throw new IllegalArgumentException(
603 "You can't add the same filter instance more than once. Create another instance and add it.");
604 }
605 }
606
607
608
609
610 @Override
611 public final void sessionOpened(NextFilter nextFilter, IoSession session) {
612 if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
613 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED,
614 session, null);
615 fireEvent(event);
616 } else {
617 nextFilter.sessionOpened(session);
618 }
619 }
620
621
622
623
624 @Override
625 public final void sessionClosed(NextFilter nextFilter, IoSession session) {
626 if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
627 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED,
628 session, null);
629 fireEvent(event);
630 } else {
631 nextFilter.sessionClosed(session);
632 }
633 }
634
635
636
637
638 @Override
639 public final void sessionIdle(NextFilter nextFilter, IoSession session,
640 IdleStatus status) {
641 if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
642 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE,
643 session, status);
644 fireEvent(event);
645 } else {
646 nextFilter.sessionIdle(session, status);
647 }
648 }
649
650
651
652
653 @Override
654 public final void exceptionCaught(NextFilter nextFilter, IoSession session,
655 Throwable cause) {
656 if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
657 IoFilterEvent event = new IoFilterEvent(nextFilter,
658 IoEventType.EXCEPTION_CAUGHT, session, cause);
659 fireEvent(event);
660 } else {
661 nextFilter.exceptionCaught(session, cause);
662 }
663 }
664
665
666
667
668 @Override
669 public final void messageReceived(NextFilter nextFilter, IoSession session,
670 Object message) {
671 if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
672 IoFilterEvent event = new IoFilterEvent(nextFilter,
673 IoEventType.MESSAGE_RECEIVED, session, message);
674 fireEvent(event);
675 } else {
676 nextFilter.messageReceived(session, message);
677 }
678 }
679
680
681
682
683 @Override
684 public final void messageSent(NextFilter nextFilter, IoSession session,
685 WriteRequest writeRequest) {
686 if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
687 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT,
688 session, writeRequest);
689 fireEvent(event);
690 } else {
691 nextFilter.messageSent(session, writeRequest);
692 }
693 }
694
695
696
697
698 @Override
699 public final void filterWrite(NextFilter nextFilter, IoSession session,
700 WriteRequest writeRequest) {
701 if (eventTypes.contains(IoEventType.WRITE)) {
702 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session,
703 writeRequest);
704 fireEvent(event);
705 } else {
706 nextFilter.filterWrite(session, writeRequest);
707 }
708 }
709
710
711
712
713 @Override
714 public final void filterClose(NextFilter nextFilter, IoSession session)
715 throws Exception {
716 if (eventTypes.contains(IoEventType.CLOSE)) {
717 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session,
718 null);
719 fireEvent(event);
720 } else {
721 nextFilter.filterClose(session);
722 }
723 }
724 }