View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Set;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.RejectedExecutionHandler;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.ThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.mina.core.session.IoEvent;
36  
37  /**
38   * A {@link ThreadPoolExecutor} that does not maintain the order of {@link IoEvent}s.
39   * This means more than one event handler methods can be invoked at the same
40   * time with mixed order.  For example, let's assume that messageReceived, messageSent,
41   * and sessionClosed events are fired.
42   * <ul>
43   * <li>All event handler methods can be called simultaneously.
44   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
45   * <li>The event order can be mixed up.
46   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
47   *           is invoked.)</li>
48   * </ul>
49   * If you need to maintain the order of events per session, please use
50   * {@link OrderedThreadPoolExecutor}.
51   *
52   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
53   * @org.apache.xbean.XBean
54   */
55  public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
56  
57      private static final Runnable EXIT_SIGNAL = new Runnable() {
58          /**
59           * {@inheritDoc}
60           */
61          @Override
62          public void run() {
63              throw new Error("This method shouldn't be called. " + "Please file a bug report.");
64          }
65      };
66  
67      private final Set<Worker> workers = new HashSet<>();
68  
69      private volatile int corePoolSize;
70  
71      private volatile int maximumPoolSize;
72  
73      private volatile int largestPoolSize;
74  
75      private final AtomicInteger idleWorkers = new AtomicInteger();
76  
77      private long completedTaskCount;
78  
79      private volatile boolean shutdown;
80  
81      private final IoEventQueueHandler queueHandler;
82  
83      /**
84       * Creates a new UnorderedThreadPoolExecutor instance
85       */
86      public UnorderedThreadPoolExecutor() {
87          this(16);
88      }
89  
90      /**
91       * Creates a new UnorderedThreadPoolExecutor instance
92       * 
93       * @param maximumPoolSize The maximum number of threads in the pool
94       */
95      public UnorderedThreadPoolExecutor(int maximumPoolSize) {
96          this(0, maximumPoolSize);
97      }
98  
99      /**
100      * Creates a new UnorderedThreadPoolExecutor instance
101      * 
102      * @param corePoolSize The initial threads pool size
103      * @param maximumPoolSize The maximum number of threads in the pool
104      */
105     public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
106         this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
107     }
108 
109     /**
110      * Creates a new UnorderedThreadPoolExecutor instance
111      * 
112      * @param corePoolSize The initial threads pool size
113      * @param maximumPoolSize The maximum number of threads in the pool
114      * @param keepAliveTime The time to keep threads alive
115      * @param unit The time unit for the keepAliveTime
116      */
117     public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
118         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
119     }
120 
121     /**
122      * Creates a new UnorderedThreadPoolExecutor instance
123      * 
124      * @param corePoolSize The initial threads pool size
125      * @param maximumPoolSize The maximum number of threads in the pool
126      * @param keepAliveTime The time to keep threads alive
127      * @param unit The time unit for the keepAliveTime
128      * @param queueHandler The Event queue handler to use
129      */
130     public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
131             IoEventQueueHandler queueHandler) {
132         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
133     }
134 
135     /**
136      * Creates a new UnorderedThreadPoolExecutor instance
137      * 
138      * @param corePoolSize The initial threads pool size
139      * @param maximumPoolSize The maximum number of threads in the pool
140      * @param keepAliveTime The time to keep threads alive
141      * @param unit The time unit for the keepAliveTime
142      * @param threadFactory The Thread factory to use
143      */
144     public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
145             ThreadFactory threadFactory) {
146         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
147     }
148 
149     /**
150      * Creates a new UnorderedThreadPoolExecutor instance
151      * 
152      * @param corePoolSize The initial threads pool size
153      * @param maximumPoolSize The maximum number of threads in the pool
154      * @param keepAliveTime The time to keep threads alive
155      * @param unit The time unit for the keepAliveTime
156      * @param threadFactory The Thread factory to use
157      * @param queueHandler The Event queue handler to use
158      */
159     public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
160             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
161         super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
162         
163         if (corePoolSize < 0) {
164             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
165         }
166 
167         if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
168             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
169         }
170 
171         if (queueHandler == null) {
172             this.queueHandler = IoEventQueueHandler.NOOP;
173         } else {
174             this.queueHandler = queueHandler;
175         }
176 
177         this.corePoolSize = corePoolSize;
178         this.maximumPoolSize = maximumPoolSize;
179     }
180 
181     /**
182      * @return The Queue handler in use
183      */
184     public IoEventQueueHandler getQueueHandler() {
185         return queueHandler;
186     }
187 
188     @Override
189     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
190         // Ignore the request.  It must always be AbortPolicy.
191     }
192 
193     private void addWorker() {
194         synchronized (workers) {
195             if (workers.size() >= maximumPoolSize) {
196                 return;
197             }
198 
199             Worker worker = new Worker();
200             Thread thread = getThreadFactory().newThread(worker);
201             idleWorkers.incrementAndGet();
202             thread.start();
203             workers.add(worker);
204 
205             if (workers.size() > largestPoolSize) {
206                 largestPoolSize = workers.size();
207             }
208         }
209     }
210 
211     private void addWorkerIfNecessary() {
212         if (idleWorkers.get() == 0) {
213             synchronized (workers) {
214                 if (workers.isEmpty() || idleWorkers.get() == 0) {
215                     addWorker();
216                 }
217             }
218         }
219     }
220 
221     private void removeWorker() {
222         synchronized (workers) {
223             if (workers.size() <= corePoolSize) {
224                 return;
225             }
226             getQueue().offer(EXIT_SIGNAL);
227         }
228     }
229 
230     @Override
231     public int getMaximumPoolSize() {
232         return maximumPoolSize;
233     }
234 
235     @Override
236     public void setMaximumPoolSize(int maximumPoolSize) {
237         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
238             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
239         }
240 
241         synchronized (workers) {
242             this.maximumPoolSize = maximumPoolSize;
243             int difference = workers.size() - maximumPoolSize;
244             while (difference > 0) {
245                 removeWorker();
246                 --difference;
247             }
248         }
249     }
250 
251     @Override
252     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
253 
254         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
255 
256         synchronized (workers) {
257             while (!isTerminated()) {
258                 long waitTime = deadline - System.currentTimeMillis();
259                 if (waitTime <= 0) {
260                     break;
261                 }
262 
263                 workers.wait(waitTime);
264             }
265         }
266         return isTerminated();
267     }
268 
269     @Override
270     public boolean isShutdown() {
271         return shutdown;
272     }
273 
274     @Override
275     public boolean isTerminated() {
276         if (!shutdown) {
277             return false;
278         }
279 
280         synchronized (workers) {
281             return workers.isEmpty();
282         }
283     }
284 
285     @Override
286     public void shutdown() {
287         if (shutdown) {
288             return;
289         }
290 
291         shutdown = true;
292 
293         synchronized (workers) {
294             for (int i = workers.size(); i > 0; i--) {
295                 getQueue().offer(EXIT_SIGNAL);
296             }
297         }
298     }
299 
300     @Override
301     public List<Runnable> shutdownNow() {
302         shutdown();
303 
304         List<Runnable> answer = new ArrayList<>();
305         Runnable task;
306         while ((task = getQueue().poll()) != null) {
307             if (task == EXIT_SIGNAL) {
308                 getQueue().offer(EXIT_SIGNAL);
309                 Thread.yield(); // Let others take the signal.
310                 continue;
311             }
312 
313             getQueueHandler().polled(this, (IoEvent) task);
314             answer.add(task);
315         }
316 
317         return answer;
318     }
319 
320     @Override
321     public void execute(Runnable task) {
322         if (shutdown) {
323             rejectTask(task);
324         }
325 
326         checkTaskType(task);
327 
328         IoEventef="../../../../../org/apache/mina/core/session/IoEvent.html#IoEvent">IoEvent e = (IoEvent) task;
329         boolean offeredEvent = queueHandler.accept(this, e);
330         
331         if (offeredEvent) {
332             getQueue().offer(e);
333         }
334 
335         addWorkerIfNecessary();
336 
337         if (offeredEvent) {
338             queueHandler.offered(this, e);
339         }
340     }
341 
342     private void rejectTask(Runnable task) {
343         getRejectedExecutionHandler().rejectedExecution(task, this);
344     }
345 
346     private void checkTaskType(Runnable task) {
347         if (!(task instanceof IoEvent)) {
348             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
349         }
350     }
351 
352     @Override
353     public int getActiveCount() {
354         synchronized (workers) {
355             return workers.size() - idleWorkers.get();
356         }
357     }
358 
359     @Override
360     public long getCompletedTaskCount() {
361         synchronized (workers) {
362             long answer = completedTaskCount;
363             for (Worker w : workers) {
364                 answer += w.completedTaskCount.get();
365             }
366 
367             return answer;
368         }
369     }
370 
371     @Override
372     public int getLargestPoolSize() {
373         return largestPoolSize;
374     }
375 
376     @Override
377     public int getPoolSize() {
378         synchronized (workers) {
379             return workers.size();
380         }
381     }
382 
383     @Override
384     public long getTaskCount() {
385         return getCompletedTaskCount();
386     }
387 
388     @Override
389     public boolean isTerminating() {
390         synchronized (workers) {
391             return isShutdown() && !isTerminated();
392         }
393     }
394 
395     @Override
396     public int prestartAllCoreThreads() {
397         int answer = 0;
398         synchronized (workers) {
399             for (int i = corePoolSize - workers.size(); i > 0; i--) {
400                 addWorker();
401                 answer++;
402             }
403         }
404         return answer;
405     }
406 
407     @Override
408     public boolean prestartCoreThread() {
409         synchronized (workers) {
410             if (workers.size() < corePoolSize) {
411                 addWorker();
412                 return true;
413             }
414 
415             return false;
416         }
417     }
418 
419     @Override
420     public void purge() {
421         // Nothing to purge in this implementation.
422     }
423 
424     @Override
425     public boolean remove(Runnable task) {
426         boolean removed = super.remove(task);
427         if (removed) {
428             getQueueHandler().polled(this, (IoEvent) task);
429         }
430         return removed;
431     }
432 
433     @Override
434     public int getCorePoolSize() {
435         return corePoolSize;
436     }
437 
438     @Override
439     public void setCorePoolSize(int corePoolSize) {
440         if (corePoolSize < 0) {
441             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
442         }
443         if (corePoolSize > maximumPoolSize) {
444             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
445         }
446 
447         synchronized (workers) {
448             if (this.corePoolSize > corePoolSize) {
449                 for (int i = this.corePoolSize - corePoolSize; i > 0; i--) {
450                     removeWorker();
451                 }
452             }
453             this.corePoolSize = corePoolSize;
454         }
455     }
456 
457     private class Worker implements Runnable {
458 
459         private AtomicLong completedTaskCount = new AtomicLong(0);
460 
461         private Thread thread;
462 
463         /**
464          * {@inheritDoc}
465          */
466         @Override
467         public void run() {
468             thread = Thread.currentThread();
469 
470             try {
471                 for (;;) {
472                     Runnable task = fetchTask();
473 
474                     idleWorkers.decrementAndGet();
475 
476                     if (task == null) {
477                         synchronized (workers) {
478                             if (workers.size() > corePoolSize) {
479                                 // Remove now to prevent duplicate exit.
480                                 workers.remove(this);
481                                 break;
482                             }
483                         }
484                     }
485 
486                     if (task == EXIT_SIGNAL) {
487                         break;
488                     }
489 
490                     try {
491                         if (task != null) {
492                             queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
493                             runTask(task);
494                         }
495                     } finally {
496                         idleWorkers.incrementAndGet();
497                     }
498                 }
499             } finally {
500                 synchronized (workers) {
501                     workers.remove(this);
502                     UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get();
503                     workers.notifyAll();
504                 }
505             }
506         }
507 
508         private Runnable fetchTask() {
509             Runnable task = null;
510             long currentTime = System.currentTimeMillis();
511             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
512             
513             for (;;) {
514                 try {
515                     long waitTime = deadline - currentTime;
516                     
517                     if (waitTime <= 0) {
518                         break;
519                     }
520 
521                     try {
522                         task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS);
523                         break;
524                     } finally {
525                         if (task == null) {
526                             currentTime = System.currentTimeMillis();
527                         }
528                     }
529                 } catch (InterruptedException e) {
530                     // Ignore.
531                     continue;
532                 }
533             }
534             return task;
535         }
536 
537         private void runTask(Runnable task) {
538             beforeExecute(thread, task);
539             boolean ran = false;
540             try {
541                 task.run();
542                 ran = true;
543                 afterExecute(task, null);
544                 completedTaskCount.incrementAndGet();
545             } catch (RuntimeException e) {
546                 if (!ran) {
547                     afterExecute(task, e);
548                 }
549                 throw e;
550             }
551         }
552     }
553 }