1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.EnumSet;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28
29 import org.apache.mina.core.filterchain.IoFilterAdapter;
30 import org.apache.mina.core.filterchain.IoFilterChain;
31 import org.apache.mina.core.filterchain.IoFilterEvent;
32 import org.apache.mina.core.session.IdleStatus;
33 import org.apache.mina.core.session.IoEventType;
34 import org.apache.mina.core.session.IoSession;
35 import org.apache.mina.core.write.WriteRequest;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 public class ExecutorFilter extends IoFilterAdapter {
113
114 private EnumSet<IoEventType> eventTypes;
115
116
117 private Executor executor;
118
119
120 private boolean manageableExecutor;
121
122
123 private static final int DEFAULT_MAX_POOL_SIZE = 16;
124
125
126 private static final int BASE_THREAD_NUMBER = 0;
127
128
129 private static final long DEFAULT_KEEPALIVE_TIME = 30;
130
131
132
133
134
135
136 private static final boolean MANAGEABLE_EXECUTOR = true;
137 private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
138
139
140 private static IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] {
141 IoEventType.EXCEPTION_CAUGHT,
142 IoEventType.MESSAGE_RECEIVED,
143 IoEventType.MESSAGE_SENT,
144 IoEventType.SESSION_CLOSED,
145 IoEventType.SESSION_IDLE,
146 IoEventType.SESSION_OPENED
147 };
148
149
150
151
152
153
154
155
156 public ExecutorFilter() {
157
158 Executor executor = createDefaultExecutor(
159 BASE_THREAD_NUMBER,
160 DEFAULT_MAX_POOL_SIZE,
161 DEFAULT_KEEPALIVE_TIME,
162 TimeUnit.SECONDS,
163 Executors.defaultThreadFactory(),
164 null);
165
166
167 init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
168 }
169
170
171
172
173
174
175
176
177
178 public ExecutorFilter(int maximumPoolSize) {
179
180 Executor executor = createDefaultExecutor(
181 BASE_THREAD_NUMBER,
182 maximumPoolSize,
183 DEFAULT_KEEPALIVE_TIME,
184 TimeUnit.SECONDS,
185 Executors.defaultThreadFactory(),
186 null);
187
188
189 init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
190 }
191
192
193
194
195
196
197
198
199
200
201 public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
202
203 Executor executor = createDefaultExecutor(
204 corePoolSize,
205 maximumPoolSize,
206 DEFAULT_KEEPALIVE_TIME,
207 TimeUnit.SECONDS,
208 Executors.defaultThreadFactory(),
209 null);
210
211
212 init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
213 }
214
215
216
217
218
219
220
221
222
223
224 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime,
225 TimeUnit unit) {
226
227 Executor executor = createDefaultExecutor(
228 corePoolSize,
229 maximumPoolSize,
230 keepAliveTime,
231 unit,
232 Executors.defaultThreadFactory(),
233 null);
234
235
236 init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
237 }
238
239
240
241
242
243
244
245
246
247
248
249 public ExecutorFilter(
250 int corePoolSize, int maximumPoolSize,
251 long keepAliveTime, TimeUnit unit,
252 IoEventQueueHandler queueHandler) {
253
254 Executor executor = createDefaultExecutor(
255 corePoolSize,
256 maximumPoolSize,
257 keepAliveTime,
258 unit,
259 Executors.defaultThreadFactory(),
260 queueHandler);
261
262
263 init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
264 }
265
266
267
268
269
270
271
272
273
274
275
276 public ExecutorFilter(
277 int corePoolSize, int maximumPoolSize,
278 long keepAliveTime, TimeUnit unit,
279 ThreadFactory threadFactory) {
280
281 Executor executor = createDefaultExecutor(
282 corePoolSize,
283 maximumPoolSize,
284 keepAliveTime,
285 unit,
286 threadFactory,
287 null);
288
289
290 init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
291 }
292
293
294
295
296
297
298
299
300
301
302
303
304 public ExecutorFilter(
305 int corePoolSize, int maximumPoolSize,
306 long keepAliveTime, TimeUnit unit,
307 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
308
309 Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler);
310
311
312 init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
313 }
314
315
316
317
318
319
320
321 public ExecutorFilter(IoEventType... eventTypes) {
322
323 Executor executor = createDefaultExecutor(
324 BASE_THREAD_NUMBER,
325 DEFAULT_MAX_POOL_SIZE,
326 DEFAULT_KEEPALIVE_TIME,
327 TimeUnit.SECONDS,
328 Executors.defaultThreadFactory(),
329 null);
330
331
332 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
333 }
334
335
336
337
338
339
340
341
342 public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
343
344 Executor executor = createDefaultExecutor(
345 BASE_THREAD_NUMBER,
346 maximumPoolSize,
347 DEFAULT_KEEPALIVE_TIME,
348 TimeUnit.SECONDS,
349 Executors.defaultThreadFactory(),
350 null);
351
352
353 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
354 }
355
356
357
358
359
360
361
362
363
364 public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
365
366 Executor executor = createDefaultExecutor(
367 corePoolSize,
368 maximumPoolSize,
369 DEFAULT_KEEPALIVE_TIME,
370 TimeUnit.SECONDS,
371 Executors.defaultThreadFactory(),
372 null);
373
374
375 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
376 }
377
378
379
380
381
382
383
384
385
386
387
388 public ExecutorFilter(
389 int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
390 IoEventType... eventTypes) {
391
392 Executor executor = createDefaultExecutor(
393 corePoolSize,
394 maximumPoolSize,
395 keepAliveTime,
396 unit,
397 Executors.defaultThreadFactory(),
398 null);
399
400
401 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
402 }
403
404
405
406
407
408
409
410
411
412
413
414
415 public ExecutorFilter(
416 int corePoolSize, int maximumPoolSize,
417 long keepAliveTime, TimeUnit unit,
418 IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
419
420 Executor executor = createDefaultExecutor(
421 corePoolSize,
422 maximumPoolSize,
423 keepAliveTime,
424 unit,
425 Executors.defaultThreadFactory(),
426 queueHandler);
427
428
429 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
430 }
431
432
433
434
435
436
437
438
439
440
441
442
443 public ExecutorFilter(
444 int corePoolSize, int maximumPoolSize,
445 long keepAliveTime, TimeUnit unit,
446 ThreadFactory threadFactory, IoEventType... eventTypes) {
447
448 Executor executor = createDefaultExecutor(
449 corePoolSize,
450 maximumPoolSize,
451 keepAliveTime,
452 unit,
453 threadFactory,
454 null);
455
456
457 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
458 }
459
460
461
462
463
464
465
466
467
468
469
470
471
472 public ExecutorFilter(
473 int corePoolSize, int maximumPoolSize,
474 long keepAliveTime, TimeUnit unit,
475 ThreadFactory threadFactory, IoEventQueueHandler queueHandler,
476 IoEventType... eventTypes) {
477
478 Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
479 keepAliveTime, unit, threadFactory, queueHandler);
480
481
482 init(executor, MANAGEABLE_EXECUTOR, eventTypes);
483 }
484
485
486
487
488
489
490 public ExecutorFilter(Executor executor) {
491
492 init(executor, NOT_MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
493 }
494
495
496
497
498
499
500
501 public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
502
503 init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
504 }
505
506
507
508
509
510
511
512
513
514
515
516
517 private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
518 TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
519
520 Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
521 keepAliveTime, unit, threadFactory, queueHandler);
522
523 return executor;
524 }
525
526
527
528
529
530
531
532 private void initEventTypes(IoEventType... eventTypes) {
533 if ((eventTypes == null) || (eventTypes.length == 0)) {
534 eventTypes = DEFAULT_EVENT_SET;
535 }
536
537
538 this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
539
540
541 if (this.eventTypes.contains( IoEventType.SESSION_CREATED )) {
542 this.eventTypes = null;
543 throw new IllegalArgumentException(IoEventType.SESSION_CREATED
544 + " is not allowed.");
545 }
546 }
547
548
549
550
551
552
553
554
555
556
557 private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
558 if (executor == null) {
559 throw new NullPointerException("executor");
560 }
561
562 initEventTypes(eventTypes);
563 this.executor = executor;
564 this.manageableExecutor = manageableExecutor;
565 }
566
567
568
569
570
571 @Override
572 public void destroy() {
573 if (manageableExecutor) {
574 ((ExecutorService) executor).shutdown();
575 }
576 }
577
578
579
580
581
582
583 public final Executor getExecutor() {
584 return executor;
585 }
586
587
588
589
590 protected void fireEvent(IoFilterEvent event) {
591 getExecutor().execute(event);
592 }
593
594
595
596
597
598
599
600
601
602
603
604
605 @Override
606 public void onPreAdd(IoFilterChain parent, String name,
607 NextFilter nextFilter) throws Exception {
608 if (parent.contains(this)) {
609 throw new IllegalArgumentException(
610 "You can't add the same filter instance more than once. Create another instance and add it.");
611 }
612 }
613
614
615
616
617 @Override
618 public final void sessionOpened(NextFilter nextFilter, IoSession session) {
619 if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
620 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED,
621 session, null);
622 fireEvent(event);
623 } else {
624 nextFilter.sessionOpened(session);
625 }
626 }
627
628
629
630
631 @Override
632 public final void sessionClosed(NextFilter nextFilter, IoSession session) {
633 if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
634 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED,
635 session, null);
636 fireEvent(event);
637 } else {
638 nextFilter.sessionClosed(session);
639 }
640 }
641
642
643
644
645 @Override
646 public final void sessionIdle(NextFilter nextFilter, IoSession session,
647 IdleStatus status) {
648 if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
649 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE,
650 session, status);
651 fireEvent(event);
652 } else {
653 nextFilter.sessionIdle(session, status);
654 }
655 }
656
657
658
659
660 @Override
661 public final void exceptionCaught(NextFilter nextFilter, IoSession session,
662 Throwable cause) {
663 if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
664 IoFilterEvent event = new IoFilterEvent(nextFilter,
665 IoEventType.EXCEPTION_CAUGHT, session, cause);
666 fireEvent(event);
667 } else {
668 nextFilter.exceptionCaught(session, cause);
669 }
670 }
671
672
673
674
675 @Override
676 public final void messageReceived(NextFilter nextFilter, IoSession session,
677 Object message) {
678 if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
679 IoFilterEvent event = new IoFilterEvent(nextFilter,
680 IoEventType.MESSAGE_RECEIVED, session, message);
681 fireEvent(event);
682 } else {
683 nextFilter.messageReceived(session, message);
684 }
685 }
686
687
688
689
690 @Override
691 public final void messageSent(NextFilter nextFilter, IoSession session,
692 WriteRequest writeRequest) {
693 if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
694 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT,
695 session, writeRequest);
696 fireEvent(event);
697 } else {
698 nextFilter.messageSent(session, writeRequest);
699 }
700 }
701
702
703
704
705 @Override
706 public final void filterWrite(NextFilter nextFilter, IoSession session,
707 WriteRequest writeRequest) {
708 if (eventTypes.contains(IoEventType.WRITE)) {
709 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session,
710 writeRequest);
711 fireEvent(event);
712 } else {
713 nextFilter.filterWrite(session, writeRequest);
714 }
715 }
716
717
718
719
720 @Override
721 public final void filterClose(NextFilter nextFilter, IoSession session)
722 throws Exception {
723 if (eventTypes.contains(IoEventType.CLOSE)) {
724 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session,
725 null);
726 fireEvent(event);
727 } else {
728 nextFilter.filterClose(session);
729 }
730 }
731 }