001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.ArrayList;
020    import java.util.Collections;
021    import java.util.Comparator;
022    import java.util.LinkedHashSet;
023    import java.util.List;
024    import java.util.Locale;
025    import java.util.Set;
026    import java.util.concurrent.ExecutionException;
027    import java.util.concurrent.ExecutorService;
028    import java.util.concurrent.Future;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.TimeoutException;
031    import java.util.concurrent.atomic.AtomicBoolean;
032    
033    import org.apache.camel.CamelContext;
034    import org.apache.camel.CamelContextAware;
035    import org.apache.camel.Consumer;
036    import org.apache.camel.Route;
037    import org.apache.camel.Service;
038    import org.apache.camel.ShutdownRoute;
039    import org.apache.camel.ShutdownRunningTask;
040    import org.apache.camel.SuspendableService;
041    import org.apache.camel.spi.RouteStartupOrder;
042    import org.apache.camel.spi.ShutdownAware;
043    import org.apache.camel.spi.ShutdownPrepared;
044    import org.apache.camel.spi.ShutdownStrategy;
045    import org.apache.camel.support.ServiceSupport;
046    import org.apache.camel.util.EventHelper;
047    import org.apache.camel.util.ObjectHelper;
048    import org.apache.camel.util.ServiceHelper;
049    import org.apache.camel.util.StopWatch;
050    import org.slf4j.Logger;
051    import org.slf4j.LoggerFactory;
052    
053    /**
054     * Default {@link org.apache.camel.spi.ShutdownStrategy} which uses graceful shutdown.
055     * <p/>
056     * Graceful shutdown ensures that any inflight and pending messages will be taken into account
057     * and it will wait until these exchanges has been completed.
058     * <p/>
059     * This strategy will perform graceful shutdown in two steps:
060     * <ul>
061     *     <li>Graceful - By suspending/stopping consumers, and let any in-flight exchanges complete</li>
062     *     <li>Forced - After a given period of time, a timeout occurred and if there are still pending
063     *     exchanges to complete, then a more aggressive forced strategy is performed.</li>
064     * </ul>
065     * The idea by the <tt>graceful</tt> shutdown strategy, is to stop taking in more new messages,
066     * and allow any existing inflight messages to complete. Then when there is no more inflight messages
067     * then the routes can be fully shutdown. This mean that if there is inflight messages then we will have
068     * to wait for these messages to complete. If they do not complete after a period of time, then a
069     * timeout triggers. And then a more aggressive strategy takes over.
070     * <p/>
071     * The idea by the <tt>forced</tt> shutdown strategy, is to stop continue processing messages.
072     * And force routes and its services to shutdown now. There is a risk when shutting down now,
073     * that some resources is not properly shutdown, which can cause side effects. The timeout value
074     * is by default 300 seconds, but can be customized.
075     * <p/>
076     * As this strategy will politely wait until all exchanges has been completed it can potential wait
077     * for a long time, and hence why a timeout value can be set. When the timeout triggers you can also
078     * specify whether the remainder consumers should be shutdown now or ignore.
079     * <p/>
080     * Will by default use a timeout of 300 seconds (5 minutes) by which it will shutdown now the remaining consumers.
081     * This ensures that when shutting down Camel it at some point eventually will shutdown.
082     * This behavior can of course be configured using the {@link #setTimeout(long)} and
083     * {@link #setShutdownNowOnTimeout(boolean)} methods.
084     * <p/>
085     * Routes will by default be shutdown in the reverse order of which they where started.
086     * You can customize this using the {@link #setShutdownRoutesInReverseOrder(boolean)} method.
087     * <p/>
088     * After route consumers have been shutdown, then any {@link ShutdownPrepared} services on the routes
089     * is being prepared for shutdown, by invoking {@link ShutdownPrepared#prepareShutdown(boolean)} which
090     * <tt>force=false</tt>.
091     * <p/>
092     * Then if a timeout occurred and the strategy has been configured with shutdown-now on timeout, then
093     * the strategy performs a more aggressive forced shutdown, by forcing all consumers to shutdown
094     * and then invokes {@link ShutdownPrepared#prepareShutdown(boolean)} with <tt>force=true</tt>
095     * on the services. This allows the services to know they should force shutdown now.
096     * <p/>
097     * When timeout occurred and a forced shutdown is happening, then there may be threads/tasks which are
098     * still inflight which may be rejected continued being routed. By default this can cause WARN and ERRORs
099     * to be logged. The option {@link #setSuppressLoggingOnTimeout(boolean)} can be used to suppress these
100     * logs, so they are logged at TRACE level instead.
101     *
102     * @version
103     */
104    public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy, CamelContextAware {
105        private static final Logger LOG = LoggerFactory.getLogger(DefaultShutdownStrategy.class);
106    
107        private CamelContext camelContext;
108        private ExecutorService executor;
109        private long timeout = 5 * 60;
110        private TimeUnit timeUnit = TimeUnit.SECONDS;
111        private boolean shutdownNowOnTimeout = true;
112        private boolean shutdownRoutesInReverseOrder = true;
113        private boolean suppressLoggingOnTimeout;
114        private volatile boolean forceShutdown;
115        private final AtomicBoolean timeoutOccurred = new AtomicBoolean();
116        private volatile Future<?> currentShutdownTaskFuture;
117    
118        public DefaultShutdownStrategy() {
119        }
120    
121        public DefaultShutdownStrategy(CamelContext camelContext) {
122            this.camelContext = camelContext;
123        }
124    
125        public void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
126            shutdown(context, routes, getTimeout(), getTimeUnit());
127        }
128    
129        @Override
130        public void shutdownForced(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
131            doShutdown(context, routes, getTimeout(), getTimeUnit(), false, false, true);
132        }
133    
134        public void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
135            doShutdown(context, routes, getTimeout(), getTimeUnit(), true, false, false);
136        }
137    
138        public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
139            doShutdown(context, routes, timeout, timeUnit, false, false, false);
140        }
141    
142        public boolean shutdown(CamelContext context, RouteStartupOrder route, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception {
143            List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
144            routes.add(route);
145            return doShutdown(context, routes, timeout, timeUnit, false, abortAfterTimeout, false);
146        }
147    
148        public void suspend(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
149            doShutdown(context, routes, timeout, timeUnit, true, false, false);
150        }
151    
152        protected boolean doShutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit,
153                                     boolean suspendOnly, boolean abortAfterTimeout, boolean forceShutdown) throws Exception {
154    
155            // just return if no routes to shutdown
156            if (routes.isEmpty()) {
157                return true;
158            }
159    
160            StopWatch watch = new StopWatch();
161    
162            // at first sort according to route startup order
163            List<RouteStartupOrder> routesOrdered = new ArrayList<RouteStartupOrder>(routes);
164            Collections.sort(routesOrdered, new Comparator<RouteStartupOrder>() {
165                public int compare(RouteStartupOrder o1, RouteStartupOrder o2) {
166                    return o1.getStartupOrder() - o2.getStartupOrder();
167                }
168            });
169            if (shutdownRoutesInReverseOrder) {
170                Collections.reverse(routesOrdered);
171            }
172    
173            LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")");
174    
175            // use another thread to perform the shutdowns so we can support timeout
176            timeoutOccurred.set(false);
177            currentShutdownTaskFuture = getExecutorService().submit(new ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly, abortAfterTimeout, timeoutOccurred));
178            try {
179                currentShutdownTaskFuture.get(timeout, timeUnit);
180            } catch (ExecutionException e) {
181                // unwrap execution exception
182                throw ObjectHelper.wrapRuntimeCamelException(e.getCause());
183            } catch (Exception e) {
184                // either timeout or interrupted exception was thrown so this is okay
185                // as interrupted would mean cancel was called on the currentShutdownTaskFuture to signal a forced timeout
186    
187                // we hit a timeout, so set the flag
188                timeoutOccurred.set(true);
189    
190                // timeout then cancel the task
191                currentShutdownTaskFuture.cancel(true);
192    
193                // signal we are forcing shutdown now, since timeout occurred
194                this.forceShutdown = forceShutdown;
195    
196                // if set, stop processing and return false to indicate that the shutdown is aborting
197                if (!forceShutdown && abortAfterTimeout) {
198                    LOG.warn("Timeout occurred. Aborting the shutdown now.");
199                    return false;
200                } else {
201                    if (forceShutdown || shutdownNowOnTimeout) {
202                        LOG.warn("Timeout occurred. Forcing the routes to be shutdown now.");
203                        // force the routes to shutdown now
204                        shutdownRoutesNow(routesOrdered);
205    
206                        // now the route consumers has been shutdown, then prepare route services for shutdown now (forced)
207                        for (RouteStartupOrder order : routes) {
208                            for (Service service : order.getServices()) {
209                                prepareShutdown(service, true, true, isSuppressLoggingOnTimeout());
210                            }
211                        }
212                    } else {
213                        LOG.warn("Timeout occurred. Will ignore shutting down the remainder routes.");
214                    }
215                }
216            } finally {
217                currentShutdownTaskFuture = null;
218            }
219    
220            // convert to seconds as its easier to read than a big milli seconds number
221            long seconds = TimeUnit.SECONDS.convert(watch.stop(), TimeUnit.MILLISECONDS);
222    
223            LOG.info("Graceful shutdown of " + routesOrdered.size() + " routes completed in " + seconds + " seconds");
224            return true;
225        }
226    
227        @Override
228        public boolean forceShutdown(Service service) {
229            return forceShutdown;
230        }
231    
232        @Override
233        public boolean hasTimeoutOccurred() {
234            return timeoutOccurred.get();
235        }
236    
237        public void setTimeout(long timeout) {
238            if (timeout <= 0) {
239                throw new IllegalArgumentException("Timeout must be a positive value");
240            }
241            this.timeout = timeout;
242        }
243    
244        public long getTimeout() {
245            return timeout;
246        }
247    
248        public void setTimeUnit(TimeUnit timeUnit) {
249            this.timeUnit = timeUnit;
250        }
251    
252        public TimeUnit getTimeUnit() {
253            return timeUnit;
254        }
255    
256        public void setShutdownNowOnTimeout(boolean shutdownNowOnTimeout) {
257            this.shutdownNowOnTimeout = shutdownNowOnTimeout;
258        }
259    
260        public boolean isShutdownNowOnTimeout() {
261            return shutdownNowOnTimeout;
262        }
263    
264        public boolean isShutdownRoutesInReverseOrder() {
265            return shutdownRoutesInReverseOrder;
266        }
267    
268        public void setShutdownRoutesInReverseOrder(boolean shutdownRoutesInReverseOrder) {
269            this.shutdownRoutesInReverseOrder = shutdownRoutesInReverseOrder;
270        }
271    
272        public boolean isSuppressLoggingOnTimeout() {
273            return suppressLoggingOnTimeout;
274        }
275    
276        public void setSuppressLoggingOnTimeout(boolean suppressLoggingOnTimeout) {
277            this.suppressLoggingOnTimeout = suppressLoggingOnTimeout;
278        }
279    
280        public CamelContext getCamelContext() {
281            return camelContext;
282        }
283    
284        public void setCamelContext(CamelContext camelContext) {
285            this.camelContext = camelContext;
286        }
287    
288        public Future<?> getCurrentShutdownTaskFuture() {
289            return currentShutdownTaskFuture;
290        }
291    
292        /**
293         * Shutdown all the consumers immediately.
294         *
295         * @param routes the routes to shutdown
296         */
297        protected void shutdownRoutesNow(List<RouteStartupOrder> routes) {
298            for (RouteStartupOrder order : routes) {
299    
300                // set the route to shutdown as fast as possible by stopping after
301                // it has completed its current task
302                ShutdownRunningTask current = order.getRoute().getRouteContext().getShutdownRunningTask();
303                if (current != ShutdownRunningTask.CompleteCurrentTaskOnly) {
304                    LOG.debug("Changing shutdownRunningTask from {} to " +  ShutdownRunningTask.CompleteCurrentTaskOnly
305                        + " on route {} to shutdown faster", current, order.getRoute().getId());
306                    order.getRoute().getRouteContext().setShutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly);
307                }
308    
309                for (Consumer consumer : order.getInputs()) {
310                    shutdownNow(consumer);
311                }
312            }
313        }
314    
315        /**
316         * Shutdown all the consumers immediately.
317         *
318         * @param consumers the consumers to shutdown
319         */
320        protected void shutdownNow(List<Consumer> consumers) {
321            for (Consumer consumer : consumers) {
322                shutdownNow(consumer);
323            }
324        }
325    
326        /**
327         * Shutdown the consumer immediately.
328         *
329         * @param consumer the consumer to shutdown
330         */
331        protected static void shutdownNow(Consumer consumer) {
332            LOG.trace("Shutting down: {}", consumer);
333    
334            // allow us to do custom work before delegating to service helper
335            try {
336                ServiceHelper.stopService(consumer);
337            } catch (Throwable e) {
338                LOG.warn("Error occurred while shutting down route: " + consumer + ". This exception will be ignored.", e);
339                // fire event
340                EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e);
341            }
342    
343            LOG.trace("Shutdown complete for: {}", consumer);
344        }
345    
346        /**
347         * Suspends/stops the consumer immediately.
348         *
349         * @param consumer the consumer to suspend
350         */
351        protected static void suspendNow(Consumer consumer) {
352            LOG.trace("Suspending: {}", consumer);
353    
354            // allow us to do custom work before delegating to service helper
355            try {
356                ServiceHelper.suspendService(consumer);
357            } catch (Throwable e) {
358                LOG.warn("Error occurred while suspending route: " + consumer + ". This exception will be ignored.", e);
359                // fire event
360                EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e);
361            }
362    
363            LOG.trace("Suspend complete for: {}", consumer);
364        }
365    
366        private ExecutorService getExecutorService() {
367            if (executor == null) {
368                executor = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "ShutdownTask");
369            }
370            return executor;
371        }
372    
373        @Override
374        protected void doStart() throws Exception {
375            ObjectHelper.notNull(camelContext, "CamelContext");
376            // reset option
377            forceShutdown = false;
378            timeoutOccurred.set(false);
379        }
380    
381        @Override
382        protected void doStop() throws Exception {
383            // noop
384        }
385    
386        @Override
387        protected void doShutdown() throws Exception {
388            if (executor != null) {
389                // force shutting down as we are shutting down Camel
390                camelContext.getExecutorServiceManager().shutdownNow(executor);
391                // should clear executor so we can restart by creating a new thread pool
392                executor = null;
393            }
394        }
395    
396        /**
397         * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean)} method
398         * on the service if it implement this interface.
399         *
400         * @param service the service
401         * @param forced  whether to force shutdown
402         * @param includeChildren whether to prepare the child of the service as well
403         */
404        private static void prepareShutdown(Service service, boolean forced, boolean includeChildren, boolean suppressLogging) {
405            Set<Service> list;
406            if (includeChildren) {
407                // include error handlers as we want to prepare them for shutdown as well
408                list = ServiceHelper.getChildServices(service, true);
409            } else {
410                list = new LinkedHashSet<Service>(1);
411                list.add(service);
412            }
413    
414            for (Service child : list) {
415                if (child instanceof ShutdownPrepared) {
416                    try {
417                        LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child);
418                        ((ShutdownPrepared) child).prepareShutdown(forced);
419                    } catch (Exception e) {
420                        if (suppressLogging) {
421                            LOG.trace("Error during prepare shutdown on " + child + ". This exception will be ignored.", e);
422                        } else {
423                            LOG.warn("Error during prepare shutdown on " + child + ". This exception will be ignored.", e);
424                        }
425                    }
426                }
427            }
428        }
429    
430        /**
431         * Holder for deferred consumers
432         */
433        static class ShutdownDeferredConsumer {
434            private final Route route;
435            private final Consumer consumer;
436    
437            ShutdownDeferredConsumer(Route route, Consumer consumer) {
438                this.route = route;
439                this.consumer = consumer;
440            }
441    
442            Route getRoute() {
443                return route;
444            }
445    
446            Consumer getConsumer() {
447                return consumer;
448            }
449        }
450    
451        /**
452         * Shutdown task which shutdown all the routes in a graceful manner.
453         */
454        static class ShutdownTask implements Runnable {
455    
456            private final CamelContext context;
457            private final List<RouteStartupOrder> routes;
458            private final boolean suspendOnly;
459            private final boolean abortAfterTimeout;
460            private final long timeout;
461            private final TimeUnit timeUnit;
462            private final AtomicBoolean timeoutOccurred;
463    
464            public ShutdownTask(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit,
465                                boolean suspendOnly, boolean abortAfterTimeout, AtomicBoolean timeoutOccurred) {
466                this.context = context;
467                this.routes = routes;
468                this.suspendOnly = suspendOnly;
469                this.abortAfterTimeout = abortAfterTimeout;
470                this.timeout = timeout;
471                this.timeUnit = timeUnit;
472                this.timeoutOccurred = timeoutOccurred;
473            }
474    
475            public void run() {
476                // the strategy in this run method is to
477                // 1) go over the routes and shutdown those routes which can be shutdown asap
478                //    some routes will be deferred to shutdown at the end, as they are needed
479                //    by other routes so they can complete their tasks
480                // 2) wait until all inflight and pending exchanges has been completed
481                // 3) shutdown the deferred routes
482    
483                LOG.debug("There are {} routes to {}", routes.size(), suspendOnly ? "suspend" : "shutdown");
484    
485                // list of deferred consumers to shutdown when all exchanges has been completed routed
486                // and thus there are no more inflight exchanges so they can be safely shutdown at that time
487                List<ShutdownDeferredConsumer> deferredConsumers = new ArrayList<ShutdownDeferredConsumer>();
488                for (RouteStartupOrder order : routes) {
489    
490                    ShutdownRoute shutdownRoute = order.getRoute().getRouteContext().getShutdownRoute();
491                    ShutdownRunningTask shutdownRunningTask = order.getRoute().getRouteContext().getShutdownRunningTask();
492    
493                    if (LOG.isTraceEnabled()) {
494                        LOG.trace("{}{} with options [{},{}]",
495                                new Object[]{suspendOnly ? "Suspending route: " : "Shutting down route: ",
496                                    order.getRoute().getId(), shutdownRoute, shutdownRunningTask});
497                    }
498    
499                    for (Consumer consumer : order.getInputs()) {
500    
501                        boolean suspend = false;
502    
503                        // assume we should shutdown if we are not deferred
504                        boolean shutdown = shutdownRoute != ShutdownRoute.Defer;
505    
506                        if (shutdown) {
507                            // if we are to shutdown then check whether we can suspend instead as its a more
508                            // gentle way to graceful shutdown
509    
510                            // some consumers do not support shutting down so let them decide
511                            // if a consumer is suspendable then prefer to use that and then shutdown later
512                            if (consumer instanceof ShutdownAware) {
513                                shutdown = !((ShutdownAware) consumer).deferShutdown(shutdownRunningTask);
514                            }
515                            if (shutdown && consumer instanceof SuspendableService) {
516                                // we prefer to suspend over shutdown
517                                suspend = true;
518                            }
519                        }
520    
521                        // log at info level when a route has been shutdown (otherwise log at debug level to not be too noisy)
522                        if (suspend) {
523                            // only suspend it and then later shutdown it
524                            suspendNow(consumer);
525                            // add it to the deferred list so the route will be shutdown later
526                            deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
527                            LOG.debug("Route: {} suspended and shutdown deferred, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint());
528                        } else if (shutdown) {
529                            shutdownNow(consumer);
530                            LOG.info("Route: {} shutdown complete, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint());
531                        } else {
532                            // we will stop it later, but for now it must run to be able to help all inflight messages
533                            // be safely completed
534                            deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
535                            LOG.debug("Route: " + order.getRoute().getId() + (suspendOnly ? " shutdown deferred." : " suspension deferred."));
536                        }
537                    }
538                }
539    
540                // notify the services we intend to shutdown
541                for (RouteStartupOrder order : routes) {
542                    for (Service service : order.getServices()) {
543                        // skip the consumer as we handle that specially
544                        if (service instanceof Consumer) {
545                            continue;
546                        }
547                        prepareShutdown(service, false, true, false);
548                    }
549                }
550    
551                // wait till there are no more pending and inflight messages
552                boolean done = false;
553                long loopDelaySeconds = 1;
554                long loopCount = 0;
555                while (!done && !timeoutOccurred.get()) {
556                    int size = 0;
557                    for (RouteStartupOrder order : routes) {
558                        int inflight = context.getInflightRepository().size(order.getRoute().getId());
559                        for (Consumer consumer : order.getInputs()) {
560                            // include any additional pending exchanges on some consumers which may have internal
561                            // memory queues such as seda
562                            if (consumer instanceof ShutdownAware) {
563                                inflight += ((ShutdownAware) consumer).getPendingExchangesSize();
564                            }
565                        }
566                        if (inflight > 0) {
567                            size += inflight;
568                            LOG.trace("{} inflight and pending exchanges for route: {}", inflight, order.getRoute().getId());
569                        }
570                    }
571                    if (size > 0) {
572                        try {
573                            LOG.info("Waiting as there are still " + size + " inflight and pending exchanges to complete, timeout in "
574                                 + (TimeUnit.SECONDS.convert(timeout, timeUnit) - (loopCount++ * loopDelaySeconds)) + " seconds.");
575                            Thread.sleep(loopDelaySeconds * 1000);
576                        } catch (InterruptedException e) {
577                            if (abortAfterTimeout) {
578                                LOG.warn("Interrupted while waiting during graceful shutdown, will abort.");
579                                return;
580                            } else {
581                                LOG.warn("Interrupted while waiting during graceful shutdown, will force shutdown now.");
582                                break;
583                            }
584                        }
585                    } else {
586                        done = true;
587                    }
588                }
589    
590                // prepare for shutdown
591                for (ShutdownDeferredConsumer deferred : deferredConsumers) {
592                    Consumer consumer = deferred.getConsumer();
593                    if (consumer instanceof ShutdownAware) {
594                        LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId());
595                        boolean forced = context.getShutdownStrategy().forceShutdown(consumer);
596                        boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout();
597                        prepareShutdown(consumer, forced, false, suppress);
598                        LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId());
599                    }
600                }
601    
602                // now all messages has been completed then stop the deferred consumers
603                for (ShutdownDeferredConsumer deferred : deferredConsumers) {
604                    Consumer consumer = deferred.getConsumer();
605                    if (suspendOnly) {
606                        suspendNow(consumer);
607                        LOG.info("Route: {} suspend complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint());
608                    } else {
609                        shutdownNow(consumer);
610                        LOG.info("Route: {} shutdown complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint());
611                    }
612                }
613    
614                // now the route consumers has been shutdown, then prepare route services for shutdown
615                for (RouteStartupOrder order : routes) {
616                    for (Service service : order.getServices()) {
617                        boolean forced = context.getShutdownStrategy().forceShutdown(service);
618                        boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout();
619                        prepareShutdown(service, forced, true, suppress);
620                    }
621                }
622            }
623    
624        }
625    
626    }