001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.interceptor; 018 019 import java.util.Collections; 020 import java.util.List; 021 022 import org.apache.camel.AsyncCallback; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Processor; 025 import org.apache.camel.impl.AggregateRouteNode; 026 import org.apache.camel.impl.DefaultRouteNode; 027 import org.apache.camel.impl.DoCatchRouteNode; 028 import org.apache.camel.impl.DoFinallyRouteNode; 029 import org.apache.camel.impl.OnCompletionRouteNode; 030 import org.apache.camel.impl.OnExceptionRouteNode; 031 import org.apache.camel.model.AggregateDefinition; 032 import org.apache.camel.model.CatchDefinition; 033 import org.apache.camel.model.FinallyDefinition; 034 import org.apache.camel.model.InterceptDefinition; 035 import org.apache.camel.model.OnCompletionDefinition; 036 import org.apache.camel.model.OnExceptionDefinition; 037 import org.apache.camel.model.ProcessorDefinition; 038 import org.apache.camel.model.ProcessorDefinitionHelper; 039 import org.apache.camel.processor.CamelLogProcessor; 040 import org.apache.camel.processor.DelegateAsyncProcessor; 041 import org.apache.camel.spi.ExchangeFormatter; 042 import org.apache.camel.spi.InterceptStrategy; 043 import org.apache.camel.spi.RouteContext; 044 import org.apache.camel.spi.TracedRouteNodes; 045 import org.apache.camel.util.ServiceHelper; 046 import org.slf4j.Logger; 047 import org.slf4j.LoggerFactory; 048 049 /** 050 * An interceptor for debugging and tracing routes 051 * 052 * @version 053 */ 054 public class TraceInterceptor extends DelegateAsyncProcessor implements ExchangeFormatter { 055 private static final Logger LOG = LoggerFactory.getLogger(TraceInterceptor.class); 056 057 private CamelLogProcessor logger; 058 059 private final ProcessorDefinition<?> node; 060 private final Tracer tracer; 061 private TraceFormatter formatter; 062 063 private RouteContext routeContext; 064 private List<TraceEventHandler> traceHandlers; 065 066 public TraceInterceptor(ProcessorDefinition<?> node, Processor target, TraceFormatter formatter, Tracer tracer) { 067 super(target); 068 this.tracer = tracer; 069 this.node = node; 070 this.formatter = formatter; 071 this.logger = tracer.getLogger(this); 072 if (tracer.getFormatter() != null) { 073 this.formatter = tracer.getFormatter(); 074 } 075 this.traceHandlers = tracer.getTraceHandlers(); 076 } 077 078 @Override 079 public String toString() { 080 return "TraceInterceptor[" + node + "]"; 081 } 082 083 public void setRouteContext(RouteContext routeContext) { 084 this.routeContext = routeContext; 085 } 086 087 @Override 088 public boolean process(final Exchange exchange, final AsyncCallback callback) { 089 // do not trace if tracing is disabled 090 if (!tracer.isEnabled() || (routeContext != null && !routeContext.isTracing())) { 091 return processor.process(exchange, callback); 092 } 093 094 // interceptor will also trace routes supposed only for TraceEvents so we need to skip 095 // logging TraceEvents to avoid infinite looping 096 if (exchange.getProperty(Exchange.TRACE_EVENT, false, Boolean.class)) { 097 // but we must still process to allow routing of TraceEvents to eg a JPA endpoint 098 return processor.process(exchange, callback); 099 } 100 101 final boolean shouldLog = shouldLogNode(node) && shouldLogExchange(exchange); 102 103 // whether we should trace it or not, some nodes should be skipped as they are abstract 104 // intermediate steps for instance related to on completion 105 boolean trace = true; 106 boolean sync = true; 107 108 // okay this is a regular exchange being routed we might need to log and trace 109 try { 110 // before 111 if (shouldLog) { 112 // traced holds the information about the current traced route path 113 if (exchange.getUnitOfWork() != null) { 114 TracedRouteNodes traced = exchange.getUnitOfWork().getTracedRouteNodes(); 115 116 if (node instanceof OnCompletionDefinition || node instanceof OnExceptionDefinition) { 117 // skip any of these as its just a marker definition 118 trace = false; 119 } else if (ProcessorDefinitionHelper.isFirstChildOfType(OnCompletionDefinition.class, node)) { 120 // special for on completion tracing 121 traceOnCompletion(traced, exchange); 122 } else if (ProcessorDefinitionHelper.isFirstChildOfType(OnExceptionDefinition.class, node)) { 123 // special for on exception 124 traceOnException(traced, exchange); 125 } else if (ProcessorDefinitionHelper.isFirstChildOfType(CatchDefinition.class, node)) { 126 // special for do catch 127 traceDoCatch(traced, exchange); 128 } else if (ProcessorDefinitionHelper.isFirstChildOfType(FinallyDefinition.class, node)) { 129 // special for do finally 130 traceDoFinally(traced, exchange); 131 } else if (ProcessorDefinitionHelper.isFirstChildOfType(AggregateDefinition.class, node)) { 132 // special for aggregate 133 traceAggregate(traced, exchange); 134 } else { 135 // regular so just add it 136 traced.addTraced(new DefaultRouteNode(node, super.getProcessor())); 137 } 138 } else { 139 LOG.trace("Cannot trace as this Exchange does not have an UnitOfWork: {}", exchange); 140 } 141 } 142 143 // log and trace the processor 144 Object state = null; 145 if (shouldLog && trace) { 146 logExchange(exchange); 147 // either call the in or generic trace method depending on OUT has been enabled or not 148 if (tracer.isTraceOutExchanges()) { 149 state = traceExchangeIn(exchange); 150 } else { 151 traceExchange(exchange); 152 } 153 } 154 final Object traceState = state; 155 156 // special for interceptor where we need to keep booking how far we have routed in the intercepted processors 157 if (node.getParent() instanceof InterceptDefinition && exchange.getUnitOfWork() != null) { 158 TracedRouteNodes traced = exchange.getUnitOfWork().getTracedRouteNodes(); 159 traceIntercept((InterceptDefinition) node.getParent(), traced, exchange); 160 } 161 162 // process the exchange 163 sync = processor.process(exchange, new AsyncCallback() { 164 @Override 165 public void done(boolean doneSync) { 166 try { 167 // after (trace out) 168 if (shouldLog && tracer.isTraceOutExchanges()) { 169 logExchange(exchange); 170 traceExchangeOut(exchange, traceState); 171 } 172 } catch (Throwable e) { 173 // some exception occurred in trace logic 174 if (shouldLogException(exchange)) { 175 logException(exchange, e); 176 } 177 exchange.setException(e); 178 } finally { 179 // ensure callback is always invoked 180 callback.done(doneSync); 181 } 182 } 183 }); 184 185 } catch (Throwable e) { 186 // some exception occurred in trace logic 187 if (shouldLogException(exchange)) { 188 logException(exchange, e); 189 } 190 exchange.setException(e); 191 } 192 193 return sync; 194 } 195 196 private void traceOnCompletion(TracedRouteNodes traced, Exchange exchange) { 197 traced.addTraced(new OnCompletionRouteNode()); 198 // do not log and trace as onCompletion should be a new event on its own 199 // add the next step as well so we have onCompletion -> new step 200 traced.addTraced(new DefaultRouteNode(node, super.getProcessor())); 201 } 202 203 private void traceOnException(TracedRouteNodes traced, Exchange exchange) throws Exception { 204 if (traced.getLastNode() != null) { 205 traced.addTraced(new DefaultRouteNode(traced.getLastNode().getProcessorDefinition(), traced.getLastNode().getProcessor())); 206 } 207 traced.addTraced(new OnExceptionRouteNode()); 208 // log and trace so we have the from -> onException event as well 209 logExchange(exchange); 210 traceExchange(exchange); 211 traced.addTraced(new DefaultRouteNode(node, super.getProcessor())); 212 } 213 214 private void traceDoCatch(TracedRouteNodes traced, Exchange exchange) throws Exception { 215 if (traced.getLastNode() != null) { 216 traced.addTraced(new DefaultRouteNode(traced.getLastNode().getProcessorDefinition(), traced.getLastNode().getProcessor())); 217 } 218 traced.addTraced(new DoCatchRouteNode()); 219 // log and trace so we have the from -> doCatch event as well 220 logExchange(exchange); 221 traceExchange(exchange); 222 traced.addTraced(new DefaultRouteNode(node, super.getProcessor())); 223 } 224 225 private void traceDoFinally(TracedRouteNodes traced, Exchange exchange) throws Exception { 226 if (traced.getLastNode() != null) { 227 traced.addTraced(new DefaultRouteNode(traced.getLastNode().getProcessorDefinition(), traced.getLastNode().getProcessor())); 228 } 229 traced.addTraced(new DoFinallyRouteNode()); 230 // log and trace so we have the from -> doFinally event as well 231 logExchange(exchange); 232 traceExchange(exchange); 233 traced.addTraced(new DefaultRouteNode(node, super.getProcessor())); 234 } 235 236 private void traceAggregate(TracedRouteNodes traced, Exchange exchange) { 237 traced.addTraced(new AggregateRouteNode((AggregateDefinition) node.getParent())); 238 traced.addTraced(new DefaultRouteNode(node, super.getProcessor())); 239 } 240 241 protected void traceIntercept(InterceptDefinition intercept, TracedRouteNodes traced, Exchange exchange) throws Exception { 242 // use the counter to get the index of the intercepted processor to be traced 243 Processor last = intercept.getInterceptedProcessor(traced.getAndIncrementCounter(intercept)); 244 // skip doing any double tracing of interceptors, so the last must not be a TraceInterceptor instance 245 if (last != null && !(last instanceof TraceInterceptor)) { 246 traced.addTraced(new DefaultRouteNode(node, last)); 247 248 boolean shouldLog = shouldLogNode(node) && shouldLogExchange(exchange); 249 if (shouldLog) { 250 // log and trace the processor that was intercepted so we can see it 251 logExchange(exchange); 252 traceExchange(exchange); 253 } 254 } 255 } 256 257 public String format(Exchange exchange) { 258 Object msg = formatter.format(this, this.getNode(), exchange); 259 if (msg != null) { 260 return msg.toString(); 261 } else { 262 return null; 263 } 264 } 265 266 // Properties 267 //------------------------------------------------------------------------- 268 public ProcessorDefinition<?> getNode() { 269 return node; 270 } 271 272 public CamelLogProcessor getLogger() { 273 return logger; 274 } 275 276 public TraceFormatter getFormatter() { 277 return formatter; 278 } 279 280 public Tracer getTracer() { 281 return tracer; 282 } 283 284 protected void logExchange(Exchange exchange) throws Exception { 285 // process the exchange that formats and logs it 286 logger.process(exchange); 287 } 288 289 protected void traceExchange(Exchange exchange) throws Exception { 290 for (TraceEventHandler traceHandler : traceHandlers) { 291 traceHandler.traceExchange(node, processor, this, exchange); 292 } 293 } 294 295 protected Object traceExchangeIn(Exchange exchange) throws Exception { 296 Object result = null; 297 for (TraceEventHandler traceHandler : traceHandlers) { 298 Object result1 = traceHandler.traceExchangeIn(node, processor, this, exchange); 299 if (result1 != null) { 300 result = result1; 301 } 302 } 303 return result; 304 } 305 306 protected void traceExchangeOut(Exchange exchange, Object traceState) throws Exception { 307 for (TraceEventHandler traceHandler : traceHandlers) { 308 traceHandler.traceExchangeOut(node, processor, this, exchange, traceState); 309 } 310 } 311 312 protected void logException(Exchange exchange, Throwable throwable) { 313 if (tracer.isTraceExceptions()) { 314 if (tracer.isLogStackTrace()) { 315 logger.process(exchange, throwable); 316 } else { 317 logger.process(exchange, ", Exception: " + throwable.toString()); 318 } 319 } 320 } 321 322 /** 323 * Returns true if the given exchange should be logged in the trace list 324 */ 325 protected boolean shouldLogExchange(Exchange exchange) { 326 return tracer.isEnabled() && (tracer.getTraceFilter() == null || tracer.getTraceFilter().matches(exchange)); 327 } 328 329 /** 330 * Returns true if the given exchange should be logged when an exception was thrown 331 */ 332 protected boolean shouldLogException(Exchange exchange) { 333 return tracer.isTraceExceptions(); 334 } 335 336 /** 337 * Returns whether exchanges coming out of processors should be traced 338 */ 339 public boolean shouldTraceOutExchanges() { 340 return tracer.isTraceOutExchanges(); 341 } 342 343 /** 344 * Returns true if the given node should be logged in the trace list 345 */ 346 protected boolean shouldLogNode(ProcessorDefinition<?> node) { 347 if (node == null) { 348 return false; 349 } 350 if (!tracer.isTraceInterceptors() && (node instanceof InterceptStrategy)) { 351 return false; 352 } 353 return true; 354 } 355 356 @Override 357 protected void doStart() throws Exception { 358 super.doStart(); 359 ServiceHelper.startService(traceHandlers); 360 } 361 362 @Override 363 protected void doStop() throws Exception { 364 super.doStop(); 365 ServiceHelper.stopService(traceHandlers); 366 } 367 368 @Deprecated 369 public void setTraceHandler(TraceEventHandler traceHandler) { 370 traceHandlers = Collections.singletonList(traceHandler); 371 } 372 }