View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import org.apache.mina.core.session.*;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  import java.util.*;
27  import java.util.concurrent.*;
28  import java.util.concurrent.atomic.AtomicInteger;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  /**
32   * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s
33   * within a session (similar to {@link OrderedThreadPoolExecutor}) and allows
34   * some sessions to be prioritized over other sessions.
35   * <p>
36   * If you don't need to maintain the order of events per session, please use
37   * {@link UnorderedThreadPoolExecutor}.
38   * <p>
39   * If you don't need to prioritize sessions, please use
40   * {@link OrderedThreadPoolExecutor}.
41   *
42   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
43   * @author Guus der Kinderen, guus.der.kinderen@gmail.com
44   * @org.apache.xbean.XBean
45   */
46  // TODO this class currently copies OrderedThreadPoolExecutor, and changes the
47  // BlockingQueue used for the waitingSessions field. This code duplication
48  // should be avoided.
49  public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
50      /** A logger for this class (commented as it breaks MDCFlter tests) */
51      private static final Logger LOGGER = LoggerFactory.getLogger(PriorityThreadPoolExecutor.class);
52  
53      /** Generates sequential identifiers that ensure FIFO behavior. */
54      private static final AtomicLong seq = new AtomicLong(0);
55  
56      /** A default value for the initial pool size */
57      private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
58  
59      /** A default value for the maximum pool size */
60      private static final int DEFAULT_MAX_THREAD_POOL = 16;
61  
62      /** A default value for the KeepAlive delay */
63      private static final int DEFAULT_KEEP_ALIVE = 30;
64  
65      private static final SessionEntry EXIT_SIGNAL = new SessionEntry(new DummySession(), null);
66  
67      /**
68       * A key stored into the session's attribute for the event tasks being queued
69       */
70      private static final AttributeKeyteKey.html#AttributeKey">AttributeKey TASKS_QUEUE = new AttributeKey(PriorityThreadPoolExecutor.class, "tasksQueue");
71  
72      /** A queue used to store the available sessions */
73      private final BlockingQueue<SessionEntry> waitingSessions;
74  
75      private final Set<Worker> workers = new HashSet<>();
76  
77      private volatile int largestPoolSize;
78  
79      private final AtomicInteger idleWorkers = new AtomicInteger();
80  
81      private long completedTaskCount;
82  
83      private volatile boolean shutdown;
84  
85      private final IoEventQueueHandler eventQueueHandler;
86  
87      private final Comparator<IoSession> comparator;
88  
89      /**
90       * Creates a default ThreadPool, with default values : - minimum pool size is 0
91       * - maximum pool size is 16 - keepAlive set to 30 seconds - A default
92       * ThreadFactory - All events are accepted
93       */
94      public PriorityThreadPoolExecutor() {
95          this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
96                  Executors.defaultThreadFactory(), null, null);
97      }
98  
99      /**
100      * Creates a default ThreadPool, with default values : - minimum pool size is 0
101      * - maximum pool size is 16 - keepAlive set to 30 seconds - A default
102      * ThreadFactory - All events are accepted
103      */
104     public PriorityThreadPoolExecutor(Comparator<IoSession> comparator) {
105         this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
106                 Executors.defaultThreadFactory(), null, comparator);
107     }
108 
109     /**
110      * Creates a default ThreadPool, with default values : - minimum pool size is 0
111      * - keepAlive set to 30 seconds - A default ThreadFactory - All events are
112      * accepted
113      *
114      * @param maximumPoolSize
115      *            The maximum pool size
116      */
117     public PriorityThreadPoolExecutor(int maximumPoolSize) {
118         this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
119                 Executors.defaultThreadFactory(), null, null);
120     }
121 
122     /**
123      * Creates a default ThreadPool, with default values : - minimum pool size is 0
124      * - keepAlive set to 30 seconds - A default ThreadFactory - All events are
125      * accepted
126      *
127      * @param maximumPoolSize
128      *            The maximum pool size
129      */
130     public PriorityThreadPoolExecutor(int maximumPoolSize, Comparator<IoSession> comparator) {
131         this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
132                 Executors.defaultThreadFactory(), null, comparator);
133     }
134 
135     /**
136      * Creates a default ThreadPool, with default values : - keepAlive set to 30
137      * seconds - A default ThreadFactory - All events are accepted
138      *
139      * @param corePoolSize
140      *            The initial pool sizePoolSize
141      * @param maximumPoolSize
142      *            The maximum pool size
143      */
144     public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
145         this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(),
146                 null, null);
147     }
148 
149     /**
150      * Creates a default ThreadPool, with default values : - A default ThreadFactory
151      * - All events are accepted
152      *
153      * @param corePoolSize
154      *            The initial pool sizePoolSize
155      * @param maximumPoolSize
156      *            The maximum pool size
157      * @param keepAliveTime
158      *            Default duration for a thread
159      * @param unit
160      *            Time unit used for the keepAlive value
161      */
162     public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
163         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null, null);
164     }
165 
166     /**
167      * Creates a default ThreadPool, with default values : - A default ThreadFactory
168      *
169      * @param corePoolSize
170      *            The initial pool sizePoolSize
171      * @param maximumPoolSize
172      *            The maximum pool size
173      * @param keepAliveTime
174      *            Default duration for a thread
175      * @param unit
176      *            Time unit used for the keepAlive value
177      * @param eventQueueHandler
178      *            The queue used to store events
179      */
180     public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
181             IoEventQueueHandler eventQueueHandler) {
182         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler,
183                 null);
184     }
185 
186     /**
187      * Creates a default ThreadPool, with default values : - A default ThreadFactory
188      *
189      * @param corePoolSize
190      *            The initial pool sizePoolSize
191      * @param maximumPoolSize
192      *            The maximum pool size
193      * @param keepAliveTime
194      *            Default duration for a thread
195      * @param unit
196      *            Time unit used for the keepAlive value
197      * @param threadFactory
198      *            The factory used to create threads
199      */
200     public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
201             ThreadFactory threadFactory) {
202         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, null);
203     }
204 
205     /**
206      * Creates a new instance of a PrioritisedOrderedThreadPoolExecutor.
207      *
208      * @param corePoolSize
209      *            The initial pool sizePoolSize
210      * @param maximumPoolSize
211      *            The maximum pool size
212      * @param keepAliveTime
213      *            Default duration for a thread
214      * @param unit
215      *            Time unit used for the keepAlive value
216      * @param threadFactory
217      *            The factory used to create threads
218      * @param eventQueueHandler
219      *            The queue used to store events
220      */
221     public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
222             ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler, Comparator<IoSession> comparator) {
223         // We have to initialize the pool with default values (0 and 1) in order
224         // to
225         // handle the exception in a better way. We can't add a try {} catch()
226         // {}
227         // around the super() call.
228         super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory,
229                 new AbortPolicy());
230 
231         if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
232             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
233         }
234 
235         if ((maximumPoolSize <= 0) || (maximumPoolSize < corePoolSize)) {
236             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
237         }
238 
239         // Now, we can setup the pool sizes
240         super.setMaximumPoolSize(maximumPoolSize);
241         super.setCorePoolSize(corePoolSize);
242 
243         // The queueHandler might be null.
244         if (eventQueueHandler == null) {
245             this.eventQueueHandler = IoEventQueueHandler.NOOP;
246         } else {
247             this.eventQueueHandler = eventQueueHandler;
248         }
249 
250         // The comparator can be null.
251         this.comparator = comparator;
252 
253         if (this.comparator == null) {
254             this.waitingSessions = new LinkedBlockingQueue<>();
255         } else {
256             this.waitingSessions = new PriorityBlockingQueue<>();
257         }
258     }
259 
260     /**
261      * Get the session's tasks queue.
262      */
263     private SessionQueue getSessionTasksQueue(IoSession session) {
264         SessionQueue queue = (SessionQueue) session.getAttribute(TASKS_QUEUE);
265 
266         if (queue == null) {
267             queue = new SessionQueue();
268             SessionQueue oldQueue = (SessionQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
269 
270             if (oldQueue != null) {
271                 queue = oldQueue;
272             }
273         }
274 
275         return queue;
276     }
277 
278     /**
279      * @return The associated queue handler.
280      */
281     public IoEventQueueHandler getQueueHandler() {
282         return eventQueueHandler;
283     }
284 
285     /**
286      * {@inheritDoc}
287      */
288     @Override
289     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
290         // Ignore the request. It must always be AbortPolicy.
291     }
292 
293     /**
294      * Add a new thread to execute a task, if needed and possible. It depends on the
295      * current pool size. If it's full, we do nothing.
296      */
297     private void addWorker() {
298         synchronized (workers) {
299             if (workers.size() >= super.getMaximumPoolSize()) {
300                 return;
301             }
302 
303             // Create a new worker, and add it to the thread pool
304             Worker worker = new Worker();
305             Thread thread = getThreadFactory().newThread(worker);
306 
307             // As we have added a new thread, it's considered as idle.
308             idleWorkers.incrementAndGet();
309 
310             // Now, we can start it.
311             thread.start();
312             workers.add(worker);
313 
314             if (workers.size() > largestPoolSize) {
315                 largestPoolSize = workers.size();
316             }
317         }
318     }
319 
320     /**
321      * Add a new Worker only if there are no idle worker.
322      */
323     private void addWorkerIfNecessary() {
324         if (idleWorkers.get() == 0) {
325             synchronized (workers) {
326                 if (workers.isEmpty() || (idleWorkers.get() == 0)) {
327                     addWorker();
328                 }
329             }
330         }
331     }
332 
333     private void removeWorker() {
334         synchronized (workers) {
335             if (workers.size() <= super.getCorePoolSize()) {
336                 return;
337             }
338             waitingSessions.offer(EXIT_SIGNAL);
339         }
340     }
341 
342     /**
343      * {@inheritDoc}
344      */
345     @Override
346     public void setMaximumPoolSize(int maximumPoolSize) {
347         if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
348             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
349         }
350 
351         synchronized (workers) {
352             super.setMaximumPoolSize(maximumPoolSize);
353             int difference = workers.size() - maximumPoolSize;
354             while (difference > 0) {
355                 removeWorker();
356                 --difference;
357             }
358         }
359     }
360 
361     /**
362      * {@inheritDoc}
363      */
364     @Override
365     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
366 
367         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
368 
369         synchronized (workers) {
370             while (!isTerminated()) {
371                 long waitTime = deadline - System.currentTimeMillis();
372                 if (waitTime <= 0) {
373                     break;
374                 }
375 
376                 workers.wait(waitTime);
377             }
378         }
379         return isTerminated();
380     }
381 
382     /**
383      * {@inheritDoc}
384      */
385     @Override
386     public boolean isShutdown() {
387         return shutdown;
388     }
389 
390     /**
391      * {@inheritDoc}
392      */
393     @Override
394     public boolean isTerminated() {
395         if (!shutdown) {
396             return false;
397         }
398 
399         synchronized (workers) {
400             return workers.isEmpty();
401         }
402     }
403 
404     /**
405      * {@inheritDoc}
406      */
407     @Override
408     public void shutdown() {
409         if (shutdown) {
410             return;
411         }
412 
413         shutdown = true;
414 
415         synchronized (workers) {
416             for (int i = workers.size(); i > 0; i--) {
417                 waitingSessions.offer(EXIT_SIGNAL);
418             }
419         }
420     }
421 
422     /**
423      * {@inheritDoc}
424      */
425     @Override
426     public List<Runnable> shutdownNow() {
427         shutdown();
428 
429         List<Runnable> answer = new ArrayList<>();
430         SessionEntry entry;
431 
432         while ((entry = waitingSessions.poll()) != null) {
433             if (entry == EXIT_SIGNAL) {
434                 waitingSessions.offer(EXIT_SIGNAL);
435                 Thread.yield(); // Let others take the signal.
436                 continue;
437             }
438 
439             SessionQueue sessionTasksQueue = (SessionQueue) entry.getSession().getAttribute(TASKS_QUEUE);
440 
441             synchronized (sessionTasksQueue.tasksQueue) {
442 
443                 for (Runnable task : sessionTasksQueue.tasksQueue) {
444                     getQueueHandler().polled(this, (IoEvent) task);
445                     answer.add(task);
446                 }
447 
448                 sessionTasksQueue.tasksQueue.clear();
449             }
450         }
451 
452         return answer;
453     }
454 
455     /**
456      * A Helper class used to print the list of events being queued.
457      */
458     private void print(Queue<Runnable> queue, IoEvent event) {
459         StringBuilder sb = new StringBuilder();
460         sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId());
461         boolean first = true;
462         sb.append("\nQueue : [");
463         
464         for (Runnable elem : queue) {
465             if (first) {
466                 first = false;
467             } else {
468                 sb.append(", ");
469             }
470 
471             sb.append(((IoEvent) elem).getType()).append(", ");
472         }
473         
474         sb.append("]\n");
475         
476         if (LOGGER.isDebugEnabled()) {
477             LOGGER.debug(sb.toString());
478         }
479     }
480 
481     /**
482      * {@inheritDoc}
483      */
484     @Override
485     public void execute(Runnable task) {
486         if (shutdown) {
487             rejectTask(task);
488         }
489 
490         // Check that it's a IoEvent task
491         checkTaskType(task);
492 
493         IoEvent../../../../../org/apache/mina/core/session/IoEvent.html#IoEvent">IoEvent event = (IoEvent) task;
494 
495         // Get the associated session
496         IoSession session = event.getSession();
497 
498         // Get the session's queue of events
499         SessionQueue sessionTasksQueue = getSessionTasksQueue(session);
500         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
501 
502         boolean offerSession;
503 
504         // propose the new event to the event queue handler. If we
505         // use a throttle queue handler, the message may be rejected
506         // if the maximum size has been reached.
507         boolean offerEvent = eventQueueHandler.accept(this, event);
508 
509         if (offerEvent) {
510             // Ok, the message has been accepted
511             synchronized (tasksQueue) {
512                 // Inject the event into the executor taskQueue
513                 tasksQueue.offer(event);
514 
515                 if (sessionTasksQueue.processingCompleted) {
516                     sessionTasksQueue.processingCompleted = false;
517                     offerSession = true;
518                 } else {
519                     offerSession = false;
520                 }
521 
522                 if (LOGGER.isDebugEnabled()) {
523                     print(tasksQueue, event);
524                 }
525             }
526         } else {
527             offerSession = false;
528         }
529 
530         if (offerSession) {
531             // As the tasksQueue was empty, the task has been executed
532             // immediately, so we can move the session to the queue
533             // of sessions waiting for completion.
534             waitingSessions.offer(new SessionEntry(session, comparator));
535         }
536 
537         addWorkerIfNecessary();
538 
539         if (offerEvent) {
540             eventQueueHandler.offered(this, event);
541         }
542     }
543 
544     private void rejectTask(Runnable task) {
545         getRejectedExecutionHandler().rejectedExecution(task, this);
546     }
547 
548     private void checkTaskType(Runnable task) {
549         if (!(task instanceof IoEvent)) {
550             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
551         }
552     }
553 
554     /**
555      * {@inheritDoc}
556      */
557     @Override
558     public int getActiveCount() {
559         synchronized (workers) {
560             return workers.size() - idleWorkers.get();
561         }
562     }
563 
564     /**
565      * {@inheritDoc}
566      */
567     @Override
568     public long getCompletedTaskCount() {
569         synchronized (workers) {
570             long answer = completedTaskCount;
571             for (Worker w : workers) {
572                 answer += w.completedTaskCount.get();
573             }
574 
575             return answer;
576         }
577     }
578 
579     /**
580      * {@inheritDoc}
581      */
582     @Override
583     public int getLargestPoolSize() {
584         return largestPoolSize;
585     }
586 
587     /**
588      * {@inheritDoc}
589      */
590     @Override
591     public int getPoolSize() {
592         synchronized (workers) {
593             return workers.size();
594         }
595     }
596 
597     /**
598      * {@inheritDoc}
599      */
600     @Override
601     public long getTaskCount() {
602         return getCompletedTaskCount();
603     }
604 
605     /**
606      * {@inheritDoc}
607      */
608     @Override
609     public boolean isTerminating() {
610         synchronized (workers) {
611             return isShutdown() && !isTerminated();
612         }
613     }
614 
615     /**
616      * {@inheritDoc}
617      */
618     @Override
619     public int prestartAllCoreThreads() {
620         int answer = 0;
621         synchronized (workers) {
622             for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) {
623                 addWorker();
624                 answer++;
625             }
626         }
627         return answer;
628     }
629 
630     /**
631      * {@inheritDoc}
632      */
633     @Override
634     public boolean prestartCoreThread() {
635         synchronized (workers) {
636             if (workers.size() < super.getCorePoolSize()) {
637                 addWorker();
638                 return true;
639             } else {
640                 return false;
641             }
642         }
643     }
644 
645     /**
646      * {@inheritDoc}
647      */
648     @Override
649     public BlockingQueue<Runnable> getQueue() {
650         throw new UnsupportedOperationException();
651     }
652 
653     /**
654      * {@inheritDoc}
655      */
656     @Override
657     public void purge() {
658         // Nothing to purge in this implementation.
659     }
660 
661     /**
662      * {@inheritDoc}
663      */
664     @Override
665     public boolean remove(Runnable task) {
666         checkTaskType(task);
667         IoEvent../../../../../org/apache/mina/core/session/IoEvent.html#IoEvent">IoEvent event = (IoEvent) task;
668         IoSession session = event.getSession();
669         SessionQueue sessionTasksQueue = (SessionQueue) session.getAttribute(TASKS_QUEUE);
670 
671         if (sessionTasksQueue == null) {
672             return false;
673         }
674 
675         boolean removed;
676         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
677 
678         synchronized (tasksQueue) {
679             removed = tasksQueue.remove(task);
680         }
681 
682         if (removed) {
683             getQueueHandler().polled(this, event);
684         }
685 
686         return removed;
687     }
688 
689     /**
690      * {@inheritDoc}
691      */
692     @Override
693     public void setCorePoolSize(int corePoolSize) {
694         if (corePoolSize < 0) {
695             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
696         }
697         if (corePoolSize > super.getMaximumPoolSize()) {
698             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
699         }
700 
701         synchronized (workers) {
702             if (super.getCorePoolSize() > corePoolSize) {
703                 for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) {
704                     removeWorker();
705                 }
706             }
707             super.setCorePoolSize(corePoolSize);
708         }
709     }
710 
711     private class Worker implements Runnable {
712 
713         private AtomicLong completedTaskCount = new AtomicLong(0);
714 
715         private Thread thread;
716 
717         /**
718          * @inheritedDoc
719          */
720         @Override
721         public void run() {
722             thread = Thread.currentThread();
723 
724             try {
725                 for (;;) {
726                     IoSession session = fetchSession();
727 
728                     idleWorkers.decrementAndGet();
729 
730                     if (session == null) {
731                         synchronized (workers) {
732                             if (workers.size() > getCorePoolSize()) {
733                                 // Remove now to prevent duplicate exit.
734                                 workers.remove(this);
735                                 break;
736                             }
737                         }
738                     }
739 
740                     if (session == EXIT_SIGNAL) {
741                         break;
742                     }
743 
744                     try {
745                         if (session != null) {
746                             runTasks(getSessionTasksQueue(session));
747                         }
748                     } finally {
749                         idleWorkers.incrementAndGet();
750                     }
751                 }
752             } finally {
753                 synchronized (workers) {
754                     workers.remove(this);
755                     PriorityThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get();
756                     workers.notifyAll();
757                 }
758             }
759         }
760 
761         private IoSession fetchSession() {
762             SessionEntry entry = null;
763             long currentTime = System.currentTimeMillis();
764             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
765 
766             for (;;) {
767                 try {
768                     long waitTime = deadline - currentTime;
769 
770                     if (waitTime <= 0) {
771                         break;
772                     }
773 
774                     try {
775                         entry = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
776                         break;
777                     } finally {
778                         if (entry != null) {
779                             currentTime = System.currentTimeMillis();
780                         }
781                     }
782                 } catch (InterruptedException e) {
783                     // Ignore.
784                     continue;
785                 }
786             }
787 
788             if (entry != null) {
789                 return entry.getSession();
790             }
791             return null;
792         }
793 
794         private void runTasks(SessionQueue sessionTasksQueue) {
795             for (;;) {
796                 Runnable task;
797                 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
798 
799                 synchronized (tasksQueue) {
800                     task = tasksQueue.poll();
801 
802                     if (task == null) {
803                         sessionTasksQueue.processingCompleted = true;
804                         break;
805                     }
806                 }
807 
808                 eventQueueHandler.polled(PriorityThreadPoolExecutor.this, (IoEvent) task);
809 
810                 runTask(task);
811             }
812         }
813 
814         private void runTask(Runnable task) {
815             beforeExecute(thread, task);
816             boolean ran = false;
817             try {
818                 task.run();
819                 ran = true;
820                 afterExecute(task, null);
821                 completedTaskCount.incrementAndGet();
822             } catch (RuntimeException e) {
823                 if (!ran) {
824                     afterExecute(task, e);
825                 }
826                 throw e;
827             }
828         }
829     }
830 
831     /**
832      * A class used to store the ordered list of events to be processed by the
833      * session, and the current task state.
834      */
835     private class SessionQueue {
836         /** A queue of ordered event waiting to be processed */
837         private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<>();
838 
839         /** The current task state */
840         private boolean processingCompleted = true;
841     }
842 
843     /**
844      * A class used to preserve first-in-first-out order of sessions that have equal
845      * priority.
846      */
847     static class SessionEntry implements Comparable<SessionEntry> {
848         private final long seqNum;
849         private final IoSession session;
850         private final Comparator<IoSession> comparator;
851 
852         public SessionEntry(IoSession session, Comparator<IoSession> comparator) {
853             if (session == null) {
854                 throw new IllegalArgumentException("session");
855             }
856             seqNum = seq.getAndIncrement();
857             this.session = session;
858             this.comparator = comparator;
859         }
860 
861         public IoSession getSession() {
862             return session;
863         }
864 
865         public int compareTo(SessionEntry other) {
866             if (other == this) {
867                 return 0;
868             }
869 
870             if (other.session == this.session) {
871                 return 0;
872             }
873 
874             // An exit signal should always be preferred.
875             if (this == EXIT_SIGNAL) {
876                 return -1;
877             }
878             if (other == EXIT_SIGNAL) {
879                 return 1;
880             }
881 
882             int res = 0;
883 
884             // If there's a comparator, use it to prioritise events.
885             if (comparator != null) {
886                 res = comparator.compare(session, other.session);
887             }
888 
889             // FIFO tiebreaker.
890             if (res == 0) {
891                 res = (seqNum < other.seqNum ? -1 : 1);
892             }
893 
894             return res;
895         }
896     }
897 }