1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Queue;
26 import java.util.Set;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.RejectedExecutionHandler;
32 import java.util.concurrent.SynchronousQueue;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.concurrent.atomic.AtomicLong;
38
39 import org.apache.mina.core.session.AttributeKey;
40 import org.apache.mina.core.session.DummySession;
41 import org.apache.mina.core.session.IoEvent;
42 import org.apache.mina.core.session.IoSession;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46
47
48
49
50
51
52
53
54
55 public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
56
57 private static final Logger LOGGER = LoggerFactory.getLogger(OrderedThreadPoolExecutor.class);
58
59
60 private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
61
62
63 private static final int DEFAULT_MAX_THREAD_POOL = 16;
64
65
66 private static final int DEFAULT_KEEP_ALIVE = 30;
67
68 private static final IoSession EXIT_SIGNAL = new DummySession();
69
70
71 private static final AttributeKeyteKey.html#AttributeKey">AttributeKey TASKS_QUEUE = new AttributeKey(OrderedThreadPoolExecutor.class, "tasksQueue");
72
73
74 private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<>();
75
76 private final Set<Worker> workers = new HashSet<>();
77
78 private volatile int largestPoolSize;
79
80 private final AtomicInteger idleWorkers = new AtomicInteger();
81
82 private long completedTaskCount;
83
84 private volatile boolean shutdown;
85
86 private final IoEventQueueHandler eventQueueHandler;
87
88
89
90
91
92
93
94
95
96 public OrderedThreadPoolExecutor() {
97 this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors
98 .defaultThreadFactory(), null);
99 }
100
101
102
103
104
105
106
107
108
109
110 public OrderedThreadPoolExecutor(int maximumPoolSize) {
111 this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors
112 .defaultThreadFactory(), null);
113 }
114
115
116
117
118
119
120
121
122
123
124 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
125 this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(),
126 null);
127 }
128
129
130
131
132
133
134
135
136
137
138
139 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
140 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null);
141 }
142
143
144
145
146
147
148
149
150
151
152
153 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
154 IoEventQueueHandler eventQueueHandler) {
155 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler);
156 }
157
158
159
160
161
162
163
164
165
166
167
168 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
169 ThreadFactory threadFactory) {
170 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
171 }
172
173
174
175
176
177
178
179
180
181
182
183 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
184 ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
185
186
187
188 super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(),
189 threadFactory, new AbortPolicy());
190
191 if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
192 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
193 }
194
195 if ((maximumPoolSize <= 0) || (maximumPoolSize < corePoolSize)) {
196 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
197 }
198
199
200 super.setMaximumPoolSize(maximumPoolSize);
201 super.setCorePoolSize(corePoolSize);
202
203
204 if (eventQueueHandler == null) {
205 this.eventQueueHandler = IoEventQueueHandler.NOOP;
206 } else {
207 this.eventQueueHandler = eventQueueHandler;
208 }
209 }
210
211
212
213
214 private SessionTasksQueue getSessionTasksQueue(IoSession session) {
215 SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
216
217 if (queue == null) {
218 queue = new SessionTasksQueue();
219 SessionTasksQueue oldQueue = (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
220
221 if (oldQueue != null) {
222 queue = oldQueue;
223 }
224 }
225
226 return queue;
227 }
228
229
230
231
232 public IoEventQueueHandler getQueueHandler() {
233 return eventQueueHandler;
234 }
235
236
237
238
239 @Override
240 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
241
242 }
243
244
245
246
247
248 private void addWorker() {
249 synchronized (workers) {
250 if (workers.size() >= super.getMaximumPoolSize()) {
251 return;
252 }
253
254
255 Worker worker = new Worker();
256 Thread thread = getThreadFactory().newThread(worker);
257
258
259 idleWorkers.incrementAndGet();
260
261
262 thread.start();
263 workers.add(worker);
264
265 if (workers.size() > largestPoolSize) {
266 largestPoolSize = workers.size();
267 }
268 }
269 }
270
271
272
273
274 private void addWorkerIfNecessary() {
275 if (idleWorkers.get() == 0) {
276 synchronized (workers) {
277 if (workers.isEmpty() || (idleWorkers.get() == 0)) {
278 addWorker();
279 }
280 }
281 }
282 }
283
284 private void removeWorker() {
285 synchronized (workers) {
286 if (workers.size() <= super.getCorePoolSize()) {
287 return;
288 }
289 waitingSessions.offer(EXIT_SIGNAL);
290 }
291 }
292
293
294
295
296 @Override
297 public void setMaximumPoolSize(int maximumPoolSize) {
298 if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
299 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
300 }
301
302 synchronized (workers) {
303 super.setMaximumPoolSize(maximumPoolSize);
304 int difference = workers.size() - maximumPoolSize;
305 while (difference > 0) {
306 removeWorker();
307 --difference;
308 }
309 }
310 }
311
312
313
314
315 @Override
316 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
317
318 long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
319
320 synchronized (workers) {
321 while (!isTerminated()) {
322 long waitTime = deadline - System.currentTimeMillis();
323 if (waitTime <= 0) {
324 break;
325 }
326
327 workers.wait(waitTime);
328 }
329 }
330 return isTerminated();
331 }
332
333
334
335
336 @Override
337 public boolean isShutdown() {
338 return shutdown;
339 }
340
341
342
343
344 @Override
345 public boolean isTerminated() {
346 if (!shutdown) {
347 return false;
348 }
349
350 synchronized (workers) {
351 return workers.isEmpty();
352 }
353 }
354
355
356
357
358 @Override
359 public void shutdown() {
360 if (shutdown) {
361 return;
362 }
363
364 shutdown = true;
365
366 synchronized (workers) {
367 for (int i = workers.size(); i > 0; i--) {
368 waitingSessions.offer(EXIT_SIGNAL);
369 }
370 }
371 }
372
373
374
375
376 @Override
377 public List<Runnable> shutdownNow() {
378 shutdown();
379
380 List<Runnable> answer = new ArrayList<>();
381 IoSession session;
382
383 while ((session = waitingSessions.poll()) != null) {
384 if (session == EXIT_SIGNAL) {
385 waitingSessions.offer(EXIT_SIGNAL);
386 Thread.yield();
387 continue;
388 }
389
390 SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
391
392 synchronized (sessionTasksQueue.tasksQueue) {
393
394 for (Runnable task : sessionTasksQueue.tasksQueue) {
395 getQueueHandler().polled(this, (IoEvent) task);
396 answer.add(task);
397 }
398
399 sessionTasksQueue.tasksQueue.clear();
400 }
401 }
402
403 return answer;
404 }
405
406
407
408
409 private void print(Queue<Runnable> queue, IoEvent event) {
410 StringBuilder sb = new StringBuilder();
411 sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId());
412 boolean first = true;
413 sb.append("\nQueue : [");
414
415 for (Runnable elem : queue) {
416 if (first) {
417 first = false;
418 } else {
419 sb.append(", ");
420 }
421
422 sb.append(((IoEvent) elem).getType()).append(", ");
423 }
424
425 sb.append("]\n");
426
427 if (LOGGER.isDebugEnabled()) {
428 LOGGER.debug(sb.toString());
429 }
430 }
431
432
433
434
435 @Override
436 public void execute(Runnable task) {
437 if (shutdown) {
438 rejectTask(task);
439 }
440
441
442 checkTaskType(task);
443
444 IoEvent../../../../../org/apache/mina/core/session/IoEvent.html#IoEvent">IoEvent event = (IoEvent) task;
445
446
447 IoSession session = event.getSession();
448
449
450 SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
451 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
452
453 boolean offerSession;
454
455
456
457
458 boolean offerEvent = eventQueueHandler.accept(this, event);
459
460 if (offerEvent) {
461
462 synchronized (tasksQueue) {
463
464 tasksQueue.offer(event);
465
466 if (sessionTasksQueue.processingCompleted) {
467 sessionTasksQueue.processingCompleted = false;
468 offerSession = true;
469 } else {
470 offerSession = false;
471 }
472
473 if (LOGGER.isDebugEnabled()) {
474 print(tasksQueue, event);
475 }
476 }
477 } else {
478 offerSession = false;
479 }
480
481 if (offerSession) {
482
483
484
485 waitingSessions.offer(session);
486 }
487
488 addWorkerIfNecessary();
489
490 if (offerEvent) {
491 eventQueueHandler.offered(this, event);
492 }
493 }
494
495 private void rejectTask(Runnable task) {
496 getRejectedExecutionHandler().rejectedExecution(task, this);
497 }
498
499 private void checkTaskType(Runnable task) {
500 if (!(task instanceof IoEvent)) {
501 throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
502 }
503 }
504
505
506
507
508 @Override
509 public int getActiveCount() {
510 synchronized (workers) {
511 return workers.size() - idleWorkers.get();
512 }
513 }
514
515
516
517
518 @Override
519 public long getCompletedTaskCount() {
520 synchronized (workers) {
521 long answer = completedTaskCount;
522 for (Worker w : workers) {
523 answer += w.completedTaskCount.get();
524 }
525
526 return answer;
527 }
528 }
529
530
531
532
533 @Override
534 public int getLargestPoolSize() {
535 return largestPoolSize;
536 }
537
538
539
540
541 @Override
542 public int getPoolSize() {
543 synchronized (workers) {
544 return workers.size();
545 }
546 }
547
548
549
550
551 @Override
552 public long getTaskCount() {
553 return getCompletedTaskCount();
554 }
555
556
557
558
559 @Override
560 public boolean isTerminating() {
561 synchronized (workers) {
562 return isShutdown() && !isTerminated();
563 }
564 }
565
566
567
568
569 @Override
570 public int prestartAllCoreThreads() {
571 int answer = 0;
572 synchronized (workers) {
573 for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) {
574 addWorker();
575 answer++;
576 }
577 }
578 return answer;
579 }
580
581
582
583
584 @Override
585 public boolean prestartCoreThread() {
586 synchronized (workers) {
587 if (workers.size() < super.getCorePoolSize()) {
588 addWorker();
589 return true;
590 } else {
591 return false;
592 }
593 }
594 }
595
596
597
598
599 @Override
600 public BlockingQueue<Runnable> getQueue() {
601 throw new UnsupportedOperationException();
602 }
603
604
605
606
607 @Override
608 public void purge() {
609
610 }
611
612
613
614
615 @Override
616 public boolean remove(Runnable task) {
617 checkTaskType(task);
618 IoEvent../../../../../org/apache/mina/core/session/IoEvent.html#IoEvent">IoEvent event = (IoEvent) task;
619 IoSession session = event.getSession();
620 SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
621
622 if (sessionTasksQueue == null) {
623 return false;
624 }
625
626 boolean removed;
627 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
628
629 synchronized (tasksQueue) {
630 removed = tasksQueue.remove(task);
631 }
632
633 if (removed) {
634 getQueueHandler().polled(this, event);
635 }
636
637 return removed;
638 }
639
640
641
642
643 @Override
644 public void setCorePoolSize(int corePoolSize) {
645 if (corePoolSize < 0) {
646 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
647 }
648 if (corePoolSize > super.getMaximumPoolSize()) {
649 throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
650 }
651
652 synchronized (workers) {
653 if (super.getCorePoolSize() > corePoolSize) {
654 for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) {
655 removeWorker();
656 }
657 }
658 super.setCorePoolSize(corePoolSize);
659 }
660 }
661
662 private class Worker implements Runnable {
663
664 private AtomicLong completedTaskCount = new AtomicLong(0);
665
666 private Thread thread;
667
668
669
670
671 @Override
672 public void run() {
673 thread = Thread.currentThread();
674
675 try {
676 for (;;) {
677 IoSession session = fetchSession();
678
679 idleWorkers.decrementAndGet();
680
681 if (session == null) {
682 synchronized (workers) {
683 if (workers.size() > getCorePoolSize()) {
684
685 workers.remove(this);
686 break;
687 }
688 }
689 }
690
691 if (session == EXIT_SIGNAL) {
692 break;
693 }
694
695 try {
696 if (session != null) {
697 runTasks(getSessionTasksQueue(session));
698 }
699 } finally {
700 idleWorkers.incrementAndGet();
701 }
702 }
703 } finally {
704 synchronized (workers) {
705 workers.remove(this);
706 OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get();
707 workers.notifyAll();
708 }
709 }
710 }
711
712 private IoSession fetchSession() {
713 IoSession session = null;
714 long currentTime = System.currentTimeMillis();
715 long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
716
717 for (;;) {
718 try {
719 long waitTime = deadline - currentTime;
720
721 if (waitTime <= 0) {
722 break;
723 }
724
725 try {
726 session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
727 break;
728 } finally {
729 if (session != null) {
730 currentTime = System.currentTimeMillis();
731 }
732 }
733 } catch (InterruptedException e) {
734
735 continue;
736 }
737 }
738
739 return session;
740 }
741
742 private void runTasks(SessionTasksQueue sessionTasksQueue) {
743 for (;;) {
744 Runnable task;
745 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
746
747 synchronized (tasksQueue) {
748 task = tasksQueue.poll();
749
750 if (task == null) {
751 sessionTasksQueue.processingCompleted = true;
752 break;
753 }
754 }
755
756 eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
757
758 runTask(task);
759 }
760 }
761
762 private void runTask(Runnable task) {
763 beforeExecute(thread, task);
764 boolean ran = false;
765 try {
766 task.run();
767 ran = true;
768 afterExecute(task, null);
769 completedTaskCount.incrementAndGet();
770 } catch (RuntimeException e) {
771 if (!ran) {
772 afterExecute(task, e);
773 }
774 throw e;
775 }
776 }
777 }
778
779
780
781
782
783 private class SessionTasksQueue {
784
785 private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<>();
786
787
788 private boolean processingCompleted = true;
789 }
790 }