View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.EnumSet;
26  import java.util.concurrent.Executor;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.mina.core.filterchain.IoFilterAdapter;
33  import org.apache.mina.core.filterchain.IoFilterChain;
34  import org.apache.mina.core.filterchain.IoFilterEvent;
35  import org.apache.mina.core.session.IdleStatus;
36  import org.apache.mina.core.session.IoEventType;
37  import org.apache.mina.core.session.IoSession;
38  import org.apache.mina.core.write.WriteRequest;
39  
40  /**
41   * A filter that forwards I/O events to {@link Executor} to enforce a certain
42   * thread model while allowing the events per session to be processed
43   * simultaneously. You can apply various thread model by inserting this filter
44   * to a {@link IoFilterChain}.
45   * 
46   * <h2>Life Cycle Management</h2>
47   * 
48   * Please note that this filter doesn't manage the life cycle of the {@link Executor}.
49   * If you created this filter using {@link #ExecutorFilter(Executor)} or similar
50   * constructor that accepts an {@link Executor} that you've instantiated, you have
51   * full control and responsibility of managing its life cycle (e.g. calling
52   * {@link ExecutorService#shutdown()}.
53   * <p> 
54   * If you created this filter using convenience constructors like
55   * {@link #ExecutorFilter(int)}, then you can shut down the executor by calling
56   * {@link #destroy()} explicitly.
57   * 
58   * <h2>Event Ordering</h2>
59   * 
60   * All convenience constructors of this filter creates a new
61   * {@link OrderedThreadPoolExecutor} instance.  Therefore, the order of event is
62   * maintained like the following:
63   * <ul>
64   * <li>All event handler methods are called exclusively.
65   *     (e.g. messageReceived and messageSent can't be invoked at the same time.)</li>
66   * <li>The event order is never mixed up.
67   *     (e.g. messageReceived is always invoked before sessionClosed or messageSent.)</li>
68   * </ul>
69   * However, if you specified other {@link Executor} instance in the constructor,
70   * the order of events are not maintained at all.  This means more than one event
71   * handler methods can be invoked at the same time with mixed order.  For example,
72   * let's assume that messageReceived, messageSent, and sessionClosed events are
73   * fired.
74   * <ul>
75   * <li>All event handler methods can be called simultaneously.
76   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
77   * <li>The event order can be mixed up.
78   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
79   *           is invoked.)</li>
80   * </ul>
81   * If you need to maintain the order of events per session, please specify an
82   * {@link OrderedThreadPoolExecutor} instance or use the convenience constructors.
83   * 
84   * <h2>Selective Filtering</h2>
85   * 
86   * By default, all event types but <tt>sessionCreated</tt>, <tt>filterWrite</tt>,
87   * <tt>filterClose</tt> and <tt>filterSetTrafficMask</tt> are submitted to the
88   * underlying executor, which is most common setting.
89   * <p>
90   * If you want to submit only a certain set of event types, you can specify them
91   * in the constructor.  For example, you could configure a thread pool for
92   * write operation for the maximum performance:
93   * <pre><code>
94   * IoService service = ...;
95   * DefaultIoFilterChainBuilder chain = service.getFilterChain();
96   * 
97   * chain.addLast("codec", new ProtocolCodecFilter(...));
98   * // Use one thread pool for most events.
99   * chain.addLast("executor1", new ExecutorFilter());
100  * // and another dedicated thread pool for 'filterWrite' events.
101  * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
102  * </code></pre>
103  * 
104  * <h2>Preventing {@link OutOfMemoryError}</h2>
105  * 
106  * Please refer to {@link IoEventQueueThrottle}, which is specified as
107  * a parameter of the convenience constructors.
108  * 
109  * @author The Apache MINA Project (dev@mina.apache.org)
110  * @version $Rev: 671827 $, $Date: 2008-06-26 10:49:48 +0200 (jeu, 26 jun 2008) $
111  * 
112  * @see OrderedThreadPoolExecutor
113  * @see UnorderedThreadPoolExecutor
114  */
115 public class ExecutorFilter extends IoFilterAdapter {
116 
117     private final EnumSet<IoEventType> eventTypes;
118     private final Executor executor;
119     private final boolean createdExecutor;
120 
121     /**
122      * (Convenience constructor) Creates a new instance with a new
123      * {@link OrderedThreadPoolExecutor}.
124      */
125     public ExecutorFilter() {
126         this(16, (IoEventType[]) null);
127     }
128     
129     /**
130      * (Convenience constructor) Creates a new instance with a new
131      * {@link OrderedThreadPoolExecutor}.
132      */
133     public ExecutorFilter(int maximumPoolSize) {
134         this(0, maximumPoolSize, (IoEventType[]) null);
135     }
136     
137     /**
138      * (Convenience constructor) Creates a new instance with a new
139      * {@link OrderedThreadPoolExecutor}.
140      */
141     public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
142         this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS, (IoEventType[]) null);
143     }
144     
145     /**
146      * (Convenience constructor) Creates a new instance with a new
147      * {@link OrderedThreadPoolExecutor}.
148      */
149     public ExecutorFilter(
150             int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
151         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, (IoEventType[]) null);
152     }
153 
154     /**
155      * (Convenience constructor) Creates a new instance with a new
156      * {@link OrderedThreadPoolExecutor}.
157      */
158     public ExecutorFilter(
159             int corePoolSize, int maximumPoolSize, 
160             long keepAliveTime, TimeUnit unit,
161             IoEventQueueHandler queueHandler) {
162         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler, (IoEventType[]) null);
163     }
164 
165     /**
166      * (Convenience constructor) Creates a new instance with a new
167      * {@link OrderedThreadPoolExecutor}.
168      */
169     public ExecutorFilter(
170             int corePoolSize, int maximumPoolSize, 
171             long keepAliveTime, TimeUnit unit,
172             ThreadFactory threadFactory) {
173         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, (IoEventType[]) null);
174     }
175 
176     /**
177      * (Convenience constructor) Creates a new instance with a new
178      * {@link OrderedThreadPoolExecutor}.
179      */
180     public ExecutorFilter(
181             int corePoolSize, int maximumPoolSize, 
182             long keepAliveTime, TimeUnit unit,
183             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
184         this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler), true, (IoEventType[]) null);
185     }
186 
187     /**
188      * (Convenience constructor) Creates a new instance with a new
189      * {@link OrderedThreadPoolExecutor}.
190      */
191     public ExecutorFilter(IoEventType... eventTypes) {
192         this(16, eventTypes);
193     }
194     
195     /**
196      * (Convenience constructor) Creates a new instance with a new
197      * {@link OrderedThreadPoolExecutor}.
198      */
199     public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
200         this(0, maximumPoolSize, eventTypes);
201     }
202     
203     /**
204      * (Convenience constructor) Creates a new instance with a new
205      * {@link OrderedThreadPoolExecutor}.
206      */
207     public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
208         this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS, eventTypes);
209     }
210     
211     /**
212      * (Convenience constructor) Creates a new instance with a new
213      * {@link OrderedThreadPoolExecutor}.
214      */
215     public ExecutorFilter(
216             int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, IoEventType... eventTypes) {
217         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventTypes);
218     }
219     
220     /**
221      * (Convenience constructor) Creates a new instance with a new
222      * {@link OrderedThreadPoolExecutor}.
223      */
224     public ExecutorFilter(
225             int corePoolSize, int maximumPoolSize, 
226             long keepAliveTime, TimeUnit unit,
227             IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
228         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler, eventTypes);
229     }
230 
231     /**
232      * (Convenience constructor) Creates a new instance with a new
233      * {@link OrderedThreadPoolExecutor}.
234      */
235     public ExecutorFilter(
236             int corePoolSize, int maximumPoolSize, 
237             long keepAliveTime, TimeUnit unit,
238             ThreadFactory threadFactory, IoEventType... eventTypes) {
239         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, eventTypes);
240     }
241 
242     /**
243      * (Convenience constructor) Creates a new instance with a new
244      * {@link OrderedThreadPoolExecutor}.
245      */
246     public ExecutorFilter(
247             int corePoolSize, int maximumPoolSize, 
248             long keepAliveTime, TimeUnit unit,
249             ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
250         this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler), true, eventTypes);
251     }
252     
253     /**
254      * Creates a new instance with the specified {@link Executor}.
255      */
256     public ExecutorFilter(Executor executor) {
257         this(executor, false, (IoEventType[]) null);
258     }
259 
260     /**
261      * Creates a new instance with the specified {@link Executor}.
262      */
263     public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
264         this(executor, false, eventTypes);
265     }
266 
267     private ExecutorFilter(Executor executor, boolean createdExecutor, IoEventType... eventTypes) {
268         if (executor == null) {
269             throw new NullPointerException("executor");
270         }
271         if (eventTypes == null || eventTypes.length == 0) {
272             eventTypes = new IoEventType[] { IoEventType.EXCEPTION_CAUGHT,
273                     IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT,
274                     IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE,
275                     IoEventType.SESSION_OPENED, };
276         }
277 
278         for (IoEventType t : eventTypes) {
279             if (t == IoEventType.SESSION_CREATED) {
280                 throw new IllegalArgumentException(IoEventType.SESSION_CREATED
281                         + " is not allowed.");
282             }
283         }
284 
285         this.executor = executor;
286         this.createdExecutor = createdExecutor;
287 
288         Collection<IoEventType> eventTypeCollection = new ArrayList<IoEventType>(
289                 eventTypes.length);
290         Collections.addAll(eventTypeCollection, eventTypes);
291         this.eventTypes = EnumSet.copyOf(eventTypeCollection);
292     }
293     
294     /**
295      * Shuts down the underlying executor if this filter is creates via
296      * a convenience constructor.
297      */
298     @Override
299     public void destroy() {
300         if (createdExecutor) {
301             ((ExecutorService) executor).shutdown();
302         }
303     }
304 
305     /**
306      * Returns the underlying {@link Executor} instance this filter uses.
307      */
308     public final Executor getExecutor() {
309         return executor;
310     }
311 
312     /**
313      * Fires the specified event through the underlying executor.
314      */
315     protected void fireEvent(IoFilterEvent event) {
316         getExecutor().execute(event);
317     }
318 
319     @Override
320     public void onPreAdd(IoFilterChain parent, String name,
321             NextFilter nextFilter) throws Exception {
322         if (parent.contains(this)) {
323             throw new IllegalArgumentException(
324                     "You can't add the same filter instance more than once.  Create another instance and add it.");
325         }
326     }
327 
328     @Override
329     public final void sessionCreated(NextFilter nextFilter, IoSession session) {
330         nextFilter.sessionCreated(session);
331     }
332 
333     @Override
334     public final void sessionOpened(NextFilter nextFilter, IoSession session) {
335         if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
336             fireEvent(new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED,
337                     session, null));
338         } else {
339             nextFilter.sessionOpened(session);
340         }
341     }
342 
343     @Override
344     public final void sessionClosed(NextFilter nextFilter, IoSession session) {
345         if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
346             fireEvent(new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED,
347                     session, null));
348         } else {
349             nextFilter.sessionClosed(session);
350         }
351     }
352 
353     @Override
354     public final void sessionIdle(NextFilter nextFilter, IoSession session,
355             IdleStatus status) {
356         if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
357             fireEvent(new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE,
358                     session, status));
359         } else {
360             nextFilter.sessionIdle(session, status);
361         }
362     }
363 
364     @Override
365     public final void exceptionCaught(NextFilter nextFilter, IoSession session,
366             Throwable cause) {
367         if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
368             fireEvent(new IoFilterEvent(nextFilter,
369                     IoEventType.EXCEPTION_CAUGHT, session, cause));
370         } else {
371             nextFilter.exceptionCaught(session, cause);
372         }
373     }
374 
375     @Override
376     public final void messageReceived(NextFilter nextFilter, IoSession session,
377             Object message) {
378         if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
379             fireEvent(new IoFilterEvent(nextFilter,
380                     IoEventType.MESSAGE_RECEIVED, session, message));
381         } else {
382             nextFilter.messageReceived(session, message);
383         }
384     }
385 
386     @Override
387     public final void messageSent(NextFilter nextFilter, IoSession session,
388             WriteRequest writeRequest) {
389         if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
390             fireEvent(new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT,
391                     session, writeRequest));
392         } else {
393             nextFilter.messageSent(session, writeRequest);
394         }
395     }
396 
397     @Override
398     public final void filterWrite(NextFilter nextFilter, IoSession session,
399             WriteRequest writeRequest) {
400         if (eventTypes.contains(IoEventType.WRITE)) {
401             fireEvent(new IoFilterEvent(nextFilter, IoEventType.WRITE, session,
402                     writeRequest));
403         } else {
404             nextFilter.filterWrite(session, writeRequest);
405         }
406     }
407 
408     @Override
409     public final void filterClose(NextFilter nextFilter, IoSession session)
410             throws Exception {
411         if (eventTypes.contains(IoEventType.CLOSE)) {
412             fireEvent(new IoFilterEvent(nextFilter, IoEventType.CLOSE, session,
413                     null));
414         } else {
415             nextFilter.filterClose(session);
416         }
417     }
418 }