View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Set;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.RejectedExecutionHandler;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.ThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.mina.common.IoEvent;
35  
36  /**
37   * A {@link ThreadPoolExecutor} that does not maintain the order of {@link IoEvent}s.
38   * This means more than one event handler methods can be invoked at the same
39   * time with mixed order.  For example, let's assume that messageReceived, messageSent,
40   * and sessionClosed events are fired.
41   * <ul>
42   * <li>All event handler methods can be called simultaneously.
43   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
44   * <li>The event order can be mixed up.
45   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
46   *           is invoked.)</li>
47   * </ul>
48   * If you need to maintain the order of events per session, please use
49   * {@link OrderedThreadPoolExecutor}.
50   * 
51   * @author The Apache MINA Project (dev@mina.apache.org)
52   * @version $Rev: 595549 $, $Date: 2007-11-15 21:45:36 -0700 (Thu, 15 Nov 2007) $
53   */
54  public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
55  
56      private static final Runnable EXIT_SIGNAL = new Runnable() {
57          public void run() {}
58      };
59      private static final IoEventQueueHandler NOOP_QUEUE_HANDLER = new IoEventQueueHandler() {
60          public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
61              return true;
62          }
63          public void offered(ThreadPoolExecutor executor, IoEvent event) {}
64          public void polled(ThreadPoolExecutor executor, IoEvent event) {}
65      };
66  
67      private final Set<Worker> workers = new HashSet<Worker>();
68      
69      private volatile int corePoolSize;
70      private volatile int maximumPoolSize;
71      private volatile int largestPoolSize;
72      private final AtomicInteger idleWorkers = new AtomicInteger();
73      
74      private long completedTaskCount;
75      private volatile boolean shutdown;
76      
77      private final IoEventQueueHandler queueHandler;
78      
79      public UnorderedThreadPoolExecutor() {
80          this(16);
81      }
82      
83      public UnorderedThreadPoolExecutor(int maximumPoolSize) {
84          this(0, maximumPoolSize);
85      }
86      
87      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
88          this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
89      }
90      
91      public UnorderedThreadPoolExecutor(
92              int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
93          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
94      }
95      
96      public UnorderedThreadPoolExecutor(
97              int corePoolSize, int maximumPoolSize, 
98              long keepAliveTime, TimeUnit unit,
99              IoEventQueueHandler queueHandler) {
100         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
101     }
102 
103     public UnorderedThreadPoolExecutor(
104             int corePoolSize, int maximumPoolSize, 
105             long keepAliveTime, TimeUnit unit,
106             ThreadFactory threadFactory) {
107         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
108     }
109 
110     public UnorderedThreadPoolExecutor(
111             int corePoolSize, int maximumPoolSize, 
112             long keepAliveTime, TimeUnit unit,
113             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
114         super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
115         if (corePoolSize < 0) {
116             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
117         }
118         
119         if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
120             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
121         }
122         
123         if (queueHandler == null) {
124             queueHandler = NOOP_QUEUE_HANDLER;
125         }
126 
127         this.corePoolSize = corePoolSize;
128         this.maximumPoolSize = maximumPoolSize;
129         this.queueHandler = queueHandler;
130     }
131     
132     public IoEventQueueHandler getQueueHandler() {
133         return queueHandler;
134     }
135 
136     @Override
137     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
138         // Ignore the request.  It must always be AbortPolicy.
139     }
140 
141     private void addWorker() {
142         synchronized (workers) {
143             if (workers.size() >= maximumPoolSize) {
144                 return;
145             }
146 
147             Worker worker = new Worker();
148             Thread thread = getThreadFactory().newThread(worker);
149             idleWorkers.incrementAndGet();
150             thread.start();
151             workers.add(worker);
152             
153             if (workers.size() > largestPoolSize) {
154                 largestPoolSize = workers.size();
155             }
156         }
157     }
158     
159     private void addWorkerIfNecessary() {
160         if (idleWorkers.get() == 0) {
161             synchronized (workers) {
162                 if (workers.isEmpty() || idleWorkers.get() == 0) {
163                     addWorker();
164                 }
165             }
166         }
167     }
168     
169     private void removeWorker() {
170         synchronized (workers) {
171             if (workers.size() <= corePoolSize) {
172                 return;
173             }
174             getQueue().offer(EXIT_SIGNAL);
175         }
176     }
177     
178     @Override
179     public int getMaximumPoolSize() {
180         return maximumPoolSize;
181     }
182     
183     @Override
184     public void setMaximumPoolSize(int maximumPoolSize) {
185         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
186             throw new IllegalArgumentException("maximumPoolSize: "
187                     + maximumPoolSize);
188         }
189 
190         synchronized (workers) {
191             this.maximumPoolSize = maximumPoolSize;
192             int difference = workers.size() - maximumPoolSize;
193             while (difference > 0) {
194                 removeWorker();
195                 --difference;
196             }
197         }
198     }
199     
200     @Override
201     public boolean awaitTermination(long timeout, TimeUnit unit)
202             throws InterruptedException {
203         
204         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
205         
206         synchronized (workers) {
207             while (!isTerminated()) {
208                 long waitTime = deadline - System.currentTimeMillis();
209                 if (waitTime <= 0) {
210                     break;
211                 }
212                 
213                 workers.wait(waitTime);
214             }
215         }
216         return isTerminated();
217     }
218 
219     @Override
220     public boolean isShutdown() {
221         return shutdown;
222     }
223 
224     @Override
225     public boolean isTerminated() {
226         if (!shutdown) {
227             return false;
228         }
229         
230         synchronized (workers) {
231             return workers.isEmpty();
232         }
233     }
234 
235     @Override
236     public void shutdown() {
237         if (shutdown) {
238             return;
239         }
240         
241         shutdown = true;
242 
243         synchronized (workers) {
244             for (int i = workers.size(); i > 0; i --) {
245                 getQueue().offer(EXIT_SIGNAL);
246             }
247         }
248     }
249 
250     @Override
251     public List<Runnable> shutdownNow() {
252         shutdown();
253         
254         List<Runnable> answer = new ArrayList<Runnable>();
255         Runnable task;
256         while ((task = getQueue().poll()) != null) {
257             if (task == EXIT_SIGNAL) {
258                 getQueue().offer(EXIT_SIGNAL);
259                 Thread.yield(); // Let others take the signal.
260                 continue;
261             }
262             
263             getQueueHandler().polled(this, (IoEvent) task);
264             answer.add(task);
265         }
266         
267         return answer;
268     }
269 
270     @Override
271     public void execute(Runnable task) {
272         if (shutdown) {
273             rejectTask(task);
274         }
275 
276         checkTaskType(task);
277         
278         IoEvent e = (IoEvent) task;
279         boolean offeredEvent = queueHandler.accept(this, e);
280         if (offeredEvent) {
281             getQueue().offer(e);
282         }
283         
284         addWorkerIfNecessary();
285         
286         if (offeredEvent) {
287             queueHandler.offered(this, e);
288         }
289     }
290     
291     private void rejectTask(Runnable task) {
292         getRejectedExecutionHandler().rejectedExecution(task, this);
293     }
294     
295     private void checkTaskType(Runnable task) {
296         if (!(task instanceof IoEvent)) {
297             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
298         }
299     }
300 
301     @Override
302     public int getActiveCount() {
303         synchronized (workers) {
304             return workers.size() - idleWorkers.get();
305         }
306     }
307 
308     @Override
309     public long getCompletedTaskCount() {
310         synchronized (workers) {
311             long answer = completedTaskCount;
312             for (Worker w: workers) {
313                 answer += w.completedTaskCount;
314             }
315             
316             return answer;
317         }
318     }
319 
320     @Override
321     public int getLargestPoolSize() {
322         return largestPoolSize;
323     }
324 
325     @Override
326     public int getPoolSize() {
327         synchronized (workers) {
328             return workers.size();
329         }
330     }
331 
332     @Override
333     public long getTaskCount() {
334         return getCompletedTaskCount();
335     }
336 
337     @Override
338     public boolean isTerminating() {
339         synchronized (workers) {
340             return isShutdown() && !isTerminated();
341         }
342     }
343 
344     @Override
345     public int prestartAllCoreThreads() {
346         int answer = 0;
347         synchronized (workers) {
348             for (int i = corePoolSize - workers.size() ; i > 0; i --) {
349                 addWorker();
350                 answer ++;
351             }
352         }
353         return answer;
354     }
355 
356     @Override
357     public boolean prestartCoreThread() {
358         synchronized (workers) {
359             if (workers.size() < corePoolSize) {
360                 addWorker();
361                 return true;
362             } else {
363                 return false;
364             }
365         }
366     }
367     
368     @Override
369     public void purge() {
370     }
371 
372     @Override
373     public boolean remove(Runnable task) {
374         boolean removed = super.remove(task);
375         if (removed) {
376             getQueueHandler().polled(this, (IoEvent) task);
377         }
378         return removed;
379     }
380 
381     @Override
382     public int getCorePoolSize() {
383         return corePoolSize;
384     }
385 
386     @Override
387     public void setCorePoolSize(int corePoolSize) {
388         if (corePoolSize < 0) {
389             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
390         }
391         if (corePoolSize > maximumPoolSize) {
392             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
393         }
394                 
395         synchronized (workers) {
396             if (this.corePoolSize > corePoolSize) {
397                 for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
398                     removeWorker();
399                 }
400             }
401             this.corePoolSize = corePoolSize;
402         }
403     }
404     
405     private class Worker implements Runnable {
406         
407         private volatile long completedTaskCount;
408         private Thread thread;
409         
410         public void run() {
411             thread = Thread.currentThread();
412             
413             try {
414                 for (;;) {
415                     Runnable task = fetchTask();
416                     
417                     idleWorkers.decrementAndGet();
418                     
419                     if (task == null) {
420                         synchronized (workers) {
421                             if (workers.size() > corePoolSize) {
422                                 // Remove now to prevent duplicate exit.
423                                 workers.remove(this);
424                                 break;
425                             }
426                         }
427                     }
428                     
429                     if (task == EXIT_SIGNAL) {
430                         break;
431                     }
432                     
433                     queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
434                     try {
435                         runTask(task);
436                     } finally {
437                         idleWorkers.incrementAndGet();
438                     }
439                 }
440             } finally {
441                 synchronized (workers) {
442                     workers.remove(this);
443                     UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
444                     workers.notifyAll();
445                 }
446             }
447         }
448 
449         private Runnable fetchTask() {
450             Runnable task = null;
451             long currentTime = System.currentTimeMillis();
452             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
453             for (;;) {
454                 try {
455                     long waitTime = deadline - currentTime;
456                     if (waitTime <= 0) {
457                         break;
458                     }
459 
460                     try {
461                         task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS);
462                         break;
463                     } finally {
464                         if (task == null) {
465                             currentTime = System.currentTimeMillis();
466                         }
467                     }
468                 } catch (InterruptedException e) {
469                     // Ignore.
470                     continue;
471                 }
472             }
473             return task;
474         }
475 
476         private void runTask(Runnable task) {
477             beforeExecute(thread, task);
478             boolean ran = false;
479             try {
480                 task.run();
481                 ran = true;
482                 afterExecute(task, null);
483                 completedTaskCount ++;
484             } catch (RuntimeException e) {
485                 if (!ran)
486                     afterExecute(task, e);
487                 throw e;
488             }
489         }
490     }
491 }