001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Date;
021    import java.util.List;
022    import java.util.Queue;
023    import java.util.concurrent.RejectedExecutionException;
024    
025    import org.apache.camel.AsyncCallback;
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.MessageHistory;
029    import org.apache.camel.Processor;
030    import org.apache.camel.Route;
031    import org.apache.camel.StatefulService;
032    import org.apache.camel.StreamCache;
033    import org.apache.camel.api.management.PerformanceCounter;
034    import org.apache.camel.impl.DefaultMessageHistory;
035    import org.apache.camel.management.DelegatePerformanceCounter;
036    import org.apache.camel.management.mbean.ManagedPerformanceCounter;
037    import org.apache.camel.model.ProcessorDefinition;
038    import org.apache.camel.model.ProcessorDefinitionHelper;
039    import org.apache.camel.processor.interceptor.BacklogDebugger;
040    import org.apache.camel.processor.interceptor.BacklogTracer;
041    import org.apache.camel.processor.interceptor.DefaultBacklogTracerEventMessage;
042    import org.apache.camel.spi.InflightRepository;
043    import org.apache.camel.spi.RouteContext;
044    import org.apache.camel.spi.RoutePolicy;
045    import org.apache.camel.spi.StreamCachingStrategy;
046    import org.apache.camel.spi.UnitOfWork;
047    import org.apache.camel.util.MessageHelper;
048    import org.apache.camel.util.StopWatch;
049    import org.apache.camel.util.UnitOfWorkHelper;
050    import org.slf4j.Logger;
051    import org.slf4j.LoggerFactory;
052    
053    /**
054     * Internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as:
055     * <ul>
056     *     <li>Execute {@link UnitOfWork}</li>
057     *     <li>Keeping track which route currently is being routed</li>
058     *     <li>Execute {@link RoutePolicy}</li>
059     *     <li>Gather JMX performance statics</li>
060     *     <li>Tracing</li>
061     *     <li>Debugging</li>
062     *     <li>Message History</li>
063     *     <li>Stream Caching</li>
064     * </ul>
065     * ... and more.
066     * <p/>
067     * This implementation executes this cross cutting functionality as a {@link CamelInternalProcessorAdvice} advice (before and after advice)
068     * by executing the {@link CamelInternalProcessorAdvice#before(org.apache.camel.Exchange)} and
069     * {@link CamelInternalProcessorAdvice#after(org.apache.camel.Exchange, Object)} callbacks in correct order during routing.
070     * This reduces number of stack frames needed during routing, and reduce the number of lines in stacktraces, as well
071     * makes debugging the routing engine easier for end users.
072     * <p/>
073     * <b>Debugging tips:</b> Camel end users whom want to debug their Camel applications with the Camel source code, then make sure to
074     * read the source code of this class about the debugging tips, which you can find in the
075     * {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method.
076     */
077    public class CamelInternalProcessor extends DelegateAsyncProcessor {
078    
079        private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class);
080        private final List<CamelInternalProcessorAdvice> advices = new ArrayList<CamelInternalProcessorAdvice>();
081    
082        public CamelInternalProcessor() {
083        }
084    
085        public CamelInternalProcessor(Processor processor) {
086            super(processor);
087        }
088    
089        /**
090         * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor.
091         *
092         * @param advice  the advice to add
093         */
094        public void addAdvice(CamelInternalProcessorAdvice advice) {
095            advices.add(advice);
096        }
097    
098        /**
099         * Gets the advice with the given type.
100         *
101         * @param type  the type of the advice
102         * @return the advice if exists, or <tt>null</tt> if no advices has been added with the given type.
103         */
104        public <T> T getAdvice(Class<T> type) {
105            for (CamelInternalProcessorAdvice task : advices) {
106                if (type.isInstance(task)) {
107                    return type.cast(task);
108                }
109            }
110            return null;
111        }
112    
113        @Override
114        public boolean process(Exchange exchange, AsyncCallback callback) {
115            // ----------------------------------------------------------
116            // CAMEL END USER - READ ME FOR DEBUGGING TIPS
117            // ----------------------------------------------------------
118            // If you want to debug the Camel routing engine, then there is a lot of internal functionality
119            // the routing engine executes during routing messages. You can skip debugging this internal
120            // functionality and instead debug where the routing engine continues routing to the next node
121            // in the routes. The CamelInternalProcessor is a vital part of the routing engine, as its
122            // being used in between the nodes. As an end user you can just debug the code in this class
123            // in between the:
124            //   CAMEL END USER - DEBUG ME HERE +++ START +++
125            //   CAMEL END USER - DEBUG ME HERE +++ END +++
126            // you can see in the code below.
127            // ----------------------------------------------------------
128    
129    
130            if (processor == null || !continueProcessing(exchange)) {
131                // no processor or we should not continue then we are done
132                callback.done(true);
133                return true;
134            }
135    
136            final List<Object> states = new ArrayList<Object>(advices.size());
137            for (CamelInternalProcessorAdvice task : advices) {
138                try {
139                    Object state = task.before(exchange);
140                    states.add(state);
141                } catch (Throwable e) {
142                    exchange.setException(e);
143                    callback.done(true);
144                    return true;
145                }
146            }
147    
148            // create internal callback which will execute the advices in reverse order when done
149            callback = new InternalCallback(states, exchange, callback);
150    
151            // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0
152            Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
153            if (exchange.isTransacted() || synchronous != null) {
154                // must be synchronized for transacted exchanges
155                if (LOG.isTraceEnabled()) {
156                    if (exchange.isTransacted()) {
157                        LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
158                    } else {
159                        LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
160                    }
161                }
162                // ----------------------------------------------------------
163                // CAMEL END USER - DEBUG ME HERE +++ START +++
164                // ----------------------------------------------------------
165                try {
166                    processor.process(exchange);
167                } catch (Throwable e) {
168                    exchange.setException(e);
169                }
170                // ----------------------------------------------------------
171                // CAMEL END USER - DEBUG ME HERE +++ END +++
172                // ----------------------------------------------------------
173                callback.done(true);
174                return true;
175            } else {
176                final UnitOfWork uow = exchange.getUnitOfWork();
177    
178                // allow unit of work to wrap callback in case it need to do some special work
179                // for example the MDCUnitOfWork
180                AsyncCallback async = callback;
181                if (uow != null) {
182                    async = uow.beforeProcess(processor, exchange, callback);
183                }
184    
185                // ----------------------------------------------------------
186                // CAMEL END USER - DEBUG ME HERE +++ START +++
187                // ----------------------------------------------------------
188                if (LOG.isTraceEnabled()) {
189                    LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
190                }
191                boolean sync = processor.process(exchange, async);
192                // ----------------------------------------------------------
193                // CAMEL END USER - DEBUG ME HERE +++ END +++
194                // ----------------------------------------------------------
195    
196                // execute any after processor work (in current thread, not in the callback)
197                if (uow != null) {
198                    uow.afterProcess(processor, exchange, callback, sync);
199                }
200    
201                if (LOG.isTraceEnabled()) {
202                    LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
203                            new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange});
204                }
205                return sync;
206            }
207        }
208    
209        @Override
210        public String toString() {
211            return processor != null ? processor.toString() : super.toString();
212        }
213    
214        /**
215         * Internal callback that executes the after advices.
216         */
217        private final class InternalCallback implements AsyncCallback {
218    
219            private final List<Object> states;
220            private final Exchange exchange;
221            private final AsyncCallback callback;
222    
223            private InternalCallback(List<Object> states, Exchange exchange, AsyncCallback callback) {
224                this.states = states;
225                this.exchange = exchange;
226                this.callback = callback;
227            }
228    
229            @Override
230            public void done(boolean doneSync) {
231                // NOTE: if you are debugging Camel routes, then all the code in the for loop below is internal only
232                // so you can step straight to the finally block and invoke the callback
233    
234                // we should call after in reverse order
235                try {
236                    for (int i = advices.size() - 1; i >= 0; i--) {
237                        CamelInternalProcessorAdvice task = advices.get(i);
238                        Object state = states.get(i);
239                        try {
240                            task.after(exchange, state);
241                        } catch (Exception e) {
242                            exchange.setException(e);
243                            // allow all advices to complete even if there was an exception
244                        }
245                    }
246                } finally {
247                    // ----------------------------------------------------------
248                    // CAMEL END USER - DEBUG ME HERE +++ START +++
249                    // ----------------------------------------------------------
250                    // callback must be called
251                    callback.done(doneSync);
252                    // ----------------------------------------------------------
253                    // CAMEL END USER - DEBUG ME HERE +++ END +++
254                    // ----------------------------------------------------------
255                }
256            }
257        }
258    
259        /**
260         * Strategy to determine if we should continue processing the {@link Exchange}.
261         */
262        protected boolean continueProcessing(Exchange exchange) {
263            Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
264            if (stop != null) {
265                boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
266                if (doStop) {
267                    LOG.debug("Exchange is marked to stop routing: {}", exchange);
268                    return false;
269                }
270            }
271    
272            // determine if we can still run, or the camel context is forcing a shutdown
273            boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown(this);
274            if (forceShutdown) {
275                LOG.debug("Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: {}", exchange);
276                if (exchange.getException() == null) {
277                    exchange.setException(new RejectedExecutionException());
278                }
279                return false;
280            }
281    
282            // yes we can continue
283            return true;
284        }
285    
286        /**
287         * Advice for JMX instrumentation of the process being invoked.
288         * <p/>
289         * This advice keeps track of JMX metrics for performance statistics.
290         * <p/>
291         * The current implementation of this advice is only used for route level statistics. For processor levels
292         * they are still wrapped in the route processor chains.
293         */
294        public static class InstrumentationAdvice implements CamelInternalProcessorAdvice<StopWatch> {
295    
296            private PerformanceCounter counter;
297            private String type;
298    
299            public InstrumentationAdvice(String type) {
300                this.type = type;
301            }
302    
303            public void setCounter(Object counter) {
304                ManagedPerformanceCounter mpc = null;
305                if (counter instanceof ManagedPerformanceCounter) {
306                    mpc = (ManagedPerformanceCounter) counter;
307                }
308    
309                if (this.counter instanceof DelegatePerformanceCounter) {
310                    ((DelegatePerformanceCounter) this.counter).setCounter(mpc);
311                } else if (mpc != null) {
312                    this.counter = mpc;
313                } else if (counter instanceof PerformanceCounter) {
314                    this.counter = (PerformanceCounter) counter;
315                }
316            }
317    
318            protected void recordTime(Exchange exchange, long duration) {
319                if (LOG.isTraceEnabled()) {
320                    LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type != null ? type + ": " : "", duration, exchange});
321                }
322    
323                if (!exchange.isFailed() && exchange.getException() == null) {
324                    counter.completedExchange(exchange, duration);
325                } else {
326                    counter.failedExchange(exchange);
327                }
328            }
329    
330            public String getType() {
331                return type;
332            }
333    
334            public void setType(String type) {
335                this.type = type;
336            }
337    
338            @Override
339            public StopWatch before(Exchange exchange) throws Exception {
340                // only record time if stats is enabled
341                return (counter != null && counter.isStatisticsEnabled()) ? new StopWatch() : null;
342            }
343    
344            @Override
345            public void after(Exchange exchange, StopWatch watch) throws Exception {
346                // record end time
347                if (watch != null) {
348                    recordTime(exchange, watch.stop());
349                }
350            }
351        }
352    
353        /**
354         * Advice to inject the current {@link RouteContext} into the {@link UnitOfWork} on the {@link Exchange}
355         */
356        public static class RouteContextAdvice implements CamelInternalProcessorAdvice<UnitOfWork> {
357    
358            private final RouteContext routeContext;
359    
360            public RouteContextAdvice(RouteContext routeContext) {
361                this.routeContext = routeContext;
362            }
363    
364            @Override
365            public UnitOfWork before(Exchange exchange) throws Exception {
366                // push the current route context
367                final UnitOfWork unitOfWork = exchange.getUnitOfWork();
368                if (unitOfWork != null) {
369                    unitOfWork.pushRouteContext(routeContext);
370                }
371                return unitOfWork;
372            }
373    
374            @Override
375            public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception {
376                if (unitOfWork != null) {
377                    unitOfWork.popRouteContext();
378                }
379            }
380        }
381    
382        /**
383         * Advice to keep the {@link InflightRepository} up to date.
384         */
385        public static class RouteInflightRepositoryAdvice implements CamelInternalProcessorAdvice {
386    
387            private final InflightRepository inflightRepository;
388            private final String id;
389    
390            public RouteInflightRepositoryAdvice(InflightRepository inflightRepository, String id) {
391                this.inflightRepository = inflightRepository;
392                this.id = id;
393            }
394    
395            @Override
396            public Object before(Exchange exchange) throws Exception {
397                inflightRepository.add(exchange, id);
398                return null;
399            }
400    
401            @Override
402            public void after(Exchange exchange, Object state) throws Exception {
403                inflightRepository.remove(exchange, id);
404            }
405        }
406    
407        /**
408         * Advice to execute any {@link RoutePolicy} a route may have been configured with.
409         */
410        public static class RoutePolicyAdvice implements CamelInternalProcessorAdvice {
411    
412            private final List<RoutePolicy> routePolicies;
413            private Route route;
414    
415            public RoutePolicyAdvice(List<RoutePolicy> routePolicies) {
416                this.routePolicies = routePolicies;
417            }
418    
419            public void setRoute(Route route) {
420                this.route = route;
421            }
422    
423            /**
424             * Strategy to determine if this policy is allowed to run
425             *
426             * @param policy the policy
427             * @return <tt>true</tt> to run
428             */
429            protected boolean isRoutePolicyRunAllowed(RoutePolicy policy) {
430                if (policy instanceof StatefulService) {
431                    StatefulService ss = (StatefulService) policy;
432                    return ss.isRunAllowed();
433                }
434                return true;
435            }
436    
437            @Override
438            public Object before(Exchange exchange) throws Exception {
439                // invoke begin
440                for (RoutePolicy policy : routePolicies) {
441                    try {
442                        if (isRoutePolicyRunAllowed(policy)) {
443                            policy.onExchangeBegin(route, exchange);
444                        }
445                    } catch (Exception e) {
446                        LOG.warn("Error occurred during onExchangeBegin on RoutePolicy: " + policy
447                                + ". This exception will be ignored", e);
448                    }
449                }
450                return null;
451            }
452    
453            @Override
454            public void after(Exchange exchange, Object data) throws Exception {
455                // do not invoke it if Camel is stopping as we don't want
456                // the policy to start a consumer during Camel is stopping
457                if (isCamelStopping(exchange.getContext())) {
458                    return;
459                }
460    
461                for (RoutePolicy policy : routePolicies) {
462                    try {
463                        if (isRoutePolicyRunAllowed(policy)) {
464                            policy.onExchangeDone(route, exchange);
465                        }
466                    } catch (Exception e) {
467                        LOG.warn("Error occurred during onExchangeDone on RoutePolicy: " + policy
468                                + ". This exception will be ignored", e);
469                    }
470                }
471            }
472    
473            private static boolean isCamelStopping(CamelContext context) {
474                if (context instanceof StatefulService) {
475                    StatefulService ss = (StatefulService) context;
476                    return ss.isStopping() || ss.isStopped();
477                }
478                return false;
479            }
480        }
481    
482        /**
483         * Advice to execute the {@link BacklogTracer} if enabled.
484         */
485        public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice {
486    
487            private final Queue<DefaultBacklogTracerEventMessage> queue;
488            private final BacklogTracer backlogTracer;
489            private final ProcessorDefinition<?> processorDefinition;
490            private final ProcessorDefinition<?> routeDefinition;
491            private final boolean first;
492    
493            public BacklogTracerAdvice(Queue<DefaultBacklogTracerEventMessage> queue, BacklogTracer backlogTracer,
494                                       ProcessorDefinition<?> processorDefinition, ProcessorDefinition<?> routeDefinition, boolean first) {
495                this.queue = queue;
496                this.backlogTracer = backlogTracer;
497                this.processorDefinition = processorDefinition;
498                this.routeDefinition = routeDefinition;
499                this.first = first;
500            }
501    
502            @Override
503            public Object before(Exchange exchange) throws Exception {
504                if (backlogTracer.shouldTrace(processorDefinition, exchange)) {
505                    // ensure there is space on the queue
506                    int drain = queue.size() - backlogTracer.getBacklogSize();
507                    // and we need room for ourselves and possible also a first pseudo message as well
508                    drain += first ? 2 : 1;
509                    if (drain > 0) {
510                        for (int i = 0; i < drain; i++) {
511                            queue.poll();
512                        }
513                    }
514    
515                    Date timestamp = new Date();
516                    String toNode = processorDefinition.getId();
517                    String exchangeId = exchange.getExchangeId();
518                    String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 4,
519                            backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), backlogTracer.getBodyMaxChars());
520    
521                    // if first we should add a pseudo trace message as well, so we have a starting message (eg from the route)
522                    String routeId = routeDefinition.getId();
523                    if (first) {
524                        Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class);
525                        DefaultBacklogTracerEventMessage pseudo = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), created, routeId, null, exchangeId, messageAsXml);
526                        queue.add(pseudo);
527                    }
528                    DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), timestamp, routeId, toNode, exchangeId, messageAsXml);
529                    queue.add(event);
530                }
531    
532                return null;
533            }
534    
535            @Override
536            public void after(Exchange exchange, Object data) throws Exception {
537                // noop
538            }
539        }
540    
541        /**
542         * Advice to execute the {@link org.apache.camel.processor.interceptor.BacklogDebugger} if enabled.
543         */
544        public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice<StopWatch> {
545    
546            private final BacklogDebugger backlogDebugger;
547            private final Processor target;
548            private final ProcessorDefinition<?> definition;
549            private final String nodeId;
550    
551            public BacklogDebuggerAdvice(BacklogDebugger backlogDebugger, Processor target, ProcessorDefinition<?> definition) {
552                this.backlogDebugger = backlogDebugger;
553                this.target = target;
554                this.definition = definition;
555                this.nodeId = definition.getId();
556            }
557    
558            @Override
559            public StopWatch before(Exchange exchange) throws Exception {
560                if (backlogDebugger.isEnabled() && (backlogDebugger.hasBreakpoint(nodeId) || backlogDebugger.isSingleStepMode())) {
561                    StopWatch watch = new StopWatch();
562                    backlogDebugger.beforeProcess(exchange, target, definition);
563                    return watch;
564                } else {
565                    return null;
566                }
567            }
568    
569            @Override
570            public void after(Exchange exchange, StopWatch stopWatch) throws Exception {
571                if (stopWatch != null) {
572                    backlogDebugger.afterProcess(exchange, target, definition, stopWatch.stop());
573                }
574            }
575        }
576    
577        /**
578         * Advice to inject new {@link UnitOfWork} to the {@link Exchange} if needed, and as well to ensure
579         * the {@link UnitOfWork} is done and stopped.
580         */
581        public static class UnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> {
582    
583            private final String routeId;
584    
585            public UnitOfWorkProcessorAdvice(String routeId) {
586                this.routeId = routeId;
587            }
588    
589            @Override
590            public UnitOfWork before(Exchange exchange) throws Exception {
591                // if the exchange doesn't have from route id set, then set it if it originated
592                // from this unit of work
593                if (routeId != null && exchange.getFromRouteId() == null) {
594                    exchange.setFromRouteId(routeId);
595                }
596    
597                if (exchange.getUnitOfWork() == null) {
598                    // If there is no existing UoW, then we should start one and
599                    // terminate it once processing is completed for the exchange.
600                    UnitOfWork uow = createUnitOfWork(exchange);
601                    exchange.setUnitOfWork(uow);
602                    uow.start();
603                    return uow;
604                }
605    
606                return null;
607            }
608    
609            @Override
610            public void after(Exchange exchange, UnitOfWork uow) throws Exception {
611                // execute done on uow if we created it, and the consumer is not doing it
612                if (uow != null) {
613                    UnitOfWorkHelper.doneUow(uow, exchange);
614                }
615            }
616    
617            protected UnitOfWork createUnitOfWork(Exchange exchange) {
618                return exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange);
619            }
620    
621        }
622    
623        /**
624         * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality.
625         */
626        public static class ChildUnitOfWorkProcessorAdvice extends UnitOfWorkProcessorAdvice {
627    
628            private final UnitOfWork parent;
629    
630            public ChildUnitOfWorkProcessorAdvice(String routeId, UnitOfWork parent) {
631                super(routeId);
632                this.parent = parent;
633            }
634    
635            @Override
636            protected UnitOfWork createUnitOfWork(Exchange exchange) {
637                // let the parent create a child unit of work to be used
638                return parent.createChildUnitOfWork(exchange);
639            }
640    
641        }
642    
643        /**
644         * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality.
645         */
646        public static class SubUnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> {
647    
648            @Override
649            public UnitOfWork before(Exchange exchange) throws Exception {
650                // begin savepoint
651                exchange.getUnitOfWork().beginSubUnitOfWork(exchange);
652                return exchange.getUnitOfWork();
653            }
654    
655            @Override
656            public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception {
657                // end sub unit of work
658                unitOfWork.endSubUnitOfWork(exchange);
659            }
660        }
661    
662        /**
663         * Advice when Message History has been enabled.
664         */
665        @SuppressWarnings("unchecked")
666        public static class MessageHistoryAdvice implements CamelInternalProcessorAdvice<MessageHistory> {
667    
668            private final ProcessorDefinition<?> definition;
669            private final String routeId;
670    
671            public MessageHistoryAdvice(ProcessorDefinition<?> definition) {
672                this.definition = definition;
673                this.routeId = ProcessorDefinitionHelper.getRouteId(definition);
674            }
675    
676            @Override
677            public MessageHistory before(Exchange exchange) throws Exception {
678                List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
679                if (list == null) {
680                    list = new ArrayList<MessageHistory>();
681                    exchange.setProperty(Exchange.MESSAGE_HISTORY, list);
682                }
683                MessageHistory history = new DefaultMessageHistory(routeId, definition, new Date());
684                list.add(history);
685                return history;
686            }
687    
688            @Override
689            public void after(Exchange exchange, MessageHistory history) throws Exception {
690                if (history != null) {
691                    history.nodeProcessingDone();
692                }
693            }
694        }
695    
696        /**
697         * Advice for {@link org.apache.camel.spi.StreamCachingStrategy}
698         */
699        public static class StreamCachingAdvice implements CamelInternalProcessorAdvice<StreamCache> {
700    
701            private final StreamCachingStrategy strategy;
702    
703            public StreamCachingAdvice(StreamCachingStrategy strategy) {
704                this.strategy = strategy;
705            }
706    
707            @Override
708            public StreamCache before(Exchange exchange) throws Exception {
709                // check if body is already cached
710                Object body = exchange.getIn().getBody();
711                if (body == null) {
712                    return null;
713                } else if (body instanceof StreamCache) {
714                    StreamCache sc = (StreamCache) body;
715                    // reset so the cache is ready to be used before processing
716                    sc.reset();
717                    return sc;
718                }
719                // cache the body and if we could do that replace it as the new body
720                StreamCache sc = strategy.cache(exchange);
721                if (sc != null) {
722                    exchange.getIn().setBody(sc);
723                }
724                return sc;
725            }
726    
727            @Override
728            public void after(Exchange exchange, StreamCache sc) throws Exception {
729                Object body = exchange.getIn().getBody();
730                if (body != null && body instanceof StreamCache) {
731                    // reset so the cache is ready to be reused after processing
732                    ((StreamCache) body).reset();
733                }
734            }
735        }
736    
737        /**
738         * Advice for delaying
739         */
740        public static class DelayerAdvice implements CamelInternalProcessorAdvice {
741    
742            private final long delay;
743    
744            public DelayerAdvice(long delay) {
745                this.delay = delay;
746            }
747    
748            @Override
749            public Object before(Exchange exchange) throws Exception {
750                try {
751                    LOG.trace("Sleeping for: {} millis", delay);
752                    Thread.sleep(delay);
753                } catch (InterruptedException e) {
754                    LOG.debug("Sleep interrupted");
755                    Thread.currentThread().interrupt();
756                    throw e;
757                }
758                return null;
759            }
760    
761            @Override
762            public void after(Exchange exchange, Object data) throws Exception {
763                // noop
764            }
765        }
766    
767    }