001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.filter.executor; 021 022import java.util.EnumSet; 023import java.util.concurrent.Executor; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.Executors; 026import java.util.concurrent.ThreadFactory; 027import java.util.concurrent.TimeUnit; 028 029import org.apache.mina.core.filterchain.IoFilterAdapter; 030import org.apache.mina.core.filterchain.IoFilterChain; 031import org.apache.mina.core.filterchain.IoFilterEvent; 032import org.apache.mina.core.session.IdleStatus; 033import org.apache.mina.core.session.IoEventType; 034import org.apache.mina.core.session.IoSession; 035import org.apache.mina.core.write.WriteRequest; 036 037/** 038 * A filter that forwards I/O events to {@link Executor} to enforce a certain 039 * thread model while allowing the events per session to be processed 040 * simultaneously. You can apply various thread model by inserting this filter 041 * to a {@link IoFilterChain}. 042 * 043 * <h2>Life Cycle Management</h2> 044 * 045 * Please note that this filter doesn't manage the life cycle of the {@link Executor}. 046 * If you created this filter using {@link #ExecutorFilter(Executor)} or similar 047 * constructor that accepts an {@link Executor} that you've instantiated, you have 048 * full control and responsibility of managing its life cycle (e.g. calling 049 * {@link ExecutorService#shutdown()}. 050 * <p> 051 * If you created this filter using convenience constructors like 052 * {@link #ExecutorFilter(int)}, then you can shut down the executor by calling 053 * {@link #destroy()} explicitly. 054 * 055 * <h2>Event Ordering</h2> 056 * 057 * All convenience constructors of this filter creates a new 058 * {@link OrderedThreadPoolExecutor} instance. Therefore, the order of event is 059 * maintained like the following: 060 * <ul> 061 * <li>All event handler methods are called exclusively. 062 * (e.g. messageReceived and messageSent can't be invoked at the same time.)</li> 063 * <li>The event order is never mixed up. 064 * (e.g. messageReceived is always invoked before sessionClosed or messageSent.)</li> 065 * </ul> 066 * However, if you specified other {@link Executor} instance in the constructor, 067 * the order of events are not maintained at all. This means more than one event 068 * handler methods can be invoked at the same time with mixed order. For example, 069 * let's assume that messageReceived, messageSent, and sessionClosed events are 070 * fired. 071 * <ul> 072 * <li>All event handler methods can be called simultaneously. 073 * (e.g. messageReceived and messageSent can be invoked at the same time.)</li> 074 * <li>The event order can be mixed up. 075 * (e.g. sessionClosed or messageSent can be invoked before messageReceived 076 * is invoked.)</li> 077 * </ul> 078 * If you need to maintain the order of events per session, please specify an 079 * {@link OrderedThreadPoolExecutor} instance or use the convenience constructors. 080 * 081 * <h2>Selective Filtering</h2> 082 * 083 * By default, all event types but <tt>sessionCreated</tt>, <tt>filterWrite</tt>, 084 * <tt>filterClose</tt> and <tt>filterSetTrafficMask</tt> are submitted to the 085 * underlying executor, which is most common setting. 086 * <p> 087 * If you want to submit only a certain set of event types, you can specify them 088 * in the constructor. For example, you could configure a thread pool for 089 * write operation for the maximum performance: 090 * <pre><code> 091 * IoService service = ...; 092 * DefaultIoFilterChainBuilder chain = service.getFilterChain(); 093 * 094 * chain.addLast("codec", new ProtocolCodecFilter(...)); 095 * // Use one thread pool for most events. 096 * chain.addLast("executor1", new ExecutorFilter()); 097 * // and another dedicated thread pool for 'filterWrite' events. 098 * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE)); 099 * </code></pre> 100 * 101 * <h2>Preventing {@link OutOfMemoryError}</h2> 102 * 103 * Please refer to {@link IoEventQueueThrottle}, which is specified as 104 * a parameter of the convenience constructors. 105 * 106 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 107 * 108 * @see OrderedThreadPoolExecutor 109 * @see UnorderedThreadPoolExecutor 110 * @org.apache.xbean.XBean 111 */ 112public class ExecutorFilter extends IoFilterAdapter { 113 /** The list of handled events */ 114 private EnumSet<IoEventType> eventTypes; 115 116 /** The associated executor */ 117 private Executor executor; 118 119 /** A flag set if the executor can be managed */ 120 private boolean manageableExecutor; 121 122 /** The default pool size */ 123 private static final int DEFAULT_MAX_POOL_SIZE = 16; 124 125 /** The number of thread to create at startup */ 126 private static final int BASE_THREAD_NUMBER = 0; 127 128 /** The default KeepAlive time, in seconds */ 129 private static final long DEFAULT_KEEPALIVE_TIME = 30; 130 131 /** 132 * A set of flags used to tell if the Executor has been created 133 * in the constructor or passed as an argument. In the second case, 134 * the executor state can be managed. 135 **/ 136 private static final boolean MANAGEABLE_EXECUTOR = true; 137 138 private static final boolean NOT_MANAGEABLE_EXECUTOR = false; 139 140 /** A list of default EventTypes to be handled by the executor */ 141 private static final IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] { IoEventType.EXCEPTION_CAUGHT, 142 IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT, IoEventType.SESSION_CLOSED, 143 IoEventType.SESSION_IDLE, IoEventType.SESSION_OPENED }; 144 145 /** 146 * (Convenience constructor) Creates a new instance with a new 147 * {@link OrderedThreadPoolExecutor}, no thread in the pool, and a 148 * maximum of 16 threads in the pool. All the event will be handled 149 * by this default executor. 150 */ 151 public ExecutorFilter() { 152 // Create a new default Executor 153 Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME, 154 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null); 155 156 // Initialize the filter 157 init(executor, MANAGEABLE_EXECUTOR); 158 } 159 160 /** 161 * (Convenience constructor) Creates a new instance with a new 162 * {@link OrderedThreadPoolExecutor}, no thread in the pool, but 163 * a maximum of threads in the pool is given. All the event will be handled 164 * by this default executor. 165 * 166 * @param maximumPoolSize The maximum pool size 167 */ 168 public ExecutorFilter(int maximumPoolSize) { 169 // Create a new default Executor 170 Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME, 171 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null); 172 173 // Initialize the filter 174 init(executor, MANAGEABLE_EXECUTOR); 175 } 176 177 /** 178 * (Convenience constructor) Creates a new instance with a new 179 * {@link OrderedThreadPoolExecutor}, a number of thread to start with, a 180 * maximum of threads the pool can contain. All the event will be handled 181 * by this default executor. 182 * 183 * @param corePoolSize The initial pool size 184 * @param maximumPoolSize The maximum pool size 185 */ 186 public ExecutorFilter(int corePoolSize, int maximumPoolSize) { 187 // Create a new default Executor 188 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME, 189 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null); 190 191 // Initialize the filter 192 init(executor, MANAGEABLE_EXECUTOR); 193 } 194 195 /** 196 * (Convenience constructor) Creates a new instance with a new 197 * {@link OrderedThreadPoolExecutor}. 198 * 199 * @param corePoolSize The initial pool size 200 * @param maximumPoolSize The maximum pool size 201 * @param keepAliveTime Default duration for a thread 202 * @param unit Time unit used for the keepAlive value 203 */ 204 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { 205 // Create a new default Executor 206 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, 207 Executors.defaultThreadFactory(), null); 208 209 // Initialize the filter 210 init(executor, MANAGEABLE_EXECUTOR); 211 } 212 213 /** 214 * (Convenience constructor) Creates a new instance with a new 215 * {@link OrderedThreadPoolExecutor}. 216 * 217 * @param corePoolSize The initial pool size 218 * @param maximumPoolSize The maximum pool size 219 * @param keepAliveTime Default duration for a thread 220 * @param unit Time unit used for the keepAlive value 221 * @param queueHandler The queue used to store events 222 */ 223 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 224 IoEventQueueHandler queueHandler) { 225 // Create a new default Executor 226 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, 227 Executors.defaultThreadFactory(), queueHandler); 228 229 // Initialize the filter 230 init(executor, MANAGEABLE_EXECUTOR); 231 } 232 233 /** 234 * (Convenience constructor) Creates a new instance with a new 235 * {@link OrderedThreadPoolExecutor}. 236 * 237 * @param corePoolSize The initial pool size 238 * @param maximumPoolSize The maximum pool size 239 * @param keepAliveTime Default duration for a thread 240 * @param unit Time unit used for the keepAlive value 241 * @param threadFactory The factory used to create threads 242 */ 243 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 244 ThreadFactory threadFactory) { 245 // Create a new default Executor 246 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, 247 null); 248 249 // Initialize the filter 250 init(executor, MANAGEABLE_EXECUTOR); 251 } 252 253 /** 254 * (Convenience constructor) Creates a new instance with a new 255 * {@link OrderedThreadPoolExecutor}. 256 * 257 * @param corePoolSize The initial pool size 258 * @param maximumPoolSize The maximum pool size 259 * @param keepAliveTime Default duration for a thread 260 * @param unit Time unit used for the keepAlive value 261 * @param threadFactory The factory used to create threads 262 * @param queueHandler The queue used to store events 263 */ 264 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 265 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) { 266 // Create a new default Executor 267 Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, 268 threadFactory, queueHandler); 269 270 // Initialize the filter 271 init(executor, MANAGEABLE_EXECUTOR); 272 } 273 274 /** 275 * (Convenience constructor) Creates a new instance with a new 276 * {@link OrderedThreadPoolExecutor}. 277 * 278 * @param eventTypes The event for which the executor will be used 279 */ 280 public ExecutorFilter(IoEventType... eventTypes) { 281 // Create a new default Executor 282 Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEPALIVE_TIME, 283 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null); 284 285 // Initialize the filter 286 init(executor, MANAGEABLE_EXECUTOR, eventTypes); 287 } 288 289 /** 290 * (Convenience constructor) Creates a new instance with a new 291 * {@link OrderedThreadPoolExecutor}. 292 * 293 * @param maximumPoolSize The maximum pool size 294 * @param eventTypes The event for which the executor will be used 295 */ 296 public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) { 297 // Create a new default Executor 298 Executor executor = createDefaultExecutor(BASE_THREAD_NUMBER, maximumPoolSize, DEFAULT_KEEPALIVE_TIME, 299 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null); 300 301 // Initialize the filter 302 init(executor, MANAGEABLE_EXECUTOR, eventTypes); 303 } 304 305 /** 306 * (Convenience constructor) Creates a new instance with a new 307 * {@link OrderedThreadPoolExecutor}. 308 * 309 * @param corePoolSize The initial pool size 310 * @param maximumPoolSize The maximum pool size 311 * @param eventTypes The event for which the executor will be used 312 */ 313 public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) { 314 // Create a new default Executor 315 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, DEFAULT_KEEPALIVE_TIME, 316 TimeUnit.SECONDS, Executors.defaultThreadFactory(), null); 317 318 // Initialize the filter 319 init(executor, MANAGEABLE_EXECUTOR, eventTypes); 320 } 321 322 /** 323 * (Convenience constructor) Creates a new instance with a new 324 * {@link OrderedThreadPoolExecutor}. 325 * 326 * @param corePoolSize The initial pool size 327 * @param maximumPoolSize The maximum pool size 328 * @param keepAliveTime Default duration for a thread 329 * @param unit Time unit used for the keepAlive value 330 * @param eventTypes The event for which the executor will be used 331 */ 332 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 333 IoEventType... eventTypes) { 334 // Create a new default Executor 335 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, 336 Executors.defaultThreadFactory(), null); 337 338 // Initialize the filter 339 init(executor, MANAGEABLE_EXECUTOR, eventTypes); 340 } 341 342 /** 343 * (Convenience constructor) Creates a new instance with a new 344 * {@link OrderedThreadPoolExecutor}. 345 * 346 * @param corePoolSize The initial pool size 347 * @param maximumPoolSize The maximum pool size 348 * @param keepAliveTime Default duration for a thread 349 * @param unit Time unit used for the keepAlive value 350 * @param queueHandler The queue used to store events 351 * @param eventTypes The event for which the executor will be used 352 */ 353 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 354 IoEventQueueHandler queueHandler, IoEventType... eventTypes) { 355 // Create a new default Executor 356 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, 357 Executors.defaultThreadFactory(), queueHandler); 358 359 // Initialize the filter 360 init(executor, MANAGEABLE_EXECUTOR, eventTypes); 361 } 362 363 /** 364 * (Convenience constructor) Creates a new instance with a new 365 * {@link OrderedThreadPoolExecutor}. 366 * 367 * @param corePoolSize The initial pool size 368 * @param maximumPoolSize The maximum pool size 369 * @param keepAliveTime Default duration for a thread 370 * @param unit Time unit used for the keepAlive value 371 * @param threadFactory The factory used to create threads 372 * @param eventTypes The event for which the executor will be used 373 */ 374 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 375 ThreadFactory threadFactory, IoEventType... eventTypes) { 376 // Create a new default Executor 377 Executor executor = createDefaultExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, 378 null); 379 380 // Initialize the filter 381 init(executor, MANAGEABLE_EXECUTOR, eventTypes); 382 } 383 384 /** 385 * (Convenience constructor) Creates a new instance with a new 386 * {@link OrderedThreadPoolExecutor}. 387 * 388 * @param corePoolSize The initial pool size 389 * @param maximumPoolSize The maximum pool size 390 * @param keepAliveTime Default duration for a thread 391 * @param unit Time unit used for the keepAlive value 392 * @param threadFactory The factory used to create threads 393 * @param queueHandler The queue used to store events 394 * @param eventTypes The event for which the executor will be used 395 */ 396 public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 397 ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType... eventTypes) { 398 // Create a new default Executor 399 Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, 400 threadFactory, queueHandler); 401 402 // Initialize the filter 403 init(executor, MANAGEABLE_EXECUTOR, eventTypes); 404 } 405 406 /** 407 * Creates a new instance with the specified {@link Executor}. 408 * 409 * @param executor the user's managed Executor to use in this filter 410 */ 411 public ExecutorFilter(Executor executor) { 412 // Initialize the filter 413 init(executor, NOT_MANAGEABLE_EXECUTOR); 414 } 415 416 /** 417 * Creates a new instance with the specified {@link Executor}. 418 * 419 * @param executor the user's managed Executor to use in this filter 420 * @param eventTypes The event for which the executor will be used 421 */ 422 public ExecutorFilter(Executor executor, IoEventType... eventTypes) { 423 // Initialize the filter 424 init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes); 425 } 426 427 /** 428 * Create an OrderedThreadPool executor. 429 * 430 * @param corePoolSize The initial pool sizePoolSize 431 * @param maximumPoolSize The maximum pool size 432 * @param keepAliveTime Default duration for a thread 433 * @param unit Time unit used for the keepAlive value 434 * @param threadFactory The factory used to create threads 435 * @param queueHandler The queue used to store events 436 * @return An instance of the created Executor 437 */ 438 private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 439 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) { 440 // Create a new Executor 441 Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, 442 threadFactory, queueHandler); 443 444 return executor; 445 } 446 447 /** 448 * Create an EnumSet from an array of EventTypes, and set the associated 449 * eventTypes field. 450 * 451 * @param eventTypes The array of handled events 452 */ 453 private void initEventTypes(IoEventType... eventTypes) { 454 if ((eventTypes == null) || (eventTypes.length == 0)) { 455 eventTypes = DEFAULT_EVENT_SET; 456 } 457 458 // Copy the list of handled events in the event set 459 this.eventTypes = EnumSet.of(eventTypes[0], eventTypes); 460 461 // Check that we don't have the SESSION_CREATED event in the set 462 if (this.eventTypes.contains(IoEventType.SESSION_CREATED)) { 463 this.eventTypes = null; 464 throw new IllegalArgumentException(IoEventType.SESSION_CREATED + " is not allowed."); 465 } 466 } 467 468 /** 469 * Creates a new instance of ExecutorFilter. This private constructor is called by all 470 * the public constructor. 471 * 472 * @param executor The underlying {@link Executor} in charge of managing the Thread pool. 473 * @param manageableExecutor Tells if the Executor's Life Cycle can be managed or not 474 * @param eventTypes The lit of event which are handled by the executor 475 */ 476 private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) { 477 if (executor == null) { 478 throw new IllegalArgumentException("executor"); 479 } 480 481 initEventTypes(eventTypes); 482 this.executor = executor; 483 this.manageableExecutor = manageableExecutor; 484 } 485 486 /** 487 * Shuts down the underlying executor if this filter hase been created via 488 * a convenience constructor. 489 */ 490 @Override 491 public void destroy() { 492 if (manageableExecutor) { 493 ((ExecutorService) executor).shutdown(); 494 } 495 } 496 497 /** 498 * @return the underlying {@link Executor} instance this filter uses. 499 */ 500 public final Executor getExecutor() { 501 return executor; 502 } 503 504 /** 505 * Fires the specified event through the underlying executor. 506 * 507 * @param event The filtered event 508 */ 509 protected void fireEvent(IoFilterEvent event) { 510 executor.execute(event); 511 } 512 513 /** 514 * {@inheritDoc} 515 */ 516 @Override 517 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { 518 if (parent.contains(this)) { 519 throw new IllegalArgumentException( 520 "You can't add the same filter instance more than once. Create another instance and add it."); 521 } 522 } 523 524 /** 525 * {@inheritDoc} 526 */ 527 @Override 528 public final void sessionOpened(NextFilter nextFilter, IoSession session) { 529 if (eventTypes.contains(IoEventType.SESSION_OPENED)) { 530 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED, session, null); 531 fireEvent(event); 532 } else { 533 nextFilter.sessionOpened(session); 534 } 535 } 536 537 /** 538 * {@inheritDoc} 539 */ 540 @Override 541 public final void sessionClosed(NextFilter nextFilter, IoSession session) { 542 if (eventTypes.contains(IoEventType.SESSION_CLOSED)) { 543 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED, session, null); 544 fireEvent(event); 545 } else { 546 nextFilter.sessionClosed(session); 547 } 548 } 549 550 /** 551 * {@inheritDoc} 552 */ 553 @Override 554 public final void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) { 555 if (eventTypes.contains(IoEventType.SESSION_IDLE)) { 556 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE, session, status); 557 fireEvent(event); 558 } else { 559 nextFilter.sessionIdle(session, status); 560 } 561 } 562 563 /** 564 * {@inheritDoc} 565 */ 566 @Override 567 public final void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) { 568 if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) { 569 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.EXCEPTION_CAUGHT, session, cause); 570 fireEvent(event); 571 } else { 572 nextFilter.exceptionCaught(session, cause); 573 } 574 } 575 576 /** 577 * {@inheritDoc} 578 */ 579 @Override 580 public final void messageReceived(NextFilter nextFilter, IoSession session, Object message) { 581 if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) { 582 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message); 583 fireEvent(event); 584 } else { 585 nextFilter.messageReceived(session, message); 586 } 587 } 588 589 /** 590 * {@inheritDoc} 591 */ 592 @Override 593 public final void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) { 594 if (eventTypes.contains(IoEventType.MESSAGE_SENT)) { 595 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT, session, writeRequest); 596 fireEvent(event); 597 } else { 598 nextFilter.messageSent(session, writeRequest); 599 } 600 } 601 602 /** 603 * {@inheritDoc} 604 */ 605 @Override 606 public final void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) { 607 if (eventTypes.contains(IoEventType.WRITE)) { 608 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeRequest); 609 fireEvent(event); 610 } else { 611 nextFilter.filterWrite(session, writeRequest); 612 } 613 } 614 615 /** 616 * {@inheritDoc} 617 */ 618 @Override 619 public final void filterClose(NextFilter nextFilter, IoSession session) throws Exception { 620 if (eventTypes.contains(IoEventType.CLOSE)) { 621 IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session, null); 622 fireEvent(event); 623 } else { 624 nextFilter.filterClose(session); 625 } 626 } 627}