1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.RejectedExecutionHandler;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.mina.core.session.IoEvent;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor {
56
57 private static final Runnable EXIT_SIGNAL = new Runnable() {
58
59
60
61 @Override
62 public void run() {
63 throw new Error("This method shouldn't be called. " + "Please file a bug report.");
64 }
65 };
66
67 private final Set<Worker> workers = new HashSet<>();
68
69 private volatile int corePoolSize;
70
71 private volatile int maximumPoolSize;
72
73 private volatile int largestPoolSize;
74
75 private final AtomicInteger idleWorkers = new AtomicInteger();
76
77 private long completedTaskCount;
78
79 private volatile boolean shutdown;
80
81 private final IoEventQueueHandler queueHandler;
82
83
84
85
86 public UnorderedThreadPoolExecutor() {
87 this(16);
88 }
89
90
91
92
93
94
95 public UnorderedThreadPoolExecutor(int maximumPoolSize) {
96 this(0, maximumPoolSize);
97 }
98
99
100
101
102
103
104
105 public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
106 this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
107 }
108
109
110
111
112
113
114
115
116
117 public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
118 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
119 }
120
121
122
123
124
125
126
127
128
129
130 public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
131 IoEventQueueHandler queueHandler) {
132 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
133 }
134
135
136
137
138
139
140
141
142
143
144 public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
145 ThreadFactory threadFactory) {
146 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
147 }
148
149
150
151
152
153
154
155
156
157
158
159 public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
160 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
161 super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy());
162
163 if (corePoolSize < 0) {
164 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
165 }
166
167 if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
168 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
169 }
170
171 if (queueHandler == null) {
172 this.queueHandler = IoEventQueueHandler.NOOP;
173 } else {
174 this.queueHandler = queueHandler;
175 }
176
177 this.corePoolSize = corePoolSize;
178 this.maximumPoolSize = maximumPoolSize;
179 }
180
181
182
183
184 public IoEventQueueHandler getQueueHandler() {
185 return queueHandler;
186 }
187
188 @Override
189 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
190
191 }
192
193 private void addWorker() {
194 synchronized (workers) {
195 if (workers.size() >= maximumPoolSize) {
196 return;
197 }
198
199 Worker worker = new Worker();
200 Thread thread = getThreadFactory().newThread(worker);
201 idleWorkers.incrementAndGet();
202 thread.start();
203 workers.add(worker);
204
205 if (workers.size() > largestPoolSize) {
206 largestPoolSize = workers.size();
207 }
208 }
209 }
210
211 private void addWorkerIfNecessary() {
212 if (idleWorkers.get() == 0) {
213 synchronized (workers) {
214 if (workers.isEmpty() || idleWorkers.get() == 0) {
215 addWorker();
216 }
217 }
218 }
219 }
220
221 private void removeWorker() {
222 synchronized (workers) {
223 if (workers.size() <= corePoolSize) {
224 return;
225 }
226 getQueue().offer(EXIT_SIGNAL);
227 }
228 }
229
230 @Override
231 public int getMaximumPoolSize() {
232 return maximumPoolSize;
233 }
234
235 @Override
236 public void setMaximumPoolSize(int maximumPoolSize) {
237 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
238 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
239 }
240
241 synchronized (workers) {
242 this.maximumPoolSize = maximumPoolSize;
243 int difference = workers.size() - maximumPoolSize;
244 while (difference > 0) {
245 removeWorker();
246 --difference;
247 }
248 }
249 }
250
251 @Override
252 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
253
254 long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
255
256 synchronized (workers) {
257 while (!isTerminated()) {
258 long waitTime = deadline - System.currentTimeMillis();
259 if (waitTime <= 0) {
260 break;
261 }
262
263 workers.wait(waitTime);
264 }
265 }
266 return isTerminated();
267 }
268
269 @Override
270 public boolean isShutdown() {
271 return shutdown;
272 }
273
274 @Override
275 public boolean isTerminated() {
276 if (!shutdown) {
277 return false;
278 }
279
280 synchronized (workers) {
281 return workers.isEmpty();
282 }
283 }
284
285 @Override
286 public void shutdown() {
287 if (shutdown) {
288 return;
289 }
290
291 shutdown = true;
292
293 synchronized (workers) {
294 for (int i = workers.size(); i > 0; i--) {
295 getQueue().offer(EXIT_SIGNAL);
296 }
297 }
298 }
299
300 @Override
301 public List<Runnable> shutdownNow() {
302 shutdown();
303
304 List<Runnable> answer = new ArrayList<>();
305 Runnable task;
306 while ((task = getQueue().poll()) != null) {
307 if (task == EXIT_SIGNAL) {
308 getQueue().offer(EXIT_SIGNAL);
309 Thread.yield();
310 continue;
311 }
312
313 getQueueHandler().polled(this, (IoEvent) task);
314 answer.add(task);
315 }
316
317 return answer;
318 }
319
320 @Override
321 public void execute(Runnable task) {
322 if (shutdown) {
323 rejectTask(task);
324 }
325
326 checkTaskType(task);
327
328 IoEventef="../../../../../org/apache/mina/core/session/IoEvent.html#IoEvent">IoEvent e = (IoEvent) task;
329 boolean offeredEvent = queueHandler.accept(this, e);
330
331 if (offeredEvent) {
332 getQueue().offer(e);
333 }
334
335 addWorkerIfNecessary();
336
337 if (offeredEvent) {
338 queueHandler.offered(this, e);
339 }
340 }
341
342 private void rejectTask(Runnable task) {
343 getRejectedExecutionHandler().rejectedExecution(task, this);
344 }
345
346 private void checkTaskType(Runnable task) {
347 if (!(task instanceof IoEvent)) {
348 throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
349 }
350 }
351
352 @Override
353 public int getActiveCount() {
354 synchronized (workers) {
355 return workers.size() - idleWorkers.get();
356 }
357 }
358
359 @Override
360 public long getCompletedTaskCount() {
361 synchronized (workers) {
362 long answer = completedTaskCount;
363 for (Worker w : workers) {
364 answer += w.completedTaskCount.get();
365 }
366
367 return answer;
368 }
369 }
370
371 @Override
372 public int getLargestPoolSize() {
373 return largestPoolSize;
374 }
375
376 @Override
377 public int getPoolSize() {
378 synchronized (workers) {
379 return workers.size();
380 }
381 }
382
383 @Override
384 public long getTaskCount() {
385 return getCompletedTaskCount();
386 }
387
388 @Override
389 public boolean isTerminating() {
390 synchronized (workers) {
391 return isShutdown() && !isTerminated();
392 }
393 }
394
395 @Override
396 public int prestartAllCoreThreads() {
397 int answer = 0;
398 synchronized (workers) {
399 for (int i = corePoolSize - workers.size(); i > 0; i--) {
400 addWorker();
401 answer++;
402 }
403 }
404 return answer;
405 }
406
407 @Override
408 public boolean prestartCoreThread() {
409 synchronized (workers) {
410 if (workers.size() < corePoolSize) {
411 addWorker();
412 return true;
413 }
414
415 return false;
416 }
417 }
418
419 @Override
420 public void purge() {
421
422 }
423
424 @Override
425 public boolean remove(Runnable task) {
426 boolean removed = super.remove(task);
427 if (removed) {
428 getQueueHandler().polled(this, (IoEvent) task);
429 }
430 return removed;
431 }
432
433 @Override
434 public int getCorePoolSize() {
435 return corePoolSize;
436 }
437
438 @Override
439 public void setCorePoolSize(int corePoolSize) {
440 if (corePoolSize < 0) {
441 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
442 }
443 if (corePoolSize > maximumPoolSize) {
444 throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
445 }
446
447 synchronized (workers) {
448 if (this.corePoolSize > corePoolSize) {
449 for (int i = this.corePoolSize - corePoolSize; i > 0; i--) {
450 removeWorker();
451 }
452 }
453 this.corePoolSize = corePoolSize;
454 }
455 }
456
457 private class Worker implements Runnable {
458
459 private AtomicLong completedTaskCount = new AtomicLong(0);
460
461 private Thread thread;
462
463
464
465
466 @Override
467 public void run() {
468 thread = Thread.currentThread();
469
470 try {
471 for (;;) {
472 Runnable task = fetchTask();
473
474 idleWorkers.decrementAndGet();
475
476 if (task == null) {
477 synchronized (workers) {
478 if (workers.size() > corePoolSize) {
479
480 workers.remove(this);
481 break;
482 }
483 }
484 }
485
486 if (task == EXIT_SIGNAL) {
487 break;
488 }
489
490 try {
491 if (task != null) {
492 queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
493 runTask(task);
494 }
495 } finally {
496 idleWorkers.incrementAndGet();
497 }
498 }
499 } finally {
500 synchronized (workers) {
501 workers.remove(this);
502 UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get();
503 workers.notifyAll();
504 }
505 }
506 }
507
508 private Runnable fetchTask() {
509 Runnable task = null;
510 long currentTime = System.currentTimeMillis();
511 long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
512
513 for (;;) {
514 try {
515 long waitTime = deadline - currentTime;
516
517 if (waitTime <= 0) {
518 break;
519 }
520
521 try {
522 task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS);
523 break;
524 } finally {
525 if (task == null) {
526 currentTime = System.currentTimeMillis();
527 }
528 }
529 } catch (InterruptedException e) {
530
531 continue;
532 }
533 }
534 return task;
535 }
536
537 private void runTask(Runnable task) {
538 beforeExecute(thread, task);
539 boolean ran = false;
540 try {
541 task.run();
542 ran = true;
543 afterExecute(task, null);
544 completedTaskCount.incrementAndGet();
545 } catch (RuntimeException e) {
546 if (!ran) {
547 afterExecute(task, e);
548 }
549 throw e;
550 }
551 }
552 }
553 }