View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.EnumSet;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.ThreadFactory;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.apache.mina.core.filterchain.IoFilterAdapter;
30  import org.apache.mina.core.filterchain.IoFilterChain;
31  import org.apache.mina.core.filterchain.IoFilterEvent;
32  import org.apache.mina.core.session.IdleStatus;
33  import org.apache.mina.core.session.IoEventType;
34  import org.apache.mina.core.session.IoSession;
35  import org.apache.mina.core.write.WriteRequest;
36  
37  /**
38   * A filter that forwards I/O events to {@link Executor} to enforce a certain
39   * thread model while allowing the events per session to be processed
40   * simultaneously. You can apply various thread model by inserting this filter
41   * to a {@link IoFilterChain}.
42   * 
43   * <h2>Life Cycle Management</h2>
44   * 
45   * Please note that this filter doesn't manage the life cycle of the {@link Executor}.
46   * If you created this filter using {@link #ExecutorFilter(Executor)} or similar
47   * constructor that accepts an {@link Executor} that you've instantiated, you have
48   * full control and responsibility of managing its life cycle (e.g. calling
49   * {@link ExecutorService#shutdown()}.
50   * <p> 
51   * If you created this filter using convenience constructors like
52   * {@link #ExecutorFilter(int)}, then you can shut down the executor by calling
53   * {@link #destroy()} explicitly.
54   * 
55   * <h2>Event Ordering</h2>
56   * 
57   * All convenience constructors of this filter creates a new
58   * {@link OrderedThreadPoolExecutor} instance.  Therefore, the order of event is
59   * maintained like the following:
60   * <ul>
61   * <li>All event handler methods are called exclusively.
62   *     (e.g. messageReceived and messageSent can't be invoked at the same time.)</li>
63   * <li>The event order is never mixed up.
64   *     (e.g. messageReceived is always invoked before sessionClosed or messageSent.)</li>
65   * </ul>
66   * However, if you specified other {@link Executor} instance in the constructor,
67   * the order of events are not maintained at all.  This means more than one event
68   * handler methods can be invoked at the same time with mixed order.  For example,
69   * let's assume that messageReceived, messageSent, and sessionClosed events are
70   * fired.
71   * <ul>
72   * <li>All event handler methods can be called simultaneously.
73   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
74   * <li>The event order can be mixed up.
75   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
76   *           is invoked.)</li>
77   * </ul>
78   * If you need to maintain the order of events per session, please specify an
79   * {@link OrderedThreadPoolExecutor} instance or use the convenience constructors.
80   * 
81   * <h2>Selective Filtering</h2>
82   * 
83   * By default, all event types but <tt>sessionCreated</tt>, <tt>filterWrite</tt>,
84   * <tt>filterClose</tt> and <tt>filterSetTrafficMask</tt> are submitted to the
85   * underlying executor, which is most common setting.
86   * <p>
87   * If you want to submit only a certain set of event types, you can specify them
88   * in the constructor.  For example, you could configure a thread pool for
89   * write operation for the maximum performance:
90   * <pre><code>
91   * IoService service = ...;
92   * DefaultIoFilterChainBuilder chain = service.getFilterChain();
93   * 
94   * chain.addLast("codec", new ProtocolCodecFilter(...));
95   * // Use one thread pool for most events.
96   * chain.addLast("executor1", new ExecutorFilter());
97   * // and another dedicated thread pool for 'filterWrite' events.
98   * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
99   * </code></pre>
100  * 
101  * <h2>Preventing {@link OutOfMemoryError}</h2>
102  * 
103  * Please refer to {@link IoEventQueueThrottle}, which is specified as
104  * a parameter of the convenience constructors.
105  * 
106  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
107  * 
108  * @see OrderedThreadPoolExecutor
109  * @see UnorderedThreadPoolExecutor
110  * @org.apache.xbean.XBean
111  */
112 public class ExecutorFilter extends IoFilterAdapter {
113     /** The list of handled events */
114     private EnumSet<IoEventType> eventTypes;
115     
116     /** The associated executor */
117     private Executor executor;
118     
119     /** A flag set if the executor can be managed */ 
120     private boolean manageableExecutor;
121     
122     /** The default pool size */
123     private static final int DEFAULT_MAX_POOL_SIZE = 16;
124     
125     /** The number of thread to create at startup */
126     private static final int BASE_THREAD_NUMBER = 0;
127     
128     /** The default KeepAlive time, in seconds */
129     private static final long DEFAULT_KEEPALIVE_TIME = 30;
130     
131     /** 
132      * A set of flags used to tell if the Executor has been created 
133      * in the constructor or passed as an argument. In the second case, 
134      * the executor state can be managed.
135      **/
136     private static final boolean MANAGEABLE_EXECUTOR = true;
137     private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
138     
139     /** A list of default EventTypes to be handled by the executor */
140     private static IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] {
141         IoEventType.EXCEPTION_CAUGHT,
142         IoEventType.MESSAGE_RECEIVED, 
143         IoEventType.MESSAGE_SENT,
144         IoEventType.SESSION_CLOSED, 
145         IoEventType.SESSION_IDLE,
146         IoEventType.SESSION_OPENED
147     };
148 
149     /**
150      * (Convenience constructor) Creates a new instance with a new
151      * {@link OrderedThreadPoolExecutor}, no thread in the pool, and a 
152      * maximum of 16 threads in the pool. All the event will be handled 
153      * by this default executor.
154      */
155     public ExecutorFilter() {
156         // Create a new default Executor
157         Executor executor = createDefaultExecutor(
158             BASE_THREAD_NUMBER,
159             DEFAULT_MAX_POOL_SIZE,
160             DEFAULT_KEEPALIVE_TIME,
161             TimeUnit.SECONDS,
162             Executors.defaultThreadFactory(),
163             null);
164         
165         // Initialize the filter
166         init(executor, MANAGEABLE_EXECUTOR);
167     }
168     
169     /**
170      * (Convenience constructor) Creates a new instance with a new
171      * {@link OrderedThreadPoolExecutor}, no thread in the pool, but 
172      * a maximum of threads in the pool is given. All the event will be handled 
173      * by this default executor.
174      * 
175      * @param maximumPoolSize The maximum pool size
176      */
177     public ExecutorFilter(int maximumPoolSize) {
178         // Create a new default Executor
179         Executor executor = createDefaultExecutor(
180             BASE_THREAD_NUMBER,
181             maximumPoolSize,
182             DEFAULT_KEEPALIVE_TIME,
183             TimeUnit.SECONDS,
184             Executors.defaultThreadFactory(),
185             null);
186         
187         // Initialize the filter
188         init(executor, MANAGEABLE_EXECUTOR);
189     }
190     
191     /**
192      * (Convenience constructor) Creates a new instance with a new
193      * {@link OrderedThreadPoolExecutor}, a number of thread to start with, a  
194      * maximum of threads the pool can contain. All the event will be handled 
195      * by this default executor.
196      *
197      * @param corePoolSize The initial pool size
198      * @param maximumPoolSize The maximum pool size
199      */
200     public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
201         // Create a new default Executor
202         Executor executor = createDefaultExecutor(
203             corePoolSize,
204             maximumPoolSize,
205             DEFAULT_KEEPALIVE_TIME,
206             TimeUnit.SECONDS,
207             Executors.defaultThreadFactory(),
208             null);
209         
210         // Initialize the filter
211         init(executor, MANAGEABLE_EXECUTOR);
212     }
213     
214     /**
215      * (Convenience constructor) Creates a new instance with a new
216      * {@link OrderedThreadPoolExecutor}.
217      * 
218      * @param corePoolSize The initial pool size
219      * @param maximumPoolSize The maximum pool size
220      * @param keepAliveTime Default duration for a thread
221      * @param unit Time unit used for the keepAlive value
222      */
223     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
224             TimeUnit unit) {
225         // Create a new default Executor
226         Executor executor = createDefaultExecutor(
227             corePoolSize,
228             maximumPoolSize,
229             keepAliveTime,
230             unit,
231             Executors.defaultThreadFactory(),
232             null);
233         
234         // Initialize the filter
235         init(executor, MANAGEABLE_EXECUTOR);
236     }
237 
238     /**
239      * (Convenience constructor) Creates a new instance with a new
240      * {@link OrderedThreadPoolExecutor}.
241      * 
242      * @param corePoolSize The initial pool size
243      * @param maximumPoolSize The maximum pool size
244      * @param keepAliveTime Default duration for a thread
245      * @param unit Time unit used for the keepAlive value
246      * @param queueHandler The queue used to store events
247      */
248     public ExecutorFilter(
249             int corePoolSize, int maximumPoolSize, 
250             long keepAliveTime, TimeUnit unit,
251             IoEventQueueHandler queueHandler) {
252         // Create a new default Executor
253         Executor executor = createDefaultExecutor(
254             corePoolSize,
255             maximumPoolSize,
256             keepAliveTime,
257             unit,
258             Executors.defaultThreadFactory(),
259             queueHandler);
260         
261         // Initialize the filter
262         init(executor, MANAGEABLE_EXECUTOR);
263     }
264 
265     /**
266      * (Convenience constructor) Creates a new instance with a new
267      * {@link OrderedThreadPoolExecutor}.
268      * 
269      * @param corePoolSize The initial pool size
270      * @param maximumPoolSize The maximum pool size
271      * @param keepAliveTime Default duration for a thread
272      * @param unit Time unit used for the keepAlive value
273      * @param threadFactory The factory used to create threads
274      */
275     public ExecutorFilter(
276             int corePoolSize, int maximumPoolSize, 
277             long keepAliveTime, TimeUnit unit,
278             ThreadFactory threadFactory) {
279         // Create a new default Executor
280         Executor executor = createDefaultExecutor(
281             corePoolSize,
282             maximumPoolSize,
283             keepAliveTime,
284             unit,
285             threadFactory,
286             null);
287         
288         // Initialize the filter
289         init(executor, MANAGEABLE_EXECUTOR);
290     }
291 
292     /**
293      * (Convenience constructor) Creates a new instance with a new
294      * {@link OrderedThreadPoolExecutor}.
295      * 
296      * @param corePoolSize The initial pool size
297      * @param maximumPoolSize The maximum pool size
298      * @param keepAliveTime Default duration for a thread
299      * @param unit Time unit used for the keepAlive value
300      * @param threadFactory The factory used to create threads
301      * @param queueHandler The queue used to store events
302      */
303     public ExecutorFilter(
304             int corePoolSize, int maximumPoolSize, 
305             long keepAliveTime, TimeUnit unit,
306             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
307         // Create a new default Executor
308         Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler);
309         
310         // Initialize the filter
311         init(executor, MANAGEABLE_EXECUTOR);
312     }
313 
314     /**
315      * (Convenience constructor) Creates a new instance with a new
316      * {@link OrderedThreadPoolExecutor}.
317      * 
318      * @param eventTypes The event for which the executor will be used
319      */
320     public ExecutorFilter(IoEventType... eventTypes) {
321         // Create a new default Executor
322         Executor executor = createDefaultExecutor(
323             BASE_THREAD_NUMBER,
324             DEFAULT_MAX_POOL_SIZE,
325             DEFAULT_KEEPALIVE_TIME,
326             TimeUnit.SECONDS,
327             Executors.defaultThreadFactory(),
328             null);
329         
330         // Initialize the filter
331         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
332     }
333     
334     /**
335      * (Convenience constructor) Creates a new instance with a new
336      * {@link OrderedThreadPoolExecutor}.
337      * 
338      * @param maximumPoolSize The maximum pool size
339      * @param eventTypes The event for which the executor will be used
340      */
341     public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
342         // Create a new default Executor
343         Executor executor = createDefaultExecutor(
344             BASE_THREAD_NUMBER,
345             maximumPoolSize,
346             DEFAULT_KEEPALIVE_TIME,
347             TimeUnit.SECONDS,
348             Executors.defaultThreadFactory(),
349             null);
350         
351         // Initialize the filter
352         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
353     }
354     
355     /**
356      * (Convenience constructor) Creates a new instance with a new
357      * {@link OrderedThreadPoolExecutor}.
358      * 
359      * @param corePoolSize The initial pool size
360      * @param maximumPoolSize The maximum pool size
361      * @param eventTypes The event for which the executor will be used
362      */
363     public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
364         // Create a new default Executor
365         Executor executor = createDefaultExecutor(
366             corePoolSize,
367             maximumPoolSize,
368             DEFAULT_KEEPALIVE_TIME,
369             TimeUnit.SECONDS,
370             Executors.defaultThreadFactory(),
371             null);
372         
373         // Initialize the filter
374         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
375     }
376     
377     /**
378      * (Convenience constructor) Creates a new instance with a new
379      * {@link OrderedThreadPoolExecutor}.
380      * 
381      * @param corePoolSize The initial pool size
382      * @param maximumPoolSize The maximum pool size
383      * @param keepAliveTime Default duration for a thread
384      * @param unit Time unit used for the keepAlive value
385      * @param eventTypes The event for which the executor will be used
386      */
387     public ExecutorFilter(
388             int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
389             IoEventType... eventTypes) {
390         // Create a new default Executor
391         Executor executor = createDefaultExecutor(
392             corePoolSize,
393             maximumPoolSize,
394             keepAliveTime,
395             unit,
396             Executors.defaultThreadFactory(),
397             null);
398         
399         // Initialize the filter
400         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
401     }
402     
403     /**
404      * (Convenience constructor) Creates a new instance with a new
405      * {@link OrderedThreadPoolExecutor}.
406      * 
407      * @param corePoolSize The initial pool size
408      * @param maximumPoolSize The maximum pool size
409      * @param keepAliveTime Default duration for a thread
410      * @param unit Time unit used for the keepAlive value
411      * @param queueHandler The queue used to store events
412      * @param eventTypes The event for which the executor will be used
413      */
414     public ExecutorFilter(
415             int corePoolSize, int maximumPoolSize, 
416             long keepAliveTime, TimeUnit unit,
417             IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
418         // Create a new default Executor
419         Executor executor = createDefaultExecutor(
420             corePoolSize,
421             maximumPoolSize,
422             keepAliveTime,
423             unit,
424             Executors.defaultThreadFactory(),
425             queueHandler);
426         
427         // Initialize the filter
428         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
429     }
430 
431     /**
432      * (Convenience constructor) Creates a new instance with a new
433      * {@link OrderedThreadPoolExecutor}.
434      * 
435      * @param corePoolSize The initial pool size
436      * @param maximumPoolSize The maximum pool size
437      * @param keepAliveTime Default duration for a thread
438      * @param unit Time unit used for the keepAlive value
439      * @param threadFactory The factory used to create threads
440      * @param eventTypes The event for which the executor will be used
441      */
442     public ExecutorFilter(
443             int corePoolSize, int maximumPoolSize, 
444             long keepAliveTime, TimeUnit unit,
445             ThreadFactory threadFactory, IoEventType... eventTypes) {
446         // Create a new default Executor
447         Executor executor = createDefaultExecutor(
448             corePoolSize,
449             maximumPoolSize,
450             keepAliveTime,
451             unit,
452             threadFactory,
453             null);
454         
455         // Initialize the filter
456         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
457     }
458 
459     /**
460      * (Convenience constructor) Creates a new instance with a new
461      * {@link OrderedThreadPoolExecutor}.
462      * 
463      * @param corePoolSize The initial pool size
464      * @param maximumPoolSize The maximum pool size
465      * @param keepAliveTime Default duration for a thread
466      * @param unit Time unit used for the keepAlive value
467      * @param threadFactory The factory used to create threads
468      * @param queueHandler The queue used to store events
469      * @param eventTypes The event for which the executor will be used
470      */
471     public ExecutorFilter(
472             int corePoolSize, int maximumPoolSize, 
473             long keepAliveTime, TimeUnit unit,
474             ThreadFactory threadFactory, IoEventQueueHandler queueHandler, 
475             IoEventType... eventTypes) {
476         // Create a new default Executor
477         Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
478             keepAliveTime, unit, threadFactory, queueHandler);
479         
480         // Initialize the filter
481         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
482     }
483     
484     /**
485      * Creates a new instance with the specified {@link Executor}.
486      * 
487      * @param executor the user's managed Executor to use in this filter
488      */
489     public ExecutorFilter(Executor executor) {
490         // Initialize the filter
491         init(executor, NOT_MANAGEABLE_EXECUTOR);
492     }
493 
494     /**
495      * Creates a new instance with the specified {@link Executor}.
496      * 
497      * @param executor the user's managed Executor to use in this filter
498      * @param eventTypes The event for which the executor will be used
499      */
500     public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
501         // Initialize the filter
502         init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
503     }
504     
505     /**
506      * Create an OrderedThreadPool executor.
507      *
508      * @param corePoolSize The initial pool sizePoolSize
509      * @param maximumPoolSize The maximum pool size
510      * @param keepAliveTime Default duration for a thread
511      * @param unit Time unit used for the keepAlive value
512      * @param threadFactory The factory used to create threads
513      * @param queueHandler The queue used to store events
514      * @return An instance of the created Executor
515      */
516     private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
517         TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
518         // Create a new Executor
519         Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
520             keepAliveTime, unit, threadFactory, queueHandler);
521         
522         return executor;
523     }
524     
525     /**
526      * Create an EnumSet from an array of EventTypes, and set the associated
527      * eventTypes field.
528      *
529      * @param eventTypes The array of handled events
530      */
531     private void initEventTypes(IoEventType... eventTypes) {
532         if ((eventTypes == null) || (eventTypes.length == 0)) {
533             eventTypes = DEFAULT_EVENT_SET;
534         }
535 
536         // Copy the list of handled events in the event set
537         this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
538         
539         // Check that we don't have the SESSION_CREATED event in the set
540         if (this.eventTypes.contains( IoEventType.SESSION_CREATED )) {
541             this.eventTypes = null;
542             throw new IllegalArgumentException(IoEventType.SESSION_CREATED
543                 + " is not allowed.");
544         }
545     }
546 
547     /**
548      * Creates a new instance of ExecutorFilter. This private constructor is called by all
549      * the public constructor.
550      *
551      * @param executor The underlying {@link Executor} in charge of managing the Thread pool.
552      * @param manageableExecutor Tells if the Executor's Life Cycle can be managed or not
553      * @param eventTypes The lit of event which are handled by the executor
554      * @param
555      */
556     private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
557         if (executor == null) {
558             throw new IllegalArgumentException("executor");
559         }
560 
561         initEventTypes(eventTypes);
562         this.executor = executor;
563         this.manageableExecutor = manageableExecutor;
564     }
565     
566     /**
567      * Shuts down the underlying executor if this filter hase been created via
568      * a convenience constructor.
569      */
570     @Override
571     public void destroy() {
572         if (manageableExecutor) {
573             ((ExecutorService) executor).shutdown();
574         }
575     }
576 
577     /**
578      * Returns the underlying {@link Executor} instance this filter uses.
579      * 
580      * @return The underlying {@link Executor}
581      */
582     public final Executor getExecutor() {
583         return executor;
584     }
585 
586     /**
587      * Fires the specified event through the underlying executor.
588      * 
589      * @param event The filtered event
590      */
591     protected void fireEvent(IoFilterEvent event) {
592         executor.execute(event);
593     }
594 
595     /**
596      * {@inheritDoc}
597      */
598     @Override
599     public void onPreAdd(IoFilterChain parent, String name,
600             NextFilter nextFilter) throws Exception {
601         if (parent.contains(this)) {
602             throw new IllegalArgumentException(
603                     "You can't add the same filter instance more than once.  Create another instance and add it.");
604         }
605     }
606 
607     /**
608      * {@inheritDoc}
609      */
610     @Override
611     public final void sessionOpened(NextFilter nextFilter, IoSession session) {
612         if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
613             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED,
614                 session, null); 
615             fireEvent(event);
616         } else {
617             nextFilter.sessionOpened(session);
618         }
619     }
620 
621     /**
622      * {@inheritDoc}
623      */
624     @Override
625     public final void sessionClosed(NextFilter nextFilter, IoSession session) {
626         if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
627             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED,
628                 session, null); 
629             fireEvent(event);
630         } else {
631             nextFilter.sessionClosed(session);
632         }
633     }
634 
635     /**
636      * {@inheritDoc}
637      */
638     @Override
639     public final void sessionIdle(NextFilter nextFilter, IoSession session,
640             IdleStatus status) {
641         if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
642             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE,
643                 session, status); 
644             fireEvent(event);
645         } else {
646             nextFilter.sessionIdle(session, status);
647         }
648     }
649 
650     /**
651      * {@inheritDoc}
652      */
653     @Override
654     public final void exceptionCaught(NextFilter nextFilter, IoSession session,
655             Throwable cause) {
656         if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
657             IoFilterEvent event = new IoFilterEvent(nextFilter,
658                 IoEventType.EXCEPTION_CAUGHT, session, cause); 
659             fireEvent(event);
660         } else {
661             nextFilter.exceptionCaught(session, cause);
662         }
663     }
664 
665     /**
666      * {@inheritDoc}
667      */
668     @Override
669     public final void messageReceived(NextFilter nextFilter, IoSession session,
670             Object message) {
671         if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
672             IoFilterEvent event = new IoFilterEvent(nextFilter,
673                 IoEventType.MESSAGE_RECEIVED, session, message); 
674             fireEvent(event);
675         } else {
676             nextFilter.messageReceived(session, message);
677         }
678     }
679 
680     /**
681      * {@inheritDoc}
682      */
683     @Override
684     public final void messageSent(NextFilter nextFilter, IoSession session,
685             WriteRequest writeRequest) {
686         if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
687             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT,
688                 session, writeRequest); 
689             fireEvent(event);
690         } else {
691             nextFilter.messageSent(session, writeRequest);
692         }
693     }
694 
695     /**
696      * {@inheritDoc}
697      */
698     @Override
699     public final void filterWrite(NextFilter nextFilter, IoSession session,
700             WriteRequest writeRequest) {
701         if (eventTypes.contains(IoEventType.WRITE)) {
702             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session,
703                 writeRequest); 
704             fireEvent(event);
705         } else {
706             nextFilter.filterWrite(session, writeRequest);
707         }
708     }
709 
710     /**
711      * {@inheritDoc}
712      */
713     @Override
714     public final void filterClose(NextFilter nextFilter, IoSession session)
715             throws Exception {
716         if (eventTypes.contains(IoEventType.CLOSE)) {
717             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session,
718                 null); 
719             fireEvent(event);
720         } else {
721             nextFilter.filterClose(session);
722         }
723     }
724 }