View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.Set;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.SynchronousQueue;
33  import java.util.concurrent.ThreadFactory;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import org.apache.mina.core.session.AttributeKey;
39  import org.apache.mina.core.session.DummySession;
40  import org.apache.mina.core.session.IoEvent;
41  import org.apache.mina.core.session.IoSession;
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  
45  /**
46   * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
47   * <p>
48   * If you don't need to maintain the order of events per session, please use
49   * {@link UnorderedThreadPoolExecutor}.
50  
51   * @author The Apache MINA Project (dev@mina.apache.org)
52   * @org.apache.xbean.XBean
53   */
54  public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
55      /** A logger for this class (commented as it breaks MDCFlter tests) */
56      static Logger LOGGER = LoggerFactory.getLogger(OrderedThreadPoolExecutor.class);
57  
58      /** A default value for the initial pool size */
59      private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
60      
61      /** A default value for the maximum pool size */
62      private static final int DEFAULT_MAX_THREAD_POOL = 16;
63      
64      /** A default value for the KeepAlive delay */
65      private static final int DEFAULT_KEEP_ALIVE = 30;
66      
67      private static final IoSession EXIT_SIGNAL = new DummySession();
68  
69      /** A key stored into the session's attribute for the event tasks being queued */ 
70      private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
71      
72      /** A queue used to store the available sessions */
73      private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
74  
75      private final Set<Worker> workers = new HashSet<Worker>();
76  
77      private volatile int largestPoolSize;
78      private final AtomicInteger idleWorkers = new AtomicInteger();
79  
80      private long completedTaskCount;
81      private volatile boolean shutdown;
82  
83      private final IoEventQueueHandler eventQueueHandler;
84  
85      /**
86       * Creates a default ThreadPool, with default values :
87       * - minimum pool size is 0
88       * - maximum pool size is 16
89       * - keepAlive set to 30 seconds
90       * - A default ThreadFactory
91       * - All events are accepted
92       */
93      public OrderedThreadPoolExecutor() {
94          this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, 
95              DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
96      }
97  
98      /**
99       * Creates a default ThreadPool, with default values :
100      * - minimum pool size is 0
101      * - keepAlive set to 30 seconds
102      * - A default ThreadFactory
103      * - All events are accepted
104      * 
105      * @param maximumPoolSize The maximum pool size
106      */
107     public OrderedThreadPoolExecutor(int maximumPoolSize) {
108         this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, 
109             Executors.defaultThreadFactory(), null);
110     }
111 
112     /**
113      * Creates a default ThreadPool, with default values :
114      * - keepAlive set to 30 seconds
115      * - A default ThreadFactory
116      * - All events are accepted
117      *
118      * @param corePoolSize The initial pool sizePoolSize
119      * @param maximumPoolSize The maximum pool size
120      */
121     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
122         this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, 
123             Executors.defaultThreadFactory(), null);
124     }
125 
126     /**
127      * Creates a default ThreadPool, with default values :
128      * - A default ThreadFactory
129      * - All events are accepted
130      * 
131      * @param corePoolSize The initial pool sizePoolSize
132      * @param maximumPoolSize The maximum pool size
133      * @param keepAliveTime Default duration for a thread
134      * @param unit Time unit used for the keepAlive value
135      */
136     public OrderedThreadPoolExecutor(
137             int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
138         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
139             Executors.defaultThreadFactory(), null);
140     }
141 
142     /**
143      * Creates a default ThreadPool, with default values :
144      * - A default ThreadFactory
145      * 
146      * @param corePoolSize The initial pool sizePoolSize
147      * @param maximumPoolSize The maximum pool size
148      * @param keepAliveTime Default duration for a thread
149      * @param unit Time unit used for the keepAlive value
150      * @param eventQueueHandler The queue used to store events
151      */
152     public OrderedThreadPoolExecutor(
153             int corePoolSize, int maximumPoolSize,
154             long keepAliveTime, TimeUnit unit,
155             IoEventQueueHandler eventQueueHandler) {
156         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
157             Executors.defaultThreadFactory(), eventQueueHandler);
158     }
159 
160     /**
161      * Creates a default ThreadPool, with default values :
162      * - A default ThreadFactory
163      * 
164      * @param corePoolSize The initial pool sizePoolSize
165      * @param maximumPoolSize The maximum pool size
166      * @param keepAliveTime Default duration for a thread
167      * @param unit Time unit used for the keepAlive value
168      * @param threadFactory The factory used to create threads
169      */
170     public OrderedThreadPoolExecutor(
171             int corePoolSize, int maximumPoolSize,
172             long keepAliveTime, TimeUnit unit,
173             ThreadFactory threadFactory) {
174         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
175     }
176 
177     /**
178      * Creates a new instance of a OrderedThreadPoolExecutor.
179      * 
180      * @param corePoolSize The initial pool sizePoolSize
181      * @param maximumPoolSize The maximum pool size
182      * @param keepAliveTime Default duration for a thread
183      * @param unit Time unit used for the keepAlive value
184      * @param threadFactory The factory used to create threads
185      * @param eventQueueHandler The queue used to store events
186      */
187     public OrderedThreadPoolExecutor(
188             int corePoolSize, int maximumPoolSize,
189             long keepAliveTime, TimeUnit unit,
190             ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
191         // We have to initialize the pool with default values (0 and 1) in order to
192         // handle the exception in a better way. We can't add a try {} catch() {}
193         // around the super() call.
194         super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, 
195             new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
196 
197         if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
198             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
199         }
200 
201         if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
202             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
203         }
204 
205         // Now, we can setup the pool sizes
206         super.setCorePoolSize( corePoolSize );
207         super.setMaximumPoolSize( maximumPoolSize );
208         
209         // The queueHandler might be null.
210         if (eventQueueHandler == null) {
211             this.eventQueueHandler = IoEventQueueHandler.NOOP;
212         } else {
213             this.eventQueueHandler = eventQueueHandler;
214         }
215     }
216     
217 
218     /**
219      * Get the session's tasks queue.
220      */
221     private SessionTasksQueue getSessionTasksQueue(IoSession session) {
222         SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
223 
224         if (queue == null) {
225             queue = new SessionTasksQueue();
226             SessionTasksQueue oldQueue = 
227                 (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
228             
229             if (oldQueue != null) {
230                 queue = oldQueue;
231             }
232         }
233         
234         return queue;
235     }
236 
237     
238     /**
239      * @return The associated queue handler. 
240      */
241     public IoEventQueueHandler getQueueHandler() {
242         return eventQueueHandler;
243     }
244 
245     /**
246      * {@inheritDoc}
247      */
248     @Override
249     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
250         // Ignore the request.  It must always be AbortPolicy.
251     }
252 
253     /**
254      * Add a new thread to execute a task, if needed and possible.
255      * It depends on the current pool size. If it's full, we do nothing.
256      */
257     private void addWorker() {
258         synchronized (workers) {
259             if (workers.size() >= super.getMaximumPoolSize()) {
260                 return;
261             }
262 
263             // Create a new worker, and add it to the thread pool
264             Worker worker = new Worker( workers.size());
265             Thread thread = getThreadFactory().newThread(worker);
266             
267             // As we have added a new thread, it's considered as idle.
268             idleWorkers.incrementAndGet();
269             
270             // Now, we can start it.
271             thread.start();
272             workers.add(worker);
273 
274             if (workers.size() > largestPoolSize) {
275                 largestPoolSize = workers.size();
276             }
277         }
278     }
279 
280     /**
281      * Add a new Worker only if there are no idle worker.
282      */
283     private void addWorkerIfNecessary() {
284         if (idleWorkers.get() == 0) {
285             synchronized (workers) {
286                 if (workers.isEmpty() || (idleWorkers.get() == 0)) {
287                     addWorker();
288                 }
289             }
290         }
291     }
292 
293     private void removeWorker() {
294         synchronized (workers) {
295             if (workers.size() <= super.getCorePoolSize()) {
296                 return;
297             }
298             waitingSessions.offer(EXIT_SIGNAL);
299         }
300     }
301 
302     /**
303      * {@inheritDoc}
304      */
305     @Override
306     public int getMaximumPoolSize() {
307         return super.getMaximumPoolSize();
308     }
309 
310     /**
311      * {@inheritDoc}
312      */
313     @Override
314     public void setMaximumPoolSize(int maximumPoolSize) {
315         if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
316             throw new IllegalArgumentException("maximumPoolSize: "
317                     + maximumPoolSize);
318         }
319 
320         synchronized (workers) {
321             super.setMaximumPoolSize( maximumPoolSize );
322             int difference = workers.size() - maximumPoolSize;
323             while (difference > 0) {
324                 removeWorker();
325                 --difference;
326             }
327         }
328     }
329 
330     /**
331      * {@inheritDoc}
332      */
333     @Override
334     public boolean awaitTermination(long timeout, TimeUnit unit)
335             throws InterruptedException {
336 
337         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
338 
339         synchronized (workers) {
340             while (!isTerminated()) {
341                 long waitTime = deadline - System.currentTimeMillis();
342                 if (waitTime <= 0) {
343                     break;
344                 }
345 
346                 workers.wait(waitTime);
347             }
348         }
349         return isTerminated();
350     }
351 
352     /**
353      * {@inheritDoc}
354      */
355     @Override
356     public boolean isShutdown() {
357         return shutdown;
358     }
359 
360     /**
361      * {@inheritDoc}
362      */
363     @Override
364     public boolean isTerminated() {
365         if (!shutdown) {
366             return false;
367         }
368 
369         synchronized (workers) {
370             return workers.isEmpty();
371         }
372     }
373 
374     /**
375      * {@inheritDoc}
376      */
377     @Override
378     public void shutdown() {
379         if (shutdown) {
380             return;
381         }
382 
383         shutdown = true;
384 
385         synchronized (workers) {
386             for (int i = workers.size(); i > 0; i --) {
387                 waitingSessions.offer(EXIT_SIGNAL);
388             }
389         }
390     }
391 
392     /**
393      * {@inheritDoc}
394      */
395     @Override
396     public List<Runnable> shutdownNow() {
397         shutdown();
398 
399         List<Runnable> answer = new ArrayList<Runnable>();
400         IoSession session;
401         
402         while ((session = waitingSessions.poll()) != null) {
403             if (session == EXIT_SIGNAL) {
404                 waitingSessions.offer(EXIT_SIGNAL);
405                 Thread.yield(); // Let others take the signal.
406                 continue;
407             }
408 
409             SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
410             
411             synchronized (sessionTasksQueue.tasksQueue) {
412                 
413                 for (Runnable task: sessionTasksQueue.tasksQueue) {
414                     getQueueHandler().polled(this, (IoEvent) task);
415                     answer.add(task);
416                 }
417                 
418                 sessionTasksQueue.tasksQueue.clear();
419             }
420         }
421 
422         return answer;
423     }
424     
425     
426     /**
427      * A Helper class used to print the list of events being queued. 
428      */
429     private void print( Queue<Runnable> queue, IoEvent event) {
430         StringBuilder sb = new StringBuilder();
431         sb.append( "Adding event " ).append( event.getType() ).append( " to session " ).append(event.getSession().getId() );
432         boolean first = true;
433         sb.append( "\nQueue : [" );
434         for (Runnable elem:queue) {
435             if ( first ) {
436                 first = false;
437             } else {
438                 sb.append( ", " );
439             }
440                 
441             sb.append(((IoEvent)elem).getType()).append(", ");
442         }
443         sb.append( "]\n" );
444         LOGGER.debug( sb.toString() );
445     }
446 
447     /**
448      * {@inheritDoc}
449      */
450     @Override
451     public void execute(Runnable task) {
452         if (shutdown) {
453             rejectTask(task);
454         }
455 
456         // Check that it's a IoEvent task
457         checkTaskType(task);
458 
459         IoEvent event = (IoEvent) task;
460         
461         // Get the associated session
462         IoSession session = event.getSession();
463         
464         // Get the session's queue of events
465         SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
466         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
467         
468         boolean offerSession;
469 
470         // propose the new event to the event queue handler. If we
471         // use a throttle queue handler, the message may be rejected
472         // if the maximum size has been reached.
473         boolean offerEvent = eventQueueHandler.accept(this, event);
474         
475         if (offerEvent) {
476             // Ok, the message has been accepted
477             synchronized (tasksQueue) {
478                 // Inject the event into the executor taskQueue
479                 tasksQueue.offer(event);
480                 
481                 if (sessionTasksQueue.processingCompleted) {
482                     sessionTasksQueue.processingCompleted = false;
483                     offerSession = true;
484                 } else {
485                     offerSession = false;
486                 }
487 
488                 if (LOGGER.isDebugEnabled()) {
489                     print(tasksQueue, event);
490                 }
491             }
492         } else {
493             offerSession = false;
494         }
495 
496         if (offerSession) {
497             // As the tasksQueue was empty, the task has been executed
498             // immediately, so we can move the session to the queue
499             // of sessions waiting for completion.
500             waitingSessions.offer(session);
501         }
502 
503         addWorkerIfNecessary();
504 
505         if (offerEvent) {
506             eventQueueHandler.offered(this, event);
507         }
508     }
509 
510     private void rejectTask(Runnable task) {
511         getRejectedExecutionHandler().rejectedExecution(task, this);
512     }
513 
514     private void checkTaskType(Runnable task) {
515         if (!(task instanceof IoEvent)) {
516             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
517         }
518     }
519 
520     /**
521      * {@inheritDoc}
522      */
523     @Override
524     public int getActiveCount() {
525         synchronized (workers) {
526             return workers.size() - idleWorkers.get();
527         }
528     }
529 
530     /**
531      * {@inheritDoc}
532      */
533     @Override
534     public long getCompletedTaskCount() {
535         synchronized (workers) {
536             long answer = completedTaskCount;
537             for (Worker w: workers) {
538                 answer += w.completedTaskCount;
539             }
540 
541             return answer;
542         }
543     }
544 
545     /**
546      * {@inheritDoc}
547      */
548     @Override
549     public int getLargestPoolSize() {
550         return largestPoolSize;
551     }
552 
553     /**
554      * {@inheritDoc}
555      */
556     @Override
557     public int getPoolSize() {
558         synchronized (workers) {
559             return workers.size();
560         }
561     }
562 
563     /**
564      * {@inheritDoc}
565      */
566     @Override
567     public long getTaskCount() {
568         return getCompletedTaskCount();
569     }
570 
571     /**
572      * {@inheritDoc}
573      */
574     @Override
575     public boolean isTerminating() {
576         synchronized (workers) {
577             return isShutdown() && !isTerminated();
578         }
579     }
580 
581     /**
582      * {@inheritDoc}
583      */
584     @Override
585     public int prestartAllCoreThreads() {
586         int answer = 0;
587         synchronized (workers) {
588             for (int i = super.getCorePoolSize() - workers.size() ; i > 0; i --) {
589                 addWorker();
590                 answer ++;
591             }
592         }
593         return answer;
594     }
595 
596     /**
597      * {@inheritDoc}
598      */
599     @Override
600     public boolean prestartCoreThread() {
601         synchronized (workers) {
602             if (workers.size() < super.getCorePoolSize()) {
603                 addWorker();
604                 return true;
605             } else {
606                 return false;
607             }
608         }
609     }
610 
611     /**
612      * {@inheritDoc}
613      */
614     @Override
615     public BlockingQueue<Runnable> getQueue() {
616         throw new UnsupportedOperationException();
617     }
618 
619     /**
620      * {@inheritDoc}
621      */
622     @Override
623     public void purge() {
624         // Nothing to purge in this implementation.
625     }
626 
627     /**
628      * {@inheritDoc}
629      */
630     @Override
631     public boolean remove(Runnable task) {
632         checkTaskType(task);
633         IoEvent event = (IoEvent) task;
634         IoSession session = event.getSession();
635         SessionTasksQueue sessionTasksQueue = (SessionTasksQueue)session.getAttribute( TASKS_QUEUE );
636         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
637         
638         if (sessionTasksQueue == null) {
639             return false;
640         }
641 
642         boolean removed;
643         
644         synchronized (tasksQueue) {
645             removed = tasksQueue.remove(task);
646         }
647 
648         if (removed) {
649             getQueueHandler().polled(this, event);
650         }
651 
652         return removed;
653     }
654 
655     /**
656      * {@inheritDoc}
657      */
658     @Override
659     public int getCorePoolSize() {
660         return super.getCorePoolSize();
661     }
662 
663     /**
664      * {@inheritDoc}
665      */
666     @Override
667     public void setCorePoolSize(int corePoolSize) {
668         if (corePoolSize < 0) {
669             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
670         }
671         if (corePoolSize > super.getMaximumPoolSize()) {
672             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
673         }
674 
675         synchronized (workers) {
676             if (super.getCorePoolSize()> corePoolSize) {
677                 for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i --) {
678                     removeWorker();
679                 }
680             }
681             super.setCorePoolSize(corePoolSize);
682         }
683     }
684 
685     private Queue<Runnable> getTasksQueue(IoSession session) {
686         Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE);
687         
688         if (tasksQueue == null) {
689             tasksQueue = new ConcurrentLinkedQueue<Runnable>();
690             Queue<Runnable> oldTasksQueue = (Queue<Runnable>) session.setAttributeIfAbsent(TASKS_QUEUE, tasksQueue);
691         
692             if (oldTasksQueue != null) {
693                 tasksQueue = oldTasksQueue;
694             }
695         }
696         
697         return tasksQueue;
698     }
699 
700     private class Worker implements Runnable {
701 
702         private volatile long completedTaskCount;
703         private Thread thread;
704         private int id;
705         
706         public Worker( int id ) {
707             this.id = id;
708         }
709 
710         public void run() {
711             thread = Thread.currentThread();
712 
713             try {
714                 for (;;) {
715                     IoSession session = fetchSession();
716 
717                     idleWorkers.decrementAndGet();
718 
719                     if (session == null) {
720                         synchronized (workers) {
721                             if (workers.size() > getCorePoolSize()) {
722                                 // Remove now to prevent duplicate exit.
723                                 workers.remove(this);
724                                 break;
725                             }
726                         }
727                     }
728 
729                     if (session == EXIT_SIGNAL) {
730                         break;
731                     }
732 
733                     try {
734                         if (session != null) {
735                             runTasks(getSessionTasksQueue(session));
736                         }
737                     } finally {
738                         idleWorkers.incrementAndGet();
739                     }
740                 }
741             } finally {
742                 synchronized (workers) {
743                     workers.remove(this);
744                     OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
745                     workers.notifyAll();
746                 }
747             }
748         }
749 
750         private IoSession fetchSession() {
751             IoSession session = null;
752             long currentTime = System.currentTimeMillis();
753             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
754             for (;;) {
755                 try {
756                     long waitTime = deadline - currentTime;
757                     if (waitTime <= 0) {
758                         break;
759                     }
760 
761                     try {
762                         session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
763                         break;
764                     } finally {
765                         if (session == null) {
766                             currentTime = System.currentTimeMillis();
767                         }
768                     }
769                 } catch (InterruptedException e) {
770                     // Ignore.
771                     continue;
772                 }
773             }
774             return session;
775         }
776 
777         private void runTasks(SessionTasksQueue sessionTasksQueue) {
778             for (;;) {
779                 Runnable task;
780                 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
781                 
782                 synchronized (tasksQueue) {
783                     task = tasksQueue.poll();
784                     
785                     if (task == null) {
786                         sessionTasksQueue.processingCompleted = true;
787                         break;
788                     }
789                 }
790 
791                 eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
792 
793                 runTask(task);
794             }
795         }
796 
797         private void runTask(Runnable task) {
798             beforeExecute(thread, task);
799             boolean ran = false;
800             try {
801                 task.run();
802                 ran = true;
803                 afterExecute(task, null);
804                 completedTaskCount ++;
805             } catch (RuntimeException e) {
806                 if (!ran) {
807                     afterExecute(task, e);
808                 }
809                 throw e;
810             }
811         }
812     }
813     
814     
815     /**
816      * A class used to store the ordered list of events to be processed by the
817      * session, and the current task state.
818      */
819     private class SessionTasksQueue {
820         /**  A queue of ordered event waiting to be processed */ 
821         private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
822         
823         /** The current task state */
824         private boolean processingCompleted = true;
825     }
826 }