View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.EnumSet;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.ThreadFactory;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.apache.mina.core.filterchain.IoFilterAdapter;
30  import org.apache.mina.core.filterchain.IoFilterChain;
31  import org.apache.mina.core.filterchain.IoFilterEvent;
32  import org.apache.mina.core.session.IdleStatus;
33  import org.apache.mina.core.session.IoEventType;
34  import org.apache.mina.core.session.IoSession;
35  import org.apache.mina.core.write.WriteRequest;
36  
37  /**
38   * A filter that forwards I/O events to {@link Executor} to enforce a certain
39   * thread model while allowing the events per session to be processed
40   * simultaneously. You can apply various thread model by inserting this filter
41   * to a {@link IoFilterChain}.
42   * 
43   * <h2>Life Cycle Management</h2>
44   * 
45   * Please note that this filter doesn't manage the life cycle of the {@link Executor}.
46   * If you created this filter using {@link #ExecutorFilter(Executor)} or similar
47   * constructor that accepts an {@link Executor} that you've instantiated, you have
48   * full control and responsibility of managing its life cycle (e.g. calling
49   * {@link ExecutorService#shutdown()}.
50   * <p> 
51   * If you created this filter using convenience constructors like
52   * {@link #ExecutorFilter(int)}, then you can shut down the executor by calling
53   * {@link #destroy()} explicitly.
54   * 
55   * <h2>Event Ordering</h2>
56   * 
57   * All convenience constructors of this filter creates a new
58   * {@link OrderedThreadPoolExecutor} instance.  Therefore, the order of event is
59   * maintained like the following:
60   * <ul>
61   * <li>All event handler methods are called exclusively.
62   *     (e.g. messageReceived and messageSent can't be invoked at the same time.)</li>
63   * <li>The event order is never mixed up.
64   *     (e.g. messageReceived is always invoked before sessionClosed or messageSent.)</li>
65   * </ul>
66   * However, if you specified other {@link Executor} instance in the constructor,
67   * the order of events are not maintained at all.  This means more than one event
68   * handler methods can be invoked at the same time with mixed order.  For example,
69   * let's assume that messageReceived, messageSent, and sessionClosed events are
70   * fired.
71   * <ul>
72   * <li>All event handler methods can be called simultaneously.
73   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
74   * <li>The event order can be mixed up.
75   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
76   *           is invoked.)</li>
77   * </ul>
78   * If you need to maintain the order of events per session, please specify an
79   * {@link OrderedThreadPoolExecutor} instance or use the convenience constructors.
80   * 
81   * <h2>Selective Filtering</h2>
82   * 
83   * By default, all event types but <tt>sessionCreated</tt>, <tt>filterWrite</tt>,
84   * <tt>filterClose</tt> and <tt>filterSetTrafficMask</tt> are submitted to the
85   * underlying executor, which is most common setting.
86   * <p>
87   * If you want to submit only a certain set of event types, you can specify them
88   * in the constructor.  For example, you could configure a thread pool for
89   * write operation for the maximum performance:
90   * <pre><code>
91   * IoService service = ...;
92   * DefaultIoFilterChainBuilder chain = service.getFilterChain();
93   * 
94   * chain.addLast("codec", new ProtocolCodecFilter(...));
95   * // Use one thread pool for most events.
96   * chain.addLast("executor1", new ExecutorFilter());
97   * // and another dedicated thread pool for 'filterWrite' events.
98   * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
99   * </code></pre>
100  * 
101  * <h2>Preventing {@link OutOfMemoryError}</h2>
102  * 
103  * Please refer to {@link IoEventQueueThrottle}, which is specified as
104  * a parameter of the convenience constructors.
105  * 
106  * @author The Apache MINA Project (dev@mina.apache.org)
107  * 
108  * @see OrderedThreadPoolExecutor
109  * @see UnorderedThreadPoolExecutor
110  * @org.apache.xbean.XBean
111  */
112 public class ExecutorFilter extends IoFilterAdapter {
113     /** The list of handled events */
114     private EnumSet<IoEventType> eventTypes;
115     
116     /** The associated executor */
117     private Executor executor;
118     
119     /** A flag set if the executor can be managed */ 
120     private boolean manageableExecutor;
121     
122     /** The default pool size */
123     private static final int DEFAULT_MAX_POOL_SIZE = 16;
124     
125     /** The number of thread to create at startup */
126     private static final int BASE_THREAD_NUMBER = 0;
127     
128     /** The default KeepAlive time, in seconds */
129     private static final long DEFAULT_KEEPALIVE_TIME = 30;
130     
131     /** 
132      * A set of flags used to tell if the Executor has been created 
133      * in the constructor or passed as an argument. In the second case, 
134      * the executor state can be managed.
135      **/
136     private static final boolean MANAGEABLE_EXECUTOR = true;
137     private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
138     
139     /** A list of default EventTypes to be handled by the executor */
140     private static IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] {
141         IoEventType.EXCEPTION_CAUGHT,
142         IoEventType.MESSAGE_RECEIVED, 
143         IoEventType.MESSAGE_SENT,
144         IoEventType.SESSION_CLOSED, 
145         IoEventType.SESSION_IDLE,
146         IoEventType.SESSION_OPENED
147     };
148     
149 
150     /**
151      * (Convenience constructor) Creates a new instance with a new
152      * {@link OrderedThreadPoolExecutor}, no thread in the pool, and a 
153      * maximum of 16 threads in the pool. All the event will be handled 
154      * by this default executor.
155      */
156     public ExecutorFilter() {
157         // Create a new default Executor
158         Executor executor = createDefaultExecutor(
159             BASE_THREAD_NUMBER,
160             DEFAULT_MAX_POOL_SIZE,
161             DEFAULT_KEEPALIVE_TIME,
162             TimeUnit.SECONDS,
163             Executors.defaultThreadFactory(),
164             null);
165         
166         // Initialize the filter
167         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
168     }
169     
170     /**
171      * (Convenience constructor) Creates a new instance with a new
172      * {@link OrderedThreadPoolExecutor}, no thread in the pool, but 
173      * a maximum of threads in the pool is given. All the event will be handled 
174      * by this default executor.
175      * 
176      * @param maximumPoolSize The maximum pool size
177      */
178     public ExecutorFilter(int maximumPoolSize) {
179         // Create a new default Executor
180         Executor executor = createDefaultExecutor(
181             BASE_THREAD_NUMBER,
182             maximumPoolSize,
183             DEFAULT_KEEPALIVE_TIME,
184             TimeUnit.SECONDS,
185             Executors.defaultThreadFactory(),
186             null);
187         
188         // Initialize the filter
189         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
190     }
191     
192     /**
193      * (Convenience constructor) Creates a new instance with a new
194      * {@link OrderedThreadPoolExecutor}, a number of thread to start with, a  
195      * maximum of threads the pool can contain. All the event will be handled 
196      * by this default executor.
197      *
198      * @param corePoolSize The initial pool size
199      * @param maximumPoolSize The maximum pool size
200      */
201     public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
202         // Create a new default Executor
203         Executor executor = createDefaultExecutor(
204             corePoolSize,
205             maximumPoolSize,
206             DEFAULT_KEEPALIVE_TIME,
207             TimeUnit.SECONDS,
208             Executors.defaultThreadFactory(),
209             null);
210         
211         // Initialize the filter
212         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
213     }
214     
215     /**
216      * (Convenience constructor) Creates a new instance with a new
217      * {@link OrderedThreadPoolExecutor}.
218      * 
219      * @param corePoolSize The initial pool size
220      * @param maximumPoolSize The maximum pool size
221      * @param keepAliveTime Default duration for a thread
222      * @param unit Time unit used for the keepAlive value
223      */
224     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
225             TimeUnit unit) {
226         // Create a new default Executor
227         Executor executor = createDefaultExecutor(
228             corePoolSize,
229             maximumPoolSize,
230             keepAliveTime,
231             unit,
232             Executors.defaultThreadFactory(),
233             null);
234         
235         // Initialize the filter
236         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
237     }
238 
239     /**
240      * (Convenience constructor) Creates a new instance with a new
241      * {@link OrderedThreadPoolExecutor}.
242      * 
243      * @param corePoolSize The initial pool size
244      * @param maximumPoolSize The maximum pool size
245      * @param keepAliveTime Default duration for a thread
246      * @param unit Time unit used for the keepAlive value
247      * @param queueHandler The queue used to store events
248      */
249     public ExecutorFilter(
250             int corePoolSize, int maximumPoolSize, 
251             long keepAliveTime, TimeUnit unit,
252             IoEventQueueHandler queueHandler) {
253         // Create a new default Executor
254         Executor executor = createDefaultExecutor(
255             corePoolSize,
256             maximumPoolSize,
257             keepAliveTime,
258             unit,
259             Executors.defaultThreadFactory(),
260             queueHandler);
261         
262         // Initialize the filter
263         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
264     }
265 
266     /**
267      * (Convenience constructor) Creates a new instance with a new
268      * {@link OrderedThreadPoolExecutor}.
269      * 
270      * @param corePoolSize The initial pool size
271      * @param maximumPoolSize The maximum pool size
272      * @param keepAliveTime Default duration for a thread
273      * @param unit Time unit used for the keepAlive value
274      * @param threadFactory The factory used to create threads
275      */
276     public ExecutorFilter(
277             int corePoolSize, int maximumPoolSize, 
278             long keepAliveTime, TimeUnit unit,
279             ThreadFactory threadFactory) {
280         // Create a new default Executor
281         Executor executor = createDefaultExecutor(
282             corePoolSize,
283             maximumPoolSize,
284             keepAliveTime,
285             unit,
286             threadFactory,
287             null);
288         
289         // Initialize the filter
290         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
291     }
292 
293     /**
294      * (Convenience constructor) Creates a new instance with a new
295      * {@link OrderedThreadPoolExecutor}.
296      * 
297      * @param corePoolSize The initial pool size
298      * @param maximumPoolSize The maximum pool size
299      * @param keepAliveTime Default duration for a thread
300      * @param unit Time unit used for the keepAlive value
301      * @param threadFactory The factory used to create threads
302      * @param queueHandler The queue used to store events
303      */
304     public ExecutorFilter(
305             int corePoolSize, int maximumPoolSize, 
306             long keepAliveTime, TimeUnit unit,
307             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
308         // Create a new default Executor
309         Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler);
310         
311         // Initialize the filter
312         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
313     }
314 
315     /**
316      * (Convenience constructor) Creates a new instance with a new
317      * {@link OrderedThreadPoolExecutor}.
318      * 
319      * @param eventTypes The event for which the executor will be used
320      */
321     public ExecutorFilter(IoEventType... eventTypes) {
322         // Create a new default Executor
323         Executor executor = createDefaultExecutor(
324             BASE_THREAD_NUMBER,
325             DEFAULT_MAX_POOL_SIZE,
326             DEFAULT_KEEPALIVE_TIME,
327             TimeUnit.SECONDS,
328             Executors.defaultThreadFactory(),
329             null);
330         
331         // Initialize the filter
332         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
333     }
334     
335     /**
336      * (Convenience constructor) Creates a new instance with a new
337      * {@link OrderedThreadPoolExecutor}.
338      * 
339      * @param maximumPoolSize The maximum pool size
340      * @param eventTypes The event for which the executor will be used
341      */
342     public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
343         // Create a new default Executor
344         Executor executor = createDefaultExecutor(
345             BASE_THREAD_NUMBER,
346             maximumPoolSize,
347             DEFAULT_KEEPALIVE_TIME,
348             TimeUnit.SECONDS,
349             Executors.defaultThreadFactory(),
350             null);
351         
352         // Initialize the filter
353         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
354     }
355     
356     /**
357      * (Convenience constructor) Creates a new instance with a new
358      * {@link OrderedThreadPoolExecutor}.
359      * 
360      * @param corePoolSize The initial pool size
361      * @param maximumPoolSize The maximum pool size
362      * @param eventTypes The event for which the executor will be used
363      */
364     public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
365         // Create a new default Executor
366         Executor executor = createDefaultExecutor(
367             corePoolSize,
368             maximumPoolSize,
369             DEFAULT_KEEPALIVE_TIME,
370             TimeUnit.SECONDS,
371             Executors.defaultThreadFactory(),
372             null);
373         
374         // Initialize the filter
375         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
376     }
377     
378     /**
379      * (Convenience constructor) Creates a new instance with a new
380      * {@link OrderedThreadPoolExecutor}.
381      * 
382      * @param corePoolSize The initial pool size
383      * @param maximumPoolSize The maximum pool size
384      * @param keepAliveTime Default duration for a thread
385      * @param unit Time unit used for the keepAlive value
386      * @param eventTypes The event for which the executor will be used
387      */
388     public ExecutorFilter(
389             int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
390             IoEventType... eventTypes) {
391         // Create a new default Executor
392         Executor executor = createDefaultExecutor(
393             corePoolSize,
394             maximumPoolSize,
395             keepAliveTime,
396             unit,
397             Executors.defaultThreadFactory(),
398             null);
399         
400         // Initialize the filter
401         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
402     }
403     
404     /**
405      * (Convenience constructor) Creates a new instance with a new
406      * {@link OrderedThreadPoolExecutor}.
407      * 
408      * @param corePoolSize The initial pool size
409      * @param maximumPoolSize The maximum pool size
410      * @param keepAliveTime Default duration for a thread
411      * @param unit Time unit used for the keepAlive value
412      * @param queueHandler The queue used to store events
413      * @param eventTypes The event for which the executor will be used
414      */
415     public ExecutorFilter(
416             int corePoolSize, int maximumPoolSize, 
417             long keepAliveTime, TimeUnit unit,
418             IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
419         // Create a new default Executor
420         Executor executor = createDefaultExecutor(
421             corePoolSize,
422             maximumPoolSize,
423             keepAliveTime,
424             unit,
425             Executors.defaultThreadFactory(),
426             queueHandler);
427         
428         // Initialize the filter
429         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
430     }
431 
432     /**
433      * (Convenience constructor) Creates a new instance with a new
434      * {@link OrderedThreadPoolExecutor}.
435      * 
436      * @param corePoolSize The initial pool size
437      * @param maximumPoolSize The maximum pool size
438      * @param keepAliveTime Default duration for a thread
439      * @param unit Time unit used for the keepAlive value
440      * @param threadFactory The factory used to create threads
441      * @param eventTypes The event for which the executor will be used
442      */
443     public ExecutorFilter(
444             int corePoolSize, int maximumPoolSize, 
445             long keepAliveTime, TimeUnit unit,
446             ThreadFactory threadFactory, IoEventType... eventTypes) {
447         // Create a new default Executor
448         Executor executor = createDefaultExecutor(
449             corePoolSize,
450             maximumPoolSize,
451             keepAliveTime,
452             unit,
453             threadFactory,
454             null);
455         
456         // Initialize the filter
457         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
458     }
459 
460     /**
461      * (Convenience constructor) Creates a new instance with a new
462      * {@link OrderedThreadPoolExecutor}.
463      * 
464      * @param corePoolSize The initial pool size
465      * @param maximumPoolSize The maximum pool size
466      * @param keepAliveTime Default duration for a thread
467      * @param unit Time unit used for the keepAlive value
468      * @param threadFactory The factory used to create threads
469      * @param queueHandler The queue used to store events
470      * @param eventTypes The event for which the executor will be used
471      */
472     public ExecutorFilter(
473             int corePoolSize, int maximumPoolSize, 
474             long keepAliveTime, TimeUnit unit,
475             ThreadFactory threadFactory, IoEventQueueHandler queueHandler, 
476             IoEventType... eventTypes) {
477         // Create a new default Executor
478         Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
479             keepAliveTime, unit, threadFactory, queueHandler);
480         
481         // Initialize the filter
482         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
483     }
484     
485     /**
486      * Creates a new instance with the specified {@link Executor}.
487      * 
488      * @param executor the user's managed Executor to use in this filter
489      */
490     public ExecutorFilter(Executor executor) {
491         // Initialize the filter
492         init(executor, NOT_MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
493     }
494 
495     /**
496      * Creates a new instance with the specified {@link Executor}.
497      * 
498      * @param executor the user's managed Executor to use in this filter
499      * @param eventTypes The event for which the executor will be used
500      */
501     public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
502         // Initialize the filter
503         init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
504     }
505     
506     /**
507      * Create an OrderedThreadPool executor.
508      *
509      * @param corePoolSize The initial pool sizePoolSize
510      * @param maximumPoolSize The maximum pool size
511      * @param keepAliveTime Default duration for a thread
512      * @param unit Time unit used for the keepAlive value
513      * @param threadFactory The factory used to create threads
514      * @param queueHandler The queue used to store events
515      * @return An instance of the created Executor
516      */
517     private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
518         TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
519         // Create a new Executor
520         Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
521             keepAliveTime, unit, threadFactory, queueHandler);
522         
523         return executor;
524     }
525     
526     /**
527      * Create an EnumSet from an array of EventTypes, and set the associated
528      * eventTypes field.
529      *
530      * @param eventTypes The array of handled events
531      */
532     private void initEventTypes(IoEventType... eventTypes) {
533         if ((eventTypes == null) || (eventTypes.length == 0)) {
534             eventTypes = DEFAULT_EVENT_SET;
535         }
536 
537         // Copy the list of handled events in the event set
538         this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
539         
540         // Check that we don't have the SESSION_CREATED event in the set
541         if (this.eventTypes.contains( IoEventType.SESSION_CREATED )) {
542             this.eventTypes = null;
543             throw new IllegalArgumentException(IoEventType.SESSION_CREATED
544                 + " is not allowed.");
545         }
546     }
547 
548     /**
549      * Creates a new instance of ExecutorFilter. This private constructor is called by all
550      * the public constructor.
551      *
552      * @param executor The underlying {@link Executor} in charge of managing the Thread pool.
553      * @param manageableExecutor Tells if the Executor's Life Cycle can be managed or not
554      * @param eventTypes The lit of event which are handled by the executor
555      * @param
556      */
557     private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
558         if (executor == null) {
559             throw new NullPointerException("executor");
560         }
561 
562         initEventTypes(eventTypes);
563         this.executor = executor;
564         this.manageableExecutor = manageableExecutor;
565     }
566     
567     /**
568      * Shuts down the underlying executor if this filter hase been created via
569      * a convenience constructor.
570      */
571     @Override
572     public void destroy() {
573         if (manageableExecutor) {
574             ((ExecutorService) executor).shutdown();
575         }
576     }
577 
578     /**
579      * Returns the underlying {@link Executor} instance this filter uses.
580      * 
581      * @return The underlying {@link Executor}
582      */
583     public final Executor getExecutor() {
584         return executor;
585     }
586 
587     /**
588      * Fires the specified event through the underlying executor.
589      */
590     protected void fireEvent(IoFilterEvent event) {
591         getExecutor().execute(event);
592     }
593 
594     /**
595      * A trigger fired when adding this filter in a chain. As this filter can be
596      * added only once in a chain, if the chain already contains the same filter,
597      * and exception will be thrown.
598      * 
599      * @param parent The chain in which we want to inject this filter
600      * @param name The Fitler's name
601      * @param nextFilter The next filter in the chain
602      * 
603      * @throws IllegalArgumentException If the filter is already present in the chain
604      */
605     @Override
606     public void onPreAdd(IoFilterChain parent, String name,
607             NextFilter nextFilter) throws Exception {
608         if (parent.contains(this)) {
609             throw new IllegalArgumentException(
610                     "You can't add the same filter instance more than once.  Create another instance and add it.");
611         }
612     }
613 
614     /**
615      * {@inheritDoc}
616      */
617     @Override
618     public final void sessionOpened(NextFilter nextFilter, IoSession session) {
619         if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
620             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED,
621                 session, null); 
622             fireEvent(event);
623         } else {
624             nextFilter.sessionOpened(session);
625         }
626     }
627 
628     /**
629      * {@inheritDoc}
630      */
631     @Override
632     public final void sessionClosed(NextFilter nextFilter, IoSession session) {
633         if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
634             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED,
635                 session, null); 
636             fireEvent(event);
637         } else {
638             nextFilter.sessionClosed(session);
639         }
640     }
641 
642     /**
643      * {@inheritDoc}
644      */
645     @Override
646     public final void sessionIdle(NextFilter nextFilter, IoSession session,
647             IdleStatus status) {
648         if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
649             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE,
650                 session, status); 
651             fireEvent(event);
652         } else {
653             nextFilter.sessionIdle(session, status);
654         }
655     }
656 
657     /**
658      * {@inheritDoc}
659      */
660     @Override
661     public final void exceptionCaught(NextFilter nextFilter, IoSession session,
662             Throwable cause) {
663         if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
664             IoFilterEvent event = new IoFilterEvent(nextFilter,
665                 IoEventType.EXCEPTION_CAUGHT, session, cause); 
666             fireEvent(event);
667         } else {
668             nextFilter.exceptionCaught(session, cause);
669         }
670     }
671 
672     /**
673      * {@inheritDoc}
674      */
675     @Override
676     public final void messageReceived(NextFilter nextFilter, IoSession session,
677             Object message) {
678         if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
679             IoFilterEvent event = new IoFilterEvent(nextFilter,
680                 IoEventType.MESSAGE_RECEIVED, session, message); 
681             fireEvent(event);
682         } else {
683             nextFilter.messageReceived(session, message);
684         }
685     }
686 
687     /**
688      * {@inheritDoc}
689      */
690     @Override
691     public final void messageSent(NextFilter nextFilter, IoSession session,
692             WriteRequest writeRequest) {
693         if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
694             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT,
695                 session, writeRequest); 
696             fireEvent(event);
697         } else {
698             nextFilter.messageSent(session, writeRequest);
699         }
700     }
701 
702     /**
703      * {@inheritDoc}
704      */
705     @Override
706     public final void filterWrite(NextFilter nextFilter, IoSession session,
707             WriteRequest writeRequest) {
708         if (eventTypes.contains(IoEventType.WRITE)) {
709             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session,
710                 writeRequest); 
711             fireEvent(event);
712         } else {
713             nextFilter.filterWrite(session, writeRequest);
714         }
715     }
716 
717     /**
718      * {@inheritDoc}
719      */
720     @Override
721     public final void filterClose(NextFilter nextFilter, IoSession session)
722             throws Exception {
723         if (eventTypes.contains(IoEventType.CLOSE)) {
724             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session,
725                 null); 
726             fireEvent(event);
727         } else {
728             nextFilter.filterClose(session);
729         }
730     }
731 }