001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.executor;
021
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import java.util.concurrent.Executors;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.concurrent.RejectedExecutionHandler;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034
035import org.apache.mina.core.session.IoEvent;
036
037/**
038 * A {@link ThreadPoolExecutor} that does not maintain the order of {@link IoEvent}s.
039 * This means more than one event handler methods can be invoked at the same
040 * time with mixed order.  For example, let's assume that messageReceived, messageSent,
041 * and sessionClosed events are fired.
042 * <ul>
043 * <li>All event handler methods can be called simultaneously.
044 *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
045 * <li>The event order can be mixed up.
046 *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
047 *           is invoked.)</li>
048 * </ul>
049 * If you need to maintain the order of events per session, please use
050 * {@link OrderedThreadPoolExecutor}.
051 *
052 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
053 * @org.apache.xbean.XBean
054 */
055public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
056
057    private static final Runnable EXIT_SIGNAL = new Runnable() {
058        public void run() {
059            throw new Error("This method shouldn't be called. " + "Please file a bug report.");
060        }
061    };
062
063    private final Set<Worker> workers = new HashSet<Worker>();
064
065    private volatile int corePoolSize;
066
067    private volatile int maximumPoolSize;
068
069    private volatile int largestPoolSize;
070
071    private final AtomicInteger idleWorkers = new AtomicInteger();
072
073    private long completedTaskCount;
074
075    private volatile boolean shutdown;
076
077    private final IoEventQueueHandler queueHandler;
078
079    public UnorderedThreadPoolExecutor() {
080        this(16);
081    }
082
083    public UnorderedThreadPoolExecutor(int maximumPoolSize) {
084        this(0, maximumPoolSize);
085    }
086
087    public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
088        this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
089    }
090
091    public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
092        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
093    }
094
095    public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
096            IoEventQueueHandler queueHandler) {
097        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
098    }
099
100    public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
101            ThreadFactory threadFactory) {
102        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
103    }
104
105    public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
106            ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
107        super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
108        if (corePoolSize < 0) {
109            throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
110        }
111
112        if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
113            throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
114        }
115
116        if (queueHandler == null) {
117            queueHandler = IoEventQueueHandler.NOOP;
118        }
119
120        this.corePoolSize = corePoolSize;
121        this.maximumPoolSize = maximumPoolSize;
122        this.queueHandler = queueHandler;
123    }
124
125    public IoEventQueueHandler getQueueHandler() {
126        return queueHandler;
127    }
128
129    @Override
130    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
131        // Ignore the request.  It must always be AbortPolicy.
132    }
133
134    private void addWorker() {
135        synchronized (workers) {
136            if (workers.size() >= maximumPoolSize) {
137                return;
138            }
139
140            Worker worker = new Worker();
141            Thread thread = getThreadFactory().newThread(worker);
142            idleWorkers.incrementAndGet();
143            thread.start();
144            workers.add(worker);
145
146            if (workers.size() > largestPoolSize) {
147                largestPoolSize = workers.size();
148            }
149        }
150    }
151
152    private void addWorkerIfNecessary() {
153        if (idleWorkers.get() == 0) {
154            synchronized (workers) {
155                if (workers.isEmpty() || idleWorkers.get() == 0) {
156                    addWorker();
157                }
158            }
159        }
160    }
161
162    private void removeWorker() {
163        synchronized (workers) {
164            if (workers.size() <= corePoolSize) {
165                return;
166            }
167            getQueue().offer(EXIT_SIGNAL);
168        }
169    }
170
171    @Override
172    public int getMaximumPoolSize() {
173        return maximumPoolSize;
174    }
175
176    @Override
177    public void setMaximumPoolSize(int maximumPoolSize) {
178        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
179            throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
180        }
181
182        synchronized (workers) {
183            this.maximumPoolSize = maximumPoolSize;
184            int difference = workers.size() - maximumPoolSize;
185            while (difference > 0) {
186                removeWorker();
187                --difference;
188            }
189        }
190    }
191
192    @Override
193    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
194
195        long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
196
197        synchronized (workers) {
198            while (!isTerminated()) {
199                long waitTime = deadline - System.currentTimeMillis();
200                if (waitTime <= 0) {
201                    break;
202                }
203
204                workers.wait(waitTime);
205            }
206        }
207        return isTerminated();
208    }
209
210    @Override
211    public boolean isShutdown() {
212        return shutdown;
213    }
214
215    @Override
216    public boolean isTerminated() {
217        if (!shutdown) {
218            return false;
219        }
220
221        synchronized (workers) {
222            return workers.isEmpty();
223        }
224    }
225
226    @Override
227    public void shutdown() {
228        if (shutdown) {
229            return;
230        }
231
232        shutdown = true;
233
234        synchronized (workers) {
235            for (int i = workers.size(); i > 0; i--) {
236                getQueue().offer(EXIT_SIGNAL);
237            }
238        }
239    }
240
241    @Override
242    public List<Runnable> shutdownNow() {
243        shutdown();
244
245        List<Runnable> answer = new ArrayList<Runnable>();
246        Runnable task;
247        while ((task = getQueue().poll()) != null) {
248            if (task == EXIT_SIGNAL) {
249                getQueue().offer(EXIT_SIGNAL);
250                Thread.yield(); // Let others take the signal.
251                continue;
252            }
253
254            getQueueHandler().polled(this, (IoEvent) task);
255            answer.add(task);
256        }
257
258        return answer;
259    }
260
261    @Override
262    public void execute(Runnable task) {
263        if (shutdown) {
264            rejectTask(task);
265        }
266
267        checkTaskType(task);
268
269        IoEvent e = (IoEvent) task;
270        boolean offeredEvent = queueHandler.accept(this, e);
271        
272        if (offeredEvent) {
273            getQueue().offer(e);
274        }
275
276        addWorkerIfNecessary();
277
278        if (offeredEvent) {
279            queueHandler.offered(this, e);
280        }
281    }
282
283    private void rejectTask(Runnable task) {
284        getRejectedExecutionHandler().rejectedExecution(task, this);
285    }
286
287    private void checkTaskType(Runnable task) {
288        if (!(task instanceof IoEvent)) {
289            throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
290        }
291    }
292
293    @Override
294    public int getActiveCount() {
295        synchronized (workers) {
296            return workers.size() - idleWorkers.get();
297        }
298    }
299
300    @Override
301    public long getCompletedTaskCount() {
302        synchronized (workers) {
303            long answer = completedTaskCount;
304            for (Worker w : workers) {
305                answer += w.completedTaskCount.get();
306            }
307
308            return answer;
309        }
310    }
311
312    @Override
313    public int getLargestPoolSize() {
314        return largestPoolSize;
315    }
316
317    @Override
318    public int getPoolSize() {
319        synchronized (workers) {
320            return workers.size();
321        }
322    }
323
324    @Override
325    public long getTaskCount() {
326        return getCompletedTaskCount();
327    }
328
329    @Override
330    public boolean isTerminating() {
331        synchronized (workers) {
332            return isShutdown() && !isTerminated();
333        }
334    }
335
336    @Override
337    public int prestartAllCoreThreads() {
338        int answer = 0;
339        synchronized (workers) {
340            for (int i = corePoolSize - workers.size(); i > 0; i--) {
341                addWorker();
342                answer++;
343            }
344        }
345        return answer;
346    }
347
348    @Override
349    public boolean prestartCoreThread() {
350        synchronized (workers) {
351            if (workers.size() < corePoolSize) {
352                addWorker();
353                return true;
354            }
355
356            return false;
357        }
358    }
359
360    @Override
361    public void purge() {
362        // Nothing to purge in this implementation.
363    }
364
365    @Override
366    public boolean remove(Runnable task) {
367        boolean removed = super.remove(task);
368        if (removed) {
369            getQueueHandler().polled(this, (IoEvent) task);
370        }
371        return removed;
372    }
373
374    @Override
375    public int getCorePoolSize() {
376        return corePoolSize;
377    }
378
379    @Override
380    public void setCorePoolSize(int corePoolSize) {
381        if (corePoolSize < 0) {
382            throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
383        }
384        if (corePoolSize > maximumPoolSize) {
385            throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
386        }
387
388        synchronized (workers) {
389            if (this.corePoolSize > corePoolSize) {
390                for (int i = this.corePoolSize - corePoolSize; i > 0; i--) {
391                    removeWorker();
392                }
393            }
394            this.corePoolSize = corePoolSize;
395        }
396    }
397
398    private class Worker implements Runnable {
399
400        private AtomicLong completedTaskCount = new AtomicLong(0);
401
402        private Thread thread;
403
404        public void run() {
405            thread = Thread.currentThread();
406
407            try {
408                for (;;) {
409                    Runnable task = fetchTask();
410
411                    idleWorkers.decrementAndGet();
412
413                    if (task == null) {
414                        synchronized (workers) {
415                            if (workers.size() > corePoolSize) {
416                                // Remove now to prevent duplicate exit.
417                                workers.remove(this);
418                                break;
419                            }
420                        }
421                    }
422
423                    if (task == EXIT_SIGNAL) {
424                        break;
425                    }
426
427                    try {
428                        if (task != null) {
429                            queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
430                            runTask(task);
431                        }
432                    } finally {
433                        idleWorkers.incrementAndGet();
434                    }
435                }
436            } finally {
437                synchronized (workers) {
438                    workers.remove(this);
439                    UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get();
440                    workers.notifyAll();
441                }
442            }
443        }
444
445        private Runnable fetchTask() {
446            Runnable task = null;
447            long currentTime = System.currentTimeMillis();
448            long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
449            for (;;) {
450                try {
451                    long waitTime = deadline - currentTime;
452                    if (waitTime <= 0) {
453                        break;
454                    }
455
456                    try {
457                        task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS);
458                        break;
459                    } finally {
460                        if (task == null) {
461                            currentTime = System.currentTimeMillis();
462                        }
463                    }
464                } catch (InterruptedException e) {
465                    // Ignore.
466                    continue;
467                }
468            }
469            return task;
470        }
471
472        private void runTask(Runnable task) {
473            beforeExecute(thread, task);
474            boolean ran = false;
475            try {
476                task.run();
477                ran = true;
478                afterExecute(task, null);
479                completedTaskCount.incrementAndGet();
480            } catch (RuntimeException e) {
481                if (!ran) {
482                    afterExecute(task, e);
483                }
484                throw e;
485            }
486        }
487    }
488}