001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.Iterator; 020 import java.util.LinkedHashSet; 021 import java.util.List; 022 import java.util.Map; 023 import java.util.Set; 024 import java.util.concurrent.ConcurrentHashMap; 025 import java.util.concurrent.CopyOnWriteArrayList; 026 import java.util.concurrent.ExecutorService; 027 import java.util.concurrent.ScheduledExecutorService; 028 import java.util.concurrent.ThreadFactory; 029 import java.util.concurrent.ThreadPoolExecutor; 030 import java.util.concurrent.TimeUnit; 031 032 import org.apache.camel.CamelContext; 033 import org.apache.camel.NamedNode; 034 import org.apache.camel.StaticService; 035 import org.apache.camel.ThreadPoolRejectedPolicy; 036 import org.apache.camel.model.OptionalIdentifiedDefinition; 037 import org.apache.camel.model.ProcessorDefinition; 038 import org.apache.camel.model.ProcessorDefinitionHelper; 039 import org.apache.camel.model.RouteDefinition; 040 import org.apache.camel.spi.ExecutorServiceManager; 041 import org.apache.camel.spi.LifecycleStrategy; 042 import org.apache.camel.spi.ThreadPoolFactory; 043 import org.apache.camel.spi.ThreadPoolProfile; 044 import org.apache.camel.support.ServiceSupport; 045 import org.apache.camel.util.ObjectHelper; 046 import org.apache.camel.util.StopWatch; 047 import org.apache.camel.util.TimeUtils; 048 import org.apache.camel.util.URISupport; 049 import org.apache.camel.util.concurrent.CamelThreadFactory; 050 import org.apache.camel.util.concurrent.SizedScheduledExecutorService; 051 import org.apache.camel.util.concurrent.ThreadHelper; 052 import org.slf4j.Logger; 053 import org.slf4j.LoggerFactory; 054 055 /** 056 * @version 057 */ 058 public class DefaultExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager { 059 private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceManager.class); 060 061 private final CamelContext camelContext; 062 private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory(); 063 private final List<ExecutorService> executorServices = new CopyOnWriteArrayList<ExecutorService>(); 064 private String threadNamePattern; 065 private long shutdownAwaitTermination = 10000; 066 private String defaultThreadPoolProfileId = "defaultThreadPoolProfile"; 067 private final Map<String, ThreadPoolProfile> threadPoolProfiles = new ConcurrentHashMap<String, ThreadPoolProfile>(); 068 private ThreadPoolProfile defaultProfile; 069 070 public DefaultExecutorServiceManager(CamelContext camelContext) { 071 this.camelContext = camelContext; 072 073 defaultProfile = new ThreadPoolProfile(defaultThreadPoolProfileId); 074 defaultProfile.setDefaultProfile(true); 075 defaultProfile.setPoolSize(10); 076 defaultProfile.setMaxPoolSize(20); 077 defaultProfile.setKeepAliveTime(60L); 078 defaultProfile.setTimeUnit(TimeUnit.SECONDS); 079 defaultProfile.setMaxQueueSize(1000); 080 defaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns); 081 082 registerThreadPoolProfile(defaultProfile); 083 } 084 085 @Override 086 public ThreadPoolFactory getThreadPoolFactory() { 087 return threadPoolFactory; 088 } 089 090 @Override 091 public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) { 092 this.threadPoolFactory = threadPoolFactory; 093 } 094 095 @Override 096 public void registerThreadPoolProfile(ThreadPoolProfile profile) { 097 ObjectHelper.notNull(profile, "profile"); 098 ObjectHelper.notEmpty(profile.getId(), "id", profile); 099 threadPoolProfiles.put(profile.getId(), profile); 100 } 101 102 @Override 103 public ThreadPoolProfile getThreadPoolProfile(String id) { 104 return threadPoolProfiles.get(id); 105 } 106 107 @Override 108 public ThreadPoolProfile getDefaultThreadPoolProfile() { 109 return getThreadPoolProfile(defaultThreadPoolProfileId); 110 } 111 112 @Override 113 public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) { 114 threadPoolProfiles.remove(defaultThreadPoolProfileId); 115 defaultThreadPoolProfile.addDefaults(defaultProfile); 116 117 LOG.info("Using custom DefaultThreadPoolProfile: " + defaultThreadPoolProfile); 118 119 this.defaultThreadPoolProfileId = defaultThreadPoolProfile.getId(); 120 defaultThreadPoolProfile.setDefaultProfile(true); 121 registerThreadPoolProfile(defaultThreadPoolProfile); 122 } 123 124 @Override 125 public String getThreadNamePattern() { 126 return threadNamePattern; 127 } 128 129 @Override 130 public void setThreadNamePattern(String threadNamePattern) { 131 // must set camel id here in the pattern and let the other placeholders be resolved on demand 132 String name = threadNamePattern.replaceFirst("#camelId#", this.camelContext.getName()); 133 this.threadNamePattern = name; 134 } 135 136 @Override 137 public long getShutdownAwaitTermination() { 138 return shutdownAwaitTermination; 139 } 140 141 @Override 142 public void setShutdownAwaitTermination(long shutdownAwaitTermination) { 143 this.shutdownAwaitTermination = shutdownAwaitTermination; 144 } 145 146 @Override 147 public String resolveThreadName(String name) { 148 return ThreadHelper.resolveThreadName(threadNamePattern, name); 149 } 150 151 @Override 152 public Thread newThread(String name, Runnable runnable) { 153 ThreadFactory factory = createThreadFactory(name, true); 154 return factory.newThread(runnable); 155 } 156 157 @Override 158 public ExecutorService newDefaultThreadPool(Object source, String name) { 159 return newThreadPool(source, name, getDefaultThreadPoolProfile()); 160 } 161 162 @Override 163 public ScheduledExecutorService newDefaultScheduledThreadPool(Object source, String name) { 164 return newScheduledThreadPool(source, name, getDefaultThreadPoolProfile()); 165 } 166 167 @Override 168 public ExecutorService newThreadPool(Object source, String name, String profileId) { 169 ThreadPoolProfile profile = getThreadPoolProfile(profileId); 170 if (profile != null) { 171 return newThreadPool(source, name, profile); 172 } else { 173 // no profile with that id 174 return null; 175 } 176 } 177 178 @Override 179 public ExecutorService newThreadPool(Object source, String name, ThreadPoolProfile profile) { 180 String sanitizedName = URISupport.sanitizeUri(name); 181 ObjectHelper.notNull(profile, "ThreadPoolProfile"); 182 183 ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile(); 184 profile.addDefaults(defaultProfile); 185 186 ThreadFactory threadFactory = createThreadFactory(sanitizedName, true); 187 ExecutorService executorService = threadPoolFactory.newThreadPool(profile, threadFactory); 188 onThreadPoolCreated(executorService, source, profile.getId()); 189 if (LOG.isDebugEnabled()) { 190 LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, executorService}); 191 } 192 193 return executorService; 194 } 195 196 @Override 197 public ExecutorService newThreadPool(Object source, String name, int poolSize, int maxPoolSize) { 198 ThreadPoolProfile profile = new ThreadPoolProfile(name); 199 profile.setPoolSize(poolSize); 200 profile.setMaxPoolSize(maxPoolSize); 201 return newThreadPool(source, name, profile); 202 } 203 204 @Override 205 public ExecutorService newSingleThreadExecutor(Object source, String name) { 206 return newFixedThreadPool(source, name, 1); 207 } 208 209 @Override 210 public ExecutorService newCachedThreadPool(Object source, String name) { 211 String sanitizedName = URISupport.sanitizeUri(name); 212 ExecutorService answer = threadPoolFactory.newCachedThreadPool(createThreadFactory(sanitizedName, true)); 213 onThreadPoolCreated(answer, source, null); 214 215 if (LOG.isDebugEnabled()) { 216 LOG.debug("Created new CachedThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer}); 217 } 218 return answer; 219 } 220 221 @Override 222 public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) { 223 ThreadPoolProfile profile = new ThreadPoolProfile(name); 224 profile.setPoolSize(poolSize); 225 profile.setMaxPoolSize(poolSize); 226 profile.setKeepAliveTime(0L); 227 return newThreadPool(source, name, profile); 228 } 229 230 @Override 231 public ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name) { 232 return newScheduledThreadPool(source, name, 1); 233 } 234 235 @Override 236 public ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile) { 237 String sanitizedName = URISupport.sanitizeUri(name); 238 profile.addDefaults(getDefaultThreadPoolProfile()); 239 ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(profile, createThreadFactory(sanitizedName, true)); 240 onThreadPoolCreated(answer, source, null); 241 242 if (LOG.isDebugEnabled()) { 243 LOG.debug("Created new ScheduledThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer}); 244 } 245 return answer; 246 } 247 248 @Override 249 public ScheduledExecutorService newScheduledThreadPool(Object source, String name, String profileId) { 250 ThreadPoolProfile profile = getThreadPoolProfile(profileId); 251 if (profile != null) { 252 return newScheduledThreadPool(source, name, profile); 253 } else { 254 // no profile with that id 255 return null; 256 } 257 } 258 259 @Override 260 public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) { 261 ThreadPoolProfile profile = new ThreadPoolProfile(name); 262 profile.setPoolSize(poolSize); 263 return newScheduledThreadPool(source, name, profile); 264 } 265 266 @Override 267 public void shutdown(ExecutorService executorService) { 268 doShutdown(executorService, 0, false); 269 } 270 271 @Override 272 public void shutdownGraceful(ExecutorService executorService) { 273 doShutdown(executorService, getShutdownAwaitTermination(), false); 274 } 275 276 @Override 277 public void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) { 278 doShutdown(executorService, shutdownAwaitTermination, false); 279 } 280 281 private boolean doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean failSafe) { 282 if (executorService == null) { 283 return false; 284 } 285 286 boolean warned = false; 287 288 // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively 289 // and try shutting down again. In both cases we wait at most the given shutdown timeout value given 290 // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus 291 // we ought to shutdown much faster) 292 if (!executorService.isShutdown()) { 293 StopWatch watch = new StopWatch(); 294 295 LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination); 296 executorService.shutdown(); 297 298 if (shutdownAwaitTermination > 0) { 299 try { 300 if (!awaitTermination(executorService, shutdownAwaitTermination)) { 301 warned = true; 302 LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService); 303 executorService.shutdownNow(); 304 // we are now shutting down aggressively, so wait to see if we can completely shutdown or not 305 if (!awaitTermination(executorService, shutdownAwaitTermination)) { 306 LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService); 307 } 308 } 309 } catch (InterruptedException e) { 310 warned = true; 311 LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService); 312 // we were interrupted during shutdown, so force shutdown 313 executorService.shutdownNow(); 314 } 315 } 316 317 // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log 318 if (warned) { 319 LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.", 320 new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())}); 321 } else if (LOG.isDebugEnabled()) { 322 LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.", 323 new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())}); 324 } 325 } 326 327 // let lifecycle strategy be notified as well which can let it be managed in JMX as well 328 ThreadPoolExecutor threadPool = null; 329 if (executorService instanceof ThreadPoolExecutor) { 330 threadPool = (ThreadPoolExecutor) executorService; 331 } else if (executorService instanceof SizedScheduledExecutorService) { 332 threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); 333 } 334 if (threadPool != null) { 335 for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { 336 lifecycle.onThreadPoolRemove(camelContext, threadPool); 337 } 338 } 339 340 // remove reference as its shutdown (do not remove if fail-safe) 341 if (!failSafe) { 342 executorServices.remove(executorService); 343 } 344 345 return warned; 346 } 347 348 @Override 349 public List<Runnable> shutdownNow(ExecutorService executorService) { 350 return doShutdownNow(executorService, false); 351 } 352 353 private List<Runnable> doShutdownNow(ExecutorService executorService, boolean failSafe) { 354 ObjectHelper.notNull(executorService, "executorService"); 355 356 List<Runnable> answer = null; 357 if (!executorService.isShutdown()) { 358 if (failSafe) { 359 // log as warn, as we shutdown as fail-safe, so end user should see more details in the log. 360 LOG.warn("Forcing shutdown of ExecutorService: {}", executorService); 361 } else { 362 LOG.debug("Forcing shutdown of ExecutorService: {}", executorService); 363 } 364 answer = executorService.shutdownNow(); 365 if (LOG.isTraceEnabled()) { 366 LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.", 367 new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()}); 368 } 369 } 370 371 // let lifecycle strategy be notified as well which can let it be managed in JMX as well 372 ThreadPoolExecutor threadPool = null; 373 if (executorService instanceof ThreadPoolExecutor) { 374 threadPool = (ThreadPoolExecutor) executorService; 375 } else if (executorService instanceof SizedScheduledExecutorService) { 376 threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); 377 } 378 if (threadPool != null) { 379 for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { 380 lifecycle.onThreadPoolRemove(camelContext, threadPool); 381 } 382 } 383 384 // remove reference as its shutdown (do not remove if fail-safe) 385 if (!failSafe) { 386 executorServices.remove(executorService); 387 } 388 389 return answer; 390 } 391 392 @Override 393 public boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException { 394 // log progress every 2nd second so end user is aware of we are shutting down 395 StopWatch watch = new StopWatch(); 396 long interval = Math.min(2000, shutdownAwaitTermination); 397 boolean done = false; 398 while (!done && interval > 0) { 399 if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) { 400 done = true; 401 } else { 402 LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService); 403 // recalculate interval 404 interval = Math.min(2000, shutdownAwaitTermination - watch.taken()); 405 } 406 } 407 408 return done; 409 } 410 411 /** 412 * Strategy callback when a new {@link java.util.concurrent.ExecutorService} have been created. 413 * 414 * @param executorService the created {@link java.util.concurrent.ExecutorService} 415 */ 416 protected void onNewExecutorService(ExecutorService executorService) { 417 // noop 418 } 419 420 @Override 421 protected void doStart() throws Exception { 422 if (threadNamePattern == null) { 423 // set default name pattern which includes the camel context name 424 threadNamePattern = "Camel (" + camelContext.getName() + ") thread ##counter# - #name#"; 425 } 426 } 427 428 @Override 429 protected void doStop() throws Exception { 430 // noop 431 } 432 433 @Override 434 protected void doShutdown() throws Exception { 435 // shutdown all remainder executor services by looping and doing this aggressively 436 // as by normal all threads pool should have been shutdown using proper lifecycle 437 // by their EIPs, components etc. This is acting as a fail-safe during shutdown 438 // of CamelContext itself. 439 Set<ExecutorService> forced = new LinkedHashSet<ExecutorService>(); 440 if (!executorServices.isEmpty()) { 441 // at first give a bit of time to shutdown nicely as the thread pool is most likely in the process of being shutdown also 442 LOG.debug("Giving time for {} ExecutorService's to shutdown properly (acting as fail-safe)", executorServices.size()); 443 for (ExecutorService executorService : executorServices) { 444 try { 445 boolean warned = doShutdown(executorService, getShutdownAwaitTermination(), true); 446 // remember the thread pools that was forced to shutdown (eg warned) 447 if (warned) { 448 forced.add(executorService); 449 } 450 } catch (Throwable e) { 451 // only log if something goes wrong as we want to shutdown them all 452 LOG.warn("Error occurred during shutdown of ExecutorService: " 453 + executorService + ". This exception will be ignored.", e); 454 } 455 } 456 } 457 458 // log the thread pools which was forced to shutdown so it may help the user to identify a problem of his 459 if (!forced.isEmpty()) { 460 LOG.warn("Forced shutdown of {} ExecutorService's which has not been shutdown properly (acting as fail-safe)", forced.size()); 461 for (ExecutorService executorService : forced) { 462 LOG.warn(" forced -> {}", executorService); 463 } 464 } 465 forced.clear(); 466 467 // clear list 468 executorServices.clear(); 469 470 // do not clear the default profile as we could potential be restarted 471 Iterator<ThreadPoolProfile> it = threadPoolProfiles.values().iterator(); 472 while (it.hasNext()) { 473 ThreadPoolProfile profile = it.next(); 474 if (!profile.isDefaultProfile()) { 475 it.remove(); 476 } 477 } 478 } 479 480 /** 481 * Invoked when a new thread pool is created. 482 * This implementation will invoke the {@link LifecycleStrategy#onThreadPoolAdd(org.apache.camel.CamelContext, 483 * java.util.concurrent.ThreadPoolExecutor, String, String, String, String) LifecycleStrategy.onThreadPoolAdd} method, 484 * which for example will enlist the thread pool in JMX management. 485 * 486 * @param executorService the thread pool 487 * @param source the source to use the thread pool 488 * @param threadPoolProfileId profile id, if the thread pool was created from a thread pool profile 489 */ 490 private void onThreadPoolCreated(ExecutorService executorService, Object source, String threadPoolProfileId) { 491 // add to internal list of thread pools 492 executorServices.add(executorService); 493 494 String id; 495 String sourceId = null; 496 String routeId = null; 497 498 // extract id from source 499 if (source instanceof NamedNode) { 500 id = ((OptionalIdentifiedDefinition<?>) source).idOrCreate(this.camelContext.getNodeIdFactory()); 501 // and let source be the short name of the pattern 502 sourceId = ((NamedNode) source).getShortName(); 503 } else if (source instanceof String) { 504 id = (String) source; 505 } else if (source != null) { 506 if (source instanceof StaticService) { 507 // the source is static service so its name would be unique 508 id = source.getClass().getSimpleName(); 509 } else { 510 // fallback and use the simple class name with hashcode for the id so its unique for this given source 511 id = source.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(source) + ")"; 512 } 513 } else { 514 // no source, so fallback and use the simple class name from thread pool and its hashcode identity so its unique 515 id = executorService.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(executorService) + ")"; 516 } 517 518 // id is mandatory 519 ObjectHelper.notEmpty(id, "id for thread pool " + executorService); 520 521 // extract route id if possible 522 if (source instanceof ProcessorDefinition) { 523 RouteDefinition route = ProcessorDefinitionHelper.getRoute((ProcessorDefinition<?>) source); 524 if (route != null) { 525 routeId = route.idOrCreate(this.camelContext.getNodeIdFactory()); 526 } 527 } 528 529 // let lifecycle strategy be notified as well which can let it be managed in JMX as well 530 ThreadPoolExecutor threadPool = null; 531 if (executorService instanceof ThreadPoolExecutor) { 532 threadPool = (ThreadPoolExecutor) executorService; 533 } else if (executorService instanceof SizedScheduledExecutorService) { 534 threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); 535 } 536 if (threadPool != null) { 537 for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { 538 lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId); 539 } 540 } 541 542 // now call strategy to allow custom logic 543 onNewExecutorService(executorService); 544 } 545 546 private ThreadFactory createThreadFactory(String name, boolean isDaemon) { 547 ThreadFactory threadFactory = new CamelThreadFactory(threadNamePattern, name, isDaemon); 548 return threadFactory; 549 } 550 551 }