1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.RejectedExecutionHandler;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import org.apache.mina.core.session.IoEvent;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
55
56 private static final Runnable EXIT_SIGNAL = new Runnable() {
57 public void run() {
58 throw new Error(
59 "This method shouldn't be called. " +
60 "Please file a bug report.");
61 }
62 };
63
64 private final Set<Worker> workers = new HashSet<Worker>();
65
66 private volatile int corePoolSize;
67 private volatile int maximumPoolSize;
68 private volatile int largestPoolSize;
69 private final AtomicInteger idleWorkers = new AtomicInteger();
70
71 private long completedTaskCount;
72 private volatile boolean shutdown;
73
74 private final IoEventQueueHandler queueHandler;
75
76 public UnorderedThreadPoolExecutor() {
77 this(16);
78 }
79
80 public UnorderedThreadPoolExecutor(int maximumPoolSize) {
81 this(0, maximumPoolSize);
82 }
83
84 public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
85 this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
86 }
87
88 public UnorderedThreadPoolExecutor(
89 int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
90 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
91 }
92
93 public UnorderedThreadPoolExecutor(
94 int corePoolSize, int maximumPoolSize,
95 long keepAliveTime, TimeUnit unit,
96 IoEventQueueHandler queueHandler) {
97 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
98 }
99
100 public UnorderedThreadPoolExecutor(
101 int corePoolSize, int maximumPoolSize,
102 long keepAliveTime, TimeUnit unit,
103 ThreadFactory threadFactory) {
104 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
105 }
106
107 public UnorderedThreadPoolExecutor(
108 int corePoolSize, int maximumPoolSize,
109 long keepAliveTime, TimeUnit unit,
110 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
111 super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
112 if (corePoolSize < 0) {
113 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
114 }
115
116 if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
117 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
118 }
119
120 if (queueHandler == null) {
121 queueHandler = IoEventQueueHandler.NOOP;
122 }
123
124 this.corePoolSize = corePoolSize;
125 this.maximumPoolSize = maximumPoolSize;
126 this.queueHandler = queueHandler;
127 }
128
129 public IoEventQueueHandler getQueueHandler() {
130 return queueHandler;
131 }
132
133 @Override
134 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
135
136 }
137
138 private void addWorker() {
139 synchronized (workers) {
140 if (workers.size() >= maximumPoolSize) {
141 return;
142 }
143
144 Worker worker = new Worker();
145 Thread thread = getThreadFactory().newThread(worker);
146 idleWorkers.incrementAndGet();
147 thread.start();
148 workers.add(worker);
149
150 if (workers.size() > largestPoolSize) {
151 largestPoolSize = workers.size();
152 }
153 }
154 }
155
156 private void addWorkerIfNecessary() {
157 if (idleWorkers.get() == 0) {
158 synchronized (workers) {
159 if (workers.isEmpty() || idleWorkers.get() == 0) {
160 addWorker();
161 }
162 }
163 }
164 }
165
166 private void removeWorker() {
167 synchronized (workers) {
168 if (workers.size() <= corePoolSize) {
169 return;
170 }
171 getQueue().offer(EXIT_SIGNAL);
172 }
173 }
174
175 @Override
176 public int getMaximumPoolSize() {
177 return maximumPoolSize;
178 }
179
180 @Override
181 public void setMaximumPoolSize(int maximumPoolSize) {
182 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
183 throw new IllegalArgumentException("maximumPoolSize: "
184 + maximumPoolSize);
185 }
186
187 synchronized (workers) {
188 this.maximumPoolSize = maximumPoolSize;
189 int difference = workers.size() - maximumPoolSize;
190 while (difference > 0) {
191 removeWorker();
192 --difference;
193 }
194 }
195 }
196
197 @Override
198 public boolean awaitTermination(long timeout, TimeUnit unit)
199 throws InterruptedException {
200
201 long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
202
203 synchronized (workers) {
204 while (!isTerminated()) {
205 long waitTime = deadline - System.currentTimeMillis();
206 if (waitTime <= 0) {
207 break;
208 }
209
210 workers.wait(waitTime);
211 }
212 }
213 return isTerminated();
214 }
215
216 @Override
217 public boolean isShutdown() {
218 return shutdown;
219 }
220
221 @Override
222 public boolean isTerminated() {
223 if (!shutdown) {
224 return false;
225 }
226
227 synchronized (workers) {
228 return workers.isEmpty();
229 }
230 }
231
232 @Override
233 public void shutdown() {
234 if (shutdown) {
235 return;
236 }
237
238 shutdown = true;
239
240 synchronized (workers) {
241 for (int i = workers.size(); i > 0; i --) {
242 getQueue().offer(EXIT_SIGNAL);
243 }
244 }
245 }
246
247 @Override
248 public List<Runnable> shutdownNow() {
249 shutdown();
250
251 List<Runnable> answer = new ArrayList<Runnable>();
252 Runnable task;
253 while ((task = getQueue().poll()) != null) {
254 if (task == EXIT_SIGNAL) {
255 getQueue().offer(EXIT_SIGNAL);
256 Thread.yield();
257 continue;
258 }
259
260 getQueueHandler().polled(this, (IoEvent) task);
261 answer.add(task);
262 }
263
264 return answer;
265 }
266
267 @Override
268 public void execute(Runnable task) {
269 if (shutdown) {
270 rejectTask(task);
271 }
272
273 checkTaskType(task);
274
275 IoEvent e = (IoEvent) task;
276 boolean offeredEvent = queueHandler.accept(this, e);
277 if (offeredEvent) {
278 getQueue().offer(e);
279 }
280
281 addWorkerIfNecessary();
282
283 if (offeredEvent) {
284 queueHandler.offered(this, e);
285 }
286 }
287
288 private void rejectTask(Runnable task) {
289 getRejectedExecutionHandler().rejectedExecution(task, this);
290 }
291
292 private void checkTaskType(Runnable task) {
293 if (!(task instanceof IoEvent)) {
294 throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
295 }
296 }
297
298 @Override
299 public int getActiveCount() {
300 synchronized (workers) {
301 return workers.size() - idleWorkers.get();
302 }
303 }
304
305 @Override
306 public long getCompletedTaskCount() {
307 synchronized (workers) {
308 long answer = completedTaskCount;
309 for (Worker w: workers) {
310 answer += w.completedTaskCount;
311 }
312
313 return answer;
314 }
315 }
316
317 @Override
318 public int getLargestPoolSize() {
319 return largestPoolSize;
320 }
321
322 @Override
323 public int getPoolSize() {
324 synchronized (workers) {
325 return workers.size();
326 }
327 }
328
329 @Override
330 public long getTaskCount() {
331 return getCompletedTaskCount();
332 }
333
334 @Override
335 public boolean isTerminating() {
336 synchronized (workers) {
337 return isShutdown() && !isTerminated();
338 }
339 }
340
341 @Override
342 public int prestartAllCoreThreads() {
343 int answer = 0;
344 synchronized (workers) {
345 for (int i = corePoolSize - workers.size() ; i > 0; i --) {
346 addWorker();
347 answer ++;
348 }
349 }
350 return answer;
351 }
352
353 @Override
354 public boolean prestartCoreThread() {
355 synchronized (workers) {
356 if (workers.size() < corePoolSize) {
357 addWorker();
358 return true;
359 } else {
360 return false;
361 }
362 }
363 }
364
365 @Override
366 public void purge() {
367
368 }
369
370 @Override
371 public boolean remove(Runnable task) {
372 boolean removed = super.remove(task);
373 if (removed) {
374 getQueueHandler().polled(this, (IoEvent) task);
375 }
376 return removed;
377 }
378
379 @Override
380 public int getCorePoolSize() {
381 return corePoolSize;
382 }
383
384 @Override
385 public void setCorePoolSize(int corePoolSize) {
386 if (corePoolSize < 0) {
387 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
388 }
389 if (corePoolSize > maximumPoolSize) {
390 throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
391 }
392
393 synchronized (workers) {
394 if (this.corePoolSize > corePoolSize) {
395 for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
396 removeWorker();
397 }
398 }
399 this.corePoolSize = corePoolSize;
400 }
401 }
402
403 private class Worker implements Runnable {
404
405 private volatile long completedTaskCount;
406 private Thread thread;
407
408 public void run() {
409 thread = Thread.currentThread();
410
411 try {
412 for (;;) {
413 Runnable task = fetchTask();
414
415 idleWorkers.decrementAndGet();
416
417 if (task == null) {
418 synchronized (workers) {
419 if (workers.size() > corePoolSize) {
420
421 workers.remove(this);
422 break;
423 }
424 }
425 }
426
427 if (task == EXIT_SIGNAL) {
428 break;
429 }
430
431 try {
432 if (task != null) {
433 queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
434 runTask(task);
435 }
436 } finally {
437 idleWorkers.incrementAndGet();
438 }
439 }
440 } finally {
441 synchronized (workers) {
442 workers.remove(this);
443 UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
444 workers.notifyAll();
445 }
446 }
447 }
448
449 private Runnable fetchTask() {
450 Runnable task = null;
451 long currentTime = System.currentTimeMillis();
452 long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
453 for (;;) {
454 try {
455 long waitTime = deadline - currentTime;
456 if (waitTime <= 0) {
457 break;
458 }
459
460 try {
461 task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS);
462 break;
463 } finally {
464 if (task == null) {
465 currentTime = System.currentTimeMillis();
466 }
467 }
468 } catch (InterruptedException e) {
469
470 continue;
471 }
472 }
473 return task;
474 }
475
476 private void runTask(Runnable task) {
477 beforeExecute(thread, task);
478 boolean ran = false;
479 try {
480 task.run();
481 ran = true;
482 afterExecute(task, null);
483 completedTaskCount ++;
484 } catch (RuntimeException e) {
485 if (!ran) {
486 afterExecute(task, e);
487 }
488 throw e;
489 }
490 }
491 }
492 }