001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.filter.executor; 021 022import java.util.ArrayList; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import java.util.concurrent.Executors; 027import java.util.concurrent.LinkedBlockingQueue; 028import java.util.concurrent.RejectedExecutionHandler; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicLong; 034 035import org.apache.mina.core.session.IoEvent; 036 037/** 038 * A {@link ThreadPoolExecutor} that does not maintain the order of {@link IoEvent}s. 039 * This means more than one event handler methods can be invoked at the same 040 * time with mixed order. For example, let's assume that messageReceived, messageSent, 041 * and sessionClosed events are fired. 042 * <ul> 043 * <li>All event handler methods can be called simultaneously. 044 * (e.g. messageReceived and messageSent can be invoked at the same time.)</li> 045 * <li>The event order can be mixed up. 046 * (e.g. sessionClosed or messageSent can be invoked before messageReceived 047 * is invoked.)</li> 048 * </ul> 049 * If you need to maintain the order of events per session, please use 050 * {@link OrderedThreadPoolExecutor}. 051 * 052 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 053 * @org.apache.xbean.XBean 054 */ 055public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor { 056 057 private static final Runnable EXIT_SIGNAL = new Runnable() { 058 public void run() { 059 throw new Error("This method shouldn't be called. " + "Please file a bug report."); 060 } 061 }; 062 063 private final Set<Worker> workers = new HashSet<Worker>(); 064 065 private volatile int corePoolSize; 066 067 private volatile int maximumPoolSize; 068 069 private volatile int largestPoolSize; 070 071 private final AtomicInteger idleWorkers = new AtomicInteger(); 072 073 private long completedTaskCount; 074 075 private volatile boolean shutdown; 076 077 private final IoEventQueueHandler queueHandler; 078 079 public UnorderedThreadPoolExecutor() { 080 this(16); 081 } 082 083 public UnorderedThreadPoolExecutor(int maximumPoolSize) { 084 this(0, maximumPoolSize); 085 } 086 087 public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) { 088 this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS); 089 } 090 091 public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { 092 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory()); 093 } 094 095 public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 096 IoEventQueueHandler queueHandler) { 097 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler); 098 } 099 100 public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 101 ThreadFactory threadFactory) { 102 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null); 103 } 104 105 public UnorderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 106 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) { 107 super(0, 1, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, new AbortPolicy()); 108 if (corePoolSize < 0) { 109 throw new IllegalArgumentException("corePoolSize: " + corePoolSize); 110 } 111 112 if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) { 113 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); 114 } 115 116 if (queueHandler == null) { 117 queueHandler = IoEventQueueHandler.NOOP; 118 } 119 120 this.corePoolSize = corePoolSize; 121 this.maximumPoolSize = maximumPoolSize; 122 this.queueHandler = queueHandler; 123 } 124 125 public IoEventQueueHandler getQueueHandler() { 126 return queueHandler; 127 } 128 129 @Override 130 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 131 // Ignore the request. It must always be AbortPolicy. 132 } 133 134 private void addWorker() { 135 synchronized (workers) { 136 if (workers.size() >= maximumPoolSize) { 137 return; 138 } 139 140 Worker worker = new Worker(); 141 Thread thread = getThreadFactory().newThread(worker); 142 idleWorkers.incrementAndGet(); 143 thread.start(); 144 workers.add(worker); 145 146 if (workers.size() > largestPoolSize) { 147 largestPoolSize = workers.size(); 148 } 149 } 150 } 151 152 private void addWorkerIfNecessary() { 153 if (idleWorkers.get() == 0) { 154 synchronized (workers) { 155 if (workers.isEmpty() || idleWorkers.get() == 0) { 156 addWorker(); 157 } 158 } 159 } 160 } 161 162 private void removeWorker() { 163 synchronized (workers) { 164 if (workers.size() <= corePoolSize) { 165 return; 166 } 167 getQueue().offer(EXIT_SIGNAL); 168 } 169 } 170 171 @Override 172 public int getMaximumPoolSize() { 173 return maximumPoolSize; 174 } 175 176 @Override 177 public void setMaximumPoolSize(int maximumPoolSize) { 178 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) { 179 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); 180 } 181 182 synchronized (workers) { 183 this.maximumPoolSize = maximumPoolSize; 184 int difference = workers.size() - maximumPoolSize; 185 while (difference > 0) { 186 removeWorker(); 187 --difference; 188 } 189 } 190 } 191 192 @Override 193 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 194 195 long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 196 197 synchronized (workers) { 198 while (!isTerminated()) { 199 long waitTime = deadline - System.currentTimeMillis(); 200 if (waitTime <= 0) { 201 break; 202 } 203 204 workers.wait(waitTime); 205 } 206 } 207 return isTerminated(); 208 } 209 210 @Override 211 public boolean isShutdown() { 212 return shutdown; 213 } 214 215 @Override 216 public boolean isTerminated() { 217 if (!shutdown) { 218 return false; 219 } 220 221 synchronized (workers) { 222 return workers.isEmpty(); 223 } 224 } 225 226 @Override 227 public void shutdown() { 228 if (shutdown) { 229 return; 230 } 231 232 shutdown = true; 233 234 synchronized (workers) { 235 for (int i = workers.size(); i > 0; i--) { 236 getQueue().offer(EXIT_SIGNAL); 237 } 238 } 239 } 240 241 @Override 242 public List<Runnable> shutdownNow() { 243 shutdown(); 244 245 List<Runnable> answer = new ArrayList<Runnable>(); 246 Runnable task; 247 while ((task = getQueue().poll()) != null) { 248 if (task == EXIT_SIGNAL) { 249 getQueue().offer(EXIT_SIGNAL); 250 Thread.yield(); // Let others take the signal. 251 continue; 252 } 253 254 getQueueHandler().polled(this, (IoEvent) task); 255 answer.add(task); 256 } 257 258 return answer; 259 } 260 261 @Override 262 public void execute(Runnable task) { 263 if (shutdown) { 264 rejectTask(task); 265 } 266 267 checkTaskType(task); 268 269 IoEvent e = (IoEvent) task; 270 boolean offeredEvent = queueHandler.accept(this, e); 271 272 if (offeredEvent) { 273 getQueue().offer(e); 274 } 275 276 addWorkerIfNecessary(); 277 278 if (offeredEvent) { 279 queueHandler.offered(this, e); 280 } 281 } 282 283 private void rejectTask(Runnable task) { 284 getRejectedExecutionHandler().rejectedExecution(task, this); 285 } 286 287 private void checkTaskType(Runnable task) { 288 if (!(task instanceof IoEvent)) { 289 throw new IllegalArgumentException("task must be an IoEvent or its subclass."); 290 } 291 } 292 293 @Override 294 public int getActiveCount() { 295 synchronized (workers) { 296 return workers.size() - idleWorkers.get(); 297 } 298 } 299 300 @Override 301 public long getCompletedTaskCount() { 302 synchronized (workers) { 303 long answer = completedTaskCount; 304 for (Worker w : workers) { 305 answer += w.completedTaskCount.get(); 306 } 307 308 return answer; 309 } 310 } 311 312 @Override 313 public int getLargestPoolSize() { 314 return largestPoolSize; 315 } 316 317 @Override 318 public int getPoolSize() { 319 synchronized (workers) { 320 return workers.size(); 321 } 322 } 323 324 @Override 325 public long getTaskCount() { 326 return getCompletedTaskCount(); 327 } 328 329 @Override 330 public boolean isTerminating() { 331 synchronized (workers) { 332 return isShutdown() && !isTerminated(); 333 } 334 } 335 336 @Override 337 public int prestartAllCoreThreads() { 338 int answer = 0; 339 synchronized (workers) { 340 for (int i = corePoolSize - workers.size(); i > 0; i--) { 341 addWorker(); 342 answer++; 343 } 344 } 345 return answer; 346 } 347 348 @Override 349 public boolean prestartCoreThread() { 350 synchronized (workers) { 351 if (workers.size() < corePoolSize) { 352 addWorker(); 353 return true; 354 } 355 356 return false; 357 } 358 } 359 360 @Override 361 public void purge() { 362 // Nothing to purge in this implementation. 363 } 364 365 @Override 366 public boolean remove(Runnable task) { 367 boolean removed = super.remove(task); 368 if (removed) { 369 getQueueHandler().polled(this, (IoEvent) task); 370 } 371 return removed; 372 } 373 374 @Override 375 public int getCorePoolSize() { 376 return corePoolSize; 377 } 378 379 @Override 380 public void setCorePoolSize(int corePoolSize) { 381 if (corePoolSize < 0) { 382 throw new IllegalArgumentException("corePoolSize: " + corePoolSize); 383 } 384 if (corePoolSize > maximumPoolSize) { 385 throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize"); 386 } 387 388 synchronized (workers) { 389 if (this.corePoolSize > corePoolSize) { 390 for (int i = this.corePoolSize - corePoolSize; i > 0; i--) { 391 removeWorker(); 392 } 393 } 394 this.corePoolSize = corePoolSize; 395 } 396 } 397 398 private class Worker implements Runnable { 399 400 private AtomicLong completedTaskCount = new AtomicLong(0); 401 402 private Thread thread; 403 404 public void run() { 405 thread = Thread.currentThread(); 406 407 try { 408 for (;;) { 409 Runnable task = fetchTask(); 410 411 idleWorkers.decrementAndGet(); 412 413 if (task == null) { 414 synchronized (workers) { 415 if (workers.size() > corePoolSize) { 416 // Remove now to prevent duplicate exit. 417 workers.remove(this); 418 break; 419 } 420 } 421 } 422 423 if (task == EXIT_SIGNAL) { 424 break; 425 } 426 427 try { 428 if (task != null) { 429 queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task); 430 runTask(task); 431 } 432 } finally { 433 idleWorkers.incrementAndGet(); 434 } 435 } 436 } finally { 437 synchronized (workers) { 438 workers.remove(this); 439 UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get(); 440 workers.notifyAll(); 441 } 442 } 443 } 444 445 private Runnable fetchTask() { 446 Runnable task = null; 447 long currentTime = System.currentTimeMillis(); 448 long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS); 449 for (;;) { 450 try { 451 long waitTime = deadline - currentTime; 452 if (waitTime <= 0) { 453 break; 454 } 455 456 try { 457 task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS); 458 break; 459 } finally { 460 if (task == null) { 461 currentTime = System.currentTimeMillis(); 462 } 463 } 464 } catch (InterruptedException e) { 465 // Ignore. 466 continue; 467 } 468 } 469 return task; 470 } 471 472 private void runTask(Runnable task) { 473 beforeExecute(thread, task); 474 boolean ran = false; 475 try { 476 task.run(); 477 ran = true; 478 afterExecute(task, null); 479 completedTaskCount.incrementAndGet(); 480 } catch (RuntimeException e) { 481 if (!ran) { 482 afterExecute(task, e); 483 } 484 throw e; 485 } 486 } 487 } 488}