001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.executor;
021
022import java.util.EnumSet;
023import java.util.concurrent.Executor;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.Executors;
026import java.util.concurrent.ThreadFactory;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.mina.core.filterchain.IoFilterAdapter;
030import org.apache.mina.core.filterchain.IoFilterChain;
031import org.apache.mina.core.filterchain.IoFilterEvent;
032import org.apache.mina.core.session.IdleStatus;
033import org.apache.mina.core.session.IoEventType;
034import org.apache.mina.core.session.IoSession;
035import org.apache.mina.core.write.WriteRequest;
036
037/**
038 * A filter that forwards I/O events to {@link Executor} to enforce a certain
039 * thread model while allowing the events per session to be processed
040 * simultaneously. You can apply various thread model by inserting this filter
041 * to a {@link IoFilterChain}.
042 * 
043 * <h2>Life Cycle Management</h2>
044 * 
045 * Please note that this filter doesn't manage the life cycle of the {@link Executor}.
046 * If you created this filter using {@link #ExecutorFilter(Executor)} or similar
047 * constructor that accepts an {@link Executor} that you've instantiated, you have
048 * full control and responsibility of managing its life cycle (e.g. calling
049 * {@link ExecutorService#shutdown()}.
050 * <p> 
051 * If you created this filter using convenience constructors like
052 * {@link #ExecutorFilter(int)}, then you can shut down the executor by calling
053 * {@link #destroy()} explicitly.
054 * 
055 * <h2>Event Ordering</h2>
056 * 
057 * All convenience constructors of this filter creates a new
058 * {@link OrderedThreadPoolExecutor} instance.  Therefore, the order of event is
059 * maintained like the following:
060 * <ul>
061 * <li>All event handler methods are called exclusively.
062 *     (e.g. messageReceived and messageSent can't be invoked at the same time.)</li>
063 * <li>The event order is never mixed up.
064 *     (e.g. messageReceived is always invoked before sessionClosed or messageSent.)</li>
065 * </ul>
066 * However, if you specified other {@link Executor} instance in the constructor,
067 * the order of events are not maintained at all.  This means more than one event
068 * handler methods can be invoked at the same time with mixed order.  For example,
069 * let's assume that messageReceived, messageSent, and sessionClosed events are
070 * fired.
071 * <ul>
072 * <li>All event handler methods can be called simultaneously.
073 *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
074 * <li>The event order can be mixed up.
075 *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
076 *           is invoked.)</li>
077 * </ul>
078 * If you need to maintain the order of events per session, please specify an
079 * {@link OrderedThreadPoolExecutor} instance or use the convenience constructors.
080 * 
081 * <h2>Selective Filtering</h2>
082 * 
083 * By default, all event types but <tt>sessionCreated</tt>, <tt>filterWrite</tt>,
084 * <tt>filterClose</tt> and <tt>filterSetTrafficMask</tt> are submitted to the
085 * underlying executor, which is most common setting.
086 * <p>
087 * If you want to submit only a certain set of event types, you can specify them
088 * in the constructor.  For example, you could configure a thread pool for
089 * write operation for the maximum performance:
090 * <pre><code>
091 * IoService service = ...;
092 * DefaultIoFilterChainBuilder chain = service.getFilterChain();
093 * 
094 * chain.addLast("codec", new ProtocolCodecFilter(...));
095 * // Use one thread pool for most events.
096 * chain.addLast("executor1", new ExecutorFilter());
097 * // and another dedicated thread pool for 'filterWrite' events.
098 * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
099 * </code></pre>
100 * 
101 * <h2>Preventing {@link OutOfMemoryError}</h2>
102 * 
103 * Please refer to {@link IoEventQueueThrottle}, which is specified as
104 * a parameter of the convenience constructors.
105 * 
106 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
107 * 
108 * @see OrderedThreadPoolExecutor
109 * @see UnorderedThreadPoolExecutor
110 * @org.apache.xbean.XBean
111 */
112public class ExecutorFilter extends IoFilterAdapter {
113    /** The list of handled events */
114    private EnumSet<IoEventType> eventTypes;
115
116    /** The associated executor */
117    private Executor executor;
118
119    /** A flag set if the executor can be managed */
120    private boolean manageableExecutor;
121
122    /** The default pool size */
123    private static final int DEFAULT_MAX_POOL_SIZE = 16;
124
125    /** The number of thread to create at startup */
126    private static final int BASE_THREAD_NUMBER = 0;
127
128    /** The default KeepAlive time, in seconds */
129    private static final long DEFAULT_KEEPALIVE_TIME = 30;
130
131    /** 
132     * A set of flags used to tell if the Executor has been created 
133     * in the constructor or passed as an argument. In the second case, 
134     * the executor state can be managed.
135     **/
136    private static final boolean MANAGEABLE_EXECUTOR = true;
137
138    private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
139
140    /** A list of default EventTypes to be handled by the executor */
141    private static final IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] { IoEventType.EXCEPTION_CAUGHT,
142            IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT, IoEventType.SESSION_CLOSED,
143            IoEventType.SESSION_IDLE, IoEventType.SESSION_OPENED };
144
145    /**
146     * (Convenience constructor) Creates a new instance with a new
147     * {@link OrderedThreadPoolExecutor}, no thread in the pool, and a 
148     * maximum of 16 threads in the pool. All the event will be handled 
149     * by this default executor.
150     */
151    public ExecutorFilter() {
152        // Create a new default Executor
153        Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
154                TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
155
156        // Initialize the filter
157        init(executor, MANAGEABLE_EXECUTOR);
158    }
159
160    /**
161     * (Convenience constructor) Creates a new instance with a new
162     * {@link OrderedThreadPoolExecutor}, no thread in the pool, but 
163     * a maximum of threads in the pool is given. All the event will be handled 
164     * by this default executor.
165     * 
166     * @param maximumPoolSize The maximum pool size
167     */
168    public ExecutorFilter(int maximumPoolSize) {
169        // Create a new default Executor
170        Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
171                TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
172
173        // Initialize the filter
174        init(executor, MANAGEABLE_EXECUTOR);
175    }
176
177    /**
178     * (Convenience constructor) Creates a new instance with a new
179     * {@link OrderedThreadPoolExecutor}, a number of thread to start with, a  
180     * maximum of threads the pool can contain. All the event will be handled 
181     * by this default executor.
182     *
183     * @param corePoolSize The initial pool size
184     * @param maximumPoolSize The maximum pool size
185     */
186    public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
187        // Create a new default Executor
188        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
189                TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
190
191        // Initialize the filter
192        init(executor, MANAGEABLE_EXECUTOR);
193    }
194
195    /**
196     * (Convenience constructor) Creates a new instance with a new
197     * {@link OrderedThreadPoolExecutor}.
198     * 
199     * @param corePoolSize The initial pool size
200     * @param maximumPoolSize The maximum pool size
201     * @param keepAliveTime Default duration for a thread
202     * @param unit Time unit used for the keepAlive value
203     */
204    public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
205        // Create a new default Executor
206        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
207                Executors.defaultThreadFactory(), null);
208
209        // Initialize the filter
210        init(executor, MANAGEABLE_EXECUTOR);
211    }
212
213    /**
214     * (Convenience constructor) Creates a new instance with a new
215     * {@link OrderedThreadPoolExecutor}.
216     * 
217     * @param corePoolSize The initial pool size
218     * @param maximumPoolSize The maximum pool size
219     * @param keepAliveTime Default duration for a thread
220     * @param unit Time unit used for the keepAlive value
221     * @param queueHandler The queue used to store events
222     */
223    public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
224            IoEventQueueHandler queueHandler) {
225        // Create a new default Executor
226        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
227                Executors.defaultThreadFactory(), queueHandler);
228
229        // Initialize the filter
230        init(executor, MANAGEABLE_EXECUTOR);
231    }
232
233    /**
234     * (Convenience constructor) Creates a new instance with a new
235     * {@link OrderedThreadPoolExecutor}.
236     * 
237     * @param corePoolSize The initial pool size
238     * @param maximumPoolSize The maximum pool size
239     * @param keepAliveTime Default duration for a thread
240     * @param unit Time unit used for the keepAlive value
241     * @param threadFactory The factory used to create threads
242     */
243    public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
244            ThreadFactory threadFactory) {
245        // Create a new default Executor
246        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
247                null);
248
249        // Initialize the filter
250        init(executor, MANAGEABLE_EXECUTOR);
251    }
252
253    /**
254     * (Convenience constructor) Creates a new instance with a new
255     * {@link OrderedThreadPoolExecutor}.
256     * 
257     * @param corePoolSize The initial pool size
258     * @param maximumPoolSize The maximum pool size
259     * @param keepAliveTime Default duration for a thread
260     * @param unit Time unit used for the keepAlive value
261     * @param threadFactory The factory used to create threads
262     * @param queueHandler The queue used to store events
263     */
264    public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
265            ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
266        // Create a new default Executor
267        Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
268                threadFactory, queueHandler);
269
270        // Initialize the filter
271        init(executor, MANAGEABLE_EXECUTOR);
272    }
273
274    /**
275     * (Convenience constructor) Creates a new instance with a new
276     * {@link OrderedThreadPoolExecutor}.
277     * 
278     * @param eventTypes The event for which the executor will be used
279     */
280    public ExecutorFilter(IoEventType... eventTypes) {
281        // Create a new default Executor
282        Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME,
283                TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
284
285        // Initialize the filter
286        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
287    }
288
289    /**
290     * (Convenience constructor) Creates a new instance with a new
291     * {@link OrderedThreadPoolExecutor}.
292     * 
293     * @param maximumPoolSize The maximum pool size
294     * @param eventTypes The event for which the executor will be used
295     */
296    public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
297        // Create a new default Executor
298        Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
299                TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
300
301        // Initialize the filter
302        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
303    }
304
305    /**
306     * (Convenience constructor) Creates a new instance with a new
307     * {@link OrderedThreadPoolExecutor}.
308     * 
309     * @param corePoolSize The initial pool size
310     * @param maximumPoolSize The maximum pool size
311     * @param eventTypes The event for which the executor will be used
312     */
313    public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
314        // Create a new default Executor
315        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME,
316                TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
317
318        // Initialize the filter
319        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
320    }
321
322    /**
323     * (Convenience constructor) Creates a new instance with a new
324     * {@link OrderedThreadPoolExecutor}.
325     * 
326     * @param corePoolSize The initial pool size
327     * @param maximumPoolSize The maximum pool size
328     * @param keepAliveTime Default duration for a thread
329     * @param unit Time unit used for the keepAlive value
330     * @param eventTypes The event for which the executor will be used
331     */
332    public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
333            IoEventType... eventTypes) {
334        // Create a new default Executor
335        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
336                Executors.defaultThreadFactory(), null);
337
338        // Initialize the filter
339        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
340    }
341
342    /**
343     * (Convenience constructor) Creates a new instance with a new
344     * {@link OrderedThreadPoolExecutor}.
345     * 
346     * @param corePoolSize The initial pool size
347     * @param maximumPoolSize The maximum pool size
348     * @param keepAliveTime Default duration for a thread
349     * @param unit Time unit used for the keepAlive value
350     * @param queueHandler The queue used to store events
351     * @param eventTypes The event for which the executor will be used
352     */
353    public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
354            IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
355        // Create a new default Executor
356        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
357                Executors.defaultThreadFactory(), queueHandler);
358
359        // Initialize the filter
360        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
361    }
362
363    /**
364     * (Convenience constructor) Creates a new instance with a new
365     * {@link OrderedThreadPoolExecutor}.
366     * 
367     * @param corePoolSize The initial pool size
368     * @param maximumPoolSize The maximum pool size
369     * @param keepAliveTime Default duration for a thread
370     * @param unit Time unit used for the keepAlive value
371     * @param threadFactory The factory used to create threads
372     * @param eventTypes The event for which the executor will be used
373     */
374    public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
375            ThreadFactory threadFactory, IoEventType... eventTypes) {
376        // Create a new default Executor
377        Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory,
378                null);
379
380        // Initialize the filter
381        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
382    }
383
384    /**
385     * (Convenience constructor) Creates a new instance with a new
386     * {@link OrderedThreadPoolExecutor}.
387     * 
388     * @param corePoolSize The initial pool size
389     * @param maximumPoolSize The maximum pool size
390     * @param keepAliveTime Default duration for a thread
391     * @param unit Time unit used for the keepAlive value
392     * @param threadFactory The factory used to create threads
393     * @param queueHandler The queue used to store events
394     * @param eventTypes The event for which the executor will be used
395     */
396    public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
397            ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
398        // Create a new default Executor
399        Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
400                threadFactory, queueHandler);
401
402        // Initialize the filter
403        init(executor, MANAGEABLE_EXECUTOR, eventTypes);
404    }
405
406    /**
407     * Creates a new instance with the specified {@link Executor}.
408     * 
409     * @param executor the user's managed Executor to use in this filter
410     */
411    public ExecutorFilter(Executor executor) {
412        // Initialize the filter
413        init(executor, NOT_MANAGEABLE_EXECUTOR);
414    }
415
416    /**
417     * Creates a new instance with the specified {@link Executor}.
418     * 
419     * @param executor the user's managed Executor to use in this filter
420     * @param eventTypes The event for which the executor will be used
421     */
422    public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
423        // Initialize the filter
424        init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
425    }
426
427    /**
428     * Create an OrderedThreadPool executor.
429     *
430     * @param corePoolSize The initial pool sizePoolSize
431     * @param maximumPoolSize The maximum pool size
432     * @param keepAliveTime Default duration for a thread
433     * @param unit Time unit used for the keepAlive value
434     * @param threadFactory The factory used to create threads
435     * @param queueHandler The queue used to store events
436     * @return An instance of the created Executor
437     */
438    private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
439            ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
440        // Create a new Executor
441        Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
442                threadFactory, queueHandler);
443
444        return executor;
445    }
446
447    /**
448     * Create an EnumSet from an array of EventTypes, and set the associated
449     * eventTypes field.
450     *
451     * @param eventTypes The array of handled events
452     */
453    private void initEventTypes(IoEventType... eventTypes) {
454        if ((eventTypes == null) || (eventTypes.length == 0)) {
455            eventTypes = DEFAULT_EVENT_SET;
456        }
457
458        // Copy the list of handled events in the event set
459        this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
460
461        // Check that we don't have the SESSION_CREATED event in the set
462        if (this.eventTypes.contains(IoEventType.SESSION_CREATED)) {
463            this.eventTypes = null;
464            throw new IllegalArgumentException(IoEventType.SESSION_CREATED + " is not allowed.");
465        }
466    }
467
468    /**
469     * Creates a new instance of ExecutorFilter. This private constructor is called by all
470     * the public constructor.
471     *
472     * @param executor The underlying {@link Executor} in charge of managing the Thread pool.
473     * @param manageableExecutor Tells if the Executor's Life Cycle can be managed or not
474     * @param eventTypes The lit of event which are handled by the executor
475     */
476    private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
477        if (executor == null) {
478            throw new IllegalArgumentException("executor");
479        }
480
481        initEventTypes(eventTypes);
482        this.executor = executor;
483        this.manageableExecutor = manageableExecutor;
484    }
485
486    /**
487     * Shuts down the underlying executor if this filter hase been created via
488     * a convenience constructor.
489     */
490    @Override
491    public void destroy() {
492        if (manageableExecutor) {
493            ((ExecutorService) executor).shutdown();
494        }
495    }
496
497    /**
498     * @return the underlying {@link Executor} instance this filter uses.
499     */
500    public final Executor getExecutor() {
501        return executor;
502    }
503
504    /**
505     * Fires the specified event through the underlying executor.
506     * 
507     * @param event The filtered event
508     */
509    protected void fireEvent(IoFilterEvent event) {
510        executor.execute(event);
511    }
512
513    /**
514     * {@inheritDoc}
515     */
516    @Override
517    public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
518        if (parent.contains(this)) {
519            throw new IllegalArgumentException(
520                    "You can't add the same filter instance more than once.  Create another instance and add it.");
521        }
522    }
523
524    /**
525     * {@inheritDoc}
526     */
527    @Override
528    public final void sessionOpened(NextFilter nextFilter, IoSession session) {
529        if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
530            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED, session, null);
531            fireEvent(event);
532        } else {
533            nextFilter.sessionOpened(session);
534        }
535    }
536
537    /**
538     * {@inheritDoc}
539     */
540    @Override
541    public final void sessionClosed(NextFilter nextFilter, IoSession session) {
542        if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
543            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED, session, null);
544            fireEvent(event);
545        } else {
546            nextFilter.sessionClosed(session);
547        }
548    }
549
550    /**
551     * {@inheritDoc}
552     */
553    @Override
554    public final void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) {
555        if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
556            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE, session, status);
557            fireEvent(event);
558        } else {
559            nextFilter.sessionIdle(session, status);
560        }
561    }
562
563    /**
564     * {@inheritDoc}
565     */
566    @Override
567    public final void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) {
568        if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
569            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.EXCEPTION_CAUGHT, session, cause);
570            fireEvent(event);
571        } else {
572            nextFilter.exceptionCaught(session, cause);
573        }
574    }
575
576    /**
577     * {@inheritDoc}
578     */
579    @Override
580    public final void messageReceived(NextFilter nextFilter, IoSession session, Object message) {
581        if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
582            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message);
583            fireEvent(event);
584        } else {
585            nextFilter.messageReceived(session, message);
586        }
587    }
588
589    /**
590     * {@inheritDoc}
591     */
592    @Override
593    public final void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
594        if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
595            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT, session, writeRequest);
596            fireEvent(event);
597        } else {
598            nextFilter.messageSent(session, writeRequest);
599        }
600    }
601
602    /**
603     * {@inheritDoc}
604     */
605    @Override
606    public final void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
607        if (eventTypes.contains(IoEventType.WRITE)) {
608            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeRequest);
609            fireEvent(event);
610        } else {
611            nextFilter.filterWrite(session, writeRequest);
612        }
613    }
614
615    /**
616     * {@inheritDoc}
617     */
618    @Override
619    public final void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
620        if (eventTypes.contains(IoEventType.CLOSE)) {
621            IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session, null);
622            fireEvent(event);
623        } else {
624            nextFilter.filterClose(session);
625        }
626    }
627}