1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.EnumSet;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.TimeUnit;
31
32 import org.apache.mina.core.filterchain.IoFilterAdapter;
33 import org.apache.mina.core.filterchain.IoFilterChain;
34 import org.apache.mina.core.filterchain.IoFilterEvent;
35 import org.apache.mina.core.session.IdleStatus;
36 import org.apache.mina.core.session.IoEventType;
37 import org.apache.mina.core.session.IoSession;
38 import org.apache.mina.core.write.WriteRequest;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 public class ExecutorFilter extends IoFilterAdapter {
116
117 private final EnumSet<IoEventType> eventTypes;
118 private final Executor executor;
119 private final boolean createdExecutor;
120
121
122
123
124
125 public ExecutorFilter() {
126 this(16, (IoEventType[]) null);
127 }
128
129
130
131
132
133 public ExecutorFilter(int maximumPoolSize) {
134 this(0, maximumPoolSize, (IoEventType[]) null);
135 }
136
137
138
139
140
141 public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
142 this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS, (IoEventType[]) null);
143 }
144
145
146
147
148
149 public ExecutorFilter(
150 int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
151 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, (IoEventType[]) null);
152 }
153
154
155
156
157
158 public ExecutorFilter(
159 int corePoolSize, int maximumPoolSize,
160 long keepAliveTime, TimeUnit unit,
161 IoEventQueueHandler queueHandler) {
162 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler, (IoEventType[]) null);
163 }
164
165
166
167
168
169 public ExecutorFilter(
170 int corePoolSize, int maximumPoolSize,
171 long keepAliveTime, TimeUnit unit,
172 ThreadFactory threadFactory) {
173 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, (IoEventType[]) null);
174 }
175
176
177
178
179
180 public ExecutorFilter(
181 int corePoolSize, int maximumPoolSize,
182 long keepAliveTime, TimeUnit unit,
183 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
184 this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler), true, (IoEventType[]) null);
185 }
186
187
188
189
190
191 public ExecutorFilter(IoEventType... eventTypes) {
192 this(16, eventTypes);
193 }
194
195
196
197
198
199 public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
200 this(0, maximumPoolSize, eventTypes);
201 }
202
203
204
205
206
207 public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
208 this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS, eventTypes);
209 }
210
211
212
213
214
215 public ExecutorFilter(
216 int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, IoEventType... eventTypes) {
217 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventTypes);
218 }
219
220
221
222
223
224 public ExecutorFilter(
225 int corePoolSize, int maximumPoolSize,
226 long keepAliveTime, TimeUnit unit,
227 IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
228 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler, eventTypes);
229 }
230
231
232
233
234
235 public ExecutorFilter(
236 int corePoolSize, int maximumPoolSize,
237 long keepAliveTime, TimeUnit unit,
238 ThreadFactory threadFactory, IoEventType... eventTypes) {
239 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, eventTypes);
240 }
241
242
243
244
245
246 public ExecutorFilter(
247 int corePoolSize, int maximumPoolSize,
248 long keepAliveTime, TimeUnit unit,
249 ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
250 this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler), true, eventTypes);
251 }
252
253
254
255
256 public ExecutorFilter(Executor executor) {
257 this(executor, false, (IoEventType[]) null);
258 }
259
260
261
262
263 public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
264 this(executor, false, eventTypes);
265 }
266
267 private ExecutorFilter(Executor executor, boolean createdExecutor, IoEventType... eventTypes) {
268 if (executor == null) {
269 throw new NullPointerException("executor");
270 }
271 if (eventTypes == null || eventTypes.length == 0) {
272 eventTypes = new IoEventType[] { IoEventType.EXCEPTION_CAUGHT,
273 IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT,
274 IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE,
275 IoEventType.SESSION_OPENED, };
276 }
277
278 for (IoEventType t : eventTypes) {
279 if (t == IoEventType.SESSION_CREATED) {
280 throw new IllegalArgumentException(IoEventType.SESSION_CREATED
281 + " is not allowed.");
282 }
283 }
284
285 this.executor = executor;
286 this.createdExecutor = createdExecutor;
287
288 Collection<IoEventType> eventTypeCollection = new ArrayList<IoEventType>(
289 eventTypes.length);
290 Collections.addAll(eventTypeCollection, eventTypes);
291 this.eventTypes = EnumSet.copyOf(eventTypeCollection);
292 }
293
294
295
296
297
298 @Override
299 public void destroy() {
300 if (createdExecutor) {
301 ((ExecutorService) executor).shutdown();
302 }
303 }
304
305
306
307
308 public final Executor getExecutor() {
309 return executor;
310 }
311
312
313
314
315 protected void fireEvent(IoFilterEvent event) {
316 getExecutor().execute(event);
317 }
318
319 @Override
320 public void onPreAdd(IoFilterChain parent, String name,
321 NextFilter nextFilter) throws Exception {
322 if (parent.contains(this)) {
323 throw new IllegalArgumentException(
324 "You can't add the same filter instance more than once. Create another instance and add it.");
325 }
326 }
327
328 @Override
329 public final void sessionCreated(NextFilter nextFilter, IoSession session) {
330 nextFilter.sessionCreated(session);
331 }
332
333 @Override
334 public final void sessionOpened(NextFilter nextFilter, IoSession session) {
335 if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
336 fireEvent(new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED,
337 session, null));
338 } else {
339 nextFilter.sessionOpened(session);
340 }
341 }
342
343 @Override
344 public final void sessionClosed(NextFilter nextFilter, IoSession session) {
345 if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
346 fireEvent(new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED,
347 session, null));
348 } else {
349 nextFilter.sessionClosed(session);
350 }
351 }
352
353 @Override
354 public final void sessionIdle(NextFilter nextFilter, IoSession session,
355 IdleStatus status) {
356 if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
357 fireEvent(new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE,
358 session, status));
359 } else {
360 nextFilter.sessionIdle(session, status);
361 }
362 }
363
364 @Override
365 public final void exceptionCaught(NextFilter nextFilter, IoSession session,
366 Throwable cause) {
367 if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
368 fireEvent(new IoFilterEvent(nextFilter,
369 IoEventType.EXCEPTION_CAUGHT, session, cause));
370 } else {
371 nextFilter.exceptionCaught(session, cause);
372 }
373 }
374
375 @Override
376 public final void messageReceived(NextFilter nextFilter, IoSession session,
377 Object message) {
378 if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
379 fireEvent(new IoFilterEvent(nextFilter,
380 IoEventType.MESSAGE_RECEIVED, session, message));
381 } else {
382 nextFilter.messageReceived(session, message);
383 }
384 }
385
386 @Override
387 public final void messageSent(NextFilter nextFilter, IoSession session,
388 WriteRequest writeRequest) {
389 if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
390 fireEvent(new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT,
391 session, writeRequest));
392 } else {
393 nextFilter.messageSent(session, writeRequest);
394 }
395 }
396
397 @Override
398 public final void filterWrite(NextFilter nextFilter, IoSession session,
399 WriteRequest writeRequest) {
400 if (eventTypes.contains(IoEventType.WRITE)) {
401 fireEvent(new IoFilterEvent(nextFilter, IoEventType.WRITE, session,
402 writeRequest));
403 } else {
404 nextFilter.filterWrite(session, writeRequest);
405 }
406 }
407
408 @Override
409 public final void filterClose(NextFilter nextFilter, IoSession session)
410 throws Exception {
411 if (eventTypes.contains(IoEventType.CLOSE)) {
412 fireEvent(new IoFilterEvent(nextFilter, IoEventType.CLOSE, session,
413 null));
414 } else {
415 nextFilter.filterClose(session);
416 }
417 }
418 }