View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.EnumSet;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.ThreadFactory;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.apache.mina.core.filterchain.IoFilterAdapter;
30  import org.apache.mina.core.filterchain.IoFilterChain;
31  import org.apache.mina.core.filterchain.IoFilterEvent;
32  import org.apache.mina.core.session.IdleStatus;
33  import org.apache.mina.core.session.IoEventType;
34  import org.apache.mina.core.session.IoSession;
35  import org.apache.mina.core.write.WriteRequest;
36  
37  /**
38   * A filter that forwards I/O events to {@link Executor} to enforce a certain
39   * thread model while allowing the events per session to be processed
40   * simultaneously. You can apply various thread model by inserting this filter
41   * to a {@link IoFilterChain}.
42   * 
43   * <h2>Life Cycle Management</h2>
44   * 
45   * Please note that this filter doesn't manage the life cycle of the {@link Executor}.
46   * If you created this filter using {@link #ExecutorFilter(Executor)} or similar
47   * constructor that accepts an {@link Executor} that you've instantiated, you have
48   * full control and responsibility of managing its life cycle (e.g. calling
49   * {@link ExecutorService#shutdown()}.
50   * <p> 
51   * If you created this filter using convenience constructors like
52   * {@link #ExecutorFilter(int)}, then you can shut down the executor by calling
53   * {@link #destroy()} explicitly.
54   * 
55   * <h2>Event Ordering</h2>
56   * 
57   * All convenience constructors of this filter creates a new
58   * {@link OrderedThreadPoolExecutor} instance.  Therefore, the order of event is
59   * maintained like the following:
60   * <ul>
61   * <li>All event handler methods are called exclusively.
62   *     (e.g. messageReceived and messageSent can't be invoked at the same time.)</li>
63   * <li>The event order is never mixed up.
64   *     (e.g. messageReceived is always invoked before sessionClosed or messageSent.)</li>
65   * </ul>
66   * However, if you specified other {@link Executor} instance in the constructor,
67   * the order of events are not maintained at all.  This means more than one event
68   * handler methods can be invoked at the same time with mixed order.  For example,
69   * let's assume that messageReceived, messageSent, and sessionClosed events are
70   * fired.
71   * <ul>
72   * <li>All event handler methods can be called simultaneously.
73   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
74   * <li>The event order can be mixed up.
75   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
76   *           is invoked.)</li>
77   * </ul>
78   * If you need to maintain the order of events per session, please specify an
79   * {@link OrderedThreadPoolExecutor} instance or use the convenience constructors.
80   * 
81   * <h2>Selective Filtering</h2>
82   * 
83   * By default, all event types but <tt>sessionCreated</tt>, <tt>filterWrite</tt>,
84   * <tt>filterClose</tt> and <tt>filterSetTrafficMask</tt> are submitted to the
85   * underlying executor, which is most common setting.
86   * <p>
87   * If you want to submit only a certain set of event types, you can specify them
88   * in the constructor.  For example, you could configure a thread pool for
89   * write operation for the maximum performance:
90   * <pre><code>
91   * IoService service = ...;
92   * DefaultIoFilterChainBuilder chain = service.getFilterChain();
93   * 
94   * chain.addLast("codec", new ProtocolCodecFilter(...));
95   * // Use one thread pool for most events.
96   * chain.addLast("executor1", new ExecutorFilter());
97   * // and another dedicated thread pool for 'filterWrite' events.
98   * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
99   * </code></pre>
100  * 
101  * <h2>Preventing {@link OutOfMemoryError}</h2>
102  * 
103  * Please refer to {@link IoEventQueueThrottle}, which is specified as
104  * a parameter of the convenience constructors.
105  * 
106  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
107  * 
108  * @see OrderedThreadPoolExecutor
109  * @see UnorderedThreadPoolExecutor
110  * @org.apache.xbean.XBean
111  */
112 public class ExecutorFilter extends IoFilterAdapter {
113     /** The list of handled events */
114     private EnumSet<IoEventType> eventTypes;
115 
116     /** The associated executor */
117     private Executor executor;
118 
119     /** A flag set if the executor can be managed */
120     private boolean manageableExecutor;
121 
122     /** The default pool size */
123     private static final int DEFAULT_MAX_POOL_SIZE = 16;
124 
125     /** The number of thread to create at startup */
126     private static final int BASE_THREAD_NUMBER = 0;
127 
128     /** The default KeepAlive time, in seconds */
129     private static final long DEFAULT_KEEPALIVE_TIME = 30;
130 
131     /** 
132      * A set of flags used to tell if the Executor has been created 
133      * in the constructor or passed as an argument. In the second case, 
134      * the executor state can be managed.
135      **/
136     private static final boolean MANAGEABLE_EXECUTOR = true;
137 
138     private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
139 
140     /** A list of default EventTypes to be handled by the executor */
141     private static final IoEventTypeml#IoEventType">IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] { 
142         IoEventType.EXCEPTION_CAUGHT, 
143         IoEventType.MESSAGE_RECEIVED, 
144         IoEventType.MESSAGE_SENT, 
145         IoEventType.SESSION_CLOSED,
146         IoEventType.SESSION_IDLE, 
147         IoEventType.SESSION_OPENED };
148 
149     /**
150      * (Convenience constructor) Creates a new instance with a new
151      * {@link OrderedThreadPoolExecutor}, no thread in the pool, and a 
152      * maximum of 16 threads in the pool. All the event will be handled 
153      * by this default executor.
154      */
155     public ExecutorFilter() {
156         // Create a new default Executor
157         Executor newExecutor = new OrderedThreadPoolExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, 
158             DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
159 
160         // Initialise the filter
161         init(newExecutor, MANAGEABLE_EXECUTOR);
162     }
163 
164     /**
165      * (Convenience constructor) Creates a new instance with a new
166      * {@link OrderedThreadPoolExecutor}, no thread in the pool, but 
167      * a maximum of threads in the pool is given. All the event will be handled 
168      * by this default executor.
169      * 
170      * @param maximumPoolSize The maximum pool size
171      */
172     public ExecutorFilter(int maximumPoolSize) {
173         // Create a new default Executor
174         Executor newExecutor = new OrderedThreadPoolExecutor(BASE_THREAD_NUMBER, maximumPoolSize, 
175             DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
176 
177         // Initialise the filter
178         init(newExecutor, MANAGEABLE_EXECUTOR);
179     }
180 
181     /**
182      * (Convenience constructor) Creates a new instance with a new
183      * {@link OrderedThreadPoolExecutor}, a number of thread to start with, a  
184      * maximum of threads the pool can contain. All the event will be handled 
185      * by this default executor.
186      *
187      * @param corePoolSize The initial pool size
188      * @param maximumPoolSize The maximum pool size
189      */
190     public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
191         // Create a new default Executor
192         Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
193             DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
194 
195         // Initialise the filter
196         init(newExecutor, MANAGEABLE_EXECUTOR);
197     }
198 
199     /**
200      * (Convenience constructor) Creates a new instance with a new
201      * {@link OrderedThreadPoolExecutor}.
202      * 
203      * @param corePoolSize The initial pool size
204      * @param maximumPoolSize The maximum pool size
205      * @param keepAliveTime Default duration for a thread
206      * @param unit Time unit used for the keepAlive value
207      */
208     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
209         // Create a new default Executor
210         Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
211             keepAliveTime, unit, Executors.defaultThreadFactory(), null);
212 
213         // Initialise the filter
214         init(newExecutor, MANAGEABLE_EXECUTOR);
215     }
216 
217     /**
218      * (Convenience constructor) Creates a new instance with a new
219      * {@link OrderedThreadPoolExecutor}.
220      * 
221      * @param corePoolSize The initial pool size
222      * @param maximumPoolSize The maximum pool size
223      * @param keepAliveTime Default duration for a thread
224      * @param unit Time unit used for the keepAlive value
225      * @param queueHandler The queue used to store events
226      */
227     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
228             IoEventQueueHandler queueHandler) {
229         // Create a new default Executor
230         Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
231             keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
232 
233         // Initialise the filter
234         init(newExecutor, MANAGEABLE_EXECUTOR);
235     }
236 
237     /**
238      * (Convenience constructor) Creates a new instance with a new
239      * {@link OrderedThreadPoolExecutor}.
240      * 
241      * @param corePoolSize The initial pool size
242      * @param maximumPoolSize The maximum pool size
243      * @param keepAliveTime Default duration for a thread
244      * @param unit Time unit used for the keepAlive value
245      * @param threadFactory The factory used to create threads
246      */
247     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
248             ThreadFactory threadFactory) {
249         // Create a new default Executor
250         Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
251             keepAliveTime, unit, threadFactory, null);
252 
253         // Initialise the filter
254         init(newExecutor, MANAGEABLE_EXECUTOR);
255     }
256 
257     /**
258      * (Convenience constructor) Creates a new instance with a new
259      * {@link OrderedThreadPoolExecutor}.
260      * 
261      * @param corePoolSize The initial pool size
262      * @param maximumPoolSize The maximum pool size
263      * @param keepAliveTime Default duration for a thread
264      * @param unit Time unit used for the keepAlive value
265      * @param threadFactory The factory used to create threads
266      * @param queueHandler The queue used to store events
267      */
268     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
269             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
270         // Create a new default Executor
271         Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
272             keepAliveTime, unit, threadFactory, queueHandler);
273 
274         // Initialise the filter
275         init(newExecutor, MANAGEABLE_EXECUTOR);
276     }
277 
278     /**
279      * (Convenience constructor) Creates a new instance with a new
280      * {@link OrderedThreadPoolExecutor}.
281      * 
282      * @param eventTypes The event for which the executor will be used
283      */
284     public ExecutorFilter(IoEventType... eventTypes) {
285         // Create a new default Executor
286         Executor newExecutor = new OrderedThreadPoolExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, 
287             DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
288 
289         // Initialise the filter
290         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
291     }
292 
293     /**
294      * (Convenience constructor) Creates a new instance with a new
295      * {@link OrderedThreadPoolExecutor}.
296      * 
297      * @param maximumPoolSize The maximum pool size
298      * @param eventTypes The event for which the executor will be used
299      */
300     public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
301         // Create a new default Executor
302         Executor newExecutor = new OrderedThreadPoolExecutor(BASE_THREAD_NUMBER, maximumPoolSize, 
303             DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
304 
305         // Initialise the filter
306         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
307     }
308 
309     /**
310      * (Convenience constructor) Creates a new instance with a new
311      * {@link OrderedThreadPoolExecutor}.
312      * 
313      * @param corePoolSize The initial pool size
314      * @param maximumPoolSize The maximum pool size
315      * @param eventTypes The event for which the executor will be used
316      */
317     public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
318         // Create a new default Executor
319         Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
320             DEFAULT_KEEPALIVE_TIME, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
321 
322         // Initialise the filter
323         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
324     }
325 
326     /**
327      * (Convenience constructor) Creates a new instance with a new
328      * {@link OrderedThreadPoolExecutor}.
329      * 
330      * @param corePoolSize The initial pool size
331      * @param maximumPoolSize The maximum pool size
332      * @param keepAliveTime Default duration for a thread
333      * @param unit Time unit used for the keepAlive value
334      * @param eventTypes The event for which the executor will be used
335      */
336     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
337             IoEventType... eventTypes) {
338         // Create a new default Executor
339         Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
340             keepAliveTime, unit, Executors.defaultThreadFactory(), null);
341 
342         // Initialise the filter
343         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
344     }
345 
346     /**
347      * (Convenience constructor) Creates a new instance with a new
348      * {@link OrderedThreadPoolExecutor}.
349      * 
350      * @param corePoolSize The initial pool size
351      * @param maximumPoolSize The maximum pool size
352      * @param keepAliveTime Default duration for a thread
353      * @param unit Time unit used for the keepAlive value
354      * @param queueHandler The queue used to store events
355      * @param eventTypes The event for which the executor will be used
356      */
357     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
358             IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
359         // Create a new default Executor
360         Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
361             keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
362 
363         // Initialise the filter
364         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
365     }
366 
367     /**
368      * (Convenience constructor) Creates a new instance with a new
369      * {@link OrderedThreadPoolExecutor}.
370      * 
371      * @param corePoolSize The initial pool size
372      * @param maximumPoolSize The maximum pool size
373      * @param keepAliveTime Default duration for a thread
374      * @param unit Time unit used for the keepAlive value
375      * @param threadFactory The factory used to create threads
376      * @param eventTypes The event for which the executor will be used
377      */
378     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
379             ThreadFactory threadFactory, IoEventType... eventTypes) {
380         // Create a new default Executor
381         Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
382             keepAliveTime, unit, threadFactory, null);
383 
384         // Initialise the filter
385         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
386     }
387 
388     /**
389      * (Convenience constructor) Creates a new instance with a new
390      * {@link OrderedThreadPoolExecutor}.
391      * 
392      * @param corePoolSize The initial pool size
393      * @param maximumPoolSize The maximum pool size
394      * @param keepAliveTime Default duration for a thread
395      * @param unit Time unit used for the keepAlive value
396      * @param threadFactory The factory used to create threads
397      * @param queueHandler The queue used to store events
398      * @param eventTypes The event for which the executor will be used
399      */
400     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
401             ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
402         // Create a new default Executor
403         Executor newExecutor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
404                 threadFactory, queueHandler);
405 
406         // Initialise the filter
407         init(newExecutor, MANAGEABLE_EXECUTOR, eventTypes);
408     }
409 
410     /**
411      * Creates a new instance with the specified {@link Executor}.
412      * 
413      * @param executor the user's managed Executor to use in this filter
414      */
415     public ExecutorFilter(Executor executor) {
416         // Initialise the filter
417         init(executor, NOT_MANAGEABLE_EXECUTOR);
418     }
419 
420     /**
421      * Creates a new instance with the specified {@link Executor}.
422      * 
423      * @param executor the user's managed Executor to use in this filter
424      * @param eventTypes The event for which the executor will be used
425      */
426     public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
427         // Initialise the filter
428         init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
429     }
430 
431     /**
432      * Create an EnumSet from an array of EventTypes, and set the associated
433      * eventTypes field.
434      *
435      * @param eventTypes The array of handled events
436      */
437     private void initEventTypes(IoEventType... eventTypes) {
438         if ((eventTypes == null) || (eventTypes.length == 0)) {
439             eventTypes = DEFAULT_EVENT_SET;
440         }
441 
442         // Copy the list of handled events in the event set
443         this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
444 
445         // Check that we don't have the SESSION_CREATED event in the set
446         if (this.eventTypes.contains(IoEventType.SESSION_CREATED)) {
447             this.eventTypes = null;
448             throw new IllegalArgumentException(IoEventType.SESSION_CREATED + " is not allowed.");
449         }
450     }
451 
452     /**
453      * Creates a new instance of ExecutorFilter. This private constructor is called by all
454      * the public constructor.
455      *
456      * @param executor The underlying {@link Executor} in charge of managing the Thread pool.
457      * @param manageableExecutor Tells if the Executor's Life Cycle can be managed or not
458      * @param eventTypes The lit of event which are handled by the executor
459      */
460     private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
461         if (executor == null) {
462             throw new IllegalArgumentException("executor");
463         }
464 
465         initEventTypes(eventTypes);
466         this.executor = executor;
467         this.manageableExecutor = manageableExecutor;
468     }
469 
470     /**
471      * Shuts down the underlying executor if this filter hase been created via
472      * a convenience constructor.
473      */
474     @Override
475     public void destroy() {
476         if (manageableExecutor) {
477             ((ExecutorService) executor).shutdown();
478         }
479     }
480 
481     /**
482      * @return the underlying {@link Executor} instance this filter uses.
483      */
484     public final Executor getExecutor() {
485         return executor;
486     }
487 
488     /**
489      * Fires the specified event through the underlying executor.
490      * 
491      * @param event The filtered event
492      */
493     protected void fireEvent(IoFilterEvent event) {
494         executor.execute(event);
495     }
496 
497     /**
498      * {@inheritDoc}
499      */
500     @Override
501     public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
502         if (parent.contains(this)) {
503             throw new IllegalArgumentException(
504                     "You can't add the same filter instance more than once.  Create another instance and add it.");
505         }
506     }
507 
508     /**
509      * {@inheritDoc}
510      */
511     @Override
512     public final void sessionOpened(NextFilter nextFilter, IoSession session) {
513         if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
514             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED, session, null);
515             fireEvent(event);
516         } else {
517             nextFilter.sessionOpened(session);
518         }
519     }
520 
521     /**
522      * {@inheritDoc}
523      */
524     @Override
525     public final void sessionClosed(NextFilter nextFilter, IoSession session) {
526         if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
527             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED, session, null);
528             fireEvent(event);
529         } else {
530             nextFilter.sessionClosed(session);
531         }
532     }
533 
534     /**
535      * {@inheritDoc}
536      */
537     @Override
538     public final void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) {
539         if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
540             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE, session, status);
541             fireEvent(event);
542         } else {
543             nextFilter.sessionIdle(session, status);
544         }
545     }
546 
547     /**
548      * {@inheritDoc}
549      */
550     @Override
551     public final void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) {
552         if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
553             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.EXCEPTION_CAUGHT, session, cause);
554             fireEvent(event);
555         } else {
556             nextFilter.exceptionCaught(session, cause);
557         }
558     }
559 
560     /**
561      * {@inheritDoc}
562      */
563     @Override
564     public final void messageReceived(NextFilter nextFilter, IoSession session, Object message) {
565         if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
566             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message);
567             fireEvent(event);
568         } else {
569             nextFilter.messageReceived(session, message);
570         }
571     }
572 
573     /**
574      * {@inheritDoc}
575      */
576     @Override
577     public final void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
578         if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
579             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT, session, writeRequest);
580             fireEvent(event);
581         } else {
582             nextFilter.messageSent(session, writeRequest);
583         }
584     }
585 
586     /**
587      * {@inheritDoc}
588      */
589     @Override
590     public final void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
591         if (eventTypes.contains(IoEventType.WRITE)) {
592             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeRequest);
593             fireEvent(event);
594         } else {
595             nextFilter.filterWrite(session, writeRequest);
596         }
597     }
598 
599     /**
600      * {@inheritDoc}
601      */
602     @Override
603     public final void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
604         if (eventTypes.contains(IoEventType.CLOSE)) {
605             IoFilterEventn/IoFilterEvent.html#IoFilterEvent">IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session, null);
606             fireEvent(event);
607         } else {
608             nextFilter.filterClose(session);
609         }
610     }
611 }