View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.Set;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.SynchronousQueue;
33  import java.util.concurrent.ThreadFactory;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicInteger;
37  import java.util.concurrent.atomic.AtomicLong;
38  
39  import org.apache.mina.core.session.AttributeKey;
40  import org.apache.mina.core.session.DummySession;
41  import org.apache.mina.core.session.IoEvent;
42  import org.apache.mina.core.session.IoSession;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  
46  /**
47   * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
48   * <p>
49   * If you don't need to maintain the order of events per session, please use
50   * {@link UnorderedThreadPoolExecutor}.
51  
52   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
53   * @org.apache.xbean.XBean
54   */
55  public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
56      /** A logger for this class (commented as it breaks MDCFlter tests) */
57      private static final Logger LOGGER = LoggerFactory.getLogger(OrderedThreadPoolExecutor.class);
58  
59      /** A default value for the initial pool size */
60      private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
61  
62      /** A default value for the maximum pool size */
63      private static final int DEFAULT_MAX_THREAD_POOL = 16;
64  
65      /** A default value for the KeepAlive delay */
66      private static final int DEFAULT_KEEP_ALIVE = 30;
67  
68      private static final IoSession EXIT_SIGNAL = new DummySession();
69  
70      /** A key stored into the session's attribute for the event tasks being queued */
71      private static final AttributeKeyteKey.html#AttributeKey">AttributeKey TASKS_QUEUE = new AttributeKey(OrderedThreadPoolExecutor.class, "tasksQueue");
72  
73      /** A queue used to store the available sessions */
74      private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<>();
75  
76      private final Set<Worker> workers = new HashSet<>();
77  
78      private volatile int largestPoolSize;
79  
80      private final AtomicInteger idleWorkers = new AtomicInteger();
81  
82      private long completedTaskCount;
83  
84      private volatile boolean shutdown;
85  
86      private final IoEventQueueHandler eventQueueHandler;
87  
88      /**
89       * Creates a default ThreadPool, with default values :
90       * - minimum pool size is 0
91       * - maximum pool size is 16
92       * - keepAlive set to 30 seconds
93       * - A default ThreadFactory
94       * - All events are accepted
95       */
96      public OrderedThreadPoolExecutor() {
97          this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors
98                  .defaultThreadFactory(), null);
99      }
100 
101     /**
102      * Creates a default ThreadPool, with default values :
103      * - minimum pool size is 0
104      * - keepAlive set to 30 seconds
105      * - A default ThreadFactory
106      * - All events are accepted
107      * 
108      * @param maximumPoolSize The maximum pool size
109      */
110     public OrderedThreadPoolExecutor(int maximumPoolSize) {
111         this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors
112                 .defaultThreadFactory(), null);
113     }
114 
115     /**
116      * Creates a default ThreadPool, with default values :
117      * - keepAlive set to 30 seconds
118      * - A default ThreadFactory
119      * - All events are accepted
120      *
121      * @param corePoolSize The initial pool sizePoolSize
122      * @param maximumPoolSize The maximum pool size
123      */
124     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
125         this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(),
126                 null);
127     }
128 
129     /**
130      * Creates a default ThreadPool, with default values :
131      * - A default ThreadFactory
132      * - All events are accepted
133      * 
134      * @param corePoolSize The initial pool sizePoolSize
135      * @param maximumPoolSize The maximum pool size
136      * @param keepAliveTime Default duration for a thread
137      * @param unit Time unit used for the keepAlive value
138      */
139     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
140         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null);
141     }
142 
143     /**
144      * Creates a default ThreadPool, with default values :
145      * - A default ThreadFactory
146      * 
147      * @param corePoolSize The initial pool sizePoolSize
148      * @param maximumPoolSize The maximum pool size
149      * @param keepAliveTime Default duration for a thread
150      * @param unit Time unit used for the keepAlive value
151      * @param eventQueueHandler The queue used to store events
152      */
153     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
154             IoEventQueueHandler eventQueueHandler) {
155         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler);
156     }
157 
158     /**
159      * Creates a default ThreadPool, with default values :
160      * - A default ThreadFactory
161      * 
162      * @param corePoolSize The initial pool sizePoolSize
163      * @param maximumPoolSize The maximum pool size
164      * @param keepAliveTime Default duration for a thread
165      * @param unit Time unit used for the keepAlive value
166      * @param threadFactory The factory used to create threads
167      */
168     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
169             ThreadFactory threadFactory) {
170         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
171     }
172 
173     /**
174      * Creates a new instance of a OrderedThreadPoolExecutor.
175      * 
176      * @param corePoolSize The initial pool sizePoolSize
177      * @param maximumPoolSize The maximum pool size
178      * @param keepAliveTime Default duration for a thread
179      * @param unit Time unit used for the keepAlive value
180      * @param threadFactory The factory used to create threads
181      * @param eventQueueHandler The queue used to store events
182      */
183     public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
184             ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
185         // We have to initialize the pool with default values (0 and 1) in order to
186         // handle the exception in a better way. We can't add a try {} catch() {}
187         // around the super() call.
188         super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(),
189                 threadFactory, new AbortPolicy());
190 
191         if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
192             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
193         }
194 
195         if ((maximumPoolSize <= 0) || (maximumPoolSize < corePoolSize)) {
196             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
197         }
198 
199         // Now, we can setup the pool sizes
200         super.setMaximumPoolSize(maximumPoolSize);
201         super.setCorePoolSize(corePoolSize);
202 
203         // The queueHandler might be null.
204         if (eventQueueHandler == null) {
205             this.eventQueueHandler = IoEventQueueHandler.NOOP;
206         } else {
207             this.eventQueueHandler = eventQueueHandler;
208         }
209     }
210 
211     /**
212      * Get the session's tasks queue.
213      */
214     private SessionTasksQueue getSessionTasksQueue(IoSession session) {
215         SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
216 
217         if (queue == null) {
218             queue = new SessionTasksQueue();
219             SessionTasksQueue oldQueue = (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
220 
221             if (oldQueue != null) {
222                 queue = oldQueue;
223             }
224         }
225 
226         return queue;
227     }
228 
229     /**
230      * @return The associated queue handler. 
231      */
232     public IoEventQueueHandler getQueueHandler() {
233         return eventQueueHandler;
234     }
235 
236     /**
237      * {@inheritDoc}
238      */
239     @Override
240     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
241         // Ignore the request.  It must always be AbortPolicy.
242     }
243 
244     /**
245      * Add a new thread to execute a task, if needed and possible.
246      * It depends on the current pool size. If it's full, we do nothing.
247      */
248     private void addWorker() {
249         synchronized (workers) {
250             if (workers.size() >= super.getMaximumPoolSize()) {
251                 return;
252             }
253 
254             // Create a new worker, and add it to the thread pool
255             Worker worker = new Worker();
256             Thread thread = getThreadFactory().newThread(worker);
257 
258             // As we have added a new thread, it's considered as idle.
259             idleWorkers.incrementAndGet();
260 
261             // Now, we can start it.
262             thread.start();
263             workers.add(worker);
264 
265             if (workers.size() > largestPoolSize) {
266                 largestPoolSize = workers.size();
267             }
268         }
269     }
270 
271     /**
272      * Add a new Worker only if there are no idle worker.
273      */
274     private void addWorkerIfNecessary() {
275         if (idleWorkers.get() == 0) {
276             synchronized (workers) {
277                 if (workers.isEmpty() || (idleWorkers.get() == 0)) {
278                     addWorker();
279                 }
280             }
281         }
282     }
283 
284     private void removeWorker() {
285         synchronized (workers) {
286             if (workers.size() <= super.getCorePoolSize()) {
287                 return;
288             }
289             waitingSessions.offer(EXIT_SIGNAL);
290         }
291     }
292 
293     /**
294      * {@inheritDoc}
295      */
296     @Override
297     public void setMaximumPoolSize(int maximumPoolSize) {
298         if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
299             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
300         }
301 
302         synchronized (workers) {
303             super.setMaximumPoolSize(maximumPoolSize);
304             int difference = workers.size() - maximumPoolSize;
305             while (difference > 0) {
306                 removeWorker();
307                 --difference;
308             }
309         }
310     }
311 
312     /**
313      * {@inheritDoc}
314      */
315     @Override
316     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
317 
318         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
319 
320         synchronized (workers) {
321             while (!isTerminated()) {
322                 long waitTime = deadline - System.currentTimeMillis();
323                 if (waitTime <= 0) {
324                     break;
325                 }
326 
327                 workers.wait(waitTime);
328             }
329         }
330         return isTerminated();
331     }
332 
333     /**
334      * {@inheritDoc}
335      */
336     @Override
337     public boolean isShutdown() {
338         return shutdown;
339     }
340 
341     /**
342      * {@inheritDoc}
343      */
344     @Override
345     public boolean isTerminated() {
346         if (!shutdown) {
347             return false;
348         }
349 
350         synchronized (workers) {
351             return workers.isEmpty();
352         }
353     }
354 
355     /**
356      * {@inheritDoc}
357      */
358     @Override
359     public void shutdown() {
360         if (shutdown) {
361             return;
362         }
363 
364         shutdown = true;
365 
366         synchronized (workers) {
367             for (int i = workers.size(); i > 0; i--) {
368                 waitingSessions.offer(EXIT_SIGNAL);
369             }
370         }
371     }
372 
373     /**
374      * {@inheritDoc}
375      */
376     @Override
377     public List<Runnable> shutdownNow() {
378         shutdown();
379 
380         List<Runnable> answer = new ArrayList<>();
381         IoSession session;
382 
383         while ((session = waitingSessions.poll()) != null) {
384             if (session == EXIT_SIGNAL) {
385                 waitingSessions.offer(EXIT_SIGNAL);
386                 Thread.yield(); // Let others take the signal.
387                 continue;
388             }
389 
390             SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
391 
392             synchronized (sessionTasksQueue.tasksQueue) {
393 
394                 for (Runnable task : sessionTasksQueue.tasksQueue) {
395                     getQueueHandler().polled(this, (IoEvent) task);
396                     answer.add(task);
397                 }
398 
399                 sessionTasksQueue.tasksQueue.clear();
400             }
401         }
402 
403         return answer;
404     }
405 
406     /**
407      * A Helper class used to print the list of events being queued. 
408      */
409     private void print(Queue<Runnable> queue, IoEvent event) {
410         StringBuilder sb = new StringBuilder();
411         sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId());
412         boolean first = true;
413         sb.append("\nQueue : [");
414         
415         for (Runnable elem : queue) {
416             if (first) {
417                 first = false;
418             } else {
419                 sb.append(", ");
420             }
421 
422             sb.append(((IoEvent) elem).getType()).append(", ");
423         }
424         
425         sb.append("]\n");
426         
427         if (LOGGER.isDebugEnabled()) {
428             LOGGER.debug(sb.toString());
429         }
430     }
431 
432     /**
433      * {@inheritDoc}
434      */
435     @Override
436     public void execute(Runnable task) {
437         if (shutdown) {
438             rejectTask(task);
439         }
440 
441         // Check that it's a IoEvent task
442         checkTaskType(task);
443 
444         IoEvent../../../../../org/apache/mina/core/session/IoEvent.html#IoEvent">IoEvent event = (IoEvent) task;
445 
446         // Get the associated session
447         IoSession session = event.getSession();
448 
449         // Get the session's queue of events
450         SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
451         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
452 
453         boolean offerSession;
454 
455         // propose the new event to the event queue handler. If we
456         // use a throttle queue handler, the message may be rejected
457         // if the maximum size has been reached.
458         boolean offerEvent = eventQueueHandler.accept(this, event);
459 
460         if (offerEvent) {
461             // Ok, the message has been accepted
462             synchronized (tasksQueue) {
463                 // Inject the event into the executor taskQueue
464                 tasksQueue.offer(event);
465 
466                 if (sessionTasksQueue.processingCompleted) {
467                     sessionTasksQueue.processingCompleted = false;
468                     offerSession = true;
469                 } else {
470                     offerSession = false;
471                 }
472 
473                 if (LOGGER.isDebugEnabled()) {
474                     print(tasksQueue, event);
475                 }
476             }
477         } else {
478             offerSession = false;
479         }
480 
481         if (offerSession) {
482             // As the tasksQueue was empty, the task has been executed
483             // immediately, so we can move the session to the queue
484             // of sessions waiting for completion.
485             waitingSessions.offer(session);
486         }
487 
488         addWorkerIfNecessary();
489 
490         if (offerEvent) {
491             eventQueueHandler.offered(this, event);
492         }
493     }
494 
495     private void rejectTask(Runnable task) {
496         getRejectedExecutionHandler().rejectedExecution(task, this);
497     }
498 
499     private void checkTaskType(Runnable task) {
500         if (!(task instanceof IoEvent)) {
501             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
502         }
503     }
504 
505     /**
506      * {@inheritDoc}
507      */
508     @Override
509     public int getActiveCount() {
510         synchronized (workers) {
511             return workers.size() - idleWorkers.get();
512         }
513     }
514 
515     /**
516      * {@inheritDoc}
517      */
518     @Override
519     public long getCompletedTaskCount() {
520         synchronized (workers) {
521             long answer = completedTaskCount;
522             for (Worker w : workers) {
523                 answer += w.completedTaskCount.get();
524             }
525 
526             return answer;
527         }
528     }
529 
530     /**
531      * {@inheritDoc}
532      */
533     @Override
534     public int getLargestPoolSize() {
535         return largestPoolSize;
536     }
537 
538     /**
539      * {@inheritDoc}
540      */
541     @Override
542     public int getPoolSize() {
543         synchronized (workers) {
544             return workers.size();
545         }
546     }
547 
548     /**
549      * {@inheritDoc}
550      */
551     @Override
552     public long getTaskCount() {
553         return getCompletedTaskCount();
554     }
555 
556     /**
557      * {@inheritDoc}
558      */
559     @Override
560     public boolean isTerminating() {
561         synchronized (workers) {
562             return isShutdown() && !isTerminated();
563         }
564     }
565 
566     /**
567      * {@inheritDoc}
568      */
569     @Override
570     public int prestartAllCoreThreads() {
571         int answer = 0;
572         synchronized (workers) {
573             for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) {
574                 addWorker();
575                 answer++;
576             }
577         }
578         return answer;
579     }
580 
581     /**
582      * {@inheritDoc}
583      */
584     @Override
585     public boolean prestartCoreThread() {
586         synchronized (workers) {
587             if (workers.size() < super.getCorePoolSize()) {
588                 addWorker();
589                 return true;
590             } else {
591                 return false;
592             }
593         }
594     }
595 
596     /**
597      * {@inheritDoc}
598      */
599     @Override
600     public BlockingQueue<Runnable> getQueue() {
601         throw new UnsupportedOperationException();
602     }
603 
604     /**
605      * {@inheritDoc}
606      */
607     @Override
608     public void purge() {
609         // Nothing to purge in this implementation.
610     }
611 
612     /**
613      * {@inheritDoc}
614      */
615     @Override
616     public boolean remove(Runnable task) {
617         checkTaskType(task);
618         IoEvent../../../../../org/apache/mina/core/session/IoEvent.html#IoEvent">IoEvent event = (IoEvent) task;
619         IoSession session = event.getSession();
620         SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
621 
622         if (sessionTasksQueue == null) {
623             return false;
624         }
625 
626         boolean removed;
627         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
628 
629         synchronized (tasksQueue) {
630             removed = tasksQueue.remove(task);
631         }
632 
633         if (removed) {
634             getQueueHandler().polled(this, event);
635         }
636 
637         return removed;
638     }
639 
640     /**
641      * {@inheritDoc}
642      */
643     @Override
644     public void setCorePoolSize(int corePoolSize) {
645         if (corePoolSize < 0) {
646             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
647         }
648         if (corePoolSize > super.getMaximumPoolSize()) {
649             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
650         }
651 
652         synchronized (workers) {
653             if (super.getCorePoolSize() > corePoolSize) {
654                 for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) {
655                     removeWorker();
656                 }
657             }
658             super.setCorePoolSize(corePoolSize);
659         }
660     }
661 
662     private class Worker implements Runnable {
663 
664         private AtomicLong completedTaskCount = new AtomicLong(0);
665 
666         private Thread thread;
667 
668         /**
669          * @inheritedDoc
670          */
671         @Override
672         public void run() {
673             thread = Thread.currentThread();
674 
675             try {
676                 for (;;) {
677                     IoSession session = fetchSession();
678 
679                     idleWorkers.decrementAndGet();
680 
681                     if (session == null) {
682                         synchronized (workers) {
683                             if (workers.size() > getCorePoolSize()) {
684                                 // Remove now to prevent duplicate exit.
685                                 workers.remove(this);
686                                 break;
687                             }
688                         }
689                     }
690 
691                     if (session == EXIT_SIGNAL) {
692                         break;
693                     }
694 
695                     try {
696                         if (session != null) {
697                             runTasks(getSessionTasksQueue(session));
698                         }
699                     } finally {
700                         idleWorkers.incrementAndGet();
701                     }
702                 }
703             } finally {
704                 synchronized (workers) {
705                     workers.remove(this);
706                     OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get();
707                     workers.notifyAll();
708                 }
709             }
710         }
711 
712         private IoSession fetchSession() {
713             IoSession session = null;
714             long currentTime = System.currentTimeMillis();
715             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
716             
717             for (;;) {
718                 try {
719                     long waitTime = deadline - currentTime;
720                     
721                     if (waitTime <= 0) {
722                         break;
723                     }
724 
725                     try {
726                         session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
727                         break;
728                     } finally {
729                         if (session != null) {
730                             currentTime = System.currentTimeMillis();
731                         }
732                     }
733                 } catch (InterruptedException e) {
734                     // Ignore.
735                     continue;
736                 }
737             }
738             
739             return session;
740         }
741 
742         private void runTasks(SessionTasksQueue sessionTasksQueue) {
743             for (;;) {
744                 Runnable task;
745                 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
746 
747                 synchronized (tasksQueue) {
748                     task = tasksQueue.poll();
749 
750                     if (task == null) {
751                         sessionTasksQueue.processingCompleted = true;
752                         break;
753                     }
754                 }
755 
756                 eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
757 
758                 runTask(task);
759             }
760         }
761 
762         private void runTask(Runnable task) {
763             beforeExecute(thread, task);
764             boolean ran = false;
765             try {
766                 task.run();
767                 ran = true;
768                 afterExecute(task, null);
769                 completedTaskCount.incrementAndGet();
770             } catch (RuntimeException e) {
771                 if (!ran) {
772                     afterExecute(task, e);
773                 }
774                 throw e;
775             }
776         }
777     }
778 
779     /**
780      * A class used to store the ordered list of events to be processed by the
781      * session, and the current task state.
782      */
783     private class SessionTasksQueue {
784         /**  A queue of ordered event waiting to be processed */
785         private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<>();
786 
787         /** The current task state */
788         private boolean processingCompleted = true;
789     }
790 }