001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.Callable;
020    import java.util.concurrent.RejectedExecutionException;
021    import java.util.concurrent.ScheduledExecutorService;
022    import java.util.concurrent.TimeUnit;
023    
024    import org.apache.camel.AsyncCallback;
025    import org.apache.camel.AsyncProcessor;
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.LoggingLevel;
029    import org.apache.camel.Message;
030    import org.apache.camel.Predicate;
031    import org.apache.camel.Processor;
032    import org.apache.camel.model.OnExceptionDefinition;
033    import org.apache.camel.spi.ExchangeFormatter;
034    import org.apache.camel.spi.ShutdownPrepared;
035    import org.apache.camel.spi.SubUnitOfWorkCallback;
036    import org.apache.camel.spi.UnitOfWork;
037    import org.apache.camel.util.AsyncProcessorConverterHelper;
038    import org.apache.camel.util.AsyncProcessorHelper;
039    import org.apache.camel.util.CamelContextHelper;
040    import org.apache.camel.util.CamelLogger;
041    import org.apache.camel.util.EventHelper;
042    import org.apache.camel.util.ExchangeHelper;
043    import org.apache.camel.util.MessageHelper;
044    import org.apache.camel.util.ObjectHelper;
045    import org.apache.camel.util.ServiceHelper;
046    
047    /**
048     * Base redeliverable error handler that also supports a final dead letter queue in case
049     * all redelivery attempts fail.
050     * <p/>
051     * This implementation should contain all the error handling logic and the sub classes
052     * should only configure it according to what they support.
053     *
054     * @version
055     */
056    public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor, ShutdownPrepared {
057    
058        protected ScheduledExecutorService executorService;
059        protected final CamelContext camelContext;
060        protected final Processor deadLetter;
061        protected final String deadLetterUri;
062        protected final Processor output;
063        protected final AsyncProcessor outputAsync;
064        protected final Processor redeliveryProcessor;
065        protected final RedeliveryPolicy redeliveryPolicy;
066        protected final Predicate retryWhilePolicy;
067        protected final CamelLogger logger;
068        protected final boolean useOriginalMessagePolicy;
069        protected boolean redeliveryEnabled;
070        protected volatile boolean preparingShutdown;
071        protected final ExchangeFormatter exchangeFormatter;
072    
073        /**
074         * Contains the current redelivery data
075         */
076        protected class RedeliveryData {
077            Exchange original;
078            boolean sync = true;
079            int redeliveryCounter;
080            long redeliveryDelay;
081            Predicate retryWhilePredicate = retryWhilePolicy;
082            boolean redeliverFromSync;
083    
084            // default behavior which can be overloaded on a per exception basis
085            RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
086            Processor deadLetterProcessor = deadLetter;
087            Processor failureProcessor;
088            Processor onRedeliveryProcessor = redeliveryProcessor;
089            Predicate handledPredicate = getDefaultHandledPredicate();
090            Predicate continuedPredicate;
091            boolean useOriginalInMessage = useOriginalMessagePolicy;
092        }
093    
094        /**
095         * Tasks which performs asynchronous redelivery attempts, and being triggered by a
096         * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
097         * has to be delayed before a redelivery attempt is performed.
098         */
099        private class AsyncRedeliveryTask implements Callable<Boolean> {
100    
101            private final Exchange exchange;
102            private final AsyncCallback callback;
103            private final RedeliveryData data;
104    
105            public AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
106                this.exchange = exchange;
107                this.callback = callback;
108                this.data = data;
109            }
110    
111            public Boolean call() throws Exception {
112                // prepare for redelivery
113                prepareExchangeForRedelivery(exchange, data);
114    
115                // letting onRedeliver be executed at first
116                deliverToOnRedeliveryProcessor(exchange, data);
117    
118                if (log.isTraceEnabled()) {
119                    log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", new Object[]{exchange.getExchangeId(), outputAsync, exchange});
120                }
121    
122                // emmit event we are doing redelivery
123                EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
124    
125                // process the exchange (also redelivery)
126                boolean sync;
127                if (data.redeliverFromSync) {
128                    // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from
129                    // this error handler, which means we have to invoke the callback with false, to have the callback
130                    // be notified when we are done
131                    sync = outputAsync.process(exchange, new AsyncCallback() {
132                        public void done(boolean doneSync) {
133                            log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
134    
135                            // mark we are in sync mode now
136                            data.sync = false;
137    
138                            // only process if the exchange hasn't failed
139                            // and it has not been handled by the error processor
140                            if (isDone(exchange)) {
141                                callback.done(false);
142                                return;
143                            }
144    
145                            // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
146                            processAsyncErrorHandler(exchange, callback, data);
147                        }
148                    });
149                } else {
150                    // this redelivery task was scheduled from asynchronous, which means we should only
151                    // handle when the asynchronous task was done
152                    sync = outputAsync.process(exchange, new AsyncCallback() {
153                        public void done(boolean doneSync) {
154                            log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
155    
156                            // this callback should only handle the async case
157                            if (doneSync) {
158                                return;
159                            }
160    
161                            // mark we are in async mode now
162                            data.sync = false;
163    
164                            // only process if the exchange hasn't failed
165                            // and it has not been handled by the error processor
166                            if (isDone(exchange)) {
167                                callback.done(doneSync);
168                                return;
169                            }
170                            // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
171                            processAsyncErrorHandler(exchange, callback, data);
172                        }
173                    });
174                }
175    
176                return sync;
177            }
178        }
179    
180        public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
181                Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter,
182                String deadLetterUri, boolean useOriginalMessagePolicy, Predicate retryWhile, ScheduledExecutorService executorService) {
183    
184            ObjectHelper.notNull(camelContext, "CamelContext", this);
185            ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this);
186    
187            this.camelContext = camelContext;
188            this.redeliveryProcessor = redeliveryProcessor;
189            this.deadLetter = deadLetter;
190            this.output = output;
191            this.outputAsync = AsyncProcessorConverterHelper.convert(output);
192            this.redeliveryPolicy = redeliveryPolicy;
193            this.logger = logger;
194            this.deadLetterUri = deadLetterUri;
195            this.useOriginalMessagePolicy = useOriginalMessagePolicy;
196            this.retryWhilePolicy = retryWhile;
197            this.executorService = executorService;
198    
199            // setup exchange formatter to be used for message history dump
200            DefaultExchangeFormatter formatter = new DefaultExchangeFormatter();
201            formatter.setShowExchangeId(true);
202            formatter.setMultiline(true);
203            formatter.setShowHeaders(true);
204            formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed);
205            this.exchangeFormatter = formatter;
206        }
207    
208        public boolean supportTransacted() {
209            return false;
210        }
211    
212        protected boolean isRunAllowed(RedeliveryData data) {
213            // if camel context is forcing a shutdown then do not allow running
214            boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
215            if (forceShutdown) {
216                log.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)");
217                return false;
218            }
219    
220            // redelivery policy can control if redelivery is allowed during stopping/shutdown
221            // but this only applies during a redelivery (counter must > 0)
222            if (data.redeliveryCounter > 0) {
223                if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
224                    log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)");
225                    return true;
226                } else if (preparingShutdown) {
227                    // we are preparing for shutdown, now determine if we can still run
228                    boolean answer = isRunAllowedOnPreparingShutdown();
229                    log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer);
230                    return answer;
231                }
232            }
233    
234            // we cannot run if we are stopping/stopped
235            boolean answer = !isStoppingOrStopped();
236            log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer);
237            return answer;
238        }
239    
240        protected boolean isRunAllowedOnPreparingShutdown() {
241            return false;
242        }
243    
244        protected boolean isRedeliveryAllowed(RedeliveryData data) {
245            // redelivery policy can control if redelivery is allowed during stopping/shutdown
246            // but this only applies during a redelivery (counter must > 0)
247            if (data.redeliveryCounter > 0) {
248                boolean stopping = isStoppingOrStopped();
249                if (!preparingShutdown && !stopping) {
250                    log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)");
251                    return true;
252                } else {
253                    // we are stopping or preparing to shutdown
254                    if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
255                        log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)");
256                        return true;
257                    } else {
258                        log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)");
259                        return false;
260                    }
261                }
262            }
263    
264            return true;
265        }
266    
267        @Override
268        public void prepareShutdown(boolean forced) {
269            // prepare for shutdown, eg do not allow redelivery if configured
270            log.trace("Prepare shutdown on error handler {}", this);
271            preparingShutdown = true;
272        }
273    
274        public void process(Exchange exchange) throws Exception {
275            if (output == null) {
276                // no output then just return
277                return;
278            }
279            AsyncProcessorHelper.process(this, exchange);
280        }
281    
282        /**
283         * Process the exchange using redelivery error handling.
284         */
285        public boolean process(final Exchange exchange, final AsyncCallback callback) {
286            final RedeliveryData data = new RedeliveryData();
287    
288            // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
289            // original Exchange is being redelivered, and not a mutated Exchange
290            data.original = defensiveCopyExchangeIfNeeded(exchange);
291    
292            // use looping to have redelivery attempts
293            while (true) {
294    
295                // can we still run
296                if (!isRunAllowed(data)) {
297                    log.trace("Run not allowed, will reject executing exchange: {}", exchange);
298                    if (exchange.getException() == null) {
299                        exchange.setException(new RejectedExecutionException());
300                    }
301                    // we cannot process so invoke callback
302                    callback.done(data.sync);
303                    return data.sync;
304                }
305    
306                // did previous processing cause an exception?
307                boolean handle = shouldHandleException(exchange);
308                if (handle) {
309                    handleException(exchange, data);
310                }
311    
312                // compute if we are exhausted, and whether redelivery is allowed
313                boolean exhausted = isExhausted(exchange, data);
314                boolean redeliverAllowed = isRedeliveryAllowed(data);
315    
316                // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC)
317                if (!redeliverAllowed || exhausted) {
318                    Processor target = null;
319                    boolean deliver = true;
320    
321                    // the unit of work may have an optional callback associated we need to leverage
322                    SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback();
323                    if (uowCallback != null) {
324                        // signal to the callback we are exhausted
325                        uowCallback.onExhausted(exchange);
326                        // do not deliver to the failure processor as its been handled by the callback instead
327                        deliver = false;
328                    }
329    
330                    if (deliver) {
331                        // should deliver to failure processor (either from onException or the dead letter channel)
332                        target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
333                    }
334                    // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
335                    // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
336                    boolean isDeadLetterChannel = isDeadLetterChannel() && target == data.deadLetterProcessor;
337                    boolean sync = deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
338                    // we are breaking out
339                    return sync;
340                }
341    
342                if (data.redeliveryCounter > 0) {
343                    // calculate delay
344                    data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
345    
346                    if (data.redeliveryDelay > 0) {
347                        // okay there is a delay so create a scheduled task to have it executed in the future
348    
349                        if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
350    
351                            // we are doing a redelivery then a thread pool must be configured (see the doStart method)
352                            ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
353    
354                            // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
355                            // have it being executed in the future, or immediately
356                            // we are continuing asynchronously
357    
358                            // mark we are routing async from now and that this redelivery task came from a synchronous routing
359                            data.sync = false;
360                            data.redeliverFromSync = true;
361                            AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
362    
363                            // schedule the redelivery task
364                            if (log.isTraceEnabled()) {
365                                log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
366                            }
367                            executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
368    
369                            return false;
370                        } else {
371                            // async delayed redelivery was disabled or we are transacted so we must be synchronous
372                            // as the transaction manager requires to execute in the same thread context
373                            try {
374                                data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
375                            } catch (InterruptedException e) {
376                                // we was interrupted so break out
377                                exchange.setException(e);
378                                // mark the exchange to stop continue routing when interrupted
379                                // as we do not want to continue routing (for example a task has been cancelled)
380                                exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
381                                callback.done(data.sync);
382                                return data.sync;
383                            }
384                        }
385                    }
386    
387                    // prepare for redelivery
388                    prepareExchangeForRedelivery(exchange, data);
389    
390                    // letting onRedeliver be executed
391                    deliverToOnRedeliveryProcessor(exchange, data);
392    
393                    // emmit event we are doing redelivery
394                    EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
395                }
396    
397                // process the exchange (also redelivery)
398                boolean sync = outputAsync.process(exchange, new AsyncCallback() {
399                    public void done(boolean sync) {
400                        // this callback should only handle the async case
401                        if (sync) {
402                            return;
403                        }
404    
405                        // mark we are in async mode now
406                        data.sync = false;
407    
408                        // if we are done then notify callback and exit
409                        if (isDone(exchange)) {
410                            callback.done(sync);
411                            return;
412                        }
413    
414                        // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
415                        // method which takes care of this in a asynchronous manner
416                        processAsyncErrorHandler(exchange, callback, data);
417                    }
418                });
419    
420                if (!sync) {
421                    // the remainder of the Exchange is being processed asynchronously so we should return
422                    return false;
423                }
424                // we continue to route synchronously
425    
426                // if we are done then notify callback and exit
427                boolean done = isDone(exchange);
428                if (done) {
429                    callback.done(true);
430                    return true;
431                }
432    
433                // error occurred so loop back around.....
434            }
435        }
436    
437        /**
438         * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY}
439         * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p>
440         *
441         * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay}
442         * and {@link RedeliveryData#redeliveryCounter} are copied in.</p>
443         *
444         * @param exchange The current exchange in question.
445         * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation.
446         * @param redeliveryDelay The default redelivery delay from RedeliveryData
447         * @param redeliveryCounter The redeliveryCounter
448         * @return The time to wait before the next redelivery.
449         */
450        protected long determineRedeliveryDelay(Exchange exchange, RedeliveryPolicy redeliveryPolicy, long redeliveryDelay, int redeliveryCounter) {
451            Message message = exchange.getIn();
452            Long delay = message.getHeader(Exchange.REDELIVERY_DELAY, Long.class);
453            if (delay == null) {
454                delay = redeliveryPolicy.calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter);
455                log.debug("Redelivery delay calculated as {}", delay);
456            } else {
457                log.debug("Redelivery delay is {} from Message Header [{}]", delay, Exchange.REDELIVERY_DELAY);
458            }
459            return delay;
460        }
461    
462        /**
463         * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback.
464         * <p/>
465         * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use
466         * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b>
467         * in terms of logic.
468         */
469        protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
470            // can we still run
471            if (!isRunAllowed(data)) {
472                log.trace("Run not allowed, will reject executing exchange: {}", exchange);
473                if (exchange.getException() == null) {
474                    exchange.setException(new RejectedExecutionException());
475                }
476                callback.done(data.sync);
477                return;
478            }
479    
480            // did previous processing cause an exception?
481            boolean handle = shouldHandleException(exchange);
482            if (handle) {
483                handleException(exchange, data);
484            }
485    
486            // compute if we are exhausted or not
487            boolean exhausted = isExhausted(exchange, data);
488            if (exhausted) {
489                Processor target = null;
490                boolean deliver = true;
491    
492                // the unit of work may have an optional callback associated we need to leverage
493                UnitOfWork uow = exchange.getUnitOfWork();
494                if (uow != null) {
495                    SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback();
496                    if (uowCallback != null) {
497                        // signal to the callback we are exhausted
498                        uowCallback.onExhausted(exchange);
499                        // do not deliver to the failure processor as its been handled by the callback instead
500                        deliver = false;
501                    }
502                }
503    
504                if (deliver) {
505                    // should deliver to failure processor (either from onException or the dead letter channel)
506                    target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
507                }
508                // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
509                // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
510                boolean isDeadLetterChannel = isDeadLetterChannel() && target == data.deadLetterProcessor;
511                deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
512                // we are breaking out
513                return;
514            }
515    
516            if (data.redeliveryCounter > 0) {
517                // we are doing a redelivery then a thread pool must be configured (see the doStart method)
518                ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
519    
520                // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
521                // have it being executed in the future, or immediately
522                // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously
523                // to ensure the callback will continue routing from where we left
524                AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
525    
526                // calculate the redelivery delay
527                data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
528                if (data.redeliveryDelay > 0) {
529                    // schedule the redelivery task
530                    if (log.isTraceEnabled()) {
531                        log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
532                    }
533                    executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
534                } else {
535                    // execute the task immediately
536                    executorService.submit(task);
537                }
538            }
539        }
540    
541        /**
542         * Performs a defensive copy of the exchange if needed
543         *
544         * @param exchange the exchange
545         * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled).
546         */
547        protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) {
548            // only do a defensive copy if redelivery is enabled
549            if (redeliveryEnabled) {
550                return ExchangeHelper.createCopy(exchange, true);
551            } else {
552                return null;
553            }
554        }
555    
556        /**
557         * Strategy whether the exchange has an exception that we should try to handle.
558         * <p/>
559         * Standard implementations should just look for an exception.
560         */
561        protected boolean shouldHandleException(Exchange exchange) {
562            return exchange.getException() != null;
563        }
564    
565        /**
566         * Strategy to determine if the exchange is done so we can continue
567         */
568        protected boolean isDone(Exchange exchange) {
569            boolean answer = isCancelledOrInterrupted(exchange);
570    
571            // only done if the exchange hasn't failed
572            // and it has not been handled by the failure processor
573            // or we are exhausted
574            if (!answer) {
575                answer = exchange.getException() == null
576                    || ExchangeHelper.isFailureHandled(exchange)
577                    || ExchangeHelper.isRedeliveryExhausted(exchange);
578            }
579    
580            log.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer);
581            return answer;
582        }
583    
584        /**
585         * Strategy to determine if the exchange was cancelled or interrupted
586         */
587        protected boolean isCancelledOrInterrupted(Exchange exchange) {
588            boolean answer = false;
589    
590            if (ExchangeHelper.isInterrupted(exchange)) {
591                // mark the exchange to stop continue routing when interrupted
592                // as we do not want to continue routing (for example a task has been cancelled)
593                exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
594                answer = true;
595            }
596    
597            log.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer);
598            return answer;
599        }
600    
601        /**
602         * Returns the output processor
603         */
604        public Processor getOutput() {
605            return output;
606        }
607    
608        /**
609         * Returns the dead letter that message exchanges will be sent to if the
610         * redelivery attempts fail
611         */
612        public Processor getDeadLetter() {
613            return deadLetter;
614        }
615    
616        public String getDeadLetterUri() {
617            return deadLetterUri;
618        }
619    
620        public boolean isUseOriginalMessagePolicy() {
621            return useOriginalMessagePolicy;
622        }
623    
624        public RedeliveryPolicy getRedeliveryPolicy() {
625            return redeliveryPolicy;
626        }
627    
628        public CamelLogger getLogger() {
629            return logger;
630        }
631    
632        protected Predicate getDefaultHandledPredicate() {
633            // Default is not not handle errors
634            return null;
635        }
636    
637        protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data) {
638            Exception caught = exchange.getException();
639    
640            // we continue so clear any exceptions
641            exchange.setException(null);
642            // clear rollback flags
643            exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
644            // reset cached streams so they can be read again
645            MessageHelper.resetStreamCache(exchange.getIn());
646    
647            // its continued then remove traces of redelivery attempted and caught exception
648            exchange.getIn().removeHeader(Exchange.REDELIVERED);
649            exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
650            exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
651            exchange.removeProperty(Exchange.FAILURE_HANDLED);
652            // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
653    
654            // create log message
655            String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
656            msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
657            msg = msg + ". Handled and continue routing.";
658    
659            // log that we failed but want to continue
660            logFailedDelivery(false, false, true, exchange, msg, data, null);
661        }
662    
663        protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) {
664            if (!redeliveryEnabled) {
665                throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly.");
666            }
667            // there must be a defensive copy of the exchange
668            ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this);
669    
670            // okay we will give it another go so clear the exception so we can try again
671            exchange.setException(null);
672    
673            // clear rollback flags
674            exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
675    
676            // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange
677            // and then put these on the exchange when doing a redelivery / fault processor
678    
679            // preserve these headers
680            Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
681            Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
682            Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class);
683    
684            // we are redelivering so copy from original back to exchange
685            exchange.getIn().copyFrom(data.original.getIn());
686            exchange.setOut(null);
687            // reset cached streams so they can be read again
688            MessageHelper.resetStreamCache(exchange.getIn());
689    
690            // put back headers
691            if (redeliveryCounter != null) {
692                exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
693            }
694            if (redeliveryMaxCounter != null) {
695                exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
696            }
697            if (redelivered != null) {
698                exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered);
699            }
700        }
701    
702        protected void handleException(Exchange exchange, RedeliveryData data) {
703            Exception e = exchange.getException();
704    
705            // store the original caused exception in a property, so we can restore it later
706            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
707    
708            // find the error handler to use (if any)
709            OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
710            if (exceptionPolicy != null) {
711                data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
712                data.handledPredicate = exceptionPolicy.getHandledPolicy();
713                data.continuedPredicate = exceptionPolicy.getContinuedPolicy();
714                data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy();
715                data.useOriginalInMessage = exceptionPolicy.isUseOriginalMessage();
716    
717                // route specific failure handler?
718                Processor processor = null;
719                UnitOfWork uow = exchange.getUnitOfWork();
720                if (uow != null && uow.getRouteContext() != null) {
721                    String routeId = uow.getRouteContext().getRoute().getId();
722                    processor = exceptionPolicy.getErrorHandler(routeId);
723                } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) {
724                    // note this should really not happen, but we have this code as a fail safe
725                    // to be backwards compatible with the old behavior
726                    log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId());
727                    processor = exceptionPolicy.getErrorHandlers().iterator().next();
728                }
729                if (processor != null) {
730                    data.failureProcessor = processor;
731                }
732    
733                // route specific on redelivery?
734                processor = exceptionPolicy.getOnRedelivery();
735                if (processor != null) {
736                    data.onRedeliveryProcessor = processor;
737                }
738            }
739    
740            // only log if not failure handled or not an exhausted unit of work
741            if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) {
742                String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange)
743                        + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
744                logFailedDelivery(true, false, false, exchange, msg, data, e);
745            }
746    
747            data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data);
748        }
749    
750        /**
751         * Gives an optional configure redelivery processor a chance to process before the Exchange
752         * will be redelivered. This can be used to alter the Exchange.
753         */
754        protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
755            if (data.onRedeliveryProcessor == null) {
756                return;
757            }
758    
759            if (log.isTraceEnabled()) {
760                log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered",
761                        data.onRedeliveryProcessor, exchange);
762            }
763    
764            // run this synchronously as its just a Processor
765            try {
766                data.onRedeliveryProcessor.process(exchange);
767            } catch (Throwable e) {
768                exchange.setException(e);
769            }
770            log.trace("Redelivery processor done");
771        }
772    
773        /**
774         * All redelivery attempts failed so move the exchange to the dead letter queue
775         */
776        protected boolean deliverToFailureProcessor(final Processor processor, final boolean isDeadLetterChannel, final Exchange exchange,
777                                                    final RedeliveryData data, final AsyncCallback callback) {
778            boolean sync = true;
779    
780            Exception caught = exchange.getException();
781    
782            // we did not success with the redelivery so now we let the failure processor handle it
783            // clear exception as we let the failure processor handle it
784            exchange.setException(null);
785    
786            // always handle if dead letter channel
787            final boolean shouldHandle = isDeadLetterChannel || shouldHandled(exchange, data);
788            final boolean shouldContinue = shouldContinue(exchange, data);
789            // regard both handled or continued as being handled
790            boolean handled = false;
791    
792            if (shouldHandle || shouldContinue) {
793                // its handled then remove traces of redelivery attempted
794                exchange.getIn().removeHeader(Exchange.REDELIVERED);
795                exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
796                exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
797                exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
798    
799                // and remove traces of rollback only and uow exhausted markers
800                exchange.removeProperty(Exchange.ROLLBACK_ONLY);
801                exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED);
802    
803                handled = true;
804            } else {
805                // must decrement the redelivery counter as we didn't process the redelivery but is
806                // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
807                decrementRedeliveryCounter(exchange);
808            }
809    
810            // is the a failure processor to process the Exchange
811            if (processor != null) {
812    
813                // prepare original IN body if it should be moved instead of current body
814                if (data.useOriginalInMessage) {
815                    log.trace("Using the original IN message instead of current");
816                    Message original = exchange.getUnitOfWork().getOriginalInMessage();
817                    exchange.setIn(original);
818                    if (exchange.hasOut()) {
819                        log.trace("Removing the out message to avoid some uncertain behavior");
820                        exchange.setOut(null);
821                    }
822                }
823    
824                // reset cached streams so they can be read again
825                MessageHelper.resetStreamCache(exchange.getIn());
826    
827                log.trace("Failure processor {} is processing Exchange: {}", processor, exchange);
828    
829                // store the last to endpoint as the failure endpoint
830                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
831                // and store the route id so we know in which route we failed
832                UnitOfWork uow = exchange.getUnitOfWork();
833                if (uow != null && uow.getRouteContext() != null) {
834                    exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
835                }
836    
837                // the failure processor could also be asynchronous
838                AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor);
839                sync = afp.process(exchange, new AsyncCallback() {
840                    public void done(boolean sync) {
841                        log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange);
842                        try {
843                            prepareExchangeAfterFailure(exchange, data, shouldHandle, shouldContinue);
844                            // fire event as we had a failure processor to handle it, which there is a event for
845                            boolean deadLetterChannel = processor == data.deadLetterProcessor && data.deadLetterProcessor != null;
846                            EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel);
847                        } finally {
848                            // if the fault was handled asynchronously, this should be reflected in the callback as well
849                            data.sync &= sync;
850                            callback.done(data.sync);
851                        }
852                    }
853                });
854            } else {
855                try {
856                    // no processor but we need to prepare after failure as well
857                    prepareExchangeAfterFailure(exchange, data, shouldHandle, shouldContinue);
858                } finally {
859                    // callback we are done
860                    callback.done(data.sync);
861                }
862            }
863    
864            // create log message
865            String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
866            msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
867            if (processor != null) {
868                msg = msg + ". Processed by failure processor: " + processor;
869            }
870    
871            // log that we failed delivery as we are exhausted
872            logFailedDelivery(false, handled, false, exchange, msg, data, null);
873    
874            return sync;
875        }
876    
877        protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data,
878                                                   final boolean shouldHandle, final boolean shouldContinue) {
879            // we could not process the exchange so we let the failure processor handled it
880            ExchangeHelper.setFailureHandled(exchange);
881    
882            // honor if already set a handling
883            boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
884            if (alreadySet) {
885                boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
886                log.trace("This exchange has already been marked for handling: {}", handled);
887                if (handled) {
888                    exchange.setException(null);
889                } else {
890                    // exception not handled, put exception back in the exchange
891                    exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
892                    // and put failure endpoint back as well
893                    exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
894                }
895                return;
896            }
897    
898            if (shouldHandle) {
899                log.trace("This exchange is handled so its marked as not failed: {}", exchange);
900                exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
901            } else if (shouldContinue) {
902                log.trace("This exchange is continued: {}", exchange);
903                // okay we want to continue then prepare the exchange for that as well
904                prepareExchangeForContinue(exchange, data);
905            } else {
906                log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange);
907                // exception not handled, put exception back in the exchange
908                exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
909                exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
910                // and put failure endpoint back as well
911                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
912                // and store the route id so we know in which route we failed
913                UnitOfWork uow = exchange.getUnitOfWork();
914                if (uow != null && uow.getRouteContext() != null) {
915                    exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
916                }
917            }
918        }
919    
920        private void logFailedDelivery(boolean shouldRedeliver, boolean handled, boolean continued, Exchange exchange, String message, RedeliveryData data, Throwable e) {
921            if (logger == null) {
922                return;
923            }
924    
925            if (!exchange.isRollbackOnly()) {
926                // if we should not rollback, then check whether logging is enabled
927                if (handled && !data.currentRedeliveryPolicy.isLogHandled()) {
928                    // do not log handled
929                    return;
930                }
931    
932                if (continued && !data.currentRedeliveryPolicy.isLogContinued()) {
933                    // do not log handled
934                    return;
935                }
936    
937                if (shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) {
938                    // do not log retry attempts
939                    return;
940                }
941    
942                if (!shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) {
943                    // do not log exhausted
944                    return;
945                }
946            }
947    
948            LoggingLevel newLogLevel;
949            boolean logStackTrace;
950            if (exchange.isRollbackOnly()) {
951                newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
952                logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
953            } else if (shouldRedeliver) {
954                newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
955                logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace();
956            } else {
957                newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
958                logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
959            }
960            if (e == null) {
961                e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
962            }
963    
964            if (exchange.isRollbackOnly()) {
965                String msg = "Rollback " + ExchangeHelper.logIds(exchange);
966                Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
967                if (cause != null) {
968                    msg = msg + " due: " + cause.getMessage();
969                }
970    
971                // should we include message history
972                if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
973                    String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, exchangeFormatter, false);
974                    if (routeStackTrace != null) {
975                        msg = msg + "\n" + routeStackTrace;
976                    }
977                }
978    
979                if (newLogLevel == LoggingLevel.ERROR) {
980                    // log intended rollback on maximum WARN level (no ERROR)
981                    logger.log(msg, LoggingLevel.WARN);
982                } else {
983                    // otherwise use the desired logging level
984                    logger.log(msg, newLogLevel);
985                }
986            } else {
987                String msg = message;
988                // should we include message history
989                if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
990                    String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, exchangeFormatter, e != null && logStackTrace);
991                    if (routeStackTrace != null) {
992                        msg = msg + "\n" + routeStackTrace;
993                    }
994                }
995    
996                if (e != null && logStackTrace) {
997                    logger.log(msg, e, newLogLevel);
998                } else {
999                    logger.log(msg, newLogLevel);
1000                }
1001            }
1002        }
1003    
1004        /**
1005         * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback).
1006         * <p/>
1007         * If the exchange is exhausted, then we will not continue processing, but let the
1008         * failure processor deal with the exchange.
1009         *
1010         * @param exchange the current exchange
1011         * @param data     the redelivery data
1012         * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust.
1013         */
1014        private boolean isExhausted(Exchange exchange, RedeliveryData data) {
1015            // if marked as rollback only then do not continue/redeliver
1016            boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
1017            if (exhausted) {
1018                log.trace("This exchange is marked as redelivery exhausted: {}", exchange);
1019                return true;
1020            }
1021    
1022            // if marked as rollback only then do not continue/redeliver
1023            boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class);
1024            if (rollbackOnly) {
1025                log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange);
1026                return true;
1027            }
1028            // its the first original call so continue
1029            if (data.redeliveryCounter == 0) {
1030                return false;
1031            }
1032            // its a potential redelivery so determine if we should redeliver or not
1033            boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate);
1034            return !redeliver;
1035        }
1036    
1037        /**
1038         * Determines whether or not to continue if we are exhausted.
1039         *
1040         * @param exchange the current exchange
1041         * @param data     the redelivery data
1042         * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust.
1043         */
1044        private boolean shouldContinue(Exchange exchange, RedeliveryData data) {
1045            if (data.continuedPredicate != null) {
1046                return data.continuedPredicate.matches(exchange);
1047            }
1048            // do not continue by default
1049            return false;
1050        }
1051    
1052        /**
1053         * Determines whether or not to handle if we are exhausted.
1054         *
1055         * @param exchange the current exchange
1056         * @param data     the redelivery data
1057         * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust.
1058         */
1059        private boolean shouldHandled(Exchange exchange, RedeliveryData data) {
1060            if (data.handledPredicate != null) {
1061                return data.handledPredicate.matches(exchange);
1062            }
1063            // do not handle by default
1064            return false;
1065        }
1066    
1067        /**
1068         * Increments the redelivery counter and adds the redelivered flag if the
1069         * message has been redelivered
1070         */
1071        private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) {
1072            Message in = exchange.getIn();
1073            Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
1074            int next = 1;
1075            if (counter != null) {
1076                next = counter + 1;
1077            }
1078            in.setHeader(Exchange.REDELIVERY_COUNTER, next);
1079            in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
1080            // if maximum redeliveries is used, then provide that information as well
1081            if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) {
1082                in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries());
1083            }
1084            return next;
1085        }
1086    
1087        /**
1088         * Prepares the redelivery counter and boolean flag for the failure handle processor
1089         */
1090        private void decrementRedeliveryCounter(Exchange exchange) {
1091            Message in = exchange.getIn();
1092            Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
1093            if (counter != null) {
1094                int prev = counter - 1;
1095                in.setHeader(Exchange.REDELIVERY_COUNTER, prev);
1096                // set boolean flag according to counter
1097                in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
1098            } else {
1099                // not redelivered
1100                in.setHeader(Exchange.REDELIVERY_COUNTER, 0);
1101                in.setHeader(Exchange.REDELIVERED, Boolean.FALSE);
1102            }
1103        }
1104    
1105        /**
1106         * Determines if redelivery is enabled by checking if any of the redelivery policy
1107         * settings may allow redeliveries.
1108         *
1109         * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise
1110         * @throws Exception can be thrown
1111         */
1112        private boolean determineIfRedeliveryIsEnabled() throws Exception {
1113            // determine if redeliver is enabled either on error handler
1114            if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) {
1115                // must check for != 0 as (-1 means redeliver forever)
1116                return true;
1117            }
1118            if (retryWhilePolicy != null) {
1119                return true;
1120            }
1121    
1122            // or on the exception policies
1123            if (!exceptionPolicies.isEmpty()) {
1124                // walk them to see if any of them have a maximum redeliveries > 0 or retry until set
1125                for (OnExceptionDefinition def : exceptionPolicies.values()) {
1126    
1127                    String ref = def.getRedeliveryPolicyRef();
1128                    if (ref != null) {
1129                        // lookup in registry if ref provided
1130                        RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class);
1131                        if (policy.getMaximumRedeliveries() != 0) {
1132                            // must check for != 0 as (-1 means redeliver forever)
1133                            return true;
1134                        }
1135                    } else if (def.getRedeliveryPolicy() != null) {
1136                        Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries());
1137                        if (max != null && max != 0) {
1138                            // must check for != 0 as (-1 means redeliver forever)
1139                            return true;
1140                        }
1141                    }
1142    
1143                    if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) {
1144                        return true;
1145                    }
1146                }
1147            }
1148    
1149            return false;
1150        }
1151    
1152        @Override
1153        protected void doStart() throws Exception {
1154            ServiceHelper.startServices(output, outputAsync, deadLetter);
1155    
1156            // determine if redeliver is enabled or not
1157            redeliveryEnabled = determineIfRedeliveryIsEnabled();
1158            if (log.isDebugEnabled()) {
1159                log.debug("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this);
1160            }
1161    
1162            // we only need thread pool if redelivery is enabled
1163            if (redeliveryEnabled) {
1164                if (executorService == null) {
1165                    // use default shared executor service
1166                    executorService = camelContext.getErrorHandlerExecutorService();
1167                }
1168                if (log.isTraceEnabled()) {
1169                    log.trace("Using ExecutorService: {} for redeliveries on error handler: {}", executorService, this);
1170                }
1171            }
1172    
1173            // reset flag when starting
1174            preparingShutdown = false;
1175        }
1176    
1177        @Override
1178        protected void doStop() throws Exception {
1179            // noop, do not stop any services which we only do when shutting down
1180            // as the error handler can be context scoped, and should not stop in case
1181            // a route stops
1182        }
1183    
1184        @Override
1185        protected void doShutdown() throws Exception {
1186            ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync);
1187        }
1188    }