001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.executor;
021
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Queue;
026import java.util.Set;
027import java.util.concurrent.BlockingQueue;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.Executors;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.RejectedExecutionHandler;
032import java.util.concurrent.SynchronousQueue;
033import java.util.concurrent.ThreadFactory;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.concurrent.atomic.AtomicLong;
038
039import org.apache.mina.core.session.AttributeKey;
040import org.apache.mina.core.session.DummySession;
041import org.apache.mina.core.session.IoEvent;
042import org.apache.mina.core.session.IoSession;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
048 * <p>
049 * If you don't need to maintain the order of events per session, please use
050 * {@link UnorderedThreadPoolExecutor}.
051
052 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
053 * @org.apache.xbean.XBean
054 */
055public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
056    /** A logger for this class (commented as it breaks MDCFlter tests) */
057    private static final Logger LOGGER = LoggerFactory.getLogger(OrderedThreadPoolExecutor.class);
058
059    /** A default value for the initial pool size */
060    private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
061
062    /** A default value for the maximum pool size */
063    private static final int DEFAULT_MAX_THREAD_POOL = 16;
064
065    /** A default value for the KeepAlive delay */
066    private static final int DEFAULT_KEEP_ALIVE = 30;
067
068    private static final IoSession EXIT_SIGNAL = new DummySession();
069
070    /** A key stored into the session's attribute for the event tasks being queued */
071    private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
072
073    /** A queue used to store the available sessions */
074    private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
075
076    private final Set<Worker> workers = new HashSet<Worker>();
077
078    private volatile int largestPoolSize;
079
080    private final AtomicInteger idleWorkers = new AtomicInteger();
081
082    private long completedTaskCount;
083
084    private volatile boolean shutdown;
085
086    private final IoEventQueueHandler eventQueueHandler;
087
088    /**
089     * Creates a default ThreadPool, with default values :
090     * - minimum pool size is 0
091     * - maximum pool size is 16
092     * - keepAlive set to 30 seconds
093     * - A default ThreadFactory
094     * - All events are accepted
095     */
096    public OrderedThreadPoolExecutor() {
097        this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors
098                .defaultThreadFactory(), null);
099    }
100
101    /**
102     * Creates a default ThreadPool, with default values :
103     * - minimum pool size is 0
104     * - keepAlive set to 30 seconds
105     * - A default ThreadFactory
106     * - All events are accepted
107     * 
108     * @param maximumPoolSize The maximum pool size
109     */
110    public OrderedThreadPoolExecutor(int maximumPoolSize) {
111        this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors
112                .defaultThreadFactory(), null);
113    }
114
115    /**
116     * Creates a default ThreadPool, with default values :
117     * - keepAlive set to 30 seconds
118     * - A default ThreadFactory
119     * - All events are accepted
120     *
121     * @param corePoolSize The initial pool sizePoolSize
122     * @param maximumPoolSize The maximum pool size
123     */
124    public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
125        this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(),
126                null);
127    }
128
129    /**
130     * Creates a default ThreadPool, with default values :
131     * - A default ThreadFactory
132     * - All events are accepted
133     * 
134     * @param corePoolSize The initial pool sizePoolSize
135     * @param maximumPoolSize The maximum pool size
136     * @param keepAliveTime Default duration for a thread
137     * @param unit Time unit used for the keepAlive value
138     */
139    public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
140        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null);
141    }
142
143    /**
144     * Creates a default ThreadPool, with default values :
145     * - A default ThreadFactory
146     * 
147     * @param corePoolSize The initial pool sizePoolSize
148     * @param maximumPoolSize The maximum pool size
149     * @param keepAliveTime Default duration for a thread
150     * @param unit Time unit used for the keepAlive value
151     * @param eventQueueHandler The queue used to store events
152     */
153    public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
154            IoEventQueueHandler eventQueueHandler) {
155        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler);
156    }
157
158    /**
159     * Creates a default ThreadPool, with default values :
160     * - A default ThreadFactory
161     * 
162     * @param corePoolSize The initial pool sizePoolSize
163     * @param maximumPoolSize The maximum pool size
164     * @param keepAliveTime Default duration for a thread
165     * @param unit Time unit used for the keepAlive value
166     * @param threadFactory The factory used to create threads
167     */
168    public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
169            ThreadFactory threadFactory) {
170        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
171    }
172
173    /**
174     * Creates a new instance of a OrderedThreadPoolExecutor.
175     * 
176     * @param corePoolSize The initial pool sizePoolSize
177     * @param maximumPoolSize The maximum pool size
178     * @param keepAliveTime Default duration for a thread
179     * @param unit Time unit used for the keepAlive value
180     * @param threadFactory The factory used to create threads
181     * @param eventQueueHandler The queue used to store events
182     */
183    public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
184            ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
185        // We have to initialize the pool with default values (0 and 1) in order to
186        // handle the exception in a better way. We can't add a try {} catch() {}
187        // around the super() call.
188        super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(),
189                threadFactory, new AbortPolicy());
190
191        if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
192            throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
193        }
194
195        if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
196            throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
197        }
198
199        // Now, we can setup the pool sizes
200        super.setCorePoolSize(corePoolSize);
201        super.setMaximumPoolSize(maximumPoolSize);
202
203        // The queueHandler might be null.
204        if (eventQueueHandler == null) {
205            this.eventQueueHandler = IoEventQueueHandler.NOOP;
206        } else {
207            this.eventQueueHandler = eventQueueHandler;
208        }
209    }
210
211    /**
212     * Get the session's tasks queue.
213     */
214    private SessionTasksQueue getSessionTasksQueue(IoSession session) {
215        SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
216
217        if (queue == null) {
218            queue = new SessionTasksQueue();
219            SessionTasksQueue oldQueue = (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
220
221            if (oldQueue != null) {
222                queue = oldQueue;
223            }
224        }
225
226        return queue;
227    }
228
229    /**
230     * @return The associated queue handler. 
231     */
232    public IoEventQueueHandler getQueueHandler() {
233        return eventQueueHandler;
234    }
235
236    /**
237     * {@inheritDoc}
238     */
239    @Override
240    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
241        // Ignore the request.  It must always be AbortPolicy.
242    }
243
244    /**
245     * Add a new thread to execute a task, if needed and possible.
246     * It depends on the current pool size. If it's full, we do nothing.
247     */
248    private void addWorker() {
249        synchronized (workers) {
250            if (workers.size() >= super.getMaximumPoolSize()) {
251                return;
252            }
253
254            // Create a new worker, and add it to the thread pool
255            Worker worker = new Worker();
256            Thread thread = getThreadFactory().newThread(worker);
257
258            // As we have added a new thread, it's considered as idle.
259            idleWorkers.incrementAndGet();
260
261            // Now, we can start it.
262            thread.start();
263            workers.add(worker);
264
265            if (workers.size() > largestPoolSize) {
266                largestPoolSize = workers.size();
267            }
268        }
269    }
270
271    /**
272     * Add a new Worker only if there are no idle worker.
273     */
274    private void addWorkerIfNecessary() {
275        if (idleWorkers.get() == 0) {
276            synchronized (workers) {
277                if (workers.isEmpty() || (idleWorkers.get() == 0)) {
278                    addWorker();
279                }
280            }
281        }
282    }
283
284    private void removeWorker() {
285        synchronized (workers) {
286            if (workers.size() <= super.getCorePoolSize()) {
287                return;
288            }
289            waitingSessions.offer(EXIT_SIGNAL);
290        }
291    }
292
293    /**
294     * {@inheritDoc}
295     */
296    @Override
297    public int getMaximumPoolSize() {
298        return super.getMaximumPoolSize();
299    }
300
301    /**
302     * {@inheritDoc}
303     */
304    @Override
305    public void setMaximumPoolSize(int maximumPoolSize) {
306        if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
307            throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
308        }
309
310        synchronized (workers) {
311            super.setMaximumPoolSize(maximumPoolSize);
312            int difference = workers.size() - maximumPoolSize;
313            while (difference > 0) {
314                removeWorker();
315                --difference;
316            }
317        }
318    }
319
320    /**
321     * {@inheritDoc}
322     */
323    @Override
324    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
325
326        long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
327
328        synchronized (workers) {
329            while (!isTerminated()) {
330                long waitTime = deadline - System.currentTimeMillis();
331                if (waitTime <= 0) {
332                    break;
333                }
334
335                workers.wait(waitTime);
336            }
337        }
338        return isTerminated();
339    }
340
341    /**
342     * {@inheritDoc}
343     */
344    @Override
345    public boolean isShutdown() {
346        return shutdown;
347    }
348
349    /**
350     * {@inheritDoc}
351     */
352    @Override
353    public boolean isTerminated() {
354        if (!shutdown) {
355            return false;
356        }
357
358        synchronized (workers) {
359            return workers.isEmpty();
360        }
361    }
362
363    /**
364     * {@inheritDoc}
365     */
366    @Override
367    public void shutdown() {
368        if (shutdown) {
369            return;
370        }
371
372        shutdown = true;
373
374        synchronized (workers) {
375            for (int i = workers.size(); i > 0; i--) {
376                waitingSessions.offer(EXIT_SIGNAL);
377            }
378        }
379    }
380
381    /**
382     * {@inheritDoc}
383     */
384    @Override
385    public List<Runnable> shutdownNow() {
386        shutdown();
387
388        List<Runnable> answer = new ArrayList<Runnable>();
389        IoSession session;
390
391        while ((session = waitingSessions.poll()) != null) {
392            if (session == EXIT_SIGNAL) {
393                waitingSessions.offer(EXIT_SIGNAL);
394                Thread.yield(); // Let others take the signal.
395                continue;
396            }
397
398            SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
399
400            synchronized (sessionTasksQueue.tasksQueue) {
401
402                for (Runnable task : sessionTasksQueue.tasksQueue) {
403                    getQueueHandler().polled(this, (IoEvent) task);
404                    answer.add(task);
405                }
406
407                sessionTasksQueue.tasksQueue.clear();
408            }
409        }
410
411        return answer;
412    }
413
414    /**
415     * A Helper class used to print the list of events being queued. 
416     */
417    private void print(Queue<Runnable> queue, IoEvent event) {
418        StringBuilder sb = new StringBuilder();
419        sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId());
420        boolean first = true;
421        sb.append("\nQueue : [");
422        for (Runnable elem : queue) {
423            if (first) {
424                first = false;
425            } else {
426                sb.append(", ");
427            }
428
429            sb.append(((IoEvent) elem).getType()).append(", ");
430        }
431        sb.append("]\n");
432        LOGGER.debug(sb.toString());
433    }
434
435    /**
436     * {@inheritDoc}
437     */
438    @Override
439    public void execute(Runnable task) {
440        if (shutdown) {
441            rejectTask(task);
442        }
443
444        // Check that it's a IoEvent task
445        checkTaskType(task);
446
447        IoEvent event = (IoEvent) task;
448
449        // Get the associated session
450        IoSession session = event.getSession();
451
452        // Get the session's queue of events
453        SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
454        Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
455
456        boolean offerSession;
457
458        // propose the new event to the event queue handler. If we
459        // use a throttle queue handler, the message may be rejected
460        // if the maximum size has been reached.
461        boolean offerEvent = eventQueueHandler.accept(this, event);
462
463        if (offerEvent) {
464            // Ok, the message has been accepted
465            synchronized (tasksQueue) {
466                // Inject the event into the executor taskQueue
467                tasksQueue.offer(event);
468
469                if (sessionTasksQueue.processingCompleted) {
470                    sessionTasksQueue.processingCompleted = false;
471                    offerSession = true;
472                } else {
473                    offerSession = false;
474                }
475
476                if (LOGGER.isDebugEnabled()) {
477                    print(tasksQueue, event);
478                }
479            }
480        } else {
481            offerSession = false;
482        }
483
484        if (offerSession) {
485            // As the tasksQueue was empty, the task has been executed
486            // immediately, so we can move the session to the queue
487            // of sessions waiting for completion.
488            waitingSessions.offer(session);
489        }
490
491        addWorkerIfNecessary();
492
493        if (offerEvent) {
494            eventQueueHandler.offered(this, event);
495        }
496    }
497
498    private void rejectTask(Runnable task) {
499        getRejectedExecutionHandler().rejectedExecution(task, this);
500    }
501
502    private void checkTaskType(Runnable task) {
503        if (!(task instanceof IoEvent)) {
504            throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
505        }
506    }
507
508    /**
509     * {@inheritDoc}
510     */
511    @Override
512    public int getActiveCount() {
513        synchronized (workers) {
514            return workers.size() - idleWorkers.get();
515        }
516    }
517
518    /**
519     * {@inheritDoc}
520     */
521    @Override
522    public long getCompletedTaskCount() {
523        synchronized (workers) {
524            long answer = completedTaskCount;
525            for (Worker w : workers) {
526                answer += w.completedTaskCount.get();
527            }
528
529            return answer;
530        }
531    }
532
533    /**
534     * {@inheritDoc}
535     */
536    @Override
537    public int getLargestPoolSize() {
538        return largestPoolSize;
539    }
540
541    /**
542     * {@inheritDoc}
543     */
544    @Override
545    public int getPoolSize() {
546        synchronized (workers) {
547            return workers.size();
548        }
549    }
550
551    /**
552     * {@inheritDoc}
553     */
554    @Override
555    public long getTaskCount() {
556        return getCompletedTaskCount();
557    }
558
559    /**
560     * {@inheritDoc}
561     */
562    @Override
563    public boolean isTerminating() {
564        synchronized (workers) {
565            return isShutdown() && !isTerminated();
566        }
567    }
568
569    /**
570     * {@inheritDoc}
571     */
572    @Override
573    public int prestartAllCoreThreads() {
574        int answer = 0;
575        synchronized (workers) {
576            for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) {
577                addWorker();
578                answer++;
579            }
580        }
581        return answer;
582    }
583
584    /**
585     * {@inheritDoc}
586     */
587    @Override
588    public boolean prestartCoreThread() {
589        synchronized (workers) {
590            if (workers.size() < super.getCorePoolSize()) {
591                addWorker();
592                return true;
593            } else {
594                return false;
595            }
596        }
597    }
598
599    /**
600     * {@inheritDoc}
601     */
602    @Override
603    public BlockingQueue<Runnable> getQueue() {
604        throw new UnsupportedOperationException();
605    }
606
607    /**
608     * {@inheritDoc}
609     */
610    @Override
611    public void purge() {
612        // Nothing to purge in this implementation.
613    }
614
615    /**
616     * {@inheritDoc}
617     */
618    @Override
619    public boolean remove(Runnable task) {
620        checkTaskType(task);
621        IoEvent event = (IoEvent) task;
622        IoSession session = event.getSession();
623        SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
624
625        if (sessionTasksQueue == null) {
626            return false;
627        }
628
629        boolean removed;
630        Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
631
632        synchronized (tasksQueue) {
633            removed = tasksQueue.remove(task);
634        }
635
636        if (removed) {
637            getQueueHandler().polled(this, event);
638        }
639
640        return removed;
641    }
642
643    /**
644     * {@inheritDoc}
645     */
646    @Override
647    public int getCorePoolSize() {
648        return super.getCorePoolSize();
649    }
650
651    /**
652     * {@inheritDoc}
653     */
654    @Override
655    public void setCorePoolSize(int corePoolSize) {
656        if (corePoolSize < 0) {
657            throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
658        }
659        if (corePoolSize > super.getMaximumPoolSize()) {
660            throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
661        }
662
663        synchronized (workers) {
664            if (super.getCorePoolSize() > corePoolSize) {
665                for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) {
666                    removeWorker();
667                }
668            }
669            super.setCorePoolSize(corePoolSize);
670        }
671    }
672
673    private class Worker implements Runnable {
674
675        private AtomicLong completedTaskCount = new AtomicLong(0);
676
677        private Thread thread;
678
679        public void run() {
680            thread = Thread.currentThread();
681
682            try {
683                for (;;) {
684                    IoSession session = fetchSession();
685
686                    idleWorkers.decrementAndGet();
687
688                    if (session == null) {
689                        synchronized (workers) {
690                            if (workers.size() > getCorePoolSize()) {
691                                // Remove now to prevent duplicate exit.
692                                workers.remove(this);
693                                break;
694                            }
695                        }
696                    }
697
698                    if (session == EXIT_SIGNAL) {
699                        break;
700                    }
701
702                    try {
703                        if (session != null) {
704                            runTasks(getSessionTasksQueue(session));
705                        }
706                    } finally {
707                        idleWorkers.incrementAndGet();
708                    }
709                }
710            } finally {
711                synchronized (workers) {
712                    workers.remove(this);
713                    OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get();
714                    workers.notifyAll();
715                }
716            }
717        }
718
719        private IoSession fetchSession() {
720            IoSession session = null;
721            long currentTime = System.currentTimeMillis();
722            long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
723            for (;;) {
724                try {
725                    long waitTime = deadline - currentTime;
726                    if (waitTime <= 0) {
727                        break;
728                    }
729
730                    try {
731                        session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
732                        break;
733                    } finally {
734                        if (session == null) {
735                            currentTime = System.currentTimeMillis();
736                        }
737                    }
738                } catch (InterruptedException e) {
739                    // Ignore.
740                    continue;
741                }
742            }
743            return session;
744        }
745
746        private void runTasks(SessionTasksQueue sessionTasksQueue) {
747            for (;;) {
748                Runnable task;
749                Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
750
751                synchronized (tasksQueue) {
752                    task = tasksQueue.poll();
753
754                    if (task == null) {
755                        sessionTasksQueue.processingCompleted = true;
756                        break;
757                    }
758                }
759
760                eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
761
762                runTask(task);
763            }
764        }
765
766        private void runTask(Runnable task) {
767            beforeExecute(thread, task);
768            boolean ran = false;
769            try {
770                task.run();
771                ran = true;
772                afterExecute(task, null);
773                completedTaskCount.incrementAndGet();
774            } catch (RuntimeException e) {
775                if (!ran) {
776                    afterExecute(task, e);
777                }
778                throw e;
779            }
780        }
781    }
782
783    /**
784     * A class used to store the ordered list of events to be processed by the
785     * session, and the current task state.
786     */
787    private class SessionTasksQueue {
788        /**  A queue of ordered event waiting to be processed */
789        private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
790
791        /** The current task state */
792        private boolean processingCompleted = true;
793    }
794}