001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.ArrayList; 020 import java.util.Date; 021 import java.util.List; 022 import java.util.Queue; 023 import java.util.concurrent.RejectedExecutionException; 024 025 import org.apache.camel.AsyncCallback; 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.MessageHistory; 029 import org.apache.camel.Processor; 030 import org.apache.camel.Route; 031 import org.apache.camel.StatefulService; 032 import org.apache.camel.StreamCache; 033 import org.apache.camel.api.management.PerformanceCounter; 034 import org.apache.camel.impl.DefaultMessageHistory; 035 import org.apache.camel.management.DelegatePerformanceCounter; 036 import org.apache.camel.management.mbean.ManagedPerformanceCounter; 037 import org.apache.camel.model.ProcessorDefinition; 038 import org.apache.camel.model.ProcessorDefinitionHelper; 039 import org.apache.camel.processor.interceptor.BacklogDebugger; 040 import org.apache.camel.processor.interceptor.BacklogTracer; 041 import org.apache.camel.processor.interceptor.DefaultBacklogTracerEventMessage; 042 import org.apache.camel.spi.InflightRepository; 043 import org.apache.camel.spi.RouteContext; 044 import org.apache.camel.spi.RoutePolicy; 045 import org.apache.camel.spi.StreamCachingStrategy; 046 import org.apache.camel.spi.UnitOfWork; 047 import org.apache.camel.util.MessageHelper; 048 import org.apache.camel.util.StopWatch; 049 import org.apache.camel.util.UnitOfWorkHelper; 050 import org.slf4j.Logger; 051 import org.slf4j.LoggerFactory; 052 053 /** 054 * Internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as: 055 * <ul> 056 * <li>Execute {@link UnitOfWork}</li> 057 * <li>Keeping track which route currently is being routed</li> 058 * <li>Execute {@link RoutePolicy}</li> 059 * <li>Gather JMX performance statics</li> 060 * <li>Tracing</li> 061 * <li>Debugging</li> 062 * <li>Message History</li> 063 * <li>Stream Caching</li> 064 * </ul> 065 * ... and more. 066 * <p/> 067 * This implementation executes this cross cutting functionality as a {@link CamelInternalProcessorAdvice} advice (before and after advice) 068 * by executing the {@link CamelInternalProcessorAdvice#before(org.apache.camel.Exchange)} and 069 * {@link CamelInternalProcessorAdvice#after(org.apache.camel.Exchange, Object)} callbacks in correct order during routing. 070 * This reduces number of stack frames needed during routing, and reduce the number of lines in stacktraces, as well 071 * makes debugging the routing engine easier for end users. 072 * <p/> 073 * <b>Debugging tips:</b> Camel end users whom want to debug their Camel applications with the Camel source code, then make sure to 074 * read the source code of this class about the debugging tips, which you can find in the 075 * {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method. 076 */ 077 public class CamelInternalProcessor extends DelegateAsyncProcessor { 078 079 private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class); 080 private final List<CamelInternalProcessorAdvice> advices = new ArrayList<CamelInternalProcessorAdvice>(); 081 082 public CamelInternalProcessor() { 083 } 084 085 public CamelInternalProcessor(Processor processor) { 086 super(processor); 087 } 088 089 /** 090 * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor. 091 * 092 * @param advice the advice to add 093 */ 094 public void addAdvice(CamelInternalProcessorAdvice advice) { 095 advices.add(advice); 096 } 097 098 /** 099 * Gets the advice with the given type. 100 * 101 * @param type the type of the advice 102 * @return the advice if exists, or <tt>null</tt> if no advices has been added with the given type. 103 */ 104 public <T> T getAdvice(Class<T> type) { 105 for (CamelInternalProcessorAdvice task : advices) { 106 if (type.isInstance(task)) { 107 return type.cast(task); 108 } 109 } 110 return null; 111 } 112 113 @Override 114 public boolean process(Exchange exchange, AsyncCallback callback) { 115 // ---------------------------------------------------------- 116 // CAMEL END USER - READ ME FOR DEBUGGING TIPS 117 // ---------------------------------------------------------- 118 // If you want to debug the Camel routing engine, then there is a lot of internal functionality 119 // the routing engine executes during routing messages. You can skip debugging this internal 120 // functionality and instead debug where the routing engine continues routing to the next node 121 // in the routes. The CamelInternalProcessor is a vital part of the routing engine, as its 122 // being used in between the nodes. As an end user you can just debug the code in this class 123 // in between the: 124 // CAMEL END USER - DEBUG ME HERE +++ START +++ 125 // CAMEL END USER - DEBUG ME HERE +++ END +++ 126 // you can see in the code below. 127 // ---------------------------------------------------------- 128 129 130 if (processor == null || !continueProcessing(exchange)) { 131 // no processor or we should not continue then we are done 132 callback.done(true); 133 return true; 134 } 135 136 final List<Object> states = new ArrayList<Object>(advices.size()); 137 for (CamelInternalProcessorAdvice task : advices) { 138 try { 139 Object state = task.before(exchange); 140 states.add(state); 141 } catch (Throwable e) { 142 exchange.setException(e); 143 callback.done(true); 144 return true; 145 } 146 } 147 148 // create internal callback which will execute the advices in reverse order when done 149 callback = new InternalCallback(states, exchange, callback); 150 151 // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0 152 Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC); 153 if (exchange.isTransacted() || synchronous != null) { 154 // must be synchronized for transacted exchanges 155 if (LOG.isTraceEnabled()) { 156 if (exchange.isTransacted()) { 157 LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); 158 } else { 159 LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); 160 } 161 } 162 // ---------------------------------------------------------- 163 // CAMEL END USER - DEBUG ME HERE +++ START +++ 164 // ---------------------------------------------------------- 165 try { 166 processor.process(exchange); 167 } catch (Throwable e) { 168 exchange.setException(e); 169 } 170 // ---------------------------------------------------------- 171 // CAMEL END USER - DEBUG ME HERE +++ END +++ 172 // ---------------------------------------------------------- 173 callback.done(true); 174 return true; 175 } else { 176 final UnitOfWork uow = exchange.getUnitOfWork(); 177 178 // allow unit of work to wrap callback in case it need to do some special work 179 // for example the MDCUnitOfWork 180 AsyncCallback async = callback; 181 if (uow != null) { 182 async = uow.beforeProcess(processor, exchange, callback); 183 } 184 185 // ---------------------------------------------------------- 186 // CAMEL END USER - DEBUG ME HERE +++ START +++ 187 // ---------------------------------------------------------- 188 if (LOG.isTraceEnabled()) { 189 LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); 190 } 191 boolean sync = processor.process(exchange, async); 192 // ---------------------------------------------------------- 193 // CAMEL END USER - DEBUG ME HERE +++ END +++ 194 // ---------------------------------------------------------- 195 196 // execute any after processor work (in current thread, not in the callback) 197 if (uow != null) { 198 uow.afterProcess(processor, exchange, callback, sync); 199 } 200 201 if (LOG.isTraceEnabled()) { 202 LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}", 203 new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange}); 204 } 205 return sync; 206 } 207 } 208 209 @Override 210 public String toString() { 211 return processor != null ? processor.toString() : super.toString(); 212 } 213 214 /** 215 * Internal callback that executes the after advices. 216 */ 217 private final class InternalCallback implements AsyncCallback { 218 219 private final List<Object> states; 220 private final Exchange exchange; 221 private final AsyncCallback callback; 222 223 private InternalCallback(List<Object> states, Exchange exchange, AsyncCallback callback) { 224 this.states = states; 225 this.exchange = exchange; 226 this.callback = callback; 227 } 228 229 @Override 230 public void done(boolean doneSync) { 231 // NOTE: if you are debugging Camel routes, then all the code in the for loop below is internal only 232 // so you can step straight to the finally block and invoke the callback 233 234 // we should call after in reverse order 235 try { 236 for (int i = advices.size() - 1; i >= 0; i--) { 237 CamelInternalProcessorAdvice task = advices.get(i); 238 Object state = states.get(i); 239 try { 240 task.after(exchange, state); 241 } catch (Exception e) { 242 exchange.setException(e); 243 // allow all advices to complete even if there was an exception 244 } 245 } 246 } finally { 247 // ---------------------------------------------------------- 248 // CAMEL END USER - DEBUG ME HERE +++ START +++ 249 // ---------------------------------------------------------- 250 // callback must be called 251 callback.done(doneSync); 252 // ---------------------------------------------------------- 253 // CAMEL END USER - DEBUG ME HERE +++ END +++ 254 // ---------------------------------------------------------- 255 } 256 } 257 } 258 259 /** 260 * Strategy to determine if we should continue processing the {@link Exchange}. 261 */ 262 protected boolean continueProcessing(Exchange exchange) { 263 Object stop = exchange.getProperty(Exchange.ROUTE_STOP); 264 if (stop != null) { 265 boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); 266 if (doStop) { 267 LOG.debug("Exchange is marked to stop routing: {}", exchange); 268 return false; 269 } 270 } 271 272 // determine if we can still run, or the camel context is forcing a shutdown 273 boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown(this); 274 if (forceShutdown) { 275 LOG.debug("Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: {}", exchange); 276 if (exchange.getException() == null) { 277 exchange.setException(new RejectedExecutionException()); 278 } 279 return false; 280 } 281 282 // yes we can continue 283 return true; 284 } 285 286 /** 287 * Advice for JMX instrumentation of the process being invoked. 288 * <p/> 289 * This advice keeps track of JMX metrics for performance statistics. 290 * <p/> 291 * The current implementation of this advice is only used for route level statistics. For processor levels 292 * they are still wrapped in the route processor chains. 293 */ 294 public static class InstrumentationAdvice implements CamelInternalProcessorAdvice<StopWatch> { 295 296 private PerformanceCounter counter; 297 private String type; 298 299 public InstrumentationAdvice(String type) { 300 this.type = type; 301 } 302 303 public void setCounter(Object counter) { 304 ManagedPerformanceCounter mpc = null; 305 if (counter instanceof ManagedPerformanceCounter) { 306 mpc = (ManagedPerformanceCounter) counter; 307 } 308 309 if (this.counter instanceof DelegatePerformanceCounter) { 310 ((DelegatePerformanceCounter) this.counter).setCounter(mpc); 311 } else if (mpc != null) { 312 this.counter = mpc; 313 } else if (counter instanceof PerformanceCounter) { 314 this.counter = (PerformanceCounter) counter; 315 } 316 } 317 318 protected void recordTime(Exchange exchange, long duration) { 319 if (LOG.isTraceEnabled()) { 320 LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type != null ? type + ": " : "", duration, exchange}); 321 } 322 323 if (!exchange.isFailed() && exchange.getException() == null) { 324 counter.completedExchange(exchange, duration); 325 } else { 326 counter.failedExchange(exchange); 327 } 328 } 329 330 public String getType() { 331 return type; 332 } 333 334 public void setType(String type) { 335 this.type = type; 336 } 337 338 @Override 339 public StopWatch before(Exchange exchange) throws Exception { 340 // only record time if stats is enabled 341 return (counter != null && counter.isStatisticsEnabled()) ? new StopWatch() : null; 342 } 343 344 @Override 345 public void after(Exchange exchange, StopWatch watch) throws Exception { 346 // record end time 347 if (watch != null) { 348 recordTime(exchange, watch.stop()); 349 } 350 } 351 } 352 353 /** 354 * Advice to inject the current {@link RouteContext} into the {@link UnitOfWork} on the {@link Exchange} 355 */ 356 public static class RouteContextAdvice implements CamelInternalProcessorAdvice<UnitOfWork> { 357 358 private final RouteContext routeContext; 359 360 public RouteContextAdvice(RouteContext routeContext) { 361 this.routeContext = routeContext; 362 } 363 364 @Override 365 public UnitOfWork before(Exchange exchange) throws Exception { 366 // push the current route context 367 final UnitOfWork unitOfWork = exchange.getUnitOfWork(); 368 if (unitOfWork != null) { 369 unitOfWork.pushRouteContext(routeContext); 370 } 371 return unitOfWork; 372 } 373 374 @Override 375 public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception { 376 if (unitOfWork != null) { 377 unitOfWork.popRouteContext(); 378 } 379 } 380 } 381 382 /** 383 * Advice to keep the {@link InflightRepository} up to date. 384 */ 385 public static class RouteInflightRepositoryAdvice implements CamelInternalProcessorAdvice { 386 387 private final InflightRepository inflightRepository; 388 private final String id; 389 390 public RouteInflightRepositoryAdvice(InflightRepository inflightRepository, String id) { 391 this.inflightRepository = inflightRepository; 392 this.id = id; 393 } 394 395 @Override 396 public Object before(Exchange exchange) throws Exception { 397 inflightRepository.add(exchange, id); 398 return null; 399 } 400 401 @Override 402 public void after(Exchange exchange, Object state) throws Exception { 403 inflightRepository.remove(exchange, id); 404 } 405 } 406 407 /** 408 * Advice to execute any {@link RoutePolicy} a route may have been configured with. 409 */ 410 public static class RoutePolicyAdvice implements CamelInternalProcessorAdvice { 411 412 private final List<RoutePolicy> routePolicies; 413 private Route route; 414 415 public RoutePolicyAdvice(List<RoutePolicy> routePolicies) { 416 this.routePolicies = routePolicies; 417 } 418 419 public void setRoute(Route route) { 420 this.route = route; 421 } 422 423 /** 424 * Strategy to determine if this policy is allowed to run 425 * 426 * @param policy the policy 427 * @return <tt>true</tt> to run 428 */ 429 protected boolean isRoutePolicyRunAllowed(RoutePolicy policy) { 430 if (policy instanceof StatefulService) { 431 StatefulService ss = (StatefulService) policy; 432 return ss.isRunAllowed(); 433 } 434 return true; 435 } 436 437 @Override 438 public Object before(Exchange exchange) throws Exception { 439 // invoke begin 440 for (RoutePolicy policy : routePolicies) { 441 try { 442 if (isRoutePolicyRunAllowed(policy)) { 443 policy.onExchangeBegin(route, exchange); 444 } 445 } catch (Exception e) { 446 LOG.warn("Error occurred during onExchangeBegin on RoutePolicy: " + policy 447 + ". This exception will be ignored", e); 448 } 449 } 450 return null; 451 } 452 453 @Override 454 public void after(Exchange exchange, Object data) throws Exception { 455 // do not invoke it if Camel is stopping as we don't want 456 // the policy to start a consumer during Camel is stopping 457 if (isCamelStopping(exchange.getContext())) { 458 return; 459 } 460 461 for (RoutePolicy policy : routePolicies) { 462 try { 463 if (isRoutePolicyRunAllowed(policy)) { 464 policy.onExchangeDone(route, exchange); 465 } 466 } catch (Exception e) { 467 LOG.warn("Error occurred during onExchangeDone on RoutePolicy: " + policy 468 + ". This exception will be ignored", e); 469 } 470 } 471 } 472 473 private static boolean isCamelStopping(CamelContext context) { 474 if (context instanceof StatefulService) { 475 StatefulService ss = (StatefulService) context; 476 return ss.isStopping() || ss.isStopped(); 477 } 478 return false; 479 } 480 } 481 482 /** 483 * Advice to execute the {@link BacklogTracer} if enabled. 484 */ 485 public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice { 486 487 private final Queue<DefaultBacklogTracerEventMessage> queue; 488 private final BacklogTracer backlogTracer; 489 private final ProcessorDefinition<?> processorDefinition; 490 private final ProcessorDefinition<?> routeDefinition; 491 private final boolean first; 492 493 public BacklogTracerAdvice(Queue<DefaultBacklogTracerEventMessage> queue, BacklogTracer backlogTracer, 494 ProcessorDefinition<?> processorDefinition, ProcessorDefinition<?> routeDefinition, boolean first) { 495 this.queue = queue; 496 this.backlogTracer = backlogTracer; 497 this.processorDefinition = processorDefinition; 498 this.routeDefinition = routeDefinition; 499 this.first = first; 500 } 501 502 @Override 503 public Object before(Exchange exchange) throws Exception { 504 if (backlogTracer.shouldTrace(processorDefinition, exchange)) { 505 // ensure there is space on the queue 506 int drain = queue.size() - backlogTracer.getBacklogSize(); 507 // and we need room for ourselves and possible also a first pseudo message as well 508 drain += first ? 2 : 1; 509 if (drain > 0) { 510 for (int i = 0; i < drain; i++) { 511 queue.poll(); 512 } 513 } 514 515 Date timestamp = new Date(); 516 String toNode = processorDefinition.getId(); 517 String exchangeId = exchange.getExchangeId(); 518 String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 4, 519 backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), backlogTracer.getBodyMaxChars()); 520 521 // if first we should add a pseudo trace message as well, so we have a starting message (eg from the route) 522 String routeId = routeDefinition.getId(); 523 if (first) { 524 Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class); 525 DefaultBacklogTracerEventMessage pseudo = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), created, routeId, null, exchangeId, messageAsXml); 526 queue.add(pseudo); 527 } 528 DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), timestamp, routeId, toNode, exchangeId, messageAsXml); 529 queue.add(event); 530 } 531 532 return null; 533 } 534 535 @Override 536 public void after(Exchange exchange, Object data) throws Exception { 537 // noop 538 } 539 } 540 541 /** 542 * Advice to execute the {@link org.apache.camel.processor.interceptor.BacklogDebugger} if enabled. 543 */ 544 public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice<StopWatch> { 545 546 private final BacklogDebugger backlogDebugger; 547 private final Processor target; 548 private final ProcessorDefinition<?> definition; 549 private final String nodeId; 550 551 public BacklogDebuggerAdvice(BacklogDebugger backlogDebugger, Processor target, ProcessorDefinition<?> definition) { 552 this.backlogDebugger = backlogDebugger; 553 this.target = target; 554 this.definition = definition; 555 this.nodeId = definition.getId(); 556 } 557 558 @Override 559 public StopWatch before(Exchange exchange) throws Exception { 560 if (backlogDebugger.isEnabled() && (backlogDebugger.hasBreakpoint(nodeId) || backlogDebugger.isSingleStepMode())) { 561 StopWatch watch = new StopWatch(); 562 backlogDebugger.beforeProcess(exchange, target, definition); 563 return watch; 564 } else { 565 return null; 566 } 567 } 568 569 @Override 570 public void after(Exchange exchange, StopWatch stopWatch) throws Exception { 571 if (stopWatch != null) { 572 backlogDebugger.afterProcess(exchange, target, definition, stopWatch.stop()); 573 } 574 } 575 } 576 577 /** 578 * Advice to inject new {@link UnitOfWork} to the {@link Exchange} if needed, and as well to ensure 579 * the {@link UnitOfWork} is done and stopped. 580 */ 581 public static class UnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> { 582 583 private final String routeId; 584 585 public UnitOfWorkProcessorAdvice(String routeId) { 586 this.routeId = routeId; 587 } 588 589 @Override 590 public UnitOfWork before(Exchange exchange) throws Exception { 591 // if the exchange doesn't have from route id set, then set it if it originated 592 // from this unit of work 593 if (routeId != null && exchange.getFromRouteId() == null) { 594 exchange.setFromRouteId(routeId); 595 } 596 597 if (exchange.getUnitOfWork() == null) { 598 // If there is no existing UoW, then we should start one and 599 // terminate it once processing is completed for the exchange. 600 UnitOfWork uow = createUnitOfWork(exchange); 601 exchange.setUnitOfWork(uow); 602 uow.start(); 603 return uow; 604 } 605 606 return null; 607 } 608 609 @Override 610 public void after(Exchange exchange, UnitOfWork uow) throws Exception { 611 // execute done on uow if we created it, and the consumer is not doing it 612 if (uow != null) { 613 UnitOfWorkHelper.doneUow(uow, exchange); 614 } 615 } 616 617 protected UnitOfWork createUnitOfWork(Exchange exchange) { 618 return exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange); 619 } 620 621 } 622 623 /** 624 * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality. 625 */ 626 public static class ChildUnitOfWorkProcessorAdvice extends UnitOfWorkProcessorAdvice { 627 628 private final UnitOfWork parent; 629 630 public ChildUnitOfWorkProcessorAdvice(String routeId, UnitOfWork parent) { 631 super(routeId); 632 this.parent = parent; 633 } 634 635 @Override 636 protected UnitOfWork createUnitOfWork(Exchange exchange) { 637 // let the parent create a child unit of work to be used 638 return parent.createChildUnitOfWork(exchange); 639 } 640 641 } 642 643 /** 644 * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality. 645 */ 646 public static class SubUnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> { 647 648 @Override 649 public UnitOfWork before(Exchange exchange) throws Exception { 650 // begin savepoint 651 exchange.getUnitOfWork().beginSubUnitOfWork(exchange); 652 return exchange.getUnitOfWork(); 653 } 654 655 @Override 656 public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception { 657 // end sub unit of work 658 unitOfWork.endSubUnitOfWork(exchange); 659 } 660 } 661 662 /** 663 * Advice when Message History has been enabled. 664 */ 665 @SuppressWarnings("unchecked") 666 public static class MessageHistoryAdvice implements CamelInternalProcessorAdvice<MessageHistory> { 667 668 private final ProcessorDefinition<?> definition; 669 private final String routeId; 670 671 public MessageHistoryAdvice(ProcessorDefinition<?> definition) { 672 this.definition = definition; 673 this.routeId = ProcessorDefinitionHelper.getRouteId(definition); 674 } 675 676 @Override 677 public MessageHistory before(Exchange exchange) throws Exception { 678 List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); 679 if (list == null) { 680 list = new ArrayList<MessageHistory>(); 681 exchange.setProperty(Exchange.MESSAGE_HISTORY, list); 682 } 683 MessageHistory history = new DefaultMessageHistory(routeId, definition, new Date()); 684 list.add(history); 685 return history; 686 } 687 688 @Override 689 public void after(Exchange exchange, MessageHistory history) throws Exception { 690 if (history != null) { 691 history.nodeProcessingDone(); 692 } 693 } 694 } 695 696 /** 697 * Advice for {@link org.apache.camel.spi.StreamCachingStrategy} 698 */ 699 public static class StreamCachingAdvice implements CamelInternalProcessorAdvice<StreamCache> { 700 701 private final StreamCachingStrategy strategy; 702 703 public StreamCachingAdvice(StreamCachingStrategy strategy) { 704 this.strategy = strategy; 705 } 706 707 @Override 708 public StreamCache before(Exchange exchange) throws Exception { 709 // check if body is already cached 710 Object body = exchange.getIn().getBody(); 711 if (body == null) { 712 return null; 713 } else if (body instanceof StreamCache) { 714 StreamCache sc = (StreamCache) body; 715 // reset so the cache is ready to be used before processing 716 sc.reset(); 717 return sc; 718 } 719 // cache the body and if we could do that replace it as the new body 720 StreamCache sc = strategy.cache(exchange); 721 if (sc != null) { 722 exchange.getIn().setBody(sc); 723 } 724 return sc; 725 } 726 727 @Override 728 public void after(Exchange exchange, StreamCache sc) throws Exception { 729 Object body = exchange.getIn().getBody(); 730 if (body != null && body instanceof StreamCache) { 731 // reset so the cache is ready to be reused after processing 732 ((StreamCache) body).reset(); 733 } 734 } 735 } 736 737 /** 738 * Advice for delaying 739 */ 740 public static class DelayerAdvice implements CamelInternalProcessorAdvice { 741 742 private final long delay; 743 744 public DelayerAdvice(long delay) { 745 this.delay = delay; 746 } 747 748 @Override 749 public Object before(Exchange exchange) throws Exception { 750 try { 751 LOG.trace("Sleeping for: {} millis", delay); 752 Thread.sleep(delay); 753 } catch (InterruptedException e) { 754 LOG.debug("Sleep interrupted"); 755 Thread.currentThread().interrupt(); 756 throw e; 757 } 758 return null; 759 } 760 761 @Override 762 public void after(Exchange exchange, Object data) throws Exception { 763 // noop 764 } 765 } 766 767 }