1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Queue;
26 import java.util.Set;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.RejectedExecutionHandler;
32 import java.util.concurrent.SynchronousQueue;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.apache.mina.core.session.AttributeKey;
39 import org.apache.mina.core.session.DummySession;
40 import org.apache.mina.core.session.IoEvent;
41 import org.apache.mina.core.session.IoSession;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45
46
47
48
49
50
51
52
53
54 public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
55
56 static Logger LOGGER = LoggerFactory.getLogger(OrderedThreadPoolExecutor.class);
57
58
59 private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
60
61
62 private static final int DEFAULT_MAX_THREAD_POOL = 16;
63
64
65 private static final int DEFAULT_KEEP_ALIVE = 30;
66
67 private static final IoSession EXIT_SIGNAL = new DummySession();
68
69
70 private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
71
72
73 private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
74
75 private final Set<Worker> workers = new HashSet<Worker>();
76
77 private volatile int largestPoolSize;
78 private final AtomicInteger idleWorkers = new AtomicInteger();
79
80 private long completedTaskCount;
81 private volatile boolean shutdown;
82
83 private final IoEventQueueHandler eventQueueHandler;
84
85
86
87
88
89
90
91
92
93 public OrderedThreadPoolExecutor() {
94 this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL,
95 DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
96 }
97
98
99
100
101
102
103
104
105
106
107 public OrderedThreadPoolExecutor(int maximumPoolSize) {
108 this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
109 Executors.defaultThreadFactory(), null);
110 }
111
112
113
114
115
116
117
118
119
120
121 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
122 this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
123 Executors.defaultThreadFactory(), null);
124 }
125
126
127
128
129
130
131
132
133
134
135
136 public OrderedThreadPoolExecutor(
137 int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
138 this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
139 Executors.defaultThreadFactory(), null);
140 }
141
142
143
144
145
146
147
148
149
150
151
152 public OrderedThreadPoolExecutor(
153 int corePoolSize, int maximumPoolSize,
154 long keepAliveTime, TimeUnit unit,
155 IoEventQueueHandler eventQueueHandler) {
156 this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
157 Executors.defaultThreadFactory(), eventQueueHandler);
158 }
159
160
161
162
163
164
165
166
167
168
169
170 public OrderedThreadPoolExecutor(
171 int corePoolSize, int maximumPoolSize,
172 long keepAliveTime, TimeUnit unit,
173 ThreadFactory threadFactory) {
174 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
175 }
176
177
178
179
180
181
182
183
184
185
186
187 public OrderedThreadPoolExecutor(
188 int corePoolSize, int maximumPoolSize,
189 long keepAliveTime, TimeUnit unit,
190 ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
191
192
193
194 super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit,
195 new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
196
197 if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
198 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
199 }
200
201 if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
202 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
203 }
204
205
206 super.setCorePoolSize( corePoolSize );
207 super.setMaximumPoolSize( maximumPoolSize );
208
209
210 if (eventQueueHandler == null) {
211 this.eventQueueHandler = IoEventQueueHandler.NOOP;
212 } else {
213 this.eventQueueHandler = eventQueueHandler;
214 }
215 }
216
217
218
219
220
221 private SessionTasksQueue getSessionTasksQueue(IoSession session) {
222 SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
223
224 if (queue == null) {
225 queue = new SessionTasksQueue();
226 SessionTasksQueue oldQueue =
227 (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
228
229 if (oldQueue != null) {
230 queue = oldQueue;
231 }
232 }
233
234 return queue;
235 }
236
237
238
239
240
241 public IoEventQueueHandler getQueueHandler() {
242 return eventQueueHandler;
243 }
244
245
246
247
248 @Override
249 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
250
251 }
252
253
254
255
256
257 private void addWorker() {
258 synchronized (workers) {
259 if (workers.size() >= super.getMaximumPoolSize()) {
260 return;
261 }
262
263
264 Worker worker = new Worker( workers.size());
265 Thread thread = getThreadFactory().newThread(worker);
266
267
268 idleWorkers.incrementAndGet();
269
270
271 thread.start();
272 workers.add(worker);
273
274 if (workers.size() > largestPoolSize) {
275 largestPoolSize = workers.size();
276 }
277 }
278 }
279
280
281
282
283 private void addWorkerIfNecessary() {
284 if (idleWorkers.get() == 0) {
285 synchronized (workers) {
286 if (workers.isEmpty() || (idleWorkers.get() == 0)) {
287 addWorker();
288 }
289 }
290 }
291 }
292
293 private void removeWorker() {
294 synchronized (workers) {
295 if (workers.size() <= super.getCorePoolSize()) {
296 return;
297 }
298 waitingSessions.offer(EXIT_SIGNAL);
299 }
300 }
301
302
303
304
305 @Override
306 public int getMaximumPoolSize() {
307 return super.getMaximumPoolSize();
308 }
309
310
311
312
313 @Override
314 public void setMaximumPoolSize(int maximumPoolSize) {
315 if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
316 throw new IllegalArgumentException("maximumPoolSize: "
317 + maximumPoolSize);
318 }
319
320 synchronized (workers) {
321 super.setMaximumPoolSize( maximumPoolSize );
322 int difference = workers.size() - maximumPoolSize;
323 while (difference > 0) {
324 removeWorker();
325 --difference;
326 }
327 }
328 }
329
330
331
332
333 @Override
334 public boolean awaitTermination(long timeout, TimeUnit unit)
335 throws InterruptedException {
336
337 long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
338
339 synchronized (workers) {
340 while (!isTerminated()) {
341 long waitTime = deadline - System.currentTimeMillis();
342 if (waitTime <= 0) {
343 break;
344 }
345
346 workers.wait(waitTime);
347 }
348 }
349 return isTerminated();
350 }
351
352
353
354
355 @Override
356 public boolean isShutdown() {
357 return shutdown;
358 }
359
360
361
362
363 @Override
364 public boolean isTerminated() {
365 if (!shutdown) {
366 return false;
367 }
368
369 synchronized (workers) {
370 return workers.isEmpty();
371 }
372 }
373
374
375
376
377 @Override
378 public void shutdown() {
379 if (shutdown) {
380 return;
381 }
382
383 shutdown = true;
384
385 synchronized (workers) {
386 for (int i = workers.size(); i > 0; i --) {
387 waitingSessions.offer(EXIT_SIGNAL);
388 }
389 }
390 }
391
392
393
394
395 @Override
396 public List<Runnable> shutdownNow() {
397 shutdown();
398
399 List<Runnable> answer = new ArrayList<Runnable>();
400 IoSession session;
401
402 while ((session = waitingSessions.poll()) != null) {
403 if (session == EXIT_SIGNAL) {
404 waitingSessions.offer(EXIT_SIGNAL);
405 Thread.yield();
406 continue;
407 }
408
409 SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
410
411 synchronized (sessionTasksQueue.tasksQueue) {
412
413 for (Runnable task: sessionTasksQueue.tasksQueue) {
414 getQueueHandler().polled(this, (IoEvent) task);
415 answer.add(task);
416 }
417
418 sessionTasksQueue.tasksQueue.clear();
419 }
420 }
421
422 return answer;
423 }
424
425
426
427
428
429 private void print( Queue<Runnable> queue, IoEvent event) {
430 StringBuilder sb = new StringBuilder();
431 sb.append( "Adding event " ).append( event.getType() ).append( " to session " ).append(event.getSession().getId() );
432 boolean first = true;
433 sb.append( "\nQueue : [" );
434 for (Runnable elem:queue) {
435 if ( first ) {
436 first = false;
437 } else {
438 sb.append( ", " );
439 }
440
441 sb.append(((IoEvent)elem).getType()).append(", ");
442 }
443 sb.append( "]\n" );
444 LOGGER.debug( sb.toString() );
445 }
446
447
448
449
450 @Override
451 public void execute(Runnable task) {
452 if (shutdown) {
453 rejectTask(task);
454 }
455
456
457 checkTaskType(task);
458
459 IoEvent event = (IoEvent) task;
460
461
462 IoSession session = event.getSession();
463
464
465 SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
466 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
467
468 boolean offerSession;
469
470
471
472
473 boolean offerEvent = eventQueueHandler.accept(this, event);
474
475 if (offerEvent) {
476
477 synchronized (tasksQueue) {
478
479 tasksQueue.offer(event);
480
481 if (sessionTasksQueue.processingCompleted) {
482 sessionTasksQueue.processingCompleted = false;
483 offerSession = true;
484 } else {
485 offerSession = false;
486 }
487
488 if (LOGGER.isDebugEnabled()) {
489 print(tasksQueue, event);
490 }
491 }
492 } else {
493 offerSession = false;
494 }
495
496 if (offerSession) {
497
498
499
500 waitingSessions.offer(session);
501 }
502
503 addWorkerIfNecessary();
504
505 if (offerEvent) {
506 eventQueueHandler.offered(this, event);
507 }
508 }
509
510 private void rejectTask(Runnable task) {
511 getRejectedExecutionHandler().rejectedExecution(task, this);
512 }
513
514 private void checkTaskType(Runnable task) {
515 if (!(task instanceof IoEvent)) {
516 throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
517 }
518 }
519
520
521
522
523 @Override
524 public int getActiveCount() {
525 synchronized (workers) {
526 return workers.size() - idleWorkers.get();
527 }
528 }
529
530
531
532
533 @Override
534 public long getCompletedTaskCount() {
535 synchronized (workers) {
536 long answer = completedTaskCount;
537 for (Worker w: workers) {
538 answer += w.completedTaskCount;
539 }
540
541 return answer;
542 }
543 }
544
545
546
547
548 @Override
549 public int getLargestPoolSize() {
550 return largestPoolSize;
551 }
552
553
554
555
556 @Override
557 public int getPoolSize() {
558 synchronized (workers) {
559 return workers.size();
560 }
561 }
562
563
564
565
566 @Override
567 public long getTaskCount() {
568 return getCompletedTaskCount();
569 }
570
571
572
573
574 @Override
575 public boolean isTerminating() {
576 synchronized (workers) {
577 return isShutdown() && !isTerminated();
578 }
579 }
580
581
582
583
584 @Override
585 public int prestartAllCoreThreads() {
586 int answer = 0;
587 synchronized (workers) {
588 for (int i = super.getCorePoolSize() - workers.size() ; i > 0; i --) {
589 addWorker();
590 answer ++;
591 }
592 }
593 return answer;
594 }
595
596
597
598
599 @Override
600 public boolean prestartCoreThread() {
601 synchronized (workers) {
602 if (workers.size() < super.getCorePoolSize()) {
603 addWorker();
604 return true;
605 } else {
606 return false;
607 }
608 }
609 }
610
611
612
613
614 @Override
615 public BlockingQueue<Runnable> getQueue() {
616 throw new UnsupportedOperationException();
617 }
618
619
620
621
622 @Override
623 public void purge() {
624
625 }
626
627
628
629
630 @Override
631 public boolean remove(Runnable task) {
632 checkTaskType(task);
633 IoEvent event = (IoEvent) task;
634 IoSession session = event.getSession();
635 SessionTasksQueue sessionTasksQueue = (SessionTasksQueue)session.getAttribute( TASKS_QUEUE );
636 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
637
638 if (sessionTasksQueue == null) {
639 return false;
640 }
641
642 boolean removed;
643
644 synchronized (tasksQueue) {
645 removed = tasksQueue.remove(task);
646 }
647
648 if (removed) {
649 getQueueHandler().polled(this, event);
650 }
651
652 return removed;
653 }
654
655
656
657
658 @Override
659 public int getCorePoolSize() {
660 return super.getCorePoolSize();
661 }
662
663
664
665
666 @Override
667 public void setCorePoolSize(int corePoolSize) {
668 if (corePoolSize < 0) {
669 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
670 }
671 if (corePoolSize > super.getMaximumPoolSize()) {
672 throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
673 }
674
675 synchronized (workers) {
676 if (super.getCorePoolSize()> corePoolSize) {
677 for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i --) {
678 removeWorker();
679 }
680 }
681 super.setCorePoolSize(corePoolSize);
682 }
683 }
684
685 private Queue<Runnable> getTasksQueue(IoSession session) {
686 Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE);
687
688 if (tasksQueue == null) {
689 tasksQueue = new ConcurrentLinkedQueue<Runnable>();
690 Queue<Runnable> oldTasksQueue = (Queue<Runnable>) session.setAttributeIfAbsent(TASKS_QUEUE, tasksQueue);
691
692 if (oldTasksQueue != null) {
693 tasksQueue = oldTasksQueue;
694 }
695 }
696
697 return tasksQueue;
698 }
699
700 private class Worker implements Runnable {
701
702 private volatile long completedTaskCount;
703 private Thread thread;
704 private int id;
705
706 public Worker( int id ) {
707 this.id = id;
708 }
709
710 public void run() {
711 thread = Thread.currentThread();
712
713 try {
714 for (;;) {
715 IoSession session = fetchSession();
716
717 idleWorkers.decrementAndGet();
718
719 if (session == null) {
720 synchronized (workers) {
721 if (workers.size() > getCorePoolSize()) {
722
723 workers.remove(this);
724 break;
725 }
726 }
727 }
728
729 if (session == EXIT_SIGNAL) {
730 break;
731 }
732
733 try {
734 if (session != null) {
735 runTasks(getSessionTasksQueue(session));
736 }
737 } finally {
738 idleWorkers.incrementAndGet();
739 }
740 }
741 } finally {
742 synchronized (workers) {
743 workers.remove(this);
744 OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
745 workers.notifyAll();
746 }
747 }
748 }
749
750 private IoSession fetchSession() {
751 IoSession session = null;
752 long currentTime = System.currentTimeMillis();
753 long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
754 for (;;) {
755 try {
756 long waitTime = deadline - currentTime;
757 if (waitTime <= 0) {
758 break;
759 }
760
761 try {
762 session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
763 break;
764 } finally {
765 if (session == null) {
766 currentTime = System.currentTimeMillis();
767 }
768 }
769 } catch (InterruptedException e) {
770
771 continue;
772 }
773 }
774 return session;
775 }
776
777 private void runTasks(SessionTasksQueue sessionTasksQueue) {
778 for (;;) {
779 Runnable task;
780 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
781
782 synchronized (tasksQueue) {
783 task = tasksQueue.poll();
784
785 if (task == null) {
786 sessionTasksQueue.processingCompleted = true;
787 break;
788 }
789 }
790
791 eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
792
793 runTask(task);
794 }
795 }
796
797 private void runTask(Runnable task) {
798 beforeExecute(thread, task);
799 boolean ran = false;
800 try {
801 task.run();
802 ran = true;
803 afterExecute(task, null);
804 completedTaskCount ++;
805 } catch (RuntimeException e) {
806 if (!ran) {
807 afterExecute(task, e);
808 }
809 throw e;
810 }
811 }
812 }
813
814
815
816
817
818
819 private class SessionTasksQueue {
820
821 private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
822
823
824 private boolean processingCompleted = true;
825 }
826 }