1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Queue;
26 import java.util.Set;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.RejectedExecutionHandler;
31 import java.util.concurrent.SynchronousQueue;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
36
37 import org.apache.mina.core.session.AttributeKey;
38 import org.apache.mina.core.session.DummySession;
39 import org.apache.mina.core.session.IoEvent;
40 import org.apache.mina.core.session.IoSession;
41 import org.apache.mina.util.CircularQueue;
42
43
44
45
46
47
48
49
50
51
52 public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
53
54 private static final IoSession EXIT_SIGNAL = new DummySession();
55
56 private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
57 private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
58
59 private final Set<Worker> workers = new HashSet<Worker>();
60
61 private volatile int corePoolSize;
62 private volatile int maximumPoolSize;
63 private volatile int largestPoolSize;
64 private final AtomicInteger idleWorkers = new AtomicInteger();
65
66 private long completedTaskCount;
67 private volatile boolean shutdown;
68
69 private final IoEventQueueHandler queueHandler;
70
71 public OrderedThreadPoolExecutor() {
72 this(16);
73 }
74
75 public OrderedThreadPoolExecutor(int maximumPoolSize) {
76 this(0, maximumPoolSize);
77 }
78
79 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
80 this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
81 }
82
83 public OrderedThreadPoolExecutor(
84 int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
85 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
86 }
87
88 public OrderedThreadPoolExecutor(
89 int corePoolSize, int maximumPoolSize,
90 long keepAliveTime, TimeUnit unit,
91 IoEventQueueHandler queueHandler) {
92 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
93 }
94
95 public OrderedThreadPoolExecutor(
96 int corePoolSize, int maximumPoolSize,
97 long keepAliveTime, TimeUnit unit,
98 ThreadFactory threadFactory) {
99 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
100 }
101
102 public OrderedThreadPoolExecutor(
103 int corePoolSize, int maximumPoolSize,
104 long keepAliveTime, TimeUnit unit,
105 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
106 super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
107 if (corePoolSize < 0) {
108 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
109 }
110
111 if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
112 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
113 }
114
115 if (queueHandler == null) {
116 queueHandler = IoEventQueueHandler.NOOP;
117 }
118
119 this.corePoolSize = corePoolSize;
120 this.maximumPoolSize = maximumPoolSize;
121 this.queueHandler = queueHandler;
122 }
123
124 public IoEventQueueHandler getQueueHandler() {
125 return queueHandler;
126 }
127
128 @Override
129 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
130
131 }
132
133 private void addWorker() {
134 synchronized (workers) {
135 if (workers.size() >= maximumPoolSize) {
136 return;
137 }
138
139 Worker worker = new Worker();
140 Thread thread = getThreadFactory().newThread(worker);
141 idleWorkers.incrementAndGet();
142 thread.start();
143 workers.add(worker);
144
145 if (workers.size() > largestPoolSize) {
146 largestPoolSize = workers.size();
147 }
148 }
149 }
150
151 private void addWorkerIfNecessary() {
152 if (idleWorkers.get() == 0) {
153 synchronized (workers) {
154 if (workers.isEmpty() || idleWorkers.get() == 0) {
155 addWorker();
156 }
157 }
158 }
159 }
160
161 private void removeWorker() {
162 synchronized (workers) {
163 if (workers.size() <= corePoolSize) {
164 return;
165 }
166 waitingSessions.offer(EXIT_SIGNAL);
167 }
168 }
169
170 @Override
171 public int getMaximumPoolSize() {
172 return maximumPoolSize;
173 }
174
175 @Override
176 public void setMaximumPoolSize(int maximumPoolSize) {
177 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
178 throw new IllegalArgumentException("maximumPoolSize: "
179 + maximumPoolSize);
180 }
181
182 synchronized (workers) {
183 this.maximumPoolSize = maximumPoolSize;
184 int difference = workers.size() - maximumPoolSize;
185 while (difference > 0) {
186 removeWorker();
187 --difference;
188 }
189 }
190 }
191
192 @Override
193 public boolean awaitTermination(long timeout, TimeUnit unit)
194 throws InterruptedException {
195
196 long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
197
198 synchronized (workers) {
199 while (!isTerminated()) {
200 long waitTime = deadline - System.currentTimeMillis();
201 if (waitTime <= 0) {
202 break;
203 }
204
205 workers.wait(waitTime);
206 }
207 }
208 return isTerminated();
209 }
210
211 @Override
212 public boolean isShutdown() {
213 return shutdown;
214 }
215
216 @Override
217 public boolean isTerminated() {
218 if (!shutdown) {
219 return false;
220 }
221
222 synchronized (workers) {
223 return workers.isEmpty();
224 }
225 }
226
227 @Override
228 public void shutdown() {
229 if (shutdown) {
230 return;
231 }
232
233 shutdown = true;
234
235 synchronized (workers) {
236 for (int i = workers.size(); i > 0; i --) {
237 waitingSessions.offer(EXIT_SIGNAL);
238 }
239 }
240 }
241
242 @Override
243 public List<Runnable> shutdownNow() {
244 shutdown();
245
246 List<Runnable> answer = new ArrayList<Runnable>();
247 IoSession session;
248 while ((session = waitingSessions.poll()) != null) {
249 if (session == EXIT_SIGNAL) {
250 waitingSessions.offer(EXIT_SIGNAL);
251 Thread.yield();
252 continue;
253 }
254
255 SessionBuffer buf = (SessionBuffer) session.getAttribute(BUFFER);
256 synchronized (buf.queue) {
257 for (Runnable task: buf.queue) {
258 getQueueHandler().polled(this, (IoEvent) task);
259 answer.add(task);
260 }
261 buf.queue.clear();
262 }
263 }
264
265 return answer;
266 }
267
268 @Override
269 public void execute(Runnable task) {
270 if (shutdown) {
271 rejectTask(task);
272 }
273
274 checkTaskType(task);
275
276 IoEvent e = (IoEvent) task;
277 IoSession s = e.getSession();
278 SessionBuffer buf = getSessionBuffer(s);
279 Queue<Runnable> queue = buf.queue;
280 boolean offerSession;
281 boolean offerEvent = queueHandler.accept(this, e);
282 if (offerEvent) {
283 synchronized (queue) {
284 queue.offer(e);
285 if (buf.processingCompleted) {
286 buf.processingCompleted = false;
287 offerSession = true;
288 } else {
289 offerSession = false;
290 }
291 }
292 } else {
293 offerSession = false;
294 }
295
296 if (offerSession) {
297 waitingSessions.offer(s);
298 }
299
300 addWorkerIfNecessary();
301
302 if (offerEvent) {
303 queueHandler.offered(this, e);
304 }
305 }
306
307 private void rejectTask(Runnable task) {
308 getRejectedExecutionHandler().rejectedExecution(task, this);
309 }
310
311 private void checkTaskType(Runnable task) {
312 if (!(task instanceof IoEvent)) {
313 throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
314 }
315 }
316
317 @Override
318 public int getActiveCount() {
319 synchronized (workers) {
320 return workers.size() - idleWorkers.get();
321 }
322 }
323
324 @Override
325 public long getCompletedTaskCount() {
326 synchronized (workers) {
327 long answer = completedTaskCount;
328 for (Worker w: workers) {
329 answer += w.completedTaskCount;
330 }
331
332 return answer;
333 }
334 }
335
336 @Override
337 public int getLargestPoolSize() {
338 return largestPoolSize;
339 }
340
341 @Override
342 public int getPoolSize() {
343 synchronized (workers) {
344 return workers.size();
345 }
346 }
347
348 @Override
349 public long getTaskCount() {
350 return getCompletedTaskCount();
351 }
352
353 @Override
354 public boolean isTerminating() {
355 synchronized (workers) {
356 return isShutdown() && !isTerminated();
357 }
358 }
359
360 @Override
361 public int prestartAllCoreThreads() {
362 int answer = 0;
363 synchronized (workers) {
364 for (int i = corePoolSize - workers.size() ; i > 0; i --) {
365 addWorker();
366 answer ++;
367 }
368 }
369 return answer;
370 }
371
372 @Override
373 public boolean prestartCoreThread() {
374 synchronized (workers) {
375 if (workers.size() < corePoolSize) {
376 addWorker();
377 return true;
378 } else {
379 return false;
380 }
381 }
382 }
383
384 @Override
385 public BlockingQueue<Runnable> getQueue() {
386 throw new UnsupportedOperationException();
387 }
388
389 @Override
390 public void purge() {
391
392 }
393
394 @Override
395 public boolean remove(Runnable task) {
396 checkTaskType(task);
397 IoEvent e = (IoEvent) task;
398 IoSession s = e.getSession();
399 SessionBuffer buffer = (SessionBuffer) s.getAttribute(BUFFER);
400 if (buffer == null) {
401 return false;
402 }
403
404 boolean removed;
405 synchronized (buffer.queue) {
406 removed = buffer.queue.remove(task);
407 }
408
409 if (removed) {
410 getQueueHandler().polled(this, e);
411 }
412
413 return removed;
414 }
415
416 @Override
417 public int getCorePoolSize() {
418 return corePoolSize;
419 }
420
421 @Override
422 public void setCorePoolSize(int corePoolSize) {
423 if (corePoolSize < 0) {
424 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
425 }
426 if (corePoolSize > maximumPoolSize) {
427 throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
428 }
429
430 synchronized (workers) {
431 if (this.corePoolSize > corePoolSize) {
432 for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
433 removeWorker();
434 }
435 }
436 this.corePoolSize = corePoolSize;
437 }
438 }
439
440 private SessionBuffer getSessionBuffer(IoSession session) {
441 SessionBuffer buffer = (SessionBuffer) session.getAttribute(BUFFER);
442 if (buffer == null) {
443 buffer = new SessionBuffer();
444 SessionBuffer oldBuffer = (SessionBuffer) session.setAttributeIfAbsent(BUFFER, buffer);
445 if (oldBuffer != null) {
446 buffer = oldBuffer;
447 }
448 }
449 return buffer;
450 }
451
452 private static class SessionBuffer {
453 private final Queue<Runnable> queue = new CircularQueue<Runnable>();
454 private boolean processingCompleted = true;
455 }
456
457 private class Worker implements Runnable {
458
459 private volatile long completedTaskCount;
460 private Thread thread;
461
462 public void run() {
463 thread = Thread.currentThread();
464
465 try {
466 for (;;) {
467 IoSession session = fetchSession();
468
469 idleWorkers.decrementAndGet();
470
471 if (session == null) {
472 synchronized (workers) {
473 if (workers.size() > corePoolSize) {
474
475 workers.remove(this);
476 break;
477 }
478 }
479 }
480
481 if (session == EXIT_SIGNAL) {
482 break;
483 }
484
485 try {
486 if (session != null) {
487 runTasks(getSessionBuffer(session));
488 }
489 } finally {
490 idleWorkers.incrementAndGet();
491 }
492 }
493 } finally {
494 synchronized (workers) {
495 workers.remove(this);
496 OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
497 workers.notifyAll();
498 }
499 }
500 }
501
502 private IoSession fetchSession() {
503 IoSession session = null;
504 long currentTime = System.currentTimeMillis();
505 long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
506 for (;;) {
507 try {
508 long waitTime = deadline - currentTime;
509 if (waitTime <= 0) {
510 break;
511 }
512
513 try {
514 session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
515 break;
516 } finally {
517 if (session == null) {
518 currentTime = System.currentTimeMillis();
519 }
520 }
521 } catch (InterruptedException e) {
522
523 continue;
524 }
525 }
526 return session;
527 }
528
529 private void runTasks(SessionBuffer buf) {
530 for (;;) {
531 Runnable task;
532 synchronized (buf.queue) {
533 task = buf.queue.poll();
534
535 if (task == null) {
536 buf.processingCompleted = true;
537 break;
538 }
539 }
540
541 queueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
542
543 runTask(task);
544 }
545 }
546
547 private void runTask(Runnable task) {
548 beforeExecute(thread, task);
549 boolean ran = false;
550 try {
551 task.run();
552 ran = true;
553 afterExecute(task, null);
554 completedTaskCount ++;
555 } catch (RuntimeException e) {
556 if (!ran) {
557 afterExecute(task, e);
558 }
559 throw e;
560 }
561 }
562 }
563 }