001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.interceptor;
018    
019    import java.util.Collections;
020    import java.util.List;
021    
022    import org.apache.camel.AsyncCallback;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Processor;
025    import org.apache.camel.impl.AggregateRouteNode;
026    import org.apache.camel.impl.DefaultRouteNode;
027    import org.apache.camel.impl.DoCatchRouteNode;
028    import org.apache.camel.impl.DoFinallyRouteNode;
029    import org.apache.camel.impl.OnCompletionRouteNode;
030    import org.apache.camel.impl.OnExceptionRouteNode;
031    import org.apache.camel.model.AggregateDefinition;
032    import org.apache.camel.model.CatchDefinition;
033    import org.apache.camel.model.FinallyDefinition;
034    import org.apache.camel.model.InterceptDefinition;
035    import org.apache.camel.model.OnCompletionDefinition;
036    import org.apache.camel.model.OnExceptionDefinition;
037    import org.apache.camel.model.ProcessorDefinition;
038    import org.apache.camel.model.ProcessorDefinitionHelper;
039    import org.apache.camel.processor.CamelLogProcessor;
040    import org.apache.camel.processor.DelegateAsyncProcessor;
041    import org.apache.camel.spi.ExchangeFormatter;
042    import org.apache.camel.spi.InterceptStrategy;
043    import org.apache.camel.spi.RouteContext;
044    import org.apache.camel.spi.TracedRouteNodes;
045    import org.apache.camel.util.ServiceHelper;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * An interceptor for debugging and tracing routes
051     *
052     * @version 
053     */
054    public class TraceInterceptor extends DelegateAsyncProcessor implements ExchangeFormatter {
055        private static final Logger LOG = LoggerFactory.getLogger(TraceInterceptor.class);
056    
057        private CamelLogProcessor logger;
058    
059        private final ProcessorDefinition<?> node;
060        private final Tracer tracer;
061        private TraceFormatter formatter;
062    
063        private RouteContext routeContext;
064        private List<TraceEventHandler> traceHandlers;
065    
066        public TraceInterceptor(ProcessorDefinition<?> node, Processor target, TraceFormatter formatter, Tracer tracer) {
067            super(target);
068            this.tracer = tracer;
069            this.node = node;
070            this.formatter = formatter;
071            this.logger = tracer.getLogger(this);
072            if (tracer.getFormatter() != null) {
073                this.formatter = tracer.getFormatter();
074            }
075            this.traceHandlers = tracer.getTraceHandlers();
076        }
077    
078        @Override
079        public String toString() {
080            return "TraceInterceptor[" + node + "]";
081        }
082    
083        public void setRouteContext(RouteContext routeContext) {
084            this.routeContext = routeContext;
085        }
086    
087        @Override
088        public boolean process(final Exchange exchange, final AsyncCallback callback) {
089            // do not trace if tracing is disabled
090            if (!tracer.isEnabled() || (routeContext != null && !routeContext.isTracing())) {
091                return processor.process(exchange, callback);
092            }
093    
094            // interceptor will also trace routes supposed only for TraceEvents so we need to skip
095            // logging TraceEvents to avoid infinite looping
096            if (exchange.getProperty(Exchange.TRACE_EVENT, false, Boolean.class)) {
097                // but we must still process to allow routing of TraceEvents to eg a JPA endpoint
098                return processor.process(exchange, callback);
099            }
100    
101            final boolean shouldLog = shouldLogNode(node) && shouldLogExchange(exchange);
102    
103            // whether we should trace it or not, some nodes should be skipped as they are abstract
104            // intermediate steps for instance related to on completion
105            boolean trace = true;
106            boolean sync = true;
107    
108            // okay this is a regular exchange being routed we might need to log and trace
109            try {
110                // before
111                if (shouldLog) {
112                    // traced holds the information about the current traced route path
113                    if (exchange.getUnitOfWork() != null) {
114                        TracedRouteNodes traced = exchange.getUnitOfWork().getTracedRouteNodes();
115    
116                        if (node instanceof OnCompletionDefinition || node instanceof OnExceptionDefinition) {
117                            // skip any of these as its just a marker definition
118                            trace = false;
119                        } else if (ProcessorDefinitionHelper.isFirstChildOfType(OnCompletionDefinition.class, node)) {
120                            // special for on completion tracing
121                            traceOnCompletion(traced, exchange);
122                        } else if (ProcessorDefinitionHelper.isFirstChildOfType(OnExceptionDefinition.class, node)) {
123                            // special for on exception
124                            traceOnException(traced, exchange);
125                        } else if (ProcessorDefinitionHelper.isFirstChildOfType(CatchDefinition.class, node)) {
126                            // special for do catch
127                            traceDoCatch(traced, exchange);
128                        } else if (ProcessorDefinitionHelper.isFirstChildOfType(FinallyDefinition.class, node)) {
129                            // special for do finally
130                            traceDoFinally(traced, exchange);
131                        } else if (ProcessorDefinitionHelper.isFirstChildOfType(AggregateDefinition.class, node)) {
132                            // special for aggregate
133                            traceAggregate(traced, exchange);
134                        } else {
135                            // regular so just add it
136                            traced.addTraced(new DefaultRouteNode(node, super.getProcessor()));
137                        }
138                    } else {
139                        LOG.trace("Cannot trace as this Exchange does not have an UnitOfWork: {}", exchange);
140                    }
141                }
142    
143                // log and trace the processor
144                Object state = null;
145                if (shouldLog && trace) {
146                    logExchange(exchange);
147                    // either call the in or generic trace method depending on OUT has been enabled or not
148                    if (tracer.isTraceOutExchanges()) {
149                        state = traceExchangeIn(exchange);
150                    } else {
151                        traceExchange(exchange);
152                    }
153                }
154                final Object traceState = state;
155    
156                // special for interceptor where we need to keep booking how far we have routed in the intercepted processors
157                if (node.getParent() instanceof InterceptDefinition && exchange.getUnitOfWork() != null) {
158                    TracedRouteNodes traced = exchange.getUnitOfWork().getTracedRouteNodes();
159                    traceIntercept((InterceptDefinition) node.getParent(), traced, exchange);
160                }
161    
162                // process the exchange
163                sync = processor.process(exchange, new AsyncCallback() {
164                    @Override
165                    public void done(boolean doneSync) {
166                        try {
167                            // after (trace out)
168                            if (shouldLog && tracer.isTraceOutExchanges()) {
169                                logExchange(exchange);
170                                traceExchangeOut(exchange, traceState);
171                            }
172                        } catch (Throwable e) {
173                            // some exception occurred in trace logic
174                            if (shouldLogException(exchange)) {
175                                logException(exchange, e);
176                            }
177                            exchange.setException(e);
178                        } finally {
179                            // ensure callback is always invoked
180                            callback.done(doneSync);
181                        }
182                    }
183                });
184    
185            } catch (Throwable e) {
186                // some exception occurred in trace logic
187                if (shouldLogException(exchange)) {
188                    logException(exchange, e);
189                }
190                exchange.setException(e);
191            }
192    
193            return sync;
194        }
195    
196        private void traceOnCompletion(TracedRouteNodes traced, Exchange exchange) {
197            traced.addTraced(new OnCompletionRouteNode());
198            // do not log and trace as onCompletion should be a new event on its own
199            // add the next step as well so we have onCompletion -> new step
200            traced.addTraced(new DefaultRouteNode(node, super.getProcessor()));
201        }
202    
203        private void traceOnException(TracedRouteNodes traced, Exchange exchange) throws Exception {
204            if (traced.getLastNode() != null) {
205                traced.addTraced(new DefaultRouteNode(traced.getLastNode().getProcessorDefinition(), traced.getLastNode().getProcessor()));
206            }
207            traced.addTraced(new OnExceptionRouteNode());
208            // log and trace so we have the from -> onException event as well
209            logExchange(exchange);
210            traceExchange(exchange);
211            traced.addTraced(new DefaultRouteNode(node, super.getProcessor()));
212        }
213    
214        private void traceDoCatch(TracedRouteNodes traced, Exchange exchange) throws Exception {
215            if (traced.getLastNode() != null) {
216                traced.addTraced(new DefaultRouteNode(traced.getLastNode().getProcessorDefinition(), traced.getLastNode().getProcessor()));
217            }
218            traced.addTraced(new DoCatchRouteNode());
219            // log and trace so we have the from -> doCatch event as well
220            logExchange(exchange);
221            traceExchange(exchange);
222            traced.addTraced(new DefaultRouteNode(node, super.getProcessor()));
223        }
224    
225        private void traceDoFinally(TracedRouteNodes traced, Exchange exchange) throws Exception {
226            if (traced.getLastNode() != null) {
227                traced.addTraced(new DefaultRouteNode(traced.getLastNode().getProcessorDefinition(), traced.getLastNode().getProcessor()));
228            }
229            traced.addTraced(new DoFinallyRouteNode());
230            // log and trace so we have the from -> doFinally event as well
231            logExchange(exchange);
232            traceExchange(exchange);
233            traced.addTraced(new DefaultRouteNode(node, super.getProcessor()));
234        }
235    
236        private void traceAggregate(TracedRouteNodes traced, Exchange exchange) {
237            traced.addTraced(new AggregateRouteNode((AggregateDefinition) node.getParent()));
238            traced.addTraced(new DefaultRouteNode(node, super.getProcessor()));
239        }
240    
241        protected void traceIntercept(InterceptDefinition intercept, TracedRouteNodes traced, Exchange exchange) throws Exception {
242            // use the counter to get the index of the intercepted processor to be traced
243            Processor last = intercept.getInterceptedProcessor(traced.getAndIncrementCounter(intercept));
244            // skip doing any double tracing of interceptors, so the last must not be a TraceInterceptor instance
245            if (last != null && !(last instanceof TraceInterceptor)) {
246                traced.addTraced(new DefaultRouteNode(node, last));
247    
248                boolean shouldLog = shouldLogNode(node) && shouldLogExchange(exchange);
249                if (shouldLog) {
250                    // log and trace the processor that was intercepted so we can see it
251                    logExchange(exchange);
252                    traceExchange(exchange);
253                }
254            }
255        }
256    
257        public String format(Exchange exchange) {
258            Object msg = formatter.format(this, this.getNode(), exchange);
259            if (msg != null) {
260                return msg.toString();
261            } else {
262                return null;
263            }
264        }
265    
266        // Properties
267        //-------------------------------------------------------------------------
268        public ProcessorDefinition<?> getNode() {
269            return node;
270        }
271    
272        public CamelLogProcessor getLogger() {
273            return logger;
274        }
275    
276        public TraceFormatter getFormatter() {
277            return formatter;
278        }
279    
280        public Tracer getTracer() {
281            return tracer;
282        }
283    
284        protected void logExchange(Exchange exchange) throws Exception {
285            // process the exchange that formats and logs it
286            logger.process(exchange);
287        }
288    
289        protected void traceExchange(Exchange exchange) throws Exception {
290            for (TraceEventHandler traceHandler : traceHandlers) {
291                traceHandler.traceExchange(node, processor, this, exchange);
292            }
293        }
294    
295        protected Object traceExchangeIn(Exchange exchange) throws Exception {
296            Object result = null;
297            for (TraceEventHandler traceHandler : traceHandlers) {
298                Object result1 = traceHandler.traceExchangeIn(node, processor, this, exchange);
299                if (result1 != null) {
300                    result = result1;
301                }
302            }
303            return result;
304        }
305    
306        protected void traceExchangeOut(Exchange exchange, Object traceState) throws Exception {
307            for (TraceEventHandler traceHandler : traceHandlers) {
308                traceHandler.traceExchangeOut(node, processor, this, exchange, traceState);
309            }
310        }
311    
312        protected void logException(Exchange exchange, Throwable throwable) {
313            if (tracer.isTraceExceptions()) {
314                if (tracer.isLogStackTrace()) {
315                    logger.process(exchange, throwable);
316                } else {
317                    logger.process(exchange, ", Exception: " + throwable.toString());
318                }
319            }
320        }
321    
322        /**
323         * Returns true if the given exchange should be logged in the trace list
324         */
325        protected boolean shouldLogExchange(Exchange exchange) {
326            return tracer.isEnabled() && (tracer.getTraceFilter() == null || tracer.getTraceFilter().matches(exchange));
327        }
328    
329        /**
330         * Returns true if the given exchange should be logged when an exception was thrown
331         */
332        protected boolean shouldLogException(Exchange exchange) {
333            return tracer.isTraceExceptions();
334        }
335    
336        /**
337         * Returns whether exchanges coming out of processors should be traced
338         */
339        public boolean shouldTraceOutExchanges() {
340            return tracer.isTraceOutExchanges();
341        }
342    
343        /**
344         * Returns true if the given node should be logged in the trace list
345         */
346        protected boolean shouldLogNode(ProcessorDefinition<?> node) {
347            if (node == null) {
348                return false;
349            }
350            if (!tracer.isTraceInterceptors() && (node instanceof InterceptStrategy)) {
351                return false;
352            }
353            return true;
354        }
355    
356        @Override
357        protected void doStart() throws Exception {
358            super.doStart();
359            ServiceHelper.startService(traceHandlers);
360        }
361    
362        @Override
363        protected void doStop() throws Exception {
364            super.doStop();
365            ServiceHelper.stopService(traceHandlers);
366        }
367    
368        @Deprecated
369        public void setTraceHandler(TraceEventHandler traceHandler) {
370            traceHandlers = Collections.singletonList(traceHandler);
371        }
372    }