1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.EnumSet;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28
29 import org.apache.mina.core.filterchain.IoFilterAdapter;
30 import org.apache.mina.core.filterchain.IoFilterChain;
31 import org.apache.mina.core.filterchain.IoFilterEvent;
32 import org.apache.mina.core.session.IdleStatus;
33 import org.apache.mina.core.session.IoEventType;
34 import org.apache.mina.core.session.IoSession;
35 import org.apache.mina.core.write.WriteRequest;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 public class ExecutorFilter extends IoFilterAdapter {
113
114 private EnumSet<IoEventType> eventTypes;
115
116
117 private Executor executor;
118
119
120 private boolean manageableExecutor;
121
122
123 private static final int DEFAULT_MAX_POOL_SIZE = 16;
124
125
126 private static final int BASE_THREAD_NUMBER = 0;
127
128
129 private static final long DEFAULT_KEEPALIVE_TIME = 30;
130
131
132
133
134
135
136 private static final boolean MANAGEABLE_EXECUTOR = true;
137
138 private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
139
140
141 private static final IoEventTypeml#IoEventType">IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] { IoEventType.EXCEPTION_CAUGHT,
142 IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT, IoEventType.SESSION_CLOSED,
143 IoEventType.SESSION_IDLE, IoEventType.SESSION_OPENED };
144
145
146
147
148
149
150
151 public ExecutorFilter() {
152
153 Executor newExecutor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
154 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
155
156
157 init(newExecutor, MANAGEABLE_EXECUTOR);
158 }
159
160
161
162
163
164
165
166
167
168 public ExecutorFilter(int maximumPoolSize) {
169
170 Executor newExecutor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
171 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
172
173
174 init(newExecutor, MANAGEABLE_EXECUTOR);
175 }
176
177
178
179
180
181
182
183
184
185
186 public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
187
188 Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
189 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
190
191
192 init(newExecutor, MANAGEABLE_EXECUTOR);
193 }
194
195
196
197
198
199
200
201
202
203
204 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
205
206 Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
207 Executors.defaultThreadFactory(), null);
208
209
210 init(newExecutor, MANAGEABLE_EXECUTOR);
211 }
212
213
214
215
216
217
218
219
220
221
222
223 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
224 IoEventQueueHandler queueHandler) {
225
226 Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
227 Executors.defaultThreadFactory(), queueHandler);
228
229
230 init(newExecutor, MANAGEABLE_EXECUTOR);
231 }
232
233
234
235
236
237
238
239
240
241
242
243 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
244 ThreadFactory threadFactory) {
245
246 Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
247 null);
248
249
250 init(newExecutor, MANAGEABLE_EXECUTOR);
251 }
252
253
254
255
256
257
258
259
260
261
262
263
264 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
265 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
266
267 Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
268 threadFactory, queueHandler);
269
270
271 init(newExecutor, MANAGEABLE_EXECUTOR);
272 }
273
274
275
276
277
278
279
280 public ExecutorFilter(IoEventType... eventTypes) {
281
282 Executor newExecutor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
283 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
284
285
286 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
287 }
288
289
290
291
292
293
294
295
296 public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
297
298 Executor newExecutor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
299 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
300
301
302 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
303 }
304
305
306
307
308
309
310
311
312
313 public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
314
315 Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
316 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
317
318
319 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
320 }
321
322
323
324
325
326
327
328
329
330
331
332 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
333 IoEventType... eventTypes) {
334
335 Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
336 Executors.defaultThreadFactory(), null);
337
338
339 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
340 }
341
342
343
344
345
346
347
348
349
350
351
352
353 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
354 IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
355
356 Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
357 Executors.defaultThreadFactory(), queueHandler);
358
359
360 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
361 }
362
363
364
365
366
367
368
369
370
371
372
373
374 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
375 ThreadFactory threadFactory, IoEventType... eventTypes) {
376
377 Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
378 null);
379
380
381 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
382 }
383
384
385
386
387
388
389
390
391
392
393
394
395
396 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
397 ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
398
399 Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
400 threadFactory, queueHandler);
401
402
403 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
404 }
405
406
407
408
409
410
411 public ExecutorFilter(Executor executor) {
412
413 init(executor, NOT_MANAGEABLE_EXECUTOR);
414 }
415
416
417
418
419
420
421
422 public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
423
424 init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
425 }
426
427
428
429
430
431
432
433
434
435
436
437
438 private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
439 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
440
441 return new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
442 threadFactory, queueHandler);
443 }
444
445
446
447
448
449
450
451 private void initEventTypes(IoEventType... eventTypes) {
452 if ((eventTypes == null) || (eventTypes.length == 0)) {
453 eventTypes = DEFAULT_EVENT_SET;
454 }
455
456
457 this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
458
459
460 if (this.eventTypes.contains(IoEventType.SESSION_CREATED)) {
461 this.eventTypes = null;
462 throw new IllegalArgumentException(IoEventType.SESSION_CREATED + " is not allowed.");
463 }
464 }
465
466
467
468
469
470
471
472
473
474 private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
475 if (executor == null) {
476 throw new IllegalArgumentException("executor");
477 }
478
479 initEventTypes(eventTypes);
480 this.executor = executor;
481 this.manageableExecutor = manageableExecutor;
482 }
483
484
485
486
487
488 @Override
489 public void destroy() {
490 if (manageableExecutor) {
491 ((ExecutorService) executor).shutdown();
492 }
493 }
494
495
496
497
498 public final Executor getExecutor() {
499 return executor;
500 }
501
502
503
504
505
506
507 protected void fireEvent(IoFilterEvent event) {
508 executor.execute(event);
509 }
510
511
512
513
514 @Override
515 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
516 if (parent.contains(this)) {
517 throw new IllegalArgumentException(
518 "You can't add the same filter instance more than once. Create another instance and add it.");
519 }
520 }
521
522
523
524
525 @Override
526 public final void sessionOpened(NextFilter nextFilter, IoSession session) {
527 if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
528 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED, session, null);
529 fireEvent(event);
530 } else {
531 nextFilter.sessionOpened(session);
532 }
533 }
534
535
536
537
538 @Override
539 public final void sessionClosed(NextFilter nextFilter, IoSession session) {
540 if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
541 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED, session, null);
542 fireEvent(event);
543 } else {
544 nextFilter.sessionClosed(session);
545 }
546 }
547
548
549
550
551 @Override
552 public final void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) {
553 if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
554 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE, session, status);
555 fireEvent(event);
556 } else {
557 nextFilter.sessionIdle(session, status);
558 }
559 }
560
561
562
563
564 @Override
565 public final void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) {
566 if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
567 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.EXCEPTION_CAUGHT, session, cause);
568 fireEvent(event);
569 } else {
570 nextFilter.exceptionCaught(session, cause);
571 }
572 }
573
574
575
576
577 @Override
578 public final void messageReceived(NextFilter nextFilter, IoSession session, Object message) {
579 if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
580 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message);
581 fireEvent(event);
582 } else {
583 nextFilter.messageReceived(session, message);
584 }
585 }
586
587
588
589
590 @Override
591 public final void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
592 if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
593 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT, session, writeRequest);
594 fireEvent(event);
595 } else {
596 nextFilter.messageSent(session, writeRequest);
597 }
598 }
599
600
601
602
603 @Override
604 public final void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
605 if (eventTypes.contains(IoEventType.WRITE)) {
606 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeRequest);
607 fireEvent(event);
608 } else {
609 nextFilter.filterWrite(session, writeRequest);
610 }
611 }
612
613
614
615
616 @Override
617 public final void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
618 if (eventTypes.contains(IoEventType.CLOSE)) {
619 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session, null);
620 fireEvent(event);
621 } else {
622 nextFilter.filterClose(session);
623 }
624 }
625 }