View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Set;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.RejectedExecutionHandler;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.ThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.mina.core.session.IoEvent;
35  
36  /**
37   * A {@link ThreadPoolExecutor} that does not maintain the order of {@link IoEvent}s.
38   * This means more than one event handler methods can be invoked at the same
39   * time with mixed order.  For example, let's assume that messageReceived, messageSent,
40   * and sessionClosed events are fired.
41   * <ul>
42   * <li>All event handler methods can be called simultaneously.
43   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
44   * <li>The event order can be mixed up.
45   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
46   *           is invoked.)</li>
47   * </ul>
48   * If you need to maintain the order of events per session, please use
49   * {@link OrderedThreadPoolExecutor}.
50   *
51   * @author The Apache MINA Project (dev@mina.apache.org)
52   * @version $Rev: 671827 $, $Date: 2008-06-26 10:49:48 +0200 (jeu, 26 jun 2008) $
53   */
54  public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
55  
56      private static final Runnable EXIT_SIGNAL = new Runnable() {
57          public void run() {
58              throw new Error(
59                      "This method shouldn't be called. " +
60                      "Please file a bug report.");
61          }
62      };
63  
64      private final Set<Worker> workers = new HashSet<Worker>();
65  
66      private volatile int corePoolSize;
67      private volatile int maximumPoolSize;
68      private volatile int largestPoolSize;
69      private final AtomicInteger idleWorkers = new AtomicInteger();
70  
71      private long completedTaskCount;
72      private volatile boolean shutdown;
73  
74      private final IoEventQueueHandler queueHandler;
75  
76      public UnorderedThreadPoolExecutor() {
77          this(16);
78      }
79  
80      public UnorderedThreadPoolExecutor(int maximumPoolSize) {
81          this(0, maximumPoolSize);
82      }
83  
84      public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
85          this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
86      }
87  
88      public UnorderedThreadPoolExecutor(
89              int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
90          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
91      }
92  
93      public UnorderedThreadPoolExecutor(
94              int corePoolSize, int maximumPoolSize,
95              long keepAliveTime, TimeUnit unit,
96              IoEventQueueHandler queueHandler) {
97          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
98      }
99  
100     public UnorderedThreadPoolExecutor(
101             int corePoolSize, int maximumPoolSize,
102             long keepAliveTime, TimeUnit unit,
103             ThreadFactory threadFactory) {
104         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
105     }
106 
107     public UnorderedThreadPoolExecutor(
108             int corePoolSize, int maximumPoolSize,
109             long keepAliveTime, TimeUnit unit,
110             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
111         super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
112         if (corePoolSize < 0) {
113             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
114         }
115 
116         if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
117             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
118         }
119 
120         if (queueHandler == null) {
121             queueHandler = IoEventQueueHandler.NOOP;
122         }
123 
124         this.corePoolSize = corePoolSize;
125         this.maximumPoolSize = maximumPoolSize;
126         this.queueHandler = queueHandler;
127     }
128 
129     public IoEventQueueHandler getQueueHandler() {
130         return queueHandler;
131     }
132 
133     @Override
134     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
135         // Ignore the request.  It must always be AbortPolicy.
136     }
137 
138     private void addWorker() {
139         synchronized (workers) {
140             if (workers.size() >= maximumPoolSize) {
141                 return;
142             }
143 
144             Worker worker = new Worker();
145             Thread thread = getThreadFactory().newThread(worker);
146             idleWorkers.incrementAndGet();
147             thread.start();
148             workers.add(worker);
149 
150             if (workers.size() > largestPoolSize) {
151                 largestPoolSize = workers.size();
152             }
153         }
154     }
155 
156     private void addWorkerIfNecessary() {
157         if (idleWorkers.get() == 0) {
158             synchronized (workers) {
159                 if (workers.isEmpty() || idleWorkers.get() == 0) {
160                     addWorker();
161                 }
162             }
163         }
164     }
165 
166     private void removeWorker() {
167         synchronized (workers) {
168             if (workers.size() <= corePoolSize) {
169                 return;
170             }
171             getQueue().offer(EXIT_SIGNAL);
172         }
173     }
174 
175     @Override
176     public int getMaximumPoolSize() {
177         return maximumPoolSize;
178     }
179 
180     @Override
181     public void setMaximumPoolSize(int maximumPoolSize) {
182         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
183             throw new IllegalArgumentException("maximumPoolSize: "
184                     + maximumPoolSize);
185         }
186 
187         synchronized (workers) {
188             this.maximumPoolSize = maximumPoolSize;
189             int difference = workers.size() - maximumPoolSize;
190             while (difference > 0) {
191                 removeWorker();
192                 --difference;
193             }
194         }
195     }
196 
197     @Override
198     public boolean awaitTermination(long timeout, TimeUnit unit)
199             throws InterruptedException {
200 
201         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
202 
203         synchronized (workers) {
204             while (!isTerminated()) {
205                 long waitTime = deadline - System.currentTimeMillis();
206                 if (waitTime <= 0) {
207                     break;
208                 }
209 
210                 workers.wait(waitTime);
211             }
212         }
213         return isTerminated();
214     }
215 
216     @Override
217     public boolean isShutdown() {
218         return shutdown;
219     }
220 
221     @Override
222     public boolean isTerminated() {
223         if (!shutdown) {
224             return false;
225         }
226 
227         synchronized (workers) {
228             return workers.isEmpty();
229         }
230     }
231 
232     @Override
233     public void shutdown() {
234         if (shutdown) {
235             return;
236         }
237 
238         shutdown = true;
239 
240         synchronized (workers) {
241             for (int i = workers.size(); i > 0; i --) {
242                 getQueue().offer(EXIT_SIGNAL);
243             }
244         }
245     }
246 
247     @Override
248     public List<Runnable> shutdownNow() {
249         shutdown();
250 
251         List<Runnable> answer = new ArrayList<Runnable>();
252         Runnable task;
253         while ((task = getQueue().poll()) != null) {
254             if (task == EXIT_SIGNAL) {
255                 getQueue().offer(EXIT_SIGNAL);
256                 Thread.yield(); // Let others take the signal.
257                 continue;
258             }
259 
260             getQueueHandler().polled(this, (IoEvent) task);
261             answer.add(task);
262         }
263 
264         return answer;
265     }
266 
267     @Override
268     public void execute(Runnable task) {
269         if (shutdown) {
270             rejectTask(task);
271         }
272 
273         checkTaskType(task);
274 
275         IoEvent e = (IoEvent) task;
276         boolean offeredEvent = queueHandler.accept(this, e);
277         if (offeredEvent) {
278             getQueue().offer(e);
279         }
280 
281         addWorkerIfNecessary();
282 
283         if (offeredEvent) {
284             queueHandler.offered(this, e);
285         }
286     }
287 
288     private void rejectTask(Runnable task) {
289         getRejectedExecutionHandler().rejectedExecution(task, this);
290     }
291 
292     private void checkTaskType(Runnable task) {
293         if (!(task instanceof IoEvent)) {
294             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
295         }
296     }
297 
298     @Override
299     public int getActiveCount() {
300         synchronized (workers) {
301             return workers.size() - idleWorkers.get();
302         }
303     }
304 
305     @Override
306     public long getCompletedTaskCount() {
307         synchronized (workers) {
308             long answer = completedTaskCount;
309             for (Worker w: workers) {
310                 answer += w.completedTaskCount;
311             }
312 
313             return answer;
314         }
315     }
316 
317     @Override
318     public int getLargestPoolSize() {
319         return largestPoolSize;
320     }
321 
322     @Override
323     public int getPoolSize() {
324         synchronized (workers) {
325             return workers.size();
326         }
327     }
328 
329     @Override
330     public long getTaskCount() {
331         return getCompletedTaskCount();
332     }
333 
334     @Override
335     public boolean isTerminating() {
336         synchronized (workers) {
337             return isShutdown() && !isTerminated();
338         }
339     }
340 
341     @Override
342     public int prestartAllCoreThreads() {
343         int answer = 0;
344         synchronized (workers) {
345             for (int i = corePoolSize - workers.size() ; i > 0; i --) {
346                 addWorker();
347                 answer ++;
348             }
349         }
350         return answer;
351     }
352 
353     @Override
354     public boolean prestartCoreThread() {
355         synchronized (workers) {
356             if (workers.size() < corePoolSize) {
357                 addWorker();
358                 return true;
359             } else {
360                 return false;
361             }
362         }
363     }
364 
365     @Override
366     public void purge() {
367         // Nothing to purge in this implementation.
368     }
369 
370     @Override
371     public boolean remove(Runnable task) {
372         boolean removed = super.remove(task);
373         if (removed) {
374             getQueueHandler().polled(this, (IoEvent) task);
375         }
376         return removed;
377     }
378 
379     @Override
380     public int getCorePoolSize() {
381         return corePoolSize;
382     }
383 
384     @Override
385     public void setCorePoolSize(int corePoolSize) {
386         if (corePoolSize < 0) {
387             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
388         }
389         if (corePoolSize > maximumPoolSize) {
390             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
391         }
392 
393         synchronized (workers) {
394             if (this.corePoolSize > corePoolSize) {
395                 for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
396                     removeWorker();
397                 }
398             }
399             this.corePoolSize = corePoolSize;
400         }
401     }
402 
403     private class Worker implements Runnable {
404 
405         private volatile long completedTaskCount;
406         private Thread thread;
407 
408         public void run() {
409             thread = Thread.currentThread();
410 
411             try {
412                 for (;;) {
413                     Runnable task = fetchTask();
414 
415                     idleWorkers.decrementAndGet();
416 
417                     if (task == null) {
418                         synchronized (workers) {
419                             if (workers.size() > corePoolSize) {
420                                 // Remove now to prevent duplicate exit.
421                                 workers.remove(this);
422                                 break;
423                             }
424                         }
425                     }
426 
427                     if (task == EXIT_SIGNAL) {
428                         break;
429                     }
430 
431                     try {
432                         if (task != null) {
433                             queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
434                             runTask(task);
435                         }
436                     } finally {
437                         idleWorkers.incrementAndGet();
438                     }
439                 }
440             } finally {
441                 synchronized (workers) {
442                     workers.remove(this);
443                     UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
444                     workers.notifyAll();
445                 }
446             }
447         }
448 
449         private Runnable fetchTask() {
450             Runnable task = null;
451             long currentTime = System.currentTimeMillis();
452             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
453             for (;;) {
454                 try {
455                     long waitTime = deadline - currentTime;
456                     if (waitTime <= 0) {
457                         break;
458                     }
459 
460                     try {
461                         task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS);
462                         break;
463                     } finally {
464                         if (task == null) {
465                             currentTime = System.currentTimeMillis();
466                         }
467                     }
468                 } catch (InterruptedException e) {
469                     // Ignore.
470                     continue;
471                 }
472             }
473             return task;
474         }
475 
476         private void runTask(Runnable task) {
477             beforeExecute(thread, task);
478             boolean ran = false;
479             try {
480                 task.run();
481                 ran = true;
482                 afterExecute(task, null);
483                 completedTaskCount ++;
484             } catch (RuntimeException e) {
485                 if (!ran) {
486                     afterExecute(task, e);
487                 }
488                 throw e;
489             }
490         }
491     }
492 }