001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.Comparator; 022 import java.util.LinkedHashSet; 023 import java.util.List; 024 import java.util.Locale; 025 import java.util.Set; 026 import java.util.concurrent.ExecutionException; 027 import java.util.concurrent.ExecutorService; 028 import java.util.concurrent.Future; 029 import java.util.concurrent.TimeUnit; 030 import java.util.concurrent.TimeoutException; 031 import java.util.concurrent.atomic.AtomicBoolean; 032 033 import org.apache.camel.CamelContext; 034 import org.apache.camel.CamelContextAware; 035 import org.apache.camel.Consumer; 036 import org.apache.camel.Route; 037 import org.apache.camel.Service; 038 import org.apache.camel.ShutdownRoute; 039 import org.apache.camel.ShutdownRunningTask; 040 import org.apache.camel.SuspendableService; 041 import org.apache.camel.spi.RouteStartupOrder; 042 import org.apache.camel.spi.ShutdownAware; 043 import org.apache.camel.spi.ShutdownPrepared; 044 import org.apache.camel.spi.ShutdownStrategy; 045 import org.apache.camel.support.ServiceSupport; 046 import org.apache.camel.util.EventHelper; 047 import org.apache.camel.util.ObjectHelper; 048 import org.apache.camel.util.ServiceHelper; 049 import org.apache.camel.util.StopWatch; 050 import org.slf4j.Logger; 051 import org.slf4j.LoggerFactory; 052 053 /** 054 * Default {@link org.apache.camel.spi.ShutdownStrategy} which uses graceful shutdown. 055 * <p/> 056 * Graceful shutdown ensures that any inflight and pending messages will be taken into account 057 * and it will wait until these exchanges has been completed. 058 * <p/> 059 * This strategy will perform graceful shutdown in two steps: 060 * <ul> 061 * <li>Graceful - By suspending/stopping consumers, and let any in-flight exchanges complete</li> 062 * <li>Forced - After a given period of time, a timeout occurred and if there are still pending 063 * exchanges to complete, then a more aggressive forced strategy is performed.</li> 064 * </ul> 065 * The idea by the <tt>graceful</tt> shutdown strategy, is to stop taking in more new messages, 066 * and allow any existing inflight messages to complete. Then when there is no more inflight messages 067 * then the routes can be fully shutdown. This mean that if there is inflight messages then we will have 068 * to wait for these messages to complete. If they do not complete after a period of time, then a 069 * timeout triggers. And then a more aggressive strategy takes over. 070 * <p/> 071 * The idea by the <tt>forced</tt> shutdown strategy, is to stop continue processing messages. 072 * And force routes and its services to shutdown now. There is a risk when shutting down now, 073 * that some resources is not properly shutdown, which can cause side effects. The timeout value 074 * is by default 300 seconds, but can be customized. 075 * <p/> 076 * As this strategy will politely wait until all exchanges has been completed it can potential wait 077 * for a long time, and hence why a timeout value can be set. When the timeout triggers you can also 078 * specify whether the remainder consumers should be shutdown now or ignore. 079 * <p/> 080 * Will by default use a timeout of 300 seconds (5 minutes) by which it will shutdown now the remaining consumers. 081 * This ensures that when shutting down Camel it at some point eventually will shutdown. 082 * This behavior can of course be configured using the {@link #setTimeout(long)} and 083 * {@link #setShutdownNowOnTimeout(boolean)} methods. 084 * <p/> 085 * Routes will by default be shutdown in the reverse order of which they where started. 086 * You can customize this using the {@link #setShutdownRoutesInReverseOrder(boolean)} method. 087 * <p/> 088 * After route consumers have been shutdown, then any {@link ShutdownPrepared} services on the routes 089 * is being prepared for shutdown, by invoking {@link ShutdownPrepared#prepareShutdown(boolean)} which 090 * <tt>force=false</tt>. 091 * <p/> 092 * Then if a timeout occurred and the strategy has been configured with shutdown-now on timeout, then 093 * the strategy performs a more aggressive forced shutdown, by forcing all consumers to shutdown 094 * and then invokes {@link ShutdownPrepared#prepareShutdown(boolean)} with <tt>force=true</tt> 095 * on the services. This allows the services to know they should force shutdown now. 096 * <p/> 097 * When timeout occurred and a forced shutdown is happening, then there may be threads/tasks which are 098 * still inflight which may be rejected continued being routed. By default this can cause WARN and ERRORs 099 * to be logged. The option {@link #setSuppressLoggingOnTimeout(boolean)} can be used to suppress these 100 * logs, so they are logged at TRACE level instead. 101 * 102 * @version 103 */ 104 public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy, CamelContextAware { 105 private static final Logger LOG = LoggerFactory.getLogger(DefaultShutdownStrategy.class); 106 107 private CamelContext camelContext; 108 private ExecutorService executor; 109 private long timeout = 5 * 60; 110 private TimeUnit timeUnit = TimeUnit.SECONDS; 111 private boolean shutdownNowOnTimeout = true; 112 private boolean shutdownRoutesInReverseOrder = true; 113 private boolean suppressLoggingOnTimeout; 114 private volatile boolean forceShutdown; 115 private final AtomicBoolean timeoutOccurred = new AtomicBoolean(); 116 private volatile Future<?> currentShutdownTaskFuture; 117 118 public DefaultShutdownStrategy() { 119 } 120 121 public DefaultShutdownStrategy(CamelContext camelContext) { 122 this.camelContext = camelContext; 123 } 124 125 public void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception { 126 shutdown(context, routes, getTimeout(), getTimeUnit()); 127 } 128 129 @Override 130 public void shutdownForced(CamelContext context, List<RouteStartupOrder> routes) throws Exception { 131 doShutdown(context, routes, getTimeout(), getTimeUnit(), false, false, true); 132 } 133 134 public void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception { 135 doShutdown(context, routes, getTimeout(), getTimeUnit(), true, false, false); 136 } 137 138 public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception { 139 doShutdown(context, routes, timeout, timeUnit, false, false, false); 140 } 141 142 public boolean shutdown(CamelContext context, RouteStartupOrder route, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception { 143 List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); 144 routes.add(route); 145 return doShutdown(context, routes, timeout, timeUnit, false, abortAfterTimeout, false); 146 } 147 148 public void suspend(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception { 149 doShutdown(context, routes, timeout, timeUnit, true, false, false); 150 } 151 152 protected boolean doShutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit, 153 boolean suspendOnly, boolean abortAfterTimeout, boolean forceShutdown) throws Exception { 154 155 // just return if no routes to shutdown 156 if (routes.isEmpty()) { 157 return true; 158 } 159 160 StopWatch watch = new StopWatch(); 161 162 // at first sort according to route startup order 163 List<RouteStartupOrder> routesOrdered = new ArrayList<RouteStartupOrder>(routes); 164 Collections.sort(routesOrdered, new Comparator<RouteStartupOrder>() { 165 public int compare(RouteStartupOrder o1, RouteStartupOrder o2) { 166 return o1.getStartupOrder() - o2.getStartupOrder(); 167 } 168 }); 169 if (shutdownRoutesInReverseOrder) { 170 Collections.reverse(routesOrdered); 171 } 172 173 LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")"); 174 175 // use another thread to perform the shutdowns so we can support timeout 176 timeoutOccurred.set(false); 177 currentShutdownTaskFuture = getExecutorService().submit(new ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly, abortAfterTimeout, timeoutOccurred)); 178 try { 179 currentShutdownTaskFuture.get(timeout, timeUnit); 180 } catch (ExecutionException e) { 181 // unwrap execution exception 182 throw ObjectHelper.wrapRuntimeCamelException(e.getCause()); 183 } catch (Exception e) { 184 // either timeout or interrupted exception was thrown so this is okay 185 // as interrupted would mean cancel was called on the currentShutdownTaskFuture to signal a forced timeout 186 187 // we hit a timeout, so set the flag 188 timeoutOccurred.set(true); 189 190 // timeout then cancel the task 191 currentShutdownTaskFuture.cancel(true); 192 193 // signal we are forcing shutdown now, since timeout occurred 194 this.forceShutdown = forceShutdown; 195 196 // if set, stop processing and return false to indicate that the shutdown is aborting 197 if (!forceShutdown && abortAfterTimeout) { 198 LOG.warn("Timeout occurred. Aborting the shutdown now."); 199 return false; 200 } else { 201 if (forceShutdown || shutdownNowOnTimeout) { 202 LOG.warn("Timeout occurred. Forcing the routes to be shutdown now."); 203 // force the routes to shutdown now 204 shutdownRoutesNow(routesOrdered); 205 206 // now the route consumers has been shutdown, then prepare route services for shutdown now (forced) 207 for (RouteStartupOrder order : routes) { 208 for (Service service : order.getServices()) { 209 prepareShutdown(service, true, true, isSuppressLoggingOnTimeout()); 210 } 211 } 212 } else { 213 LOG.warn("Timeout occurred. Will ignore shutting down the remainder routes."); 214 } 215 } 216 } finally { 217 currentShutdownTaskFuture = null; 218 } 219 220 // convert to seconds as its easier to read than a big milli seconds number 221 long seconds = TimeUnit.SECONDS.convert(watch.stop(), TimeUnit.MILLISECONDS); 222 223 LOG.info("Graceful shutdown of " + routesOrdered.size() + " routes completed in " + seconds + " seconds"); 224 return true; 225 } 226 227 @Override 228 public boolean forceShutdown(Service service) { 229 return forceShutdown; 230 } 231 232 @Override 233 public boolean hasTimeoutOccurred() { 234 return timeoutOccurred.get(); 235 } 236 237 public void setTimeout(long timeout) { 238 if (timeout <= 0) { 239 throw new IllegalArgumentException("Timeout must be a positive value"); 240 } 241 this.timeout = timeout; 242 } 243 244 public long getTimeout() { 245 return timeout; 246 } 247 248 public void setTimeUnit(TimeUnit timeUnit) { 249 this.timeUnit = timeUnit; 250 } 251 252 public TimeUnit getTimeUnit() { 253 return timeUnit; 254 } 255 256 public void setShutdownNowOnTimeout(boolean shutdownNowOnTimeout) { 257 this.shutdownNowOnTimeout = shutdownNowOnTimeout; 258 } 259 260 public boolean isShutdownNowOnTimeout() { 261 return shutdownNowOnTimeout; 262 } 263 264 public boolean isShutdownRoutesInReverseOrder() { 265 return shutdownRoutesInReverseOrder; 266 } 267 268 public void setShutdownRoutesInReverseOrder(boolean shutdownRoutesInReverseOrder) { 269 this.shutdownRoutesInReverseOrder = shutdownRoutesInReverseOrder; 270 } 271 272 public boolean isSuppressLoggingOnTimeout() { 273 return suppressLoggingOnTimeout; 274 } 275 276 public void setSuppressLoggingOnTimeout(boolean suppressLoggingOnTimeout) { 277 this.suppressLoggingOnTimeout = suppressLoggingOnTimeout; 278 } 279 280 public CamelContext getCamelContext() { 281 return camelContext; 282 } 283 284 public void setCamelContext(CamelContext camelContext) { 285 this.camelContext = camelContext; 286 } 287 288 public Future<?> getCurrentShutdownTaskFuture() { 289 return currentShutdownTaskFuture; 290 } 291 292 /** 293 * Shutdown all the consumers immediately. 294 * 295 * @param routes the routes to shutdown 296 */ 297 protected void shutdownRoutesNow(List<RouteStartupOrder> routes) { 298 for (RouteStartupOrder order : routes) { 299 300 // set the route to shutdown as fast as possible by stopping after 301 // it has completed its current task 302 ShutdownRunningTask current = order.getRoute().getRouteContext().getShutdownRunningTask(); 303 if (current != ShutdownRunningTask.CompleteCurrentTaskOnly) { 304 LOG.debug("Changing shutdownRunningTask from {} to " + ShutdownRunningTask.CompleteCurrentTaskOnly 305 + " on route {} to shutdown faster", current, order.getRoute().getId()); 306 order.getRoute().getRouteContext().setShutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly); 307 } 308 309 for (Consumer consumer : order.getInputs()) { 310 shutdownNow(consumer); 311 } 312 } 313 } 314 315 /** 316 * Shutdown all the consumers immediately. 317 * 318 * @param consumers the consumers to shutdown 319 */ 320 protected void shutdownNow(List<Consumer> consumers) { 321 for (Consumer consumer : consumers) { 322 shutdownNow(consumer); 323 } 324 } 325 326 /** 327 * Shutdown the consumer immediately. 328 * 329 * @param consumer the consumer to shutdown 330 */ 331 protected static void shutdownNow(Consumer consumer) { 332 LOG.trace("Shutting down: {}", consumer); 333 334 // allow us to do custom work before delegating to service helper 335 try { 336 ServiceHelper.stopService(consumer); 337 } catch (Throwable e) { 338 LOG.warn("Error occurred while shutting down route: " + consumer + ". This exception will be ignored.", e); 339 // fire event 340 EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e); 341 } 342 343 LOG.trace("Shutdown complete for: {}", consumer); 344 } 345 346 /** 347 * Suspends/stops the consumer immediately. 348 * 349 * @param consumer the consumer to suspend 350 */ 351 protected static void suspendNow(Consumer consumer) { 352 LOG.trace("Suspending: {}", consumer); 353 354 // allow us to do custom work before delegating to service helper 355 try { 356 ServiceHelper.suspendService(consumer); 357 } catch (Throwable e) { 358 LOG.warn("Error occurred while suspending route: " + consumer + ". This exception will be ignored.", e); 359 // fire event 360 EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e); 361 } 362 363 LOG.trace("Suspend complete for: {}", consumer); 364 } 365 366 private ExecutorService getExecutorService() { 367 if (executor == null) { 368 executor = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "ShutdownTask"); 369 } 370 return executor; 371 } 372 373 @Override 374 protected void doStart() throws Exception { 375 ObjectHelper.notNull(camelContext, "CamelContext"); 376 // reset option 377 forceShutdown = false; 378 timeoutOccurred.set(false); 379 } 380 381 @Override 382 protected void doStop() throws Exception { 383 // noop 384 } 385 386 @Override 387 protected void doShutdown() throws Exception { 388 if (executor != null) { 389 // force shutting down as we are shutting down Camel 390 camelContext.getExecutorServiceManager().shutdownNow(executor); 391 // should clear executor so we can restart by creating a new thread pool 392 executor = null; 393 } 394 } 395 396 /** 397 * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean)} method 398 * on the service if it implement this interface. 399 * 400 * @param service the service 401 * @param forced whether to force shutdown 402 * @param includeChildren whether to prepare the child of the service as well 403 */ 404 private static void prepareShutdown(Service service, boolean forced, boolean includeChildren, boolean suppressLogging) { 405 Set<Service> list; 406 if (includeChildren) { 407 // include error handlers as we want to prepare them for shutdown as well 408 list = ServiceHelper.getChildServices(service, true); 409 } else { 410 list = new LinkedHashSet<Service>(1); 411 list.add(service); 412 } 413 414 for (Service child : list) { 415 if (child instanceof ShutdownPrepared) { 416 try { 417 LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child); 418 ((ShutdownPrepared) child).prepareShutdown(forced); 419 } catch (Exception e) { 420 if (suppressLogging) { 421 LOG.trace("Error during prepare shutdown on " + child + ". This exception will be ignored.", e); 422 } else { 423 LOG.warn("Error during prepare shutdown on " + child + ". This exception will be ignored.", e); 424 } 425 } 426 } 427 } 428 } 429 430 /** 431 * Holder for deferred consumers 432 */ 433 static class ShutdownDeferredConsumer { 434 private final Route route; 435 private final Consumer consumer; 436 437 ShutdownDeferredConsumer(Route route, Consumer consumer) { 438 this.route = route; 439 this.consumer = consumer; 440 } 441 442 Route getRoute() { 443 return route; 444 } 445 446 Consumer getConsumer() { 447 return consumer; 448 } 449 } 450 451 /** 452 * Shutdown task which shutdown all the routes in a graceful manner. 453 */ 454 static class ShutdownTask implements Runnable { 455 456 private final CamelContext context; 457 private final List<RouteStartupOrder> routes; 458 private final boolean suspendOnly; 459 private final boolean abortAfterTimeout; 460 private final long timeout; 461 private final TimeUnit timeUnit; 462 private final AtomicBoolean timeoutOccurred; 463 464 public ShutdownTask(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit, 465 boolean suspendOnly, boolean abortAfterTimeout, AtomicBoolean timeoutOccurred) { 466 this.context = context; 467 this.routes = routes; 468 this.suspendOnly = suspendOnly; 469 this.abortAfterTimeout = abortAfterTimeout; 470 this.timeout = timeout; 471 this.timeUnit = timeUnit; 472 this.timeoutOccurred = timeoutOccurred; 473 } 474 475 public void run() { 476 // the strategy in this run method is to 477 // 1) go over the routes and shutdown those routes which can be shutdown asap 478 // some routes will be deferred to shutdown at the end, as they are needed 479 // by other routes so they can complete their tasks 480 // 2) wait until all inflight and pending exchanges has been completed 481 // 3) shutdown the deferred routes 482 483 LOG.debug("There are {} routes to {}", routes.size(), suspendOnly ? "suspend" : "shutdown"); 484 485 // list of deferred consumers to shutdown when all exchanges has been completed routed 486 // and thus there are no more inflight exchanges so they can be safely shutdown at that time 487 List<ShutdownDeferredConsumer> deferredConsumers = new ArrayList<ShutdownDeferredConsumer>(); 488 for (RouteStartupOrder order : routes) { 489 490 ShutdownRoute shutdownRoute = order.getRoute().getRouteContext().getShutdownRoute(); 491 ShutdownRunningTask shutdownRunningTask = order.getRoute().getRouteContext().getShutdownRunningTask(); 492 493 if (LOG.isTraceEnabled()) { 494 LOG.trace("{}{} with options [{},{}]", 495 new Object[]{suspendOnly ? "Suspending route: " : "Shutting down route: ", 496 order.getRoute().getId(), shutdownRoute, shutdownRunningTask}); 497 } 498 499 for (Consumer consumer : order.getInputs()) { 500 501 boolean suspend = false; 502 503 // assume we should shutdown if we are not deferred 504 boolean shutdown = shutdownRoute != ShutdownRoute.Defer; 505 506 if (shutdown) { 507 // if we are to shutdown then check whether we can suspend instead as its a more 508 // gentle way to graceful shutdown 509 510 // some consumers do not support shutting down so let them decide 511 // if a consumer is suspendable then prefer to use that and then shutdown later 512 if (consumer instanceof ShutdownAware) { 513 shutdown = !((ShutdownAware) consumer).deferShutdown(shutdownRunningTask); 514 } 515 if (shutdown && consumer instanceof SuspendableService) { 516 // we prefer to suspend over shutdown 517 suspend = true; 518 } 519 } 520 521 // log at info level when a route has been shutdown (otherwise log at debug level to not be too noisy) 522 if (suspend) { 523 // only suspend it and then later shutdown it 524 suspendNow(consumer); 525 // add it to the deferred list so the route will be shutdown later 526 deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer)); 527 LOG.debug("Route: {} suspended and shutdown deferred, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint()); 528 } else if (shutdown) { 529 shutdownNow(consumer); 530 LOG.info("Route: {} shutdown complete, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint()); 531 } else { 532 // we will stop it later, but for now it must run to be able to help all inflight messages 533 // be safely completed 534 deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer)); 535 LOG.debug("Route: " + order.getRoute().getId() + (suspendOnly ? " shutdown deferred." : " suspension deferred.")); 536 } 537 } 538 } 539 540 // notify the services we intend to shutdown 541 for (RouteStartupOrder order : routes) { 542 for (Service service : order.getServices()) { 543 // skip the consumer as we handle that specially 544 if (service instanceof Consumer) { 545 continue; 546 } 547 prepareShutdown(service, false, true, false); 548 } 549 } 550 551 // wait till there are no more pending and inflight messages 552 boolean done = false; 553 long loopDelaySeconds = 1; 554 long loopCount = 0; 555 while (!done && !timeoutOccurred.get()) { 556 int size = 0; 557 for (RouteStartupOrder order : routes) { 558 int inflight = context.getInflightRepository().size(order.getRoute().getId()); 559 for (Consumer consumer : order.getInputs()) { 560 // include any additional pending exchanges on some consumers which may have internal 561 // memory queues such as seda 562 if (consumer instanceof ShutdownAware) { 563 inflight += ((ShutdownAware) consumer).getPendingExchangesSize(); 564 } 565 } 566 if (inflight > 0) { 567 size += inflight; 568 LOG.trace("{} inflight and pending exchanges for route: {}", inflight, order.getRoute().getId()); 569 } 570 } 571 if (size > 0) { 572 try { 573 LOG.info("Waiting as there are still " + size + " inflight and pending exchanges to complete, timeout in " 574 + (TimeUnit.SECONDS.convert(timeout, timeUnit) - (loopCount++ * loopDelaySeconds)) + " seconds."); 575 Thread.sleep(loopDelaySeconds * 1000); 576 } catch (InterruptedException e) { 577 if (abortAfterTimeout) { 578 LOG.warn("Interrupted while waiting during graceful shutdown, will abort."); 579 return; 580 } else { 581 LOG.warn("Interrupted while waiting during graceful shutdown, will force shutdown now."); 582 break; 583 } 584 } 585 } else { 586 done = true; 587 } 588 } 589 590 // prepare for shutdown 591 for (ShutdownDeferredConsumer deferred : deferredConsumers) { 592 Consumer consumer = deferred.getConsumer(); 593 if (consumer instanceof ShutdownAware) { 594 LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId()); 595 boolean forced = context.getShutdownStrategy().forceShutdown(consumer); 596 boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout(); 597 prepareShutdown(consumer, forced, false, suppress); 598 LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId()); 599 } 600 } 601 602 // now all messages has been completed then stop the deferred consumers 603 for (ShutdownDeferredConsumer deferred : deferredConsumers) { 604 Consumer consumer = deferred.getConsumer(); 605 if (suspendOnly) { 606 suspendNow(consumer); 607 LOG.info("Route: {} suspend complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint()); 608 } else { 609 shutdownNow(consumer); 610 LOG.info("Route: {} shutdown complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint()); 611 } 612 } 613 614 // now the route consumers has been shutdown, then prepare route services for shutdown 615 for (RouteStartupOrder order : routes) { 616 for (Service service : order.getServices()) { 617 boolean forced = context.getShutdownStrategy().forceShutdown(service); 618 boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout(); 619 prepareShutdown(service, forced, true, suppress); 620 } 621 } 622 } 623 624 } 625 626 }