View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.Set;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.LinkedBlockingQueue;
30  import java.util.concurrent.RejectedExecutionHandler;
31  import java.util.concurrent.SynchronousQueue;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicInteger;
36  
37  import org.apache.mina.core.session.AttributeKey;
38  import org.apache.mina.core.session.DummySession;
39  import org.apache.mina.core.session.IoEvent;
40  import org.apache.mina.core.session.IoSession;
41  import org.apache.mina.util.CircularQueue;
42  
43  /**
44   * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
45   * <p>
46   * If you don't need to maintain the order of events per session, please use
47   * {@link UnorderedThreadPoolExecutor}.
48  
49   * @author The Apache MINA Project (dev@mina.apache.org)
50   * @version $Rev: 671827 $, $Date: 2008-06-26 10:49:48 +0200 (jeu, 26 jun 2008) $
51   */
52  public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
53  
54      private static final IoSession EXIT_SIGNAL = new DummySession();
55  
56      private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
57      private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
58  
59      private final Set<Worker> workers = new HashSet<Worker>();
60  
61      private volatile int corePoolSize;
62      private volatile int maximumPoolSize;
63      private volatile int largestPoolSize;
64      private final AtomicInteger idleWorkers = new AtomicInteger();
65  
66      private long completedTaskCount;
67      private volatile boolean shutdown;
68  
69      private final IoEventQueueHandler queueHandler;
70  
71      public OrderedThreadPoolExecutor() {
72          this(16);
73      }
74  
75      public OrderedThreadPoolExecutor(int maximumPoolSize) {
76          this(0, maximumPoolSize);
77      }
78  
79      public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
80          this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
81      }
82  
83      public OrderedThreadPoolExecutor(
84              int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
85          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
86      }
87  
88      public OrderedThreadPoolExecutor(
89              int corePoolSize, int maximumPoolSize,
90              long keepAliveTime, TimeUnit unit,
91              IoEventQueueHandler queueHandler) {
92          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
93      }
94  
95      public OrderedThreadPoolExecutor(
96              int corePoolSize, int maximumPoolSize,
97              long keepAliveTime, TimeUnit unit,
98              ThreadFactory threadFactory) {
99          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
100     }
101 
102     public OrderedThreadPoolExecutor(
103             int corePoolSize, int maximumPoolSize,
104             long keepAliveTime, TimeUnit unit,
105             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
106         super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
107         if (corePoolSize < 0) {
108             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
109         }
110 
111         if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
112             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
113         }
114 
115         if (queueHandler == null) {
116             queueHandler = IoEventQueueHandler.NOOP;
117         }
118 
119         this.corePoolSize = corePoolSize;
120         this.maximumPoolSize = maximumPoolSize;
121         this.queueHandler = queueHandler;
122     }
123 
124     public IoEventQueueHandler getQueueHandler() {
125         return queueHandler;
126     }
127 
128     @Override
129     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
130         // Ignore the request.  It must always be AbortPolicy.
131     }
132 
133     private void addWorker() {
134         synchronized (workers) {
135             if (workers.size() >= maximumPoolSize) {
136                 return;
137             }
138 
139             Worker worker = new Worker();
140             Thread thread = getThreadFactory().newThread(worker);
141             idleWorkers.incrementAndGet();
142             thread.start();
143             workers.add(worker);
144 
145             if (workers.size() > largestPoolSize) {
146                 largestPoolSize = workers.size();
147             }
148         }
149     }
150 
151     private void addWorkerIfNecessary() {
152         if (idleWorkers.get() == 0) {
153             synchronized (workers) {
154                 if (workers.isEmpty() || idleWorkers.get() == 0) {
155                     addWorker();
156                 }
157             }
158         }
159     }
160 
161     private void removeWorker() {
162         synchronized (workers) {
163             if (workers.size() <= corePoolSize) {
164                 return;
165             }
166             waitingSessions.offer(EXIT_SIGNAL);
167         }
168     }
169 
170     @Override
171     public int getMaximumPoolSize() {
172         return maximumPoolSize;
173     }
174 
175     @Override
176     public void setMaximumPoolSize(int maximumPoolSize) {
177         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
178             throw new IllegalArgumentException("maximumPoolSize: "
179                     + maximumPoolSize);
180         }
181 
182         synchronized (workers) {
183             this.maximumPoolSize = maximumPoolSize;
184             int difference = workers.size() - maximumPoolSize;
185             while (difference > 0) {
186                 removeWorker();
187                 --difference;
188             }
189         }
190     }
191 
192     @Override
193     public boolean awaitTermination(long timeout, TimeUnit unit)
194             throws InterruptedException {
195 
196         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
197 
198         synchronized (workers) {
199             while (!isTerminated()) {
200                 long waitTime = deadline - System.currentTimeMillis();
201                 if (waitTime <= 0) {
202                     break;
203                 }
204 
205                 workers.wait(waitTime);
206             }
207         }
208         return isTerminated();
209     }
210 
211     @Override
212     public boolean isShutdown() {
213         return shutdown;
214     }
215 
216     @Override
217     public boolean isTerminated() {
218         if (!shutdown) {
219             return false;
220         }
221 
222         synchronized (workers) {
223             return workers.isEmpty();
224         }
225     }
226 
227     @Override
228     public void shutdown() {
229         if (shutdown) {
230             return;
231         }
232 
233         shutdown = true;
234 
235         synchronized (workers) {
236             for (int i = workers.size(); i > 0; i --) {
237                 waitingSessions.offer(EXIT_SIGNAL);
238             }
239         }
240     }
241 
242     @Override
243     public List<Runnable> shutdownNow() {
244         shutdown();
245 
246         List<Runnable> answer = new ArrayList<Runnable>();
247         IoSession session;
248         while ((session = waitingSessions.poll()) != null) {
249             if (session == EXIT_SIGNAL) {
250                 waitingSessions.offer(EXIT_SIGNAL);
251                 Thread.yield(); // Let others take the signal.
252                 continue;
253             }
254 
255             SessionBuffer buf = (SessionBuffer) session.getAttribute(BUFFER);
256             synchronized (buf.queue) {
257                 for (Runnable task: buf.queue) {
258                     getQueueHandler().polled(this, (IoEvent) task);
259                     answer.add(task);
260                 }
261                 buf.queue.clear();
262             }
263         }
264 
265         return answer;
266     }
267 
268     @Override
269     public void execute(Runnable task) {
270         if (shutdown) {
271             rejectTask(task);
272         }
273 
274         checkTaskType(task);
275 
276         IoEvent e = (IoEvent) task;
277         IoSession s = e.getSession();
278         SessionBuffer buf = getSessionBuffer(s);
279         Queue<Runnable> queue = buf.queue;
280         boolean offerSession;
281         boolean offerEvent = queueHandler.accept(this, e);
282         if (offerEvent) {
283             synchronized (queue) {
284                 queue.offer(e);
285                 if (buf.processingCompleted) {
286                     buf.processingCompleted = false;
287                     offerSession = true;
288                 } else {
289                     offerSession = false;
290                 }
291             }
292         } else {
293             offerSession = false;
294         }
295 
296         if (offerSession) {
297             waitingSessions.offer(s);
298         }
299 
300         addWorkerIfNecessary();
301 
302         if (offerEvent) {
303             queueHandler.offered(this, e);
304         }
305     }
306 
307     private void rejectTask(Runnable task) {
308         getRejectedExecutionHandler().rejectedExecution(task, this);
309     }
310 
311     private void checkTaskType(Runnable task) {
312         if (!(task instanceof IoEvent)) {
313             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
314         }
315     }
316 
317     @Override
318     public int getActiveCount() {
319         synchronized (workers) {
320             return workers.size() - idleWorkers.get();
321         }
322     }
323 
324     @Override
325     public long getCompletedTaskCount() {
326         synchronized (workers) {
327             long answer = completedTaskCount;
328             for (Worker w: workers) {
329                 answer += w.completedTaskCount;
330             }
331 
332             return answer;
333         }
334     }
335 
336     @Override
337     public int getLargestPoolSize() {
338         return largestPoolSize;
339     }
340 
341     @Override
342     public int getPoolSize() {
343         synchronized (workers) {
344             return workers.size();
345         }
346     }
347 
348     @Override
349     public long getTaskCount() {
350         return getCompletedTaskCount();
351     }
352 
353     @Override
354     public boolean isTerminating() {
355         synchronized (workers) {
356             return isShutdown() && !isTerminated();
357         }
358     }
359 
360     @Override
361     public int prestartAllCoreThreads() {
362         int answer = 0;
363         synchronized (workers) {
364             for (int i = corePoolSize - workers.size() ; i > 0; i --) {
365                 addWorker();
366                 answer ++;
367             }
368         }
369         return answer;
370     }
371 
372     @Override
373     public boolean prestartCoreThread() {
374         synchronized (workers) {
375             if (workers.size() < corePoolSize) {
376                 addWorker();
377                 return true;
378             } else {
379                 return false;
380             }
381         }
382     }
383 
384     @Override
385     public BlockingQueue<Runnable> getQueue() {
386         throw new UnsupportedOperationException();
387     }
388 
389     @Override
390     public void purge() {
391         // Nothing to purge in this implementation.
392     }
393 
394     @Override
395     public boolean remove(Runnable task) {
396         checkTaskType(task);
397         IoEvent e = (IoEvent) task;
398         IoSession s = e.getSession();
399         SessionBuffer buffer = (SessionBuffer) s.getAttribute(BUFFER);
400         if (buffer == null) {
401             return false;
402         }
403 
404         boolean removed;
405         synchronized (buffer.queue) {
406             removed = buffer.queue.remove(task);
407         }
408 
409         if (removed) {
410             getQueueHandler().polled(this, e);
411         }
412 
413         return removed;
414     }
415 
416     @Override
417     public int getCorePoolSize() {
418         return corePoolSize;
419     }
420 
421     @Override
422     public void setCorePoolSize(int corePoolSize) {
423         if (corePoolSize < 0) {
424             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
425         }
426         if (corePoolSize > maximumPoolSize) {
427             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
428         }
429 
430         synchronized (workers) {
431             if (this.corePoolSize > corePoolSize) {
432                 for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
433                     removeWorker();
434                 }
435             }
436             this.corePoolSize = corePoolSize;
437         }
438     }
439 
440     private SessionBuffer getSessionBuffer(IoSession session) {
441         SessionBuffer buffer = (SessionBuffer) session.getAttribute(BUFFER);
442         if (buffer == null) {
443             buffer = new SessionBuffer();
444             SessionBuffer oldBuffer = (SessionBuffer) session.setAttributeIfAbsent(BUFFER, buffer);
445             if (oldBuffer != null) {
446                 buffer = oldBuffer;
447             }
448         }
449         return buffer;
450     }
451 
452     private static class SessionBuffer {
453         private final Queue<Runnable> queue = new CircularQueue<Runnable>();
454         private boolean processingCompleted = true;
455     }
456 
457     private class Worker implements Runnable {
458 
459         private volatile long completedTaskCount;
460         private Thread thread;
461 
462         public void run() {
463             thread = Thread.currentThread();
464 
465             try {
466                 for (;;) {
467                     IoSession session = fetchSession();
468 
469                     idleWorkers.decrementAndGet();
470 
471                     if (session == null) {
472                         synchronized (workers) {
473                             if (workers.size() > corePoolSize) {
474                                 // Remove now to prevent duplicate exit.
475                                 workers.remove(this);
476                                 break;
477                             }
478                         }
479                     }
480 
481                     if (session == EXIT_SIGNAL) {
482                         break;
483                     }
484 
485                     try {
486                         if (session != null) {
487                             runTasks(getSessionBuffer(session));
488                         }
489                     } finally {
490                         idleWorkers.incrementAndGet();
491                     }
492                 }
493             } finally {
494                 synchronized (workers) {
495                     workers.remove(this);
496                     OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
497                     workers.notifyAll();
498                 }
499             }
500         }
501 
502         private IoSession fetchSession() {
503             IoSession session = null;
504             long currentTime = System.currentTimeMillis();
505             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
506             for (;;) {
507                 try {
508                     long waitTime = deadline - currentTime;
509                     if (waitTime <= 0) {
510                         break;
511                     }
512 
513                     try {
514                         session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
515                         break;
516                     } finally {
517                         if (session == null) {
518                             currentTime = System.currentTimeMillis();
519                         }
520                     }
521                 } catch (InterruptedException e) {
522                     // Ignore.
523                     continue;
524                 }
525             }
526             return session;
527         }
528 
529         private void runTasks(SessionBuffer buf) {
530             for (;;) {
531                 Runnable task;
532                 synchronized (buf.queue) {
533                     task = buf.queue.poll();
534 
535                     if (task == null) {
536                         buf.processingCompleted = true;
537                         break;
538                     }
539                 }
540 
541                 queueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
542 
543                 runTask(task);
544             }
545         }
546 
547         private void runTask(Runnable task) {
548             beforeExecute(thread, task);
549             boolean ran = false;
550             try {
551                 task.run();
552                 ran = true;
553                 afterExecute(task, null);
554                 completedTaskCount ++;
555             } catch (RuntimeException e) {
556                 if (!ran) {
557                     afterExecute(task, e);
558                 }
559                 throw e;
560             }
561         }
562     }
563 }