001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.Callable; 020 import java.util.concurrent.RejectedExecutionException; 021 import java.util.concurrent.ScheduledExecutorService; 022 import java.util.concurrent.TimeUnit; 023 024 import org.apache.camel.AsyncCallback; 025 import org.apache.camel.AsyncProcessor; 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.LoggingLevel; 029 import org.apache.camel.Message; 030 import org.apache.camel.Predicate; 031 import org.apache.camel.Processor; 032 import org.apache.camel.model.OnExceptionDefinition; 033 import org.apache.camel.spi.ExchangeFormatter; 034 import org.apache.camel.spi.ShutdownPrepared; 035 import org.apache.camel.spi.SubUnitOfWorkCallback; 036 import org.apache.camel.spi.UnitOfWork; 037 import org.apache.camel.util.AsyncProcessorConverterHelper; 038 import org.apache.camel.util.AsyncProcessorHelper; 039 import org.apache.camel.util.CamelContextHelper; 040 import org.apache.camel.util.CamelLogger; 041 import org.apache.camel.util.EventHelper; 042 import org.apache.camel.util.ExchangeHelper; 043 import org.apache.camel.util.MessageHelper; 044 import org.apache.camel.util.ObjectHelper; 045 import org.apache.camel.util.ServiceHelper; 046 047 /** 048 * Base redeliverable error handler that also supports a final dead letter queue in case 049 * all redelivery attempts fail. 050 * <p/> 051 * This implementation should contain all the error handling logic and the sub classes 052 * should only configure it according to what they support. 053 * 054 * @version 055 */ 056 public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor, ShutdownPrepared { 057 058 protected ScheduledExecutorService executorService; 059 protected final CamelContext camelContext; 060 protected final Processor deadLetter; 061 protected final String deadLetterUri; 062 protected final Processor output; 063 protected final AsyncProcessor outputAsync; 064 protected final Processor redeliveryProcessor; 065 protected final RedeliveryPolicy redeliveryPolicy; 066 protected final Predicate retryWhilePolicy; 067 protected final CamelLogger logger; 068 protected final boolean useOriginalMessagePolicy; 069 protected boolean redeliveryEnabled; 070 protected volatile boolean preparingShutdown; 071 protected final ExchangeFormatter exchangeFormatter; 072 073 /** 074 * Contains the current redelivery data 075 */ 076 protected class RedeliveryData { 077 Exchange original; 078 boolean sync = true; 079 int redeliveryCounter; 080 long redeliveryDelay; 081 Predicate retryWhilePredicate = retryWhilePolicy; 082 boolean redeliverFromSync; 083 084 // default behavior which can be overloaded on a per exception basis 085 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy; 086 Processor deadLetterProcessor = deadLetter; 087 Processor failureProcessor; 088 Processor onRedeliveryProcessor = redeliveryProcessor; 089 Predicate handledPredicate = getDefaultHandledPredicate(); 090 Predicate continuedPredicate; 091 boolean useOriginalInMessage = useOriginalMessagePolicy; 092 } 093 094 /** 095 * Tasks which performs asynchronous redelivery attempts, and being triggered by a 096 * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task 097 * has to be delayed before a redelivery attempt is performed. 098 */ 099 private class AsyncRedeliveryTask implements Callable<Boolean> { 100 101 private final Exchange exchange; 102 private final AsyncCallback callback; 103 private final RedeliveryData data; 104 105 public AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) { 106 this.exchange = exchange; 107 this.callback = callback; 108 this.data = data; 109 } 110 111 public Boolean call() throws Exception { 112 // prepare for redelivery 113 prepareExchangeForRedelivery(exchange, data); 114 115 // letting onRedeliver be executed at first 116 deliverToOnRedeliveryProcessor(exchange, data); 117 118 if (log.isTraceEnabled()) { 119 log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", new Object[]{exchange.getExchangeId(), outputAsync, exchange}); 120 } 121 122 // emmit event we are doing redelivery 123 EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter); 124 125 // process the exchange (also redelivery) 126 boolean sync; 127 if (data.redeliverFromSync) { 128 // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from 129 // this error handler, which means we have to invoke the callback with false, to have the callback 130 // be notified when we are done 131 sync = outputAsync.process(exchange, new AsyncCallback() { 132 public void done(boolean doneSync) { 133 log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); 134 135 // mark we are in sync mode now 136 data.sync = false; 137 138 // only process if the exchange hasn't failed 139 // and it has not been handled by the error processor 140 if (isDone(exchange)) { 141 callback.done(false); 142 return; 143 } 144 145 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 146 processAsyncErrorHandler(exchange, callback, data); 147 } 148 }); 149 } else { 150 // this redelivery task was scheduled from asynchronous, which means we should only 151 // handle when the asynchronous task was done 152 sync = outputAsync.process(exchange, new AsyncCallback() { 153 public void done(boolean doneSync) { 154 log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); 155 156 // this callback should only handle the async case 157 if (doneSync) { 158 return; 159 } 160 161 // mark we are in async mode now 162 data.sync = false; 163 164 // only process if the exchange hasn't failed 165 // and it has not been handled by the error processor 166 if (isDone(exchange)) { 167 callback.done(doneSync); 168 return; 169 } 170 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 171 processAsyncErrorHandler(exchange, callback, data); 172 } 173 }); 174 } 175 176 return sync; 177 } 178 } 179 180 public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, 181 Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter, 182 String deadLetterUri, boolean useOriginalMessagePolicy, Predicate retryWhile, ScheduledExecutorService executorService) { 183 184 ObjectHelper.notNull(camelContext, "CamelContext", this); 185 ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this); 186 187 this.camelContext = camelContext; 188 this.redeliveryProcessor = redeliveryProcessor; 189 this.deadLetter = deadLetter; 190 this.output = output; 191 this.outputAsync = AsyncProcessorConverterHelper.convert(output); 192 this.redeliveryPolicy = redeliveryPolicy; 193 this.logger = logger; 194 this.deadLetterUri = deadLetterUri; 195 this.useOriginalMessagePolicy = useOriginalMessagePolicy; 196 this.retryWhilePolicy = retryWhile; 197 this.executorService = executorService; 198 199 // setup exchange formatter to be used for message history dump 200 DefaultExchangeFormatter formatter = new DefaultExchangeFormatter(); 201 formatter.setShowExchangeId(true); 202 formatter.setMultiline(true); 203 formatter.setShowHeaders(true); 204 formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed); 205 this.exchangeFormatter = formatter; 206 } 207 208 public boolean supportTransacted() { 209 return false; 210 } 211 212 protected boolean isRunAllowed(RedeliveryData data) { 213 // if camel context is forcing a shutdown then do not allow running 214 boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this); 215 if (forceShutdown) { 216 log.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)"); 217 return false; 218 } 219 220 // redelivery policy can control if redelivery is allowed during stopping/shutdown 221 // but this only applies during a redelivery (counter must > 0) 222 if (data.redeliveryCounter > 0) { 223 if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) { 224 log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)"); 225 return true; 226 } else if (preparingShutdown) { 227 // we are preparing for shutdown, now determine if we can still run 228 boolean answer = isRunAllowedOnPreparingShutdown(); 229 log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer); 230 return answer; 231 } 232 } 233 234 // we cannot run if we are stopping/stopped 235 boolean answer = !isStoppingOrStopped(); 236 log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer); 237 return answer; 238 } 239 240 protected boolean isRunAllowedOnPreparingShutdown() { 241 return false; 242 } 243 244 protected boolean isRedeliveryAllowed(RedeliveryData data) { 245 // redelivery policy can control if redelivery is allowed during stopping/shutdown 246 // but this only applies during a redelivery (counter must > 0) 247 if (data.redeliveryCounter > 0) { 248 boolean stopping = isStoppingOrStopped(); 249 if (!preparingShutdown && !stopping) { 250 log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)"); 251 return true; 252 } else { 253 // we are stopping or preparing to shutdown 254 if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) { 255 log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)"); 256 return true; 257 } else { 258 log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)"); 259 return false; 260 } 261 } 262 } 263 264 return true; 265 } 266 267 @Override 268 public void prepareShutdown(boolean forced) { 269 // prepare for shutdown, eg do not allow redelivery if configured 270 log.trace("Prepare shutdown on error handler {}", this); 271 preparingShutdown = true; 272 } 273 274 public void process(Exchange exchange) throws Exception { 275 if (output == null) { 276 // no output then just return 277 return; 278 } 279 AsyncProcessorHelper.process(this, exchange); 280 } 281 282 /** 283 * Process the exchange using redelivery error handling. 284 */ 285 public boolean process(final Exchange exchange, final AsyncCallback callback) { 286 final RedeliveryData data = new RedeliveryData(); 287 288 // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the 289 // original Exchange is being redelivered, and not a mutated Exchange 290 data.original = defensiveCopyExchangeIfNeeded(exchange); 291 292 // use looping to have redelivery attempts 293 while (true) { 294 295 // can we still run 296 if (!isRunAllowed(data)) { 297 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 298 if (exchange.getException() == null) { 299 exchange.setException(new RejectedExecutionException()); 300 } 301 // we cannot process so invoke callback 302 callback.done(data.sync); 303 return data.sync; 304 } 305 306 // did previous processing cause an exception? 307 boolean handle = shouldHandleException(exchange); 308 if (handle) { 309 handleException(exchange, data); 310 } 311 312 // compute if we are exhausted, and whether redelivery is allowed 313 boolean exhausted = isExhausted(exchange, data); 314 boolean redeliverAllowed = isRedeliveryAllowed(data); 315 316 // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC) 317 if (!redeliverAllowed || exhausted) { 318 Processor target = null; 319 boolean deliver = true; 320 321 // the unit of work may have an optional callback associated we need to leverage 322 SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback(); 323 if (uowCallback != null) { 324 // signal to the callback we are exhausted 325 uowCallback.onExhausted(exchange); 326 // do not deliver to the failure processor as its been handled by the callback instead 327 deliver = false; 328 } 329 330 if (deliver) { 331 // should deliver to failure processor (either from onException or the dead letter channel) 332 target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor; 333 } 334 // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair 335 // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) 336 boolean isDeadLetterChannel = isDeadLetterChannel() && target == data.deadLetterProcessor; 337 boolean sync = deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback); 338 // we are breaking out 339 return sync; 340 } 341 342 if (data.redeliveryCounter > 0) { 343 // calculate delay 344 data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter); 345 346 if (data.redeliveryDelay > 0) { 347 // okay there is a delay so create a scheduled task to have it executed in the future 348 349 if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) { 350 351 // we are doing a redelivery then a thread pool must be configured (see the doStart method) 352 ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this); 353 354 // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to 355 // have it being executed in the future, or immediately 356 // we are continuing asynchronously 357 358 // mark we are routing async from now and that this redelivery task came from a synchronous routing 359 data.sync = false; 360 data.redeliverFromSync = true; 361 AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); 362 363 // schedule the redelivery task 364 if (log.isTraceEnabled()) { 365 log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId()); 366 } 367 executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS); 368 369 return false; 370 } else { 371 // async delayed redelivery was disabled or we are transacted so we must be synchronous 372 // as the transaction manager requires to execute in the same thread context 373 try { 374 data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); 375 } catch (InterruptedException e) { 376 // we was interrupted so break out 377 exchange.setException(e); 378 // mark the exchange to stop continue routing when interrupted 379 // as we do not want to continue routing (for example a task has been cancelled) 380 exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); 381 callback.done(data.sync); 382 return data.sync; 383 } 384 } 385 } 386 387 // prepare for redelivery 388 prepareExchangeForRedelivery(exchange, data); 389 390 // letting onRedeliver be executed 391 deliverToOnRedeliveryProcessor(exchange, data); 392 393 // emmit event we are doing redelivery 394 EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter); 395 } 396 397 // process the exchange (also redelivery) 398 boolean sync = outputAsync.process(exchange, new AsyncCallback() { 399 public void done(boolean sync) { 400 // this callback should only handle the async case 401 if (sync) { 402 return; 403 } 404 405 // mark we are in async mode now 406 data.sync = false; 407 408 // if we are done then notify callback and exit 409 if (isDone(exchange)) { 410 callback.done(sync); 411 return; 412 } 413 414 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 415 // method which takes care of this in a asynchronous manner 416 processAsyncErrorHandler(exchange, callback, data); 417 } 418 }); 419 420 if (!sync) { 421 // the remainder of the Exchange is being processed asynchronously so we should return 422 return false; 423 } 424 // we continue to route synchronously 425 426 // if we are done then notify callback and exit 427 boolean done = isDone(exchange); 428 if (done) { 429 callback.done(true); 430 return true; 431 } 432 433 // error occurred so loop back around..... 434 } 435 } 436 437 /** 438 * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY} 439 * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p> 440 * 441 * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay} 442 * and {@link RedeliveryData#redeliveryCounter} are copied in.</p> 443 * 444 * @param exchange The current exchange in question. 445 * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation. 446 * @param redeliveryDelay The default redelivery delay from RedeliveryData 447 * @param redeliveryCounter The redeliveryCounter 448 * @return The time to wait before the next redelivery. 449 */ 450 protected long determineRedeliveryDelay(Exchange exchange, RedeliveryPolicy redeliveryPolicy, long redeliveryDelay, int redeliveryCounter) { 451 Message message = exchange.getIn(); 452 Long delay = message.getHeader(Exchange.REDELIVERY_DELAY, Long.class); 453 if (delay == null) { 454 delay = redeliveryPolicy.calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter); 455 log.debug("Redelivery delay calculated as {}", delay); 456 } else { 457 log.debug("Redelivery delay is {} from Message Header [{}]", delay, Exchange.REDELIVERY_DELAY); 458 } 459 return delay; 460 } 461 462 /** 463 * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback. 464 * <p/> 465 * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use 466 * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b> 467 * in terms of logic. 468 */ 469 protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 470 // can we still run 471 if (!isRunAllowed(data)) { 472 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 473 if (exchange.getException() == null) { 474 exchange.setException(new RejectedExecutionException()); 475 } 476 callback.done(data.sync); 477 return; 478 } 479 480 // did previous processing cause an exception? 481 boolean handle = shouldHandleException(exchange); 482 if (handle) { 483 handleException(exchange, data); 484 } 485 486 // compute if we are exhausted or not 487 boolean exhausted = isExhausted(exchange, data); 488 if (exhausted) { 489 Processor target = null; 490 boolean deliver = true; 491 492 // the unit of work may have an optional callback associated we need to leverage 493 UnitOfWork uow = exchange.getUnitOfWork(); 494 if (uow != null) { 495 SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback(); 496 if (uowCallback != null) { 497 // signal to the callback we are exhausted 498 uowCallback.onExhausted(exchange); 499 // do not deliver to the failure processor as its been handled by the callback instead 500 deliver = false; 501 } 502 } 503 504 if (deliver) { 505 // should deliver to failure processor (either from onException or the dead letter channel) 506 target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor; 507 } 508 // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair 509 // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) 510 boolean isDeadLetterChannel = isDeadLetterChannel() && target == data.deadLetterProcessor; 511 deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback); 512 // we are breaking out 513 return; 514 } 515 516 if (data.redeliveryCounter > 0) { 517 // we are doing a redelivery then a thread pool must be configured (see the doStart method) 518 ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this); 519 520 // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to 521 // have it being executed in the future, or immediately 522 // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously 523 // to ensure the callback will continue routing from where we left 524 AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); 525 526 // calculate the redelivery delay 527 data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter); 528 if (data.redeliveryDelay > 0) { 529 // schedule the redelivery task 530 if (log.isTraceEnabled()) { 531 log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId()); 532 } 533 executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS); 534 } else { 535 // execute the task immediately 536 executorService.submit(task); 537 } 538 } 539 } 540 541 /** 542 * Performs a defensive copy of the exchange if needed 543 * 544 * @param exchange the exchange 545 * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled). 546 */ 547 protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) { 548 // only do a defensive copy if redelivery is enabled 549 if (redeliveryEnabled) { 550 return ExchangeHelper.createCopy(exchange, true); 551 } else { 552 return null; 553 } 554 } 555 556 /** 557 * Strategy whether the exchange has an exception that we should try to handle. 558 * <p/> 559 * Standard implementations should just look for an exception. 560 */ 561 protected boolean shouldHandleException(Exchange exchange) { 562 return exchange.getException() != null; 563 } 564 565 /** 566 * Strategy to determine if the exchange is done so we can continue 567 */ 568 protected boolean isDone(Exchange exchange) { 569 boolean answer = isCancelledOrInterrupted(exchange); 570 571 // only done if the exchange hasn't failed 572 // and it has not been handled by the failure processor 573 // or we are exhausted 574 if (!answer) { 575 answer = exchange.getException() == null 576 || ExchangeHelper.isFailureHandled(exchange) 577 || ExchangeHelper.isRedeliveryExhausted(exchange); 578 } 579 580 log.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer); 581 return answer; 582 } 583 584 /** 585 * Strategy to determine if the exchange was cancelled or interrupted 586 */ 587 protected boolean isCancelledOrInterrupted(Exchange exchange) { 588 boolean answer = false; 589 590 if (ExchangeHelper.isInterrupted(exchange)) { 591 // mark the exchange to stop continue routing when interrupted 592 // as we do not want to continue routing (for example a task has been cancelled) 593 exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); 594 answer = true; 595 } 596 597 log.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer); 598 return answer; 599 } 600 601 /** 602 * Returns the output processor 603 */ 604 public Processor getOutput() { 605 return output; 606 } 607 608 /** 609 * Returns the dead letter that message exchanges will be sent to if the 610 * redelivery attempts fail 611 */ 612 public Processor getDeadLetter() { 613 return deadLetter; 614 } 615 616 public String getDeadLetterUri() { 617 return deadLetterUri; 618 } 619 620 public boolean isUseOriginalMessagePolicy() { 621 return useOriginalMessagePolicy; 622 } 623 624 public RedeliveryPolicy getRedeliveryPolicy() { 625 return redeliveryPolicy; 626 } 627 628 public CamelLogger getLogger() { 629 return logger; 630 } 631 632 protected Predicate getDefaultHandledPredicate() { 633 // Default is not not handle errors 634 return null; 635 } 636 637 protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data) { 638 Exception caught = exchange.getException(); 639 640 // we continue so clear any exceptions 641 exchange.setException(null); 642 // clear rollback flags 643 exchange.setProperty(Exchange.ROLLBACK_ONLY, null); 644 // reset cached streams so they can be read again 645 MessageHelper.resetStreamCache(exchange.getIn()); 646 647 // its continued then remove traces of redelivery attempted and caught exception 648 exchange.getIn().removeHeader(Exchange.REDELIVERED); 649 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); 650 exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); 651 exchange.removeProperty(Exchange.FAILURE_HANDLED); 652 // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception 653 654 // create log message 655 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); 656 msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; 657 msg = msg + ". Handled and continue routing."; 658 659 // log that we failed but want to continue 660 logFailedDelivery(false, false, true, exchange, msg, data, null); 661 } 662 663 protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) { 664 if (!redeliveryEnabled) { 665 throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly."); 666 } 667 // there must be a defensive copy of the exchange 668 ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this); 669 670 // okay we will give it another go so clear the exception so we can try again 671 exchange.setException(null); 672 673 // clear rollback flags 674 exchange.setProperty(Exchange.ROLLBACK_ONLY, null); 675 676 // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange 677 // and then put these on the exchange when doing a redelivery / fault processor 678 679 // preserve these headers 680 Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 681 Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class); 682 Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class); 683 684 // we are redelivering so copy from original back to exchange 685 exchange.getIn().copyFrom(data.original.getIn()); 686 exchange.setOut(null); 687 // reset cached streams so they can be read again 688 MessageHelper.resetStreamCache(exchange.getIn()); 689 690 // put back headers 691 if (redeliveryCounter != null) { 692 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter); 693 } 694 if (redeliveryMaxCounter != null) { 695 exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter); 696 } 697 if (redelivered != null) { 698 exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered); 699 } 700 } 701 702 protected void handleException(Exchange exchange, RedeliveryData data) { 703 Exception e = exchange.getException(); 704 705 // store the original caused exception in a property, so we can restore it later 706 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); 707 708 // find the error handler to use (if any) 709 OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e); 710 if (exceptionPolicy != null) { 711 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy); 712 data.handledPredicate = exceptionPolicy.getHandledPolicy(); 713 data.continuedPredicate = exceptionPolicy.getContinuedPolicy(); 714 data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy(); 715 data.useOriginalInMessage = exceptionPolicy.isUseOriginalMessage(); 716 717 // route specific failure handler? 718 Processor processor = null; 719 UnitOfWork uow = exchange.getUnitOfWork(); 720 if (uow != null && uow.getRouteContext() != null) { 721 String routeId = uow.getRouteContext().getRoute().getId(); 722 processor = exceptionPolicy.getErrorHandler(routeId); 723 } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) { 724 // note this should really not happen, but we have this code as a fail safe 725 // to be backwards compatible with the old behavior 726 log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId()); 727 processor = exceptionPolicy.getErrorHandlers().iterator().next(); 728 } 729 if (processor != null) { 730 data.failureProcessor = processor; 731 } 732 733 // route specific on redelivery? 734 processor = exceptionPolicy.getOnRedelivery(); 735 if (processor != null) { 736 data.onRedeliveryProcessor = processor; 737 } 738 } 739 740 // only log if not failure handled or not an exhausted unit of work 741 if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) { 742 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange) 743 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e; 744 logFailedDelivery(true, false, false, exchange, msg, data, e); 745 } 746 747 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data); 748 } 749 750 /** 751 * Gives an optional configure redelivery processor a chance to process before the Exchange 752 * will be redelivered. This can be used to alter the Exchange. 753 */ 754 protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) { 755 if (data.onRedeliveryProcessor == null) { 756 return; 757 } 758 759 if (log.isTraceEnabled()) { 760 log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered", 761 data.onRedeliveryProcessor, exchange); 762 } 763 764 // run this synchronously as its just a Processor 765 try { 766 data.onRedeliveryProcessor.process(exchange); 767 } catch (Throwable e) { 768 exchange.setException(e); 769 } 770 log.trace("Redelivery processor done"); 771 } 772 773 /** 774 * All redelivery attempts failed so move the exchange to the dead letter queue 775 */ 776 protected boolean deliverToFailureProcessor(final Processor processor, final boolean isDeadLetterChannel, final Exchange exchange, 777 final RedeliveryData data, final AsyncCallback callback) { 778 boolean sync = true; 779 780 Exception caught = exchange.getException(); 781 782 // we did not success with the redelivery so now we let the failure processor handle it 783 // clear exception as we let the failure processor handle it 784 exchange.setException(null); 785 786 // always handle if dead letter channel 787 final boolean shouldHandle = isDeadLetterChannel || shouldHandled(exchange, data); 788 final boolean shouldContinue = shouldContinue(exchange, data); 789 // regard both handled or continued as being handled 790 boolean handled = false; 791 792 if (shouldHandle || shouldContinue) { 793 // its handled then remove traces of redelivery attempted 794 exchange.getIn().removeHeader(Exchange.REDELIVERED); 795 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); 796 exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); 797 exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); 798 799 // and remove traces of rollback only and uow exhausted markers 800 exchange.removeProperty(Exchange.ROLLBACK_ONLY); 801 exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED); 802 803 handled = true; 804 } else { 805 // must decrement the redelivery counter as we didn't process the redelivery but is 806 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync 807 decrementRedeliveryCounter(exchange); 808 } 809 810 // is the a failure processor to process the Exchange 811 if (processor != null) { 812 813 // prepare original IN body if it should be moved instead of current body 814 if (data.useOriginalInMessage) { 815 log.trace("Using the original IN message instead of current"); 816 Message original = exchange.getUnitOfWork().getOriginalInMessage(); 817 exchange.setIn(original); 818 if (exchange.hasOut()) { 819 log.trace("Removing the out message to avoid some uncertain behavior"); 820 exchange.setOut(null); 821 } 822 } 823 824 // reset cached streams so they can be read again 825 MessageHelper.resetStreamCache(exchange.getIn()); 826 827 log.trace("Failure processor {} is processing Exchange: {}", processor, exchange); 828 829 // store the last to endpoint as the failure endpoint 830 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 831 // and store the route id so we know in which route we failed 832 UnitOfWork uow = exchange.getUnitOfWork(); 833 if (uow != null && uow.getRouteContext() != null) { 834 exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId()); 835 } 836 837 // the failure processor could also be asynchronous 838 AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor); 839 sync = afp.process(exchange, new AsyncCallback() { 840 public void done(boolean sync) { 841 log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange); 842 try { 843 prepareExchangeAfterFailure(exchange, data, shouldHandle, shouldContinue); 844 // fire event as we had a failure processor to handle it, which there is a event for 845 boolean deadLetterChannel = processor == data.deadLetterProcessor && data.deadLetterProcessor != null; 846 EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel); 847 } finally { 848 // if the fault was handled asynchronously, this should be reflected in the callback as well 849 data.sync &= sync; 850 callback.done(data.sync); 851 } 852 } 853 }); 854 } else { 855 try { 856 // no processor but we need to prepare after failure as well 857 prepareExchangeAfterFailure(exchange, data, shouldHandle, shouldContinue); 858 } finally { 859 // callback we are done 860 callback.done(data.sync); 861 } 862 } 863 864 // create log message 865 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); 866 msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; 867 if (processor != null) { 868 msg = msg + ". Processed by failure processor: " + processor; 869 } 870 871 // log that we failed delivery as we are exhausted 872 logFailedDelivery(false, handled, false, exchange, msg, data, null); 873 874 return sync; 875 } 876 877 protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data, 878 final boolean shouldHandle, final boolean shouldContinue) { 879 // we could not process the exchange so we let the failure processor handled it 880 ExchangeHelper.setFailureHandled(exchange); 881 882 // honor if already set a handling 883 boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null; 884 if (alreadySet) { 885 boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class); 886 log.trace("This exchange has already been marked for handling: {}", handled); 887 if (handled) { 888 exchange.setException(null); 889 } else { 890 // exception not handled, put exception back in the exchange 891 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 892 // and put failure endpoint back as well 893 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 894 } 895 return; 896 } 897 898 if (shouldHandle) { 899 log.trace("This exchange is handled so its marked as not failed: {}", exchange); 900 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE); 901 } else if (shouldContinue) { 902 log.trace("This exchange is continued: {}", exchange); 903 // okay we want to continue then prepare the exchange for that as well 904 prepareExchangeForContinue(exchange, data); 905 } else { 906 log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange); 907 // exception not handled, put exception back in the exchange 908 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE); 909 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 910 // and put failure endpoint back as well 911 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 912 // and store the route id so we know in which route we failed 913 UnitOfWork uow = exchange.getUnitOfWork(); 914 if (uow != null && uow.getRouteContext() != null) { 915 exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId()); 916 } 917 } 918 } 919 920 private void logFailedDelivery(boolean shouldRedeliver, boolean handled, boolean continued, Exchange exchange, String message, RedeliveryData data, Throwable e) { 921 if (logger == null) { 922 return; 923 } 924 925 if (!exchange.isRollbackOnly()) { 926 // if we should not rollback, then check whether logging is enabled 927 if (handled && !data.currentRedeliveryPolicy.isLogHandled()) { 928 // do not log handled 929 return; 930 } 931 932 if (continued && !data.currentRedeliveryPolicy.isLogContinued()) { 933 // do not log handled 934 return; 935 } 936 937 if (shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) { 938 // do not log retry attempts 939 return; 940 } 941 942 if (!shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) { 943 // do not log exhausted 944 return; 945 } 946 } 947 948 LoggingLevel newLogLevel; 949 boolean logStackTrace; 950 if (exchange.isRollbackOnly()) { 951 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 952 logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace(); 953 } else if (shouldRedeliver) { 954 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel(); 955 logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace(); 956 } else { 957 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 958 logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace(); 959 } 960 if (e == null) { 961 e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); 962 } 963 964 if (exchange.isRollbackOnly()) { 965 String msg = "Rollback " + ExchangeHelper.logIds(exchange); 966 Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class); 967 if (cause != null) { 968 msg = msg + " due: " + cause.getMessage(); 969 } 970 971 // should we include message history 972 if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) { 973 String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, exchangeFormatter, false); 974 if (routeStackTrace != null) { 975 msg = msg + "\n" + routeStackTrace; 976 } 977 } 978 979 if (newLogLevel == LoggingLevel.ERROR) { 980 // log intended rollback on maximum WARN level (no ERROR) 981 logger.log(msg, LoggingLevel.WARN); 982 } else { 983 // otherwise use the desired logging level 984 logger.log(msg, newLogLevel); 985 } 986 } else { 987 String msg = message; 988 // should we include message history 989 if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) { 990 String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, exchangeFormatter, e != null && logStackTrace); 991 if (routeStackTrace != null) { 992 msg = msg + "\n" + routeStackTrace; 993 } 994 } 995 996 if (e != null && logStackTrace) { 997 logger.log(msg, e, newLogLevel); 998 } else { 999 logger.log(msg, newLogLevel); 1000 } 1001 } 1002 } 1003 1004 /** 1005 * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback). 1006 * <p/> 1007 * If the exchange is exhausted, then we will not continue processing, but let the 1008 * failure processor deal with the exchange. 1009 * 1010 * @param exchange the current exchange 1011 * @param data the redelivery data 1012 * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust. 1013 */ 1014 private boolean isExhausted(Exchange exchange, RedeliveryData data) { 1015 // if marked as rollback only then do not continue/redeliver 1016 boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class); 1017 if (exhausted) { 1018 log.trace("This exchange is marked as redelivery exhausted: {}", exchange); 1019 return true; 1020 } 1021 1022 // if marked as rollback only then do not continue/redeliver 1023 boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class); 1024 if (rollbackOnly) { 1025 log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange); 1026 return true; 1027 } 1028 // its the first original call so continue 1029 if (data.redeliveryCounter == 0) { 1030 return false; 1031 } 1032 // its a potential redelivery so determine if we should redeliver or not 1033 boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate); 1034 return !redeliver; 1035 } 1036 1037 /** 1038 * Determines whether or not to continue if we are exhausted. 1039 * 1040 * @param exchange the current exchange 1041 * @param data the redelivery data 1042 * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust. 1043 */ 1044 private boolean shouldContinue(Exchange exchange, RedeliveryData data) { 1045 if (data.continuedPredicate != null) { 1046 return data.continuedPredicate.matches(exchange); 1047 } 1048 // do not continue by default 1049 return false; 1050 } 1051 1052 /** 1053 * Determines whether or not to handle if we are exhausted. 1054 * 1055 * @param exchange the current exchange 1056 * @param data the redelivery data 1057 * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust. 1058 */ 1059 private boolean shouldHandled(Exchange exchange, RedeliveryData data) { 1060 if (data.handledPredicate != null) { 1061 return data.handledPredicate.matches(exchange); 1062 } 1063 // do not handle by default 1064 return false; 1065 } 1066 1067 /** 1068 * Increments the redelivery counter and adds the redelivered flag if the 1069 * message has been redelivered 1070 */ 1071 private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) { 1072 Message in = exchange.getIn(); 1073 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 1074 int next = 1; 1075 if (counter != null) { 1076 next = counter + 1; 1077 } 1078 in.setHeader(Exchange.REDELIVERY_COUNTER, next); 1079 in.setHeader(Exchange.REDELIVERED, Boolean.TRUE); 1080 // if maximum redeliveries is used, then provide that information as well 1081 if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) { 1082 in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries()); 1083 } 1084 return next; 1085 } 1086 1087 /** 1088 * Prepares the redelivery counter and boolean flag for the failure handle processor 1089 */ 1090 private void decrementRedeliveryCounter(Exchange exchange) { 1091 Message in = exchange.getIn(); 1092 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 1093 if (counter != null) { 1094 int prev = counter - 1; 1095 in.setHeader(Exchange.REDELIVERY_COUNTER, prev); 1096 // set boolean flag according to counter 1097 in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE); 1098 } else { 1099 // not redelivered 1100 in.setHeader(Exchange.REDELIVERY_COUNTER, 0); 1101 in.setHeader(Exchange.REDELIVERED, Boolean.FALSE); 1102 } 1103 } 1104 1105 /** 1106 * Determines if redelivery is enabled by checking if any of the redelivery policy 1107 * settings may allow redeliveries. 1108 * 1109 * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise 1110 * @throws Exception can be thrown 1111 */ 1112 private boolean determineIfRedeliveryIsEnabled() throws Exception { 1113 // determine if redeliver is enabled either on error handler 1114 if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) { 1115 // must check for != 0 as (-1 means redeliver forever) 1116 return true; 1117 } 1118 if (retryWhilePolicy != null) { 1119 return true; 1120 } 1121 1122 // or on the exception policies 1123 if (!exceptionPolicies.isEmpty()) { 1124 // walk them to see if any of them have a maximum redeliveries > 0 or retry until set 1125 for (OnExceptionDefinition def : exceptionPolicies.values()) { 1126 1127 String ref = def.getRedeliveryPolicyRef(); 1128 if (ref != null) { 1129 // lookup in registry if ref provided 1130 RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class); 1131 if (policy.getMaximumRedeliveries() != 0) { 1132 // must check for != 0 as (-1 means redeliver forever) 1133 return true; 1134 } 1135 } else if (def.getRedeliveryPolicy() != null) { 1136 Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries()); 1137 if (max != null && max != 0) { 1138 // must check for != 0 as (-1 means redeliver forever) 1139 return true; 1140 } 1141 } 1142 1143 if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) { 1144 return true; 1145 } 1146 } 1147 } 1148 1149 return false; 1150 } 1151 1152 @Override 1153 protected void doStart() throws Exception { 1154 ServiceHelper.startServices(output, outputAsync, deadLetter); 1155 1156 // determine if redeliver is enabled or not 1157 redeliveryEnabled = determineIfRedeliveryIsEnabled(); 1158 if (log.isDebugEnabled()) { 1159 log.debug("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this); 1160 } 1161 1162 // we only need thread pool if redelivery is enabled 1163 if (redeliveryEnabled) { 1164 if (executorService == null) { 1165 // use default shared executor service 1166 executorService = camelContext.getErrorHandlerExecutorService(); 1167 } 1168 if (log.isTraceEnabled()) { 1169 log.trace("Using ExecutorService: {} for redeliveries on error handler: {}", executorService, this); 1170 } 1171 } 1172 1173 // reset flag when starting 1174 preparingShutdown = false; 1175 } 1176 1177 @Override 1178 protected void doStop() throws Exception { 1179 // noop, do not stop any services which we only do when shutting down 1180 // as the error handler can be context scoped, and should not stop in case 1181 // a route stops 1182 } 1183 1184 @Override 1185 protected void doShutdown() throws Exception { 1186 ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync); 1187 } 1188 }