1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.EnumSet;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28
29 import org.apache.mina.core.filterchain.IoFilterAdapter;
30 import org.apache.mina.core.filterchain.IoFilterChain;
31 import org.apache.mina.core.filterchain.IoFilterEvent;
32 import org.apache.mina.core.session.IdleStatus;
33 import org.apache.mina.core.session.IoEventType;
34 import org.apache.mina.core.session.IoSession;
35 import org.apache.mina.core.write.WriteRequest;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 public class ExecutorFilter extends IoFilterAdapter {
113
114 private EnumSet<IoEventType> eventTypes;
115
116
117 private Executor executor;
118
119
120 private boolean manageableExecutor;
121
122
123 private static final int DEFAULT_MAX_POOL_SIZE = 16;
124
125
126 private static final int BASE_THREAD_NUMBER = 0;
127
128
129 private static final long DEFAULT_KEEPALIVE_TIME = 30;
130
131
132
133
134
135
136 private static final boolean MANAGEABLE_EXECUTOR = true;
137
138 private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
139
140
141 private static final IoEventTypeml#IoEventType">IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] {
142 IoEventType.EXCEPTION_CAUGHT,
143 IoEventType.MESSAGE_RECEIVED,
144 IoEventType.MESSAGE_SENT,
145 IoEventType.SESSION_CLOSED,
146 IoEventType.SESSION_IDLE,
147 IoEventType.SESSION_OPENED };
148
149
150
151
152
153
154
155 public ExecutorFilter() {
156
157 Executor newExecutor = new OrderedThreadPoolExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE,
158 DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
159
160
161 init(newExecutor, MANAGEABLE_EXECUTOR);
162 }
163
164
165
166
167
168
169
170
171
172 public ExecutorFilter(int maximumPoolSize) {
173
174 Executor newExecutor = new OrderedThreadPoolExecutor(BASE_THREAD_NUMBER, maximumPoolSize,
175 DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
176
177
178 init(newExecutor, MANAGEABLE_EXECUTOR);
179 }
180
181
182
183
184
185
186
187
188
189
190 public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
191
192 Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
193 DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
194
195
196 init(newExecutor, MANAGEABLE_EXECUTOR);
197 }
198
199
200
201
202
203
204
205
206
207
208 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
209
210 Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
211 keepAliveTime, unit, Executors.defaultThreadFactory(), null);
212
213
214 init(newExecutor, MANAGEABLE_EXECUTOR);
215 }
216
217
218
219
220
221
222
223
224
225
226
227 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
228 IoEventQueueHandler queueHandler) {
229
230 Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
231 keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
232
233
234 init(newExecutor, MANAGEABLE_EXECUTOR);
235 }
236
237
238
239
240
241
242
243
244
245
246
247 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
248 ThreadFactory threadFactory) {
249
250 Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
251 keepAliveTime, unit, threadFactory, null);
252
253
254 init(newExecutor, MANAGEABLE_EXECUTOR);
255 }
256
257
258
259
260
261
262
263
264
265
266
267
268 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
269 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
270
271 Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
272 keepAliveTime, unit, threadFactory, queueHandler);
273
274
275 init(newExecutor, MANAGEABLE_EXECUTOR);
276 }
277
278
279
280
281
282
283
284 public ExecutorFilter(IoEventType... eventTypes) {
285
286 Executor newExecutor = new OrderedThreadPoolExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE,
287 DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
288
289
290 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
291 }
292
293
294
295
296
297
298
299
300 public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
301
302 Executor newExecutor = new OrderedThreadPoolExecutor(BASE_THREAD_NUMBER, maximumPoolSize,
303 DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
304
305
306 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
307 }
308
309
310
311
312
313
314
315
316
317 public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
318
319 Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
320 DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
321
322
323 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
324 }
325
326
327
328
329
330
331
332
333
334
335
336 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
337 IoEventType... eventTypes) {
338
339 Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
340 keepAliveTime, unit, Executors.defaultThreadFactory(), null);
341
342
343 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
344 }
345
346
347
348
349
350
351
352
353
354
355
356
357 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
358 IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
359
360 Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
361 keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
362
363
364 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
365 }
366
367
368
369
370
371
372
373
374
375
376
377
378 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
379 ThreadFactory threadFactory, IoEventType... eventTypes) {
380
381 Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize,
382 keepAliveTime, unit, threadFactory, null);
383
384
385 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
386 }
387
388
389
390
391
392
393
394
395
396
397
398
399
400 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
401 ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
402
403 Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
404 threadFactory, queueHandler);
405
406
407 init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
408 }
409
410
411
412
413
414
415 public ExecutorFilter(Executor executor) {
416
417 init(executor, NOT_MANAGEABLE_EXECUTOR);
418 }
419
420
421
422
423
424
425
426 public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
427
428 init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
429 }
430
431
432
433
434
435
436
437 private void initEventTypes(IoEventType... eventTypes) {
438 if ((eventTypes == null) || (eventTypes.length == 0)) {
439 eventTypes = DEFAULT_EVENT_SET;
440 }
441
442
443 this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
444
445
446 if (this.eventTypes.contains(IoEventType.SESSION_CREATED)) {
447 this.eventTypes = null;
448 throw new IllegalArgumentException(IoEventType.SESSION_CREATED + " is not allowed.");
449 }
450 }
451
452
453
454
455
456
457
458
459
460 private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
461 if (executor == null) {
462 throw new IllegalArgumentException("executor");
463 }
464
465 initEventTypes(eventTypes);
466 this.executor = executor;
467 this.manageableExecutor = manageableExecutor;
468 }
469
470
471
472
473
474 @Override
475 public void destroy() {
476 if (manageableExecutor) {
477 ((ExecutorService) executor).shutdown();
478 }
479 }
480
481
482
483
484 public final Executor getExecutor() {
485 return executor;
486 }
487
488
489
490
491
492
493 protected void fireEvent(IoFilterEvent event) {
494 executor.execute(event);
495 }
496
497
498
499
500 @Override
501 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
502 if (parent.contains(this)) {
503 throw new IllegalArgumentException(
504 "You can't add the same filter instance more than once. Create another instance and add it.");
505 }
506 }
507
508
509
510
511 @Override
512 public final void sessionOpened(NextFilter nextFilter, IoSession session) {
513 if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
514 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED, session, null);
515 fireEvent(event);
516 } else {
517 nextFilter.sessionOpened(session);
518 }
519 }
520
521
522
523
524 @Override
525 public final void sessionClosed(NextFilter nextFilter, IoSession session) {
526 if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
527 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED, session, null);
528 fireEvent(event);
529 } else {
530 nextFilter.sessionClosed(session);
531 }
532 }
533
534
535
536
537 @Override
538 public final void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) {
539 if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
540 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE, session, status);
541 fireEvent(event);
542 } else {
543 nextFilter.sessionIdle(session, status);
544 }
545 }
546
547
548
549
550 @Override
551 public final void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) {
552 if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
553 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.EXCEPTION_CAUGHT, session, cause);
554 fireEvent(event);
555 } else {
556 nextFilter.exceptionCaught(session, cause);
557 }
558 }
559
560
561
562
563 @Override
564 public final void messageReceived(NextFilter nextFilter, IoSession session, Object message) {
565 if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
566 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message);
567 fireEvent(event);
568 } else {
569 nextFilter.messageReceived(session, message);
570 }
571 }
572
573
574
575
576 @Override
577 public final void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
578 if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
579 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT, session, writeRequest);
580 fireEvent(event);
581 } else {
582 nextFilter.messageSent(session, writeRequest);
583 }
584 }
585
586
587
588
589 @Override
590 public final void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
591 if (eventTypes.contains(IoEventType.WRITE)) {
592 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeRequest);
593 fireEvent(event);
594 } else {
595 nextFilter.filterWrite(session, writeRequest);
596 }
597 }
598
599
600
601
602 @Override
603 public final void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
604 if (eventTypes.contains(IoEventType.CLOSE)) {
605 IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session, null);
606 fireEvent(event);
607 } else {
608 nextFilter.filterClose(session);
609 }
610 }
611 }