001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.filter.executor; 021 022import java.util.ArrayList; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Queue; 026import java.util.Set; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.Executors; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.RejectedExecutionHandler; 032import java.util.concurrent.SynchronousQueue; 033import java.util.concurrent.ThreadFactory; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.concurrent.atomic.AtomicLong; 038 039import org.apache.mina.core.session.AttributeKey; 040import org.apache.mina.core.session.DummySession; 041import org.apache.mina.core.session.IoEvent; 042import org.apache.mina.core.session.IoSession; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s. 048 * <p> 049 * If you don't need to maintain the order of events per session, please use 050 * {@link UnorderedThreadPoolExecutor}. 051 052 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 053 * @org.apache.xbean.XBean 054 */ 055public class OrderedThreadPoolExecutor extends ThreadPoolExecutor { 056 /** A logger for this class (commented as it breaks MDCFlter tests) */ 057 private static final Logger LOGGER = LoggerFactory.getLogger(OrderedThreadPoolExecutor.class); 058 059 /** A default value for the initial pool size */ 060 private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0; 061 062 /** A default value for the maximum pool size */ 063 private static final int DEFAULT_MAX_THREAD_POOL = 16; 064 065 /** A default value for the KeepAlive delay */ 066 private static final int DEFAULT_KEEP_ALIVE = 30; 067 068 private static final IoSession EXIT_SIGNAL = new DummySession(); 069 070 /** A key stored into the session's attribute for the event tasks being queued */ 071 private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue"); 072 073 /** A queue used to store the available sessions */ 074 private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>(); 075 076 private final Set<Worker> workers = new HashSet<Worker>(); 077 078 private volatile int largestPoolSize; 079 080 private final AtomicInteger idleWorkers = new AtomicInteger(); 081 082 private long completedTaskCount; 083 084 private volatile boolean shutdown; 085 086 private final IoEventQueueHandler eventQueueHandler; 087 088 /** 089 * Creates a default ThreadPool, with default values : 090 * - minimum pool size is 0 091 * - maximum pool size is 16 092 * - keepAlive set to 30 seconds 093 * - A default ThreadFactory 094 * - All events are accepted 095 */ 096 public OrderedThreadPoolExecutor() { 097 this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors 098 .defaultThreadFactory(), null); 099 } 100 101 /** 102 * Creates a default ThreadPool, with default values : 103 * - minimum pool size is 0 104 * - keepAlive set to 30 seconds 105 * - A default ThreadFactory 106 * - All events are accepted 107 * 108 * @param maximumPoolSize The maximum pool size 109 */ 110 public OrderedThreadPoolExecutor(int maximumPoolSize) { 111 this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors 112 .defaultThreadFactory(), null); 113 } 114 115 /** 116 * Creates a default ThreadPool, with default values : 117 * - keepAlive set to 30 seconds 118 * - A default ThreadFactory 119 * - All events are accepted 120 * 121 * @param corePoolSize The initial pool sizePoolSize 122 * @param maximumPoolSize The maximum pool size 123 */ 124 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) { 125 this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), 126 null); 127 } 128 129 /** 130 * Creates a default ThreadPool, with default values : 131 * - A default ThreadFactory 132 * - All events are accepted 133 * 134 * @param corePoolSize The initial pool sizePoolSize 135 * @param maximumPoolSize The maximum pool size 136 * @param keepAliveTime Default duration for a thread 137 * @param unit Time unit used for the keepAlive value 138 */ 139 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { 140 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null); 141 } 142 143 /** 144 * Creates a default ThreadPool, with default values : 145 * - A default ThreadFactory 146 * 147 * @param corePoolSize The initial pool sizePoolSize 148 * @param maximumPoolSize The maximum pool size 149 * @param keepAliveTime Default duration for a thread 150 * @param unit Time unit used for the keepAlive value 151 * @param eventQueueHandler The queue used to store events 152 */ 153 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 154 IoEventQueueHandler eventQueueHandler) { 155 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler); 156 } 157 158 /** 159 * Creates a default ThreadPool, with default values : 160 * - A default ThreadFactory 161 * 162 * @param corePoolSize The initial pool sizePoolSize 163 * @param maximumPoolSize The maximum pool size 164 * @param keepAliveTime Default duration for a thread 165 * @param unit Time unit used for the keepAlive value 166 * @param threadFactory The factory used to create threads 167 */ 168 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 169 ThreadFactory threadFactory) { 170 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null); 171 } 172 173 /** 174 * Creates a new instance of a OrderedThreadPoolExecutor. 175 * 176 * @param corePoolSize The initial pool sizePoolSize 177 * @param maximumPoolSize The maximum pool size 178 * @param keepAliveTime Default duration for a thread 179 * @param unit Time unit used for the keepAlive value 180 * @param threadFactory The factory used to create threads 181 * @param eventQueueHandler The queue used to store events 182 */ 183 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 184 ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) { 185 // We have to initialize the pool with default values (0 and 1) in order to 186 // handle the exception in a better way. We can't add a try {} catch() {} 187 // around the super() call. 188 super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), 189 threadFactory, new AbortPolicy()); 190 191 if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) { 192 throw new IllegalArgumentException("corePoolSize: " + corePoolSize); 193 } 194 195 if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) { 196 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); 197 } 198 199 // Now, we can setup the pool sizes 200 super.setCorePoolSize(corePoolSize); 201 super.setMaximumPoolSize(maximumPoolSize); 202 203 // The queueHandler might be null. 204 if (eventQueueHandler == null) { 205 this.eventQueueHandler = IoEventQueueHandler.NOOP; 206 } else { 207 this.eventQueueHandler = eventQueueHandler; 208 } 209 } 210 211 /** 212 * Get the session's tasks queue. 213 */ 214 private SessionTasksQueue getSessionTasksQueue(IoSession session) { 215 SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE); 216 217 if (queue == null) { 218 queue = new SessionTasksQueue(); 219 SessionTasksQueue oldQueue = (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue); 220 221 if (oldQueue != null) { 222 queue = oldQueue; 223 } 224 } 225 226 return queue; 227 } 228 229 /** 230 * @return The associated queue handler. 231 */ 232 public IoEventQueueHandler getQueueHandler() { 233 return eventQueueHandler; 234 } 235 236 /** 237 * {@inheritDoc} 238 */ 239 @Override 240 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 241 // Ignore the request. It must always be AbortPolicy. 242 } 243 244 /** 245 * Add a new thread to execute a task, if needed and possible. 246 * It depends on the current pool size. If it's full, we do nothing. 247 */ 248 private void addWorker() { 249 synchronized (workers) { 250 if (workers.size() >= super.getMaximumPoolSize()) { 251 return; 252 } 253 254 // Create a new worker, and add it to the thread pool 255 Worker worker = new Worker(); 256 Thread thread = getThreadFactory().newThread(worker); 257 258 // As we have added a new thread, it's considered as idle. 259 idleWorkers.incrementAndGet(); 260 261 // Now, we can start it. 262 thread.start(); 263 workers.add(worker); 264 265 if (workers.size() > largestPoolSize) { 266 largestPoolSize = workers.size(); 267 } 268 } 269 } 270 271 /** 272 * Add a new Worker only if there are no idle worker. 273 */ 274 private void addWorkerIfNecessary() { 275 if (idleWorkers.get() == 0) { 276 synchronized (workers) { 277 if (workers.isEmpty() || (idleWorkers.get() == 0)) { 278 addWorker(); 279 } 280 } 281 } 282 } 283 284 private void removeWorker() { 285 synchronized (workers) { 286 if (workers.size() <= super.getCorePoolSize()) { 287 return; 288 } 289 waitingSessions.offer(EXIT_SIGNAL); 290 } 291 } 292 293 /** 294 * {@inheritDoc} 295 */ 296 @Override 297 public int getMaximumPoolSize() { 298 return super.getMaximumPoolSize(); 299 } 300 301 /** 302 * {@inheritDoc} 303 */ 304 @Override 305 public void setMaximumPoolSize(int maximumPoolSize) { 306 if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) { 307 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); 308 } 309 310 synchronized (workers) { 311 super.setMaximumPoolSize(maximumPoolSize); 312 int difference = workers.size() - maximumPoolSize; 313 while (difference > 0) { 314 removeWorker(); 315 --difference; 316 } 317 } 318 } 319 320 /** 321 * {@inheritDoc} 322 */ 323 @Override 324 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 325 326 long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 327 328 synchronized (workers) { 329 while (!isTerminated()) { 330 long waitTime = deadline - System.currentTimeMillis(); 331 if (waitTime <= 0) { 332 break; 333 } 334 335 workers.wait(waitTime); 336 } 337 } 338 return isTerminated(); 339 } 340 341 /** 342 * {@inheritDoc} 343 */ 344 @Override 345 public boolean isShutdown() { 346 return shutdown; 347 } 348 349 /** 350 * {@inheritDoc} 351 */ 352 @Override 353 public boolean isTerminated() { 354 if (!shutdown) { 355 return false; 356 } 357 358 synchronized (workers) { 359 return workers.isEmpty(); 360 } 361 } 362 363 /** 364 * {@inheritDoc} 365 */ 366 @Override 367 public void shutdown() { 368 if (shutdown) { 369 return; 370 } 371 372 shutdown = true; 373 374 synchronized (workers) { 375 for (int i = workers.size(); i > 0; i--) { 376 waitingSessions.offer(EXIT_SIGNAL); 377 } 378 } 379 } 380 381 /** 382 * {@inheritDoc} 383 */ 384 @Override 385 public List<Runnable> shutdownNow() { 386 shutdown(); 387 388 List<Runnable> answer = new ArrayList<Runnable>(); 389 IoSession session; 390 391 while ((session = waitingSessions.poll()) != null) { 392 if (session == EXIT_SIGNAL) { 393 waitingSessions.offer(EXIT_SIGNAL); 394 Thread.yield(); // Let others take the signal. 395 continue; 396 } 397 398 SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE); 399 400 synchronized (sessionTasksQueue.tasksQueue) { 401 402 for (Runnable task : sessionTasksQueue.tasksQueue) { 403 getQueueHandler().polled(this, (IoEvent) task); 404 answer.add(task); 405 } 406 407 sessionTasksQueue.tasksQueue.clear(); 408 } 409 } 410 411 return answer; 412 } 413 414 /** 415 * A Helper class used to print the list of events being queued. 416 */ 417 private void print(Queue<Runnable> queue, IoEvent event) { 418 StringBuilder sb = new StringBuilder(); 419 sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId()); 420 boolean first = true; 421 sb.append("\nQueue : ["); 422 for (Runnable elem : queue) { 423 if (first) { 424 first = false; 425 } else { 426 sb.append(", "); 427 } 428 429 sb.append(((IoEvent) elem).getType()).append(", "); 430 } 431 sb.append("]\n"); 432 LOGGER.debug(sb.toString()); 433 } 434 435 /** 436 * {@inheritDoc} 437 */ 438 @Override 439 public void execute(Runnable task) { 440 if (shutdown) { 441 rejectTask(task); 442 } 443 444 // Check that it's a IoEvent task 445 checkTaskType(task); 446 447 IoEvent event = (IoEvent) task; 448 449 // Get the associated session 450 IoSession session = event.getSession(); 451 452 // Get the session's queue of events 453 SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session); 454 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; 455 456 boolean offerSession; 457 458 // propose the new event to the event queue handler. If we 459 // use a throttle queue handler, the message may be rejected 460 // if the maximum size has been reached. 461 boolean offerEvent = eventQueueHandler.accept(this, event); 462 463 if (offerEvent) { 464 // Ok, the message has been accepted 465 synchronized (tasksQueue) { 466 // Inject the event into the executor taskQueue 467 tasksQueue.offer(event); 468 469 if (sessionTasksQueue.processingCompleted) { 470 sessionTasksQueue.processingCompleted = false; 471 offerSession = true; 472 } else { 473 offerSession = false; 474 } 475 476 if (LOGGER.isDebugEnabled()) { 477 print(tasksQueue, event); 478 } 479 } 480 } else { 481 offerSession = false; 482 } 483 484 if (offerSession) { 485 // As the tasksQueue was empty, the task has been executed 486 // immediately, so we can move the session to the queue 487 // of sessions waiting for completion. 488 waitingSessions.offer(session); 489 } 490 491 addWorkerIfNecessary(); 492 493 if (offerEvent) { 494 eventQueueHandler.offered(this, event); 495 } 496 } 497 498 private void rejectTask(Runnable task) { 499 getRejectedExecutionHandler().rejectedExecution(task, this); 500 } 501 502 private void checkTaskType(Runnable task) { 503 if (!(task instanceof IoEvent)) { 504 throw new IllegalArgumentException("task must be an IoEvent or its subclass."); 505 } 506 } 507 508 /** 509 * {@inheritDoc} 510 */ 511 @Override 512 public int getActiveCount() { 513 synchronized (workers) { 514 return workers.size() - idleWorkers.get(); 515 } 516 } 517 518 /** 519 * {@inheritDoc} 520 */ 521 @Override 522 public long getCompletedTaskCount() { 523 synchronized (workers) { 524 long answer = completedTaskCount; 525 for (Worker w : workers) { 526 answer += w.completedTaskCount.get(); 527 } 528 529 return answer; 530 } 531 } 532 533 /** 534 * {@inheritDoc} 535 */ 536 @Override 537 public int getLargestPoolSize() { 538 return largestPoolSize; 539 } 540 541 /** 542 * {@inheritDoc} 543 */ 544 @Override 545 public int getPoolSize() { 546 synchronized (workers) { 547 return workers.size(); 548 } 549 } 550 551 /** 552 * {@inheritDoc} 553 */ 554 @Override 555 public long getTaskCount() { 556 return getCompletedTaskCount(); 557 } 558 559 /** 560 * {@inheritDoc} 561 */ 562 @Override 563 public boolean isTerminating() { 564 synchronized (workers) { 565 return isShutdown() && !isTerminated(); 566 } 567 } 568 569 /** 570 * {@inheritDoc} 571 */ 572 @Override 573 public int prestartAllCoreThreads() { 574 int answer = 0; 575 synchronized (workers) { 576 for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) { 577 addWorker(); 578 answer++; 579 } 580 } 581 return answer; 582 } 583 584 /** 585 * {@inheritDoc} 586 */ 587 @Override 588 public boolean prestartCoreThread() { 589 synchronized (workers) { 590 if (workers.size() < super.getCorePoolSize()) { 591 addWorker(); 592 return true; 593 } else { 594 return false; 595 } 596 } 597 } 598 599 /** 600 * {@inheritDoc} 601 */ 602 @Override 603 public BlockingQueue<Runnable> getQueue() { 604 throw new UnsupportedOperationException(); 605 } 606 607 /** 608 * {@inheritDoc} 609 */ 610 @Override 611 public void purge() { 612 // Nothing to purge in this implementation. 613 } 614 615 /** 616 * {@inheritDoc} 617 */ 618 @Override 619 public boolean remove(Runnable task) { 620 checkTaskType(task); 621 IoEvent event = (IoEvent) task; 622 IoSession session = event.getSession(); 623 SessionTasksQueue sessionTasksQueue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE); 624 625 if (sessionTasksQueue == null) { 626 return false; 627 } 628 629 boolean removed; 630 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; 631 632 synchronized (tasksQueue) { 633 removed = tasksQueue.remove(task); 634 } 635 636 if (removed) { 637 getQueueHandler().polled(this, event); 638 } 639 640 return removed; 641 } 642 643 /** 644 * {@inheritDoc} 645 */ 646 @Override 647 public int getCorePoolSize() { 648 return super.getCorePoolSize(); 649 } 650 651 /** 652 * {@inheritDoc} 653 */ 654 @Override 655 public void setCorePoolSize(int corePoolSize) { 656 if (corePoolSize < 0) { 657 throw new IllegalArgumentException("corePoolSize: " + corePoolSize); 658 } 659 if (corePoolSize > super.getMaximumPoolSize()) { 660 throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize"); 661 } 662 663 synchronized (workers) { 664 if (super.getCorePoolSize() > corePoolSize) { 665 for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) { 666 removeWorker(); 667 } 668 } 669 super.setCorePoolSize(corePoolSize); 670 } 671 } 672 673 private class Worker implements Runnable { 674 675 private AtomicLong completedTaskCount = new AtomicLong(0); 676 677 private Thread thread; 678 679 public void run() { 680 thread = Thread.currentThread(); 681 682 try { 683 for (;;) { 684 IoSession session = fetchSession(); 685 686 idleWorkers.decrementAndGet(); 687 688 if (session == null) { 689 synchronized (workers) { 690 if (workers.size() > getCorePoolSize()) { 691 // Remove now to prevent duplicate exit. 692 workers.remove(this); 693 break; 694 } 695 } 696 } 697 698 if (session == EXIT_SIGNAL) { 699 break; 700 } 701 702 try { 703 if (session != null) { 704 runTasks(getSessionTasksQueue(session)); 705 } 706 } finally { 707 idleWorkers.incrementAndGet(); 708 } 709 } 710 } finally { 711 synchronized (workers) { 712 workers.remove(this); 713 OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get(); 714 workers.notifyAll(); 715 } 716 } 717 } 718 719 private IoSession fetchSession() { 720 IoSession session = null; 721 long currentTime = System.currentTimeMillis(); 722 long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS); 723 for (;;) { 724 try { 725 long waitTime = deadline - currentTime; 726 if (waitTime <= 0) { 727 break; 728 } 729 730 try { 731 session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS); 732 break; 733 } finally { 734 if (session == null) { 735 currentTime = System.currentTimeMillis(); 736 } 737 } 738 } catch (InterruptedException e) { 739 // Ignore. 740 continue; 741 } 742 } 743 return session; 744 } 745 746 private void runTasks(SessionTasksQueue sessionTasksQueue) { 747 for (;;) { 748 Runnable task; 749 Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; 750 751 synchronized (tasksQueue) { 752 task = tasksQueue.poll(); 753 754 if (task == null) { 755 sessionTasksQueue.processingCompleted = true; 756 break; 757 } 758 } 759 760 eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task); 761 762 runTask(task); 763 } 764 } 765 766 private void runTask(Runnable task) { 767 beforeExecute(thread, task); 768 boolean ran = false; 769 try { 770 task.run(); 771 ran = true; 772 afterExecute(task, null); 773 completedTaskCount.incrementAndGet(); 774 } catch (RuntimeException e) { 775 if (!ran) { 776 afterExecute(task, e); 777 } 778 throw e; 779 } 780 } 781 } 782 783 /** 784 * A class used to store the ordered list of events to be processed by the 785 * session, and the current task state. 786 */ 787 private class SessionTasksQueue { 788 /** A queue of ordered event waiting to be processed */ 789 private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>(); 790 791 /** The current task state */ 792 private boolean processingCompleted = true; 793 } 794}