1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import org.apache.mina.core.session.*;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import java.util.*;
27 import java.util.concurrent.*;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import java.util.concurrent.atomic.AtomicLong;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
50
51 private static final Logger LOGGER = LoggerFactory.getLogger(PriorityThreadPoolExecutor.class);
52
53
54 private static final AtomicLong seq = new AtomicLong(0);
55
56
57 private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
58
59
60 private static final int DEFAULT_MAX_THREAD_POOL = 16;
61
62
63 private static final int DEFAULT_KEEP_ALIVE = 30;
64
65 private static final SessionEntry EXIT_SIGNAL = new SessionEntry(new DummySession(), null);
66
67
68
69
70 private static final AttributeKeyteKey.html#AttributeKey">AttributeKey TASKS_QUEUE = new AttributeKey(PriorityThreadPoolExecutor.class, "tasksQueue");
71
72
73 private final BlockingQueue<SessionEntry> waitingSessions;
74
75 private final Set<Worker> workers = new HashSet<>();
76
77 private volatile int largestPoolSize;
78
79 private final AtomicInteger idleWorkers = new AtomicInteger();
80
81 private long completedTaskCount;
82
83 private volatile boolean shutdown;
84
85 private final IoEventQueueHandler eventQueueHandler;
86
87 private final Comparator<IoSession> comparator;
88
89
90
91
92
93
94 public PriorityThreadPoolExecutor() {
95 this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
96 Executors.defaultThreadFactory(), null, null);
97 }
98
99
100
101
102
103
104 public PriorityThreadPoolExecutor(Comparator<IoSession> comparator) {
105 this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
106 Executors.defaultThreadFactory(), null, comparator);
107 }
108
109
110
111
112
113
114
115
116
117 public PriorityThreadPoolExecutor(int maximumPoolSize) {
118 this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
119 Executors.defaultThreadFactory(), null, null);
120 }
121
122
123
124
125
126
127
128
129
130 public PriorityThreadPoolExecutor(int maximumPoolSize, Comparator<IoSession> comparator) {
131 this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
132 Executors.defaultThreadFactory(), null, comparator);
133 }
134
135
136
137
138
139
140
141
142
143
144 public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
145 this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(),
146 null, null);
147 }
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162 public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
163 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null, null);
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
181 IoEventQueueHandler eventQueueHandler) {
182 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler,
183 null);
184 }
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200 public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
201 ThreadFactory threadFactory) {
202 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, null);
203 }
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221 public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
222 ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler, Comparator<IoSession> comparator) {
223
224
225
226
227
228 super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory,
229 new AbortPolicy());
230
231 if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
232 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
233 }
234
235 if ((maximumPoolSize <= 0) || (maximumPoolSize < corePoolSize)) {
236 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
237 }
238
239
240 super.setMaximumPoolSize(maximumPoolSize);
241 super.setCorePoolSize(corePoolSize);
242
243
244 if (eventQueueHandler == null) {
245 this.eventQueueHandler = IoEventQueueHandler.NOOP;
246 } else {
247 this.eventQueueHandler = eventQueueHandler;
248 }
249
250
251 this.comparator = comparator;
252
253 if (this.comparator == null) {
254 this.waitingSessions = new LinkedBlockingQueue<>();
255 } else {
256 this.waitingSessions = new PriorityBlockingQueue<>();
257 }
258 }
259
260
261
262
263 private SessionQueue getSessionTasksQueue(IoSession session) {
264 SessionQueue queue = (SessionQueue) session.getAttribute(TASKS_QUEUE);
265
266 if (queue == null) {
267 queue = new SessionQueue();
268 SessionQueue oldQueue = (SessionQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
269
270 if (oldQueue != null) {
271 queue = oldQueue;
272 }
273 }
274
275 return queue;
276 }
277
278
279
280
281 public IoEventQueueHandler getQueueHandler() {
282 return eventQueueHandler;
283 }
284
285
286
287
288 @Override
289 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
290
291 }
292
293
294
295
296
297 private void addWorker() {
298 synchronized (workers) {
299 if (workers.size() >= super.getMaximumPoolSize()) {
300 return;
301 }
302
303
304 Worker worker = new Worker();
305 Thread thread = getThreadFactory().newThread(worker);
306
307
308 idleWorkers.incrementAndGet();
309
310
311 thread.start();
312 workers.add(worker);
313
314 if (workers.size() > largestPoolSize) {
315 largestPoolSize = workers.size();
316 }
317 }
318 }
319
320
321
322
323 private void addWorkerIfNecessary() {
324 if (idleWorkers.get() == 0) {
325 synchronized (workers) {
326 if (workers.isEmpty() || (idleWorkers.get() == 0)) {
327 addWorker();
328 }
329 }
330 }
331 }
332
333 private void removeWorker() {
334 synchronized (workers) {
335 if (workers.size() <= super.getCorePoolSize()) {
336 return;
337 }
338 waitingSessions.offer(EXIT_SIGNAL);
339 }
340 }
341
342
343
344
345 @Override
346 public void setMaximumPoolSize(int maximumPoolSize) {
347 if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
348 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
349 }
350
351 synchronized (workers) {
352 super.setMaximumPoolSize(maximumPoolSize);
353 int difference = workers.size() - maximumPoolSize;
354 while (difference > 0) {
355 removeWorker();
356 --difference;
357 }
358 }
359 }
360
361
362
363
364 @Override
365 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
366
367 long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
368
369 synchronized (workers) {
370 while (!isTerminated()) {
371 long waitTime = deadline - System.currentTimeMillis();
372 if (waitTime <= 0) {
373 break;
374 }
375
376 workers.wait(waitTime);
377 }
378 }
379 return isTerminated();
380 }
381
382
383
384
385 @Override
386 public boolean isShutdown() {
387 return shutdown;
388 }
389
390
391
392
393 @Override
394 public boolean isTerminated() {
395 if (!shutdown) {
396 return false;
397 }
398
399 synchronized (workers) {
400 return workers.isEmpty();
401 }
402 }
403
404
405
406
407 @Override
408 public void shutdown() {
409 if (shutdown) {
410 return;
411 }
412
413 shutdown = true;
414
415 synchronized (workers) {
416 for (int i = workers.size(); i > 0; i--) {
417 waitingSessions.offer(EXIT_SIGNAL);
418 }
419 }
420 }
421
422
423
424
425 @Override
426 public List<Runnable> shutdownNow() {
427 shutdown();
428
429 List<Runnable> answer = new ArrayList<>();
430 SessionEntry entry;
431
432 while ((entry = waitingSessions.poll()) != null) {
433 if (entry == EXIT_SIGNAL) {
434 waitingSessions.offer(EXIT_SIGNAL);
435 Thread.yield();
436 continue;
437 }
438
439 SessionQueue sessionTasksQueue = (SessionQueue) entry.getSession().getAttribute(TASKS_QUEUE);
440
441 synchronized (sessionTasksQueue.tasksQueue) {
442
443 for (Runnable task : sessionTasksQueue.tasksQueue) {
444 getQueueHandler().polled(this, (IoEvent) task);
445 answer.add(task);
446 }
447
448 sessionTasksQueue.tasksQueue.clear();
449 }
450 }
451
452 return answer;
453 }
454
455
456
457
458 private void print(Queue<Runnable> queue, IoEvent event) {
459 StringBuilder sb = new StringBuilder();
460 sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId());
461 boolean first = true;
462 sb.append("\nQueue : [");
463
464 for (Runnable elem : queue) {
465 if (first) {
466 first = false;
467 } else {
468 sb.append(", ");
469 }
470
471 sb.append(((IoEvent) elem).getType()).append(", ");
472 }
473
474 sb.append("]\n");
475
476 if (LOGGER.isDebugEnabled()) {
477 LOGGER.debug(sb.toString());
478 }
479 }
480
481
482
483
484 @Override
485 public void execute(Runnable task) {
486 if (shutdown) {
487 rejectTask(task);
488 }
489
490
491 checkTaskType(task);
492
493 IoEvent../../../../../org/apache/mina/core/session/IoEvent.html#IoEvent">IoEvent event = (IoEvent) task;
494
495
496 IoSession session = event.getSession();
497
498
499 SessionQueue sessionTasksQueue = getSessionTasksQueue(session);
500 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
501
502 boolean offerSession;
503
504
505
506
507 boolean offerEvent = eventQueueHandler.accept(this, event);
508
509 if (offerEvent) {
510
511 synchronized (tasksQueue) {
512
513 tasksQueue.offer(event);
514
515 if (sessionTasksQueue.processingCompleted) {
516 sessionTasksQueue.processingCompleted = false;
517 offerSession = true;
518 } else {
519 offerSession = false;
520 }
521
522 if (LOGGER.isDebugEnabled()) {
523 print(tasksQueue, event);
524 }
525 }
526 } else {
527 offerSession = false;
528 }
529
530 if (offerSession) {
531
532
533
534 waitingSessions.offer(new SessionEntry(session, comparator));
535 }
536
537 addWorkerIfNecessary();
538
539 if (offerEvent) {
540 eventQueueHandler.offered(this, event);
541 }
542 }
543
544 private void rejectTask(Runnable task) {
545 getRejectedExecutionHandler().rejectedExecution(task, this);
546 }
547
548 private void checkTaskType(Runnable task) {
549 if (!(task instanceof IoEvent)) {
550 throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
551 }
552 }
553
554
555
556
557 @Override
558 public int getActiveCount() {
559 synchronized (workers) {
560 return workers.size() - idleWorkers.get();
561 }
562 }
563
564
565
566
567 @Override
568 public long getCompletedTaskCount() {
569 synchronized (workers) {
570 long answer = completedTaskCount;
571 for (Worker w : workers) {
572 answer += w.completedTaskCount.get();
573 }
574
575 return answer;
576 }
577 }
578
579
580
581
582 @Override
583 public int getLargestPoolSize() {
584 return largestPoolSize;
585 }
586
587
588
589
590 @Override
591 public int getPoolSize() {
592 synchronized (workers) {
593 return workers.size();
594 }
595 }
596
597
598
599
600 @Override
601 public long getTaskCount() {
602 return getCompletedTaskCount();
603 }
604
605
606
607
608 @Override
609 public boolean isTerminating() {
610 synchronized (workers) {
611 return isShutdown() && !isTerminated();
612 }
613 }
614
615
616
617
618 @Override
619 public int prestartAllCoreThreads() {
620 int answer = 0;
621 synchronized (workers) {
622 for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) {
623 addWorker();
624 answer++;
625 }
626 }
627 return answer;
628 }
629
630
631
632
633 @Override
634 public boolean prestartCoreThread() {
635 synchronized (workers) {
636 if (workers.size() < super.getCorePoolSize()) {
637 addWorker();
638 return true;
639 } else {
640 return false;
641 }
642 }
643 }
644
645
646
647
648 @Override
649 public BlockingQueue<Runnable> getQueue() {
650 throw new UnsupportedOperationException();
651 }
652
653
654
655
656 @Override
657 public void purge() {
658
659 }
660
661
662
663
664 @Override
665 public boolean remove(Runnable task) {
666 checkTaskType(task);
667 IoEvent../../../../../org/apache/mina/core/session/IoEvent.html#IoEvent">IoEvent event = (IoEvent) task;
668 IoSession session = event.getSession();
669 SessionQueue sessionTasksQueue = (SessionQueue) session.getAttribute(TASKS_QUEUE);
670
671 if (sessionTasksQueue == null) {
672 return false;
673 }
674
675 boolean removed;
676 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
677
678 synchronized (tasksQueue) {
679 removed = tasksQueue.remove(task);
680 }
681
682 if (removed) {
683 getQueueHandler().polled(this, event);
684 }
685
686 return removed;
687 }
688
689
690
691
692 @Override
693 public void setCorePoolSize(int corePoolSize) {
694 if (corePoolSize < 0) {
695 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
696 }
697 if (corePoolSize > super.getMaximumPoolSize()) {
698 throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
699 }
700
701 synchronized (workers) {
702 if (super.getCorePoolSize() > corePoolSize) {
703 for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) {
704 removeWorker();
705 }
706 }
707 super.setCorePoolSize(corePoolSize);
708 }
709 }
710
711 private class Worker implements Runnable {
712
713 private AtomicLong completedTaskCount = new AtomicLong(0);
714
715 private Thread thread;
716
717
718
719
720 @Override
721 public void run() {
722 thread = Thread.currentThread();
723
724 try {
725 for (;;) {
726 IoSession session = fetchSession();
727
728 idleWorkers.decrementAndGet();
729
730 if (session == null) {
731 synchronized (workers) {
732 if (workers.size() > getCorePoolSize()) {
733
734 workers.remove(this);
735 break;
736 }
737 }
738 }
739
740 if (session == EXIT_SIGNAL) {
741 break;
742 }
743
744 try {
745 if (session != null) {
746 runTasks(getSessionTasksQueue(session));
747 }
748 } finally {
749 idleWorkers.incrementAndGet();
750 }
751 }
752 } finally {
753 synchronized (workers) {
754 workers.remove(this);
755 PriorityThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get();
756 workers.notifyAll();
757 }
758 }
759 }
760
761 private IoSession fetchSession() {
762 SessionEntry entry = null;
763 long currentTime = System.currentTimeMillis();
764 long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
765
766 for (;;) {
767 try {
768 long waitTime = deadline - currentTime;
769
770 if (waitTime <= 0) {
771 break;
772 }
773
774 try {
775 entry = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
776 break;
777 } finally {
778 if (entry != null) {
779 currentTime = System.currentTimeMillis();
780 }
781 }
782 } catch (InterruptedException e) {
783
784 continue;
785 }
786 }
787
788 if (entry != null) {
789 return entry.getSession();
790 }
791 return null;
792 }
793
794 private void runTasks(SessionQueue sessionTasksQueue) {
795 for (;;) {
796 Runnable task;
797 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
798
799 synchronized (tasksQueue) {
800 task = tasksQueue.poll();
801
802 if (task == null) {
803 sessionTasksQueue.processingCompleted = true;
804 break;
805 }
806 }
807
808 eventQueueHandler.polled(PriorityThreadPoolExecutor.this, (IoEvent) task);
809
810 runTask(task);
811 }
812 }
813
814 private void runTask(Runnable task) {
815 beforeExecute(thread, task);
816 boolean ran = false;
817 try {
818 task.run();
819 ran = true;
820 afterExecute(task, null);
821 completedTaskCount.incrementAndGet();
822 } catch (RuntimeException e) {
823 if (!ran) {
824 afterExecute(task, e);
825 }
826 throw e;
827 }
828 }
829 }
830
831
832
833
834
835 private class SessionQueue {
836
837 private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<>();
838
839
840 private boolean processingCompleted = true;
841 }
842
843
844
845
846
847 static class SessionEntry implements Comparable<SessionEntry> {
848 private final long seqNum;
849 private final IoSession session;
850 private final Comparator<IoSession> comparator;
851
852 public SessionEntry(IoSession session, Comparator<IoSession> comparator) {
853 if (session == null) {
854 throw new IllegalArgumentException("session");
855 }
856 seqNum = seq.getAndIncrement();
857 this.session = session;
858 this.comparator = comparator;
859 }
860
861 public IoSession getSession() {
862 return session;
863 }
864
865 public int compareTo(SessionEntry other) {
866 if (other == this) {
867 return 0;
868 }
869
870 if (other.session == this.session) {
871 return 0;
872 }
873
874
875 if (this == EXIT_SIGNAL) {
876 return -1;
877 }
878 if (other == EXIT_SIGNAL) {
879 return 1;
880 }
881
882 int res = 0;
883
884
885 if (comparator != null) {
886 res = comparator.compare(session, other.session);
887 }
888
889
890 if (res == 0) {
891 res = (seqNum < other.seqNum ? -1 : 1);
892 }
893
894 return res;
895 }
896 }
897 }