1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.RejectedExecutionHandler;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import org.apache.mina.common.IoEvent;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
55
56 private static final Runnable EXIT_SIGNAL = new Runnable() {
57 public void run() {}
58 };
59 private static final IoEventQueueHandler NOOP_QUEUE_HANDLER = new IoEventQueueHandler() {
60 public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
61 return true;
62 }
63 public void offered(ThreadPoolExecutor executor, IoEvent event) {}
64 public void polled(ThreadPoolExecutor executor, IoEvent event) {}
65 };
66
67 private final Set<Worker> workers = new HashSet<Worker>();
68
69 private volatile int corePoolSize;
70 private volatile int maximumPoolSize;
71 private volatile int largestPoolSize;
72 private final AtomicInteger idleWorkers = new AtomicInteger();
73
74 private long completedTaskCount;
75 private volatile boolean shutdown;
76
77 private final IoEventQueueHandler queueHandler;
78
79 public UnorderedThreadPoolExecutor() {
80 this(16);
81 }
82
83 public UnorderedThreadPoolExecutor(int maximumPoolSize) {
84 this(0, maximumPoolSize);
85 }
86
87 public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
88 this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
89 }
90
91 public UnorderedThreadPoolExecutor(
92 int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
93 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
94 }
95
96 public UnorderedThreadPoolExecutor(
97 int corePoolSize, int maximumPoolSize,
98 long keepAliveTime, TimeUnit unit,
99 IoEventQueueHandler queueHandler) {
100 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
101 }
102
103 public UnorderedThreadPoolExecutor(
104 int corePoolSize, int maximumPoolSize,
105 long keepAliveTime, TimeUnit unit,
106 ThreadFactory threadFactory) {
107 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
108 }
109
110 public UnorderedThreadPoolExecutor(
111 int corePoolSize, int maximumPoolSize,
112 long keepAliveTime, TimeUnit unit,
113 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
114 super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
115 if (corePoolSize < 0) {
116 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
117 }
118
119 if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
120 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
121 }
122
123 if (queueHandler == null) {
124 queueHandler = NOOP_QUEUE_HANDLER;
125 }
126
127 this.corePoolSize = corePoolSize;
128 this.maximumPoolSize = maximumPoolSize;
129 this.queueHandler = queueHandler;
130 }
131
132 public IoEventQueueHandler getQueueHandler() {
133 return queueHandler;
134 }
135
136 @Override
137 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
138
139 }
140
141 private void addWorker() {
142 synchronized (workers) {
143 if (workers.size() >= maximumPoolSize) {
144 return;
145 }
146
147 Worker worker = new Worker();
148 Thread thread = getThreadFactory().newThread(worker);
149 idleWorkers.incrementAndGet();
150 thread.start();
151 workers.add(worker);
152
153 if (workers.size() > largestPoolSize) {
154 largestPoolSize = workers.size();
155 }
156 }
157 }
158
159 private void addWorkerIfNecessary() {
160 if (idleWorkers.get() == 0) {
161 synchronized (workers) {
162 if (workers.isEmpty() || idleWorkers.get() == 0) {
163 addWorker();
164 }
165 }
166 }
167 }
168
169 private void removeWorker() {
170 synchronized (workers) {
171 if (workers.size() <= corePoolSize) {
172 return;
173 }
174 getQueue().offer(EXIT_SIGNAL);
175 }
176 }
177
178 @Override
179 public int getMaximumPoolSize() {
180 return maximumPoolSize;
181 }
182
183 @Override
184 public void setMaximumPoolSize(int maximumPoolSize) {
185 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
186 throw new IllegalArgumentException("maximumPoolSize: "
187 + maximumPoolSize);
188 }
189
190 synchronized (workers) {
191 this.maximumPoolSize = maximumPoolSize;
192 int difference = workers.size() - maximumPoolSize;
193 while (difference > 0) {
194 removeWorker();
195 --difference;
196 }
197 }
198 }
199
200 @Override
201 public boolean awaitTermination(long timeout, TimeUnit unit)
202 throws InterruptedException {
203
204 long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
205
206 synchronized (workers) {
207 while (!isTerminated()) {
208 long waitTime = deadline - System.currentTimeMillis();
209 if (waitTime <= 0) {
210 break;
211 }
212
213 workers.wait(waitTime);
214 }
215 }
216 return isTerminated();
217 }
218
219 @Override
220 public boolean isShutdown() {
221 return shutdown;
222 }
223
224 @Override
225 public boolean isTerminated() {
226 if (!shutdown) {
227 return false;
228 }
229
230 synchronized (workers) {
231 return workers.isEmpty();
232 }
233 }
234
235 @Override
236 public void shutdown() {
237 if (shutdown) {
238 return;
239 }
240
241 shutdown = true;
242
243 synchronized (workers) {
244 for (int i = workers.size(); i > 0; i --) {
245 getQueue().offer(EXIT_SIGNAL);
246 }
247 }
248 }
249
250 @Override
251 public List<Runnable> shutdownNow() {
252 shutdown();
253
254 List<Runnable> answer = new ArrayList<Runnable>();
255 Runnable task;
256 while ((task = getQueue().poll()) != null) {
257 if (task == EXIT_SIGNAL) {
258 getQueue().offer(EXIT_SIGNAL);
259 Thread.yield();
260 continue;
261 }
262
263 getQueueHandler().polled(this, (IoEvent) task);
264 answer.add(task);
265 }
266
267 return answer;
268 }
269
270 @Override
271 public void execute(Runnable task) {
272 if (shutdown) {
273 rejectTask(task);
274 }
275
276 checkTaskType(task);
277
278 IoEvent e = (IoEvent) task;
279 boolean offeredEvent = queueHandler.accept(this, e);
280 if (offeredEvent) {
281 getQueue().offer(e);
282 }
283
284 addWorkerIfNecessary();
285
286 if (offeredEvent) {
287 queueHandler.offered(this, e);
288 }
289 }
290
291 private void rejectTask(Runnable task) {
292 getRejectedExecutionHandler().rejectedExecution(task, this);
293 }
294
295 private void checkTaskType(Runnable task) {
296 if (!(task instanceof IoEvent)) {
297 throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
298 }
299 }
300
301 @Override
302 public int getActiveCount() {
303 synchronized (workers) {
304 return workers.size() - idleWorkers.get();
305 }
306 }
307
308 @Override
309 public long getCompletedTaskCount() {
310 synchronized (workers) {
311 long answer = completedTaskCount;
312 for (Worker w: workers) {
313 answer += w.completedTaskCount;
314 }
315
316 return answer;
317 }
318 }
319
320 @Override
321 public int getLargestPoolSize() {
322 return largestPoolSize;
323 }
324
325 @Override
326 public int getPoolSize() {
327 synchronized (workers) {
328 return workers.size();
329 }
330 }
331
332 @Override
333 public long getTaskCount() {
334 return getCompletedTaskCount();
335 }
336
337 @Override
338 public boolean isTerminating() {
339 synchronized (workers) {
340 return isShutdown() && !isTerminated();
341 }
342 }
343
344 @Override
345 public int prestartAllCoreThreads() {
346 int answer = 0;
347 synchronized (workers) {
348 for (int i = corePoolSize - workers.size() ; i > 0; i --) {
349 addWorker();
350 answer ++;
351 }
352 }
353 return answer;
354 }
355
356 @Override
357 public boolean prestartCoreThread() {
358 synchronized (workers) {
359 if (workers.size() < corePoolSize) {
360 addWorker();
361 return true;
362 } else {
363 return false;
364 }
365 }
366 }
367
368 @Override
369 public void purge() {
370 }
371
372 @Override
373 public boolean remove(Runnable task) {
374 boolean removed = super.remove(task);
375 if (removed) {
376 getQueueHandler().polled(this, (IoEvent) task);
377 }
378 return removed;
379 }
380
381 @Override
382 public int getCorePoolSize() {
383 return corePoolSize;
384 }
385
386 @Override
387 public void setCorePoolSize(int corePoolSize) {
388 if (corePoolSize < 0) {
389 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
390 }
391 if (corePoolSize > maximumPoolSize) {
392 throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
393 }
394
395 synchronized (workers) {
396 if (this.corePoolSize > corePoolSize) {
397 for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
398 removeWorker();
399 }
400 }
401 this.corePoolSize = corePoolSize;
402 }
403 }
404
405 private class Worker implements Runnable {
406
407 private volatile long completedTaskCount;
408 private Thread thread;
409
410 public void run() {
411 thread = Thread.currentThread();
412
413 try {
414 for (;;) {
415 Runnable task = fetchTask();
416
417 idleWorkers.decrementAndGet();
418
419 if (task == null) {
420 synchronized (workers) {
421 if (workers.size() > corePoolSize) {
422
423 workers.remove(this);
424 break;
425 }
426 }
427 }
428
429 if (task == EXIT_SIGNAL) {
430 break;
431 }
432
433 queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
434 try {
435 runTask(task);
436 } finally {
437 idleWorkers.incrementAndGet();
438 }
439 }
440 } finally {
441 synchronized (workers) {
442 workers.remove(this);
443 UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
444 workers.notifyAll();
445 }
446 }
447 }
448
449 private Runnable fetchTask() {
450 Runnable task = null;
451 long currentTime = System.currentTimeMillis();
452 long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
453 for (;;) {
454 try {
455 long waitTime = deadline - currentTime;
456 if (waitTime <= 0) {
457 break;
458 }
459
460 try {
461 task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS);
462 break;
463 } finally {
464 if (task == null) {
465 currentTime = System.currentTimeMillis();
466 }
467 }
468 } catch (InterruptedException e) {
469
470 continue;
471 }
472 }
473 return task;
474 }
475
476 private void runTask(Runnable task) {
477 beforeExecute(thread, task);
478 boolean ran = false;
479 try {
480 task.run();
481 ran = true;
482 afterExecute(task, null);
483 completedTaskCount ++;
484 } catch (RuntimeException e) {
485 if (!ran)
486 afterExecute(task, e);
487 throw e;
488 }
489 }
490 }
491 }