View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.EnumSet;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.ThreadFactory;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.apache.mina.core.filterchain.IoFilterAdapter;
30  import org.apache.mina.core.filterchain.IoFilterChain;
31  import org.apache.mina.core.filterchain.IoFilterEvent;
32  import org.apache.mina.core.session.IdleStatus;
33  import org.apache.mina.core.session.IoEventType;
34  import org.apache.mina.core.session.IoSession;
35  import org.apache.mina.core.write.WriteRequest;
36  
37  /**
38   * A filter that forwards I/O events to {@link Executor} to enforce a certain
39   * thread model while allowing the events per session to be processed
40   * simultaneously. You can apply various thread model by inserting this filter
41   * to a {@link IoFilterChain}.
42   * 
43   * <h2>Life Cycle Management</h2>
44   * 
45   * Please note that this filter doesn't manage the life cycle of the {@link Executor}.
46   * If you created this filter using {@link #ExecutorFilter(Executor)} or similar
47   * constructor that accepts an {@link Executor} that you've instantiated, you have
48   * full control and responsibility of managing its life cycle (e.g. calling
49   * {@link ExecutorService#shutdown()}.
50   * <p> 
51   * If you created this filter using convenience constructors like
52   * {@link #ExecutorFilter(int)}, then you can shut down the executor by calling
53   * {@link #destroy()} explicitly.
54   * 
55   * <h2>Event Ordering</h2>
56   * 
57   * All convenience constructors of this filter creates a new
58   * {@link OrderedThreadPoolExecutor} instance.  Therefore, the order of event is
59   * maintained like the following:
60   * <ul>
61   * <li>All event handler methods are called exclusively.
62   *     (e.g. messageReceived and messageSent can't be invoked at the same time.)</li>
63   * <li>The event order is never mixed up.
64   *     (e.g. messageReceived is always invoked before sessionClosed or messageSent.)</li>
65   * </ul>
66   * However, if you specified other {@link Executor} instance in the constructor,
67   * the order of events are not maintained at all.  This means more than one event
68   * handler methods can be invoked at the same time with mixed order.  For example,
69   * let's assume that messageReceived, messageSent, and sessionClosed events are
70   * fired.
71   * <ul>
72   * <li>All event handler methods can be called simultaneously.
73   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
74   * <li>The event order can be mixed up.
75   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
76   *           is invoked.)</li>
77   * </ul>
78   * If you need to maintain the order of events per session, please specify an
79   * {@link OrderedThreadPoolExecutor} instance or use the convenience constructors.
80   * 
81   * <h2>Selective Filtering</h2>
82   * 
83   * By default, all event types but <tt>sessionCreated</tt>, <tt>filterWrite</tt>,
84   * <tt>filterClose</tt> and <tt>filterSetTrafficMask</tt> are submitted to the
85   * underlying executor, which is most common setting.
86   * <p>
87   * If you want to submit only a certain set of event types, you can specify them
88   * in the constructor.  For example, you could configure a thread pool for
89   * write operation for the maximum performance:
90   * <pre><code>
91   * IoService service = ...;
92   * DefaultIoFilterChainBuilder chain = service.getFilterChain();
93   * 
94   * chain.addLast("codec", new ProtocolCodecFilter(...));
95   * // Use one thread pool for most events.
96   * chain.addLast("executor1", new ExecutorFilter());
97   * // and another dedicated thread pool for 'filterWrite' events.
98   * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
99   * </code></pre>
100  * 
101  * <h2>Preventing {@link OutOfMemoryError}</h2>
102  * 
103  * Please refer to {@link IoEventQueueThrottle}, which is specified as
104  * a parameter of the convenience constructors.
105  * 
106  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
107  * 
108  * @see OrderedThreadPoolExecutor
109  * @see UnorderedThreadPoolExecutor
110  * @org.apache.xbean.XBean
111  */
112 public class ExecutorFilter extends IoFilterAdapter {
113     /** The list of handled events */
114     private EnumSet<IoEventType> eventTypes;
115 
116     /** The associated executor */
117     private Executor executor;
118 
119     /** A flag set if the executor can be managed */
120     private boolean manageableExecutor;
121 
122     /** The default pool size */
123     private static final int DEFAULT_MAX_POOL_SIZE = 16;
124 
125     /** The number of thread to create at startup */
126     private static final int BASE_THREAD_NUMBER = 0;
127 
128     /** The default KeepAlive time, in seconds */
129     private static final long DEFAULT_KEEPALIVE_TIME = 30;
130 
131     /** 
132      * A set of flags used to tell if the Executor has been created 
133      * in the constructor or passed as an argument. In the second case, 
134      * the executor state can be managed.
135      **/
136     private static final boolean MANAGEABLE_EXECUTOR = true;
137 
138     private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
139 
140     /** A list of default EventTypes to be handled by the executor */
141     private static final IoEventTypeml#IoEventType">IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] { IoEventType.EXCEPTION_CAUGHT,
142             IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT, IoEventType.SESSION_CLOSED,
143             IoEventType.SESSION_IDLE, IoEventType.SESSION_OPENED };
144 
145     /**
146      * (Convenience constructor) Creates a new instance with a new
147      * {@link OrderedThreadPoolExecutor}, no thread in the pool, and a 
148      * maximum of 16 threads in the pool. All the event will be handled 
149      * by this default executor.
150      */
151     public ExecutorFilter() {
152         // Create a new default Executor
153         Executor newExecutor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
154                 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
155 
156         // Initialize the filter
157         init(newExecutor, MANAGEABLE_EXECUTOR);
158     }
159 
160     /**
161      * (Convenience constructor) Creates a new instance with a new
162      * {@link OrderedThreadPoolExecutor}, no thread in the pool, but 
163      * a maximum of threads in the pool is given. All the event will be handled 
164      * by this default executor.
165      * 
166      * @param maximumPoolSize The maximum pool size
167      */
168     public ExecutorFilter(int maximumPoolSize) {
169         // Create a new default Executor
170         Executor newExecutor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
171                 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
172 
173         // Initialize the filter
174         init(newExecutor, MANAGEABLE_EXECUTOR);
175     }
176 
177     /**
178      * (Convenience constructor) Creates a new instance with a new
179      * {@link OrderedThreadPoolExecutor}, a number of thread to start with, a  
180      * maximum of threads the pool can contain. All the event will be handled 
181      * by this default executor.
182      *
183      * @param corePoolSize The initial pool size
184      * @param maximumPoolSize The maximum pool size
185      */
186     public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
187         // Create a new default Executor
188         Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
189                 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
190 
191         // Initialize the filter
192         init(newExecutor, MANAGEABLE_EXECUTOR);
193     }
194 
195     /**
196      * (Convenience constructor) Creates a new instance with a new
197      * {@link OrderedThreadPoolExecutor}.
198      * 
199      * @param corePoolSize The initial pool size
200      * @param maximumPoolSize The maximum pool size
201      * @param keepAliveTime Default duration for a thread
202      * @param unit Time unit used for the keepAlive value
203      */
204     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
205         // Create a new default Executor
206         Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
207                 Executors.defaultThreadFactory(), null);
208 
209         // Initialize the filter
210         init(newExecutor, MANAGEABLE_EXECUTOR);
211     }
212 
213     /**
214      * (Convenience constructor) Creates a new instance with a new
215      * {@link OrderedThreadPoolExecutor}.
216      * 
217      * @param corePoolSize The initial pool size
218      * @param maximumPoolSize The maximum pool size
219      * @param keepAliveTime Default duration for a thread
220      * @param unit Time unit used for the keepAlive value
221      * @param queueHandler The queue used to store events
222      */
223     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
224             IoEventQueueHandler queueHandler) {
225         // Create a new default Executor
226         Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
227                 Executors.defaultThreadFactory(), queueHandler);
228 
229         // Initialize the filter
230         init(newExecutor, MANAGEABLE_EXECUTOR);
231     }
232 
233     /**
234      * (Convenience constructor) Creates a new instance with a new
235      * {@link OrderedThreadPoolExecutor}.
236      * 
237      * @param corePoolSize The initial pool size
238      * @param maximumPoolSize The maximum pool size
239      * @param keepAliveTime Default duration for a thread
240      * @param unit Time unit used for the keepAlive value
241      * @param threadFactory The factory used to create threads
242      */
243     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
244             ThreadFactory threadFactory) {
245         // Create a new default Executor
246         Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
247                 null);
248 
249         // Initialize the filter
250         init(newExecutor, MANAGEABLE_EXECUTOR);
251     }
252 
253     /**
254      * (Convenience constructor) Creates a new instance with a new
255      * {@link OrderedThreadPoolExecutor}.
256      * 
257      * @param corePoolSize The initial pool size
258      * @param maximumPoolSize The maximum pool size
259      * @param keepAliveTime Default duration for a thread
260      * @param unit Time unit used for the keepAlive value
261      * @param threadFactory The factory used to create threads
262      * @param queueHandler The queue used to store events
263      */
264     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
265             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
266         // Create a new default Executor
267         Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
268                 threadFactory, queueHandler);
269 
270         // Initialize the filter
271         init(newExecutor, MANAGEABLE_EXECUTOR);
272     }
273 
274     /**
275      * (Convenience constructor) Creates a new instance with a new
276      * {@link OrderedThreadPoolExecutor}.
277      * 
278      * @param eventTypes The event for which the executor will be used
279      */
280     public ExecutorFilter(IoEventType... eventTypes) {
281         // Create a new default Executor
282         Executor newExecutor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
283                 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
284 
285         // Initialize the filter
286         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
287     }
288 
289     /**
290      * (Convenience constructor) Creates a new instance with a new
291      * {@link OrderedThreadPoolExecutor}.
292      * 
293      * @param maximumPoolSize The maximum pool size
294      * @param eventTypes The event for which the executor will be used
295      */
296     public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
297         // Create a new default Executor
298         Executor newExecutor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
299                 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
300 
301         // Initialize the filter
302         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
303     }
304 
305     /**
306      * (Convenience constructor) Creates a new instance with a new
307      * {@link OrderedThreadPoolExecutor}.
308      * 
309      * @param corePoolSize The initial pool size
310      * @param maximumPoolSize The maximum pool size
311      * @param eventTypes The event for which the executor will be used
312      */
313     public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
314         // Create a new default Executor
315         Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
316                 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
317 
318         // Initialize the filter
319         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
320     }
321 
322     /**
323      * (Convenience constructor) Creates a new instance with a new
324      * {@link OrderedThreadPoolExecutor}.
325      * 
326      * @param corePoolSize The initial pool size
327      * @param maximumPoolSize The maximum pool size
328      * @param keepAliveTime Default duration for a thread
329      * @param unit Time unit used for the keepAlive value
330      * @param eventTypes The event for which the executor will be used
331      */
332     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
333             IoEventType... eventTypes) {
334         // Create a new default Executor
335         Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
336                 Executors.defaultThreadFactory(), null);
337 
338         // Initialize the filter
339         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
340     }
341 
342     /**
343      * (Convenience constructor) Creates a new instance with a new
344      * {@link OrderedThreadPoolExecutor}.
345      * 
346      * @param corePoolSize The initial pool size
347      * @param maximumPoolSize The maximum pool size
348      * @param keepAliveTime Default duration for a thread
349      * @param unit Time unit used for the keepAlive value
350      * @param queueHandler The queue used to store events
351      * @param eventTypes The event for which the executor will be used
352      */
353     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
354             IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
355         // Create a new default Executor
356         Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
357                 Executors.defaultThreadFactory(), queueHandler);
358 
359         // Initialize the filter
360         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
361     }
362 
363     /**
364      * (Convenience constructor) Creates a new instance with a new
365      * {@link OrderedThreadPoolExecutor}.
366      * 
367      * @param corePoolSize The initial pool size
368      * @param maximumPoolSize The maximum pool size
369      * @param keepAliveTime Default duration for a thread
370      * @param unit Time unit used for the keepAlive value
371      * @param threadFactory The factory used to create threads
372      * @param eventTypes The event for which the executor will be used
373      */
374     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
375             ThreadFactory threadFactory, IoEventType... eventTypes) {
376         // Create a new default Executor
377         Executor newExecutor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
378                 null);
379 
380         // Initialize the filter
381         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
382     }
383 
384     /**
385      * (Convenience constructor) Creates a new instance with a new
386      * {@link OrderedThreadPoolExecutor}.
387      * 
388      * @param corePoolSize The initial pool size
389      * @param maximumPoolSize The maximum pool size
390      * @param keepAliveTime Default duration for a thread
391      * @param unit Time unit used for the keepAlive value
392      * @param threadFactory The factory used to create threads
393      * @param queueHandler The queue used to store events
394      * @param eventTypes The event for which the executor will be used
395      */
396     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
397             ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
398         // Create a new default Executor
399         Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
400                 threadFactory, queueHandler);
401 
402         // Initialize the filter
403         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
404     }
405 
406     /**
407      * Creates a new instance with the specified {@link Executor}.
408      * 
409      * @param executor the user's managed Executor to use in this filter
410      */
411     public ExecutorFilter(Executor executor) {
412         // Initialize the filter
413         init(executor, NOT_MANAGEABLE_EXECUTOR);
414     }
415 
416     /**
417      * Creates a new instance with the specified {@link Executor}.
418      * 
419      * @param executor the user's managed Executor to use in this filter
420      * @param eventTypes The event for which the executor will be used
421      */
422     public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
423         // Initialize the filter
424         init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
425     }
426 
427     /**
428      * Create an OrderedThreadPool executor.
429      *
430      * @param corePoolSize The initial pool sizePoolSize
431      * @param maximumPoolSize The maximum pool size
432      * @param keepAliveTime Default duration for a thread
433      * @param unit Time unit used for the keepAlive value
434      * @param threadFactory The factory used to create threads
435      * @param queueHandler The queue used to store events
436      * @return An instance of the created Executor
437      */
438     private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
439             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
440         // Create a new Executor
441         return new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
442                 threadFactory, queueHandler);
443     }
444 
445     /**
446      * Create an EnumSet from an array of EventTypes, and set the associated
447      * eventTypes field.
448      *
449      * @param eventTypes The array of handled events
450      */
451     private void initEventTypes(IoEventType... eventTypes) {
452         if ((eventTypes == null) || (eventTypes.length == 0)) {
453             eventTypes = DEFAULT_EVENT_SET;
454         }
455 
456         // Copy the list of handled events in the event set
457         this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
458 
459         // Check that we don't have the SESSION_CREATED event in the set
460         if (this.eventTypes.contains(IoEventType.SESSION_CREATED)) {
461             this.eventTypes = null;
462             throw new IllegalArgumentException(IoEventType.SESSION_CREATED + " is not allowed.");
463         }
464     }
465 
466     /**
467      * Creates a new instance of ExecutorFilter. This private constructor is called by all
468      * the public constructor.
469      *
470      * @param executor The underlying {@link Executor} in charge of managing the Thread pool.
471      * @param manageableExecutor Tells if the Executor's Life Cycle can be managed or not
472      * @param eventTypes The lit of event which are handled by the executor
473      */
474     private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
475         if (executor == null) {
476             throw new IllegalArgumentException("executor");
477         }
478 
479         initEventTypes(eventTypes);
480         this.executor = executor;
481         this.manageableExecutor = manageableExecutor;
482     }
483 
484     /**
485      * Shuts down the underlying executor if this filter hase been created via
486      * a convenience constructor.
487      */
488     @Override
489     public void destroy() {
490         if (manageableExecutor) {
491             ((ExecutorService) executor).shutdown();
492         }
493     }
494 
495     /**
496      * @return the underlying {@link Executor} instance this filter uses.
497      */
498     public final Executor getExecutor() {
499         return executor;
500     }
501 
502     /**
503      * Fires the specified event through the underlying executor.
504      * 
505      * @param event The filtered event
506      */
507     protected void fireEvent(IoFilterEvent event) {
508         executor.execute(event);
509     }
510 
511     /**
512      * {@inheritDoc}
513      */
514     @Override
515     public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
516         if (parent.contains(this)) {
517             throw new IllegalArgumentException(
518                     "You can't add the same filter instance more than once.  Create another instance and add it.");
519         }
520     }
521 
522     /**
523      * {@inheritDoc}
524      */
525     @Override
526     public final void sessionOpened(NextFilter nextFilter, IoSession session) {
527         if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
528             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED, session, null);
529             fireEvent(event);
530         } else {
531             nextFilter.sessionOpened(session);
532         }
533     }
534 
535     /**
536      * {@inheritDoc}
537      */
538     @Override
539     public final void sessionClosed(NextFilter nextFilter, IoSession session) {
540         if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
541             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED, session, null);
542             fireEvent(event);
543         } else {
544             nextFilter.sessionClosed(session);
545         }
546     }
547 
548     /**
549      * {@inheritDoc}
550      */
551     @Override
552     public final void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) {
553         if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
554             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE, session, status);
555             fireEvent(event);
556         } else {
557             nextFilter.sessionIdle(session, status);
558         }
559     }
560 
561     /**
562      * {@inheritDoc}
563      */
564     @Override
565     public final void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) {
566         if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
567             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.EXCEPTION_CAUGHT, session, cause);
568             fireEvent(event);
569         } else {
570             nextFilter.exceptionCaught(session, cause);
571         }
572     }
573 
574     /**
575      * {@inheritDoc}
576      */
577     @Override
578     public final void messageReceived(NextFilter nextFilter, IoSession session, Object message) {
579         if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
580             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message);
581             fireEvent(event);
582         } else {
583             nextFilter.messageReceived(session, message);
584         }
585     }
586 
587     /**
588      * {@inheritDoc}
589      */
590     @Override
591     public final void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
592         if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
593             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT, session, writeRequest);
594             fireEvent(event);
595         } else {
596             nextFilter.messageSent(session, writeRequest);
597         }
598     }
599 
600     /**
601      * {@inheritDoc}
602      */
603     @Override
604     public final void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
605         if (eventTypes.contains(IoEventType.WRITE)) {
606             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeRequest);
607             fireEvent(event);
608         } else {
609             nextFilter.filterWrite(session, writeRequest);
610         }
611     }
612 
613     /**
614      * {@inheritDoc}
615      */
616     @Override
617     public final void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
618         if (eventTypes.contains(IoEventType.CLOSE)) {
619             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session, null);
620             fireEvent(event);
621         } else {
622             nextFilter.filterClose(session);
623         }
624     }
625 }