001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.interceptor; 018 019 import java.util.Date; 020 import java.util.EventObject; 021 import java.util.LinkedHashSet; 022 import java.util.List; 023 import java.util.Set; 024 import java.util.concurrent.ConcurrentHashMap; 025 import java.util.concurrent.ConcurrentMap; 026 import java.util.concurrent.CountDownLatch; 027 import java.util.concurrent.TimeUnit; 028 import java.util.concurrent.atomic.AtomicBoolean; 029 import java.util.concurrent.atomic.AtomicLong; 030 031 import org.apache.camel.CamelContext; 032 import org.apache.camel.Exchange; 033 import org.apache.camel.LoggingLevel; 034 import org.apache.camel.NoTypeConversionAvailableException; 035 import org.apache.camel.Predicate; 036 import org.apache.camel.Processor; 037 import org.apache.camel.api.management.mbean.BacklogTracerEventMessage; 038 import org.apache.camel.impl.BreakpointSupport; 039 import org.apache.camel.impl.DefaultDebugger; 040 import org.apache.camel.management.event.ExchangeCompletedEvent; 041 import org.apache.camel.model.ProcessorDefinition; 042 import org.apache.camel.model.ProcessorDefinitionHelper; 043 import org.apache.camel.spi.Condition; 044 import org.apache.camel.spi.Debugger; 045 import org.apache.camel.spi.InterceptStrategy; 046 import org.apache.camel.support.ServiceSupport; 047 import org.apache.camel.util.CamelLogger; 048 import org.apache.camel.util.MessageHelper; 049 import org.apache.camel.util.ObjectHelper; 050 import org.apache.camel.util.ServiceHelper; 051 import org.slf4j.Logger; 052 import org.slf4j.LoggerFactory; 053 054 /** 055 * A {@link org.apache.camel.spi.Debugger} that has easy debugging functionality which 056 * can be used from JMX with {@link org.apache.camel.api.management.mbean.ManagedBacklogDebuggerMBean}. 057 * <p/> 058 * This implementation allows to set breakpoints (with or without a condition) and inspect the {@link Exchange} 059 * dumped in XML in {@link BacklogTracerEventMessage} format. There is operations to resume suspended breakpoints 060 * to continue routing the {@link Exchange}. There is also step functionality so you can single step a given 061 * {@link Exchange}. 062 * <p/> 063 * This implementation will only break the first {@link Exchange} that arrives to a breakpoint. If Camel routes using 064 * concurrency then sub-sequent {@link Exchange} will continue to be routed, if there breakpoint already holds a 065 * suspended {@link Exchange}. 066 */ 067 public class BacklogDebugger extends ServiceSupport implements InterceptStrategy { 068 069 private static final Logger LOG = LoggerFactory.getLogger(BacklogDebugger.class); 070 071 private long fallbackTimeout = 300; 072 private final CamelContext camelContext; 073 private LoggingLevel loggingLevel = LoggingLevel.INFO; 074 private final CamelLogger logger = new CamelLogger(LOG, loggingLevel); 075 private final AtomicBoolean enabled = new AtomicBoolean(); 076 private final AtomicLong debugCounter = new AtomicLong(0); 077 private final Debugger debugger; 078 private final ConcurrentMap<String, NodeBreakpoint> breakpoints = new ConcurrentHashMap<String, NodeBreakpoint>(); 079 private final ConcurrentMap<String, SuspendedExchange> suspendedBreakpoints = new ConcurrentHashMap<String, SuspendedExchange>(); 080 private final ConcurrentMap<String, BacklogTracerEventMessage> suspendedBreakpointMessages = new ConcurrentHashMap<String, BacklogTracerEventMessage>(); 081 private volatile String singleStepExchangeId; 082 private int bodyMaxChars = 128 * 1024; 083 private boolean bodyIncludeStreams; 084 private boolean bodyIncludeFiles = true; 085 086 /** 087 * A suspend {@link Exchange} at a breakpoint. 088 */ 089 private static final class SuspendedExchange { 090 private final Exchange exchange; 091 private final CountDownLatch latch; 092 093 /** 094 * @param exchange the suspend exchange 095 * @param latch the latch to use to continue routing the exchange 096 */ 097 private SuspendedExchange(Exchange exchange, CountDownLatch latch) { 098 this.exchange = exchange; 099 this.latch = latch; 100 } 101 102 public Exchange getExchange() { 103 return exchange; 104 } 105 106 public CountDownLatch getLatch() { 107 return latch; 108 } 109 } 110 111 public BacklogDebugger(CamelContext camelContext) { 112 this.camelContext = camelContext; 113 DefaultDebugger debugger = new DefaultDebugger(camelContext); 114 debugger.setUseTracer(false); 115 this.debugger = debugger; 116 } 117 118 @Override 119 @Deprecated 120 public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition, Processor target, Processor nextTarget) throws Exception { 121 throw new UnsupportedOperationException("Deprecated"); 122 } 123 124 /** 125 * A helper method to return the BacklogDebugger instance if one is enabled 126 * 127 * @return the backlog debugger or null if none can be found 128 */ 129 public static BacklogDebugger getBacklogDebugger(CamelContext context) { 130 List<InterceptStrategy> list = context.getInterceptStrategies(); 131 for (InterceptStrategy interceptStrategy : list) { 132 if (interceptStrategy instanceof BacklogDebugger) { 133 return (BacklogDebugger) interceptStrategy; 134 } 135 } 136 return null; 137 } 138 139 public Debugger getDebugger() { 140 return debugger; 141 } 142 143 public String getLoggingLevel() { 144 return loggingLevel.name(); 145 } 146 147 public void setLoggingLevel(String level) { 148 loggingLevel = LoggingLevel.valueOf(level); 149 logger.setLevel(loggingLevel); 150 } 151 152 public void enableDebugger() { 153 logger.log("Enabling debugger"); 154 try { 155 ServiceHelper.startService(debugger); 156 enabled.set(true); 157 } catch (Exception e) { 158 throw ObjectHelper.wrapRuntimeCamelException(e); 159 } 160 } 161 162 public void disableDebugger() { 163 logger.log("Disabling debugger"); 164 try { 165 enabled.set(false); 166 ServiceHelper.stopService(debugger); 167 } catch (Exception e) { 168 // ignore 169 } 170 clearBreakpoints(); 171 } 172 173 public boolean isEnabled() { 174 return enabled.get(); 175 } 176 177 public boolean hasBreakpoint(String nodeId) { 178 return breakpoints.containsKey(nodeId); 179 } 180 181 public boolean isSingleStepMode() { 182 return singleStepExchangeId != null; 183 } 184 185 public void addBreakpoint(String nodeId) { 186 NodeBreakpoint breakpoint = breakpoints.get(nodeId); 187 if (breakpoint == null) { 188 logger.log("Adding breakpoint " + nodeId); 189 breakpoint = new NodeBreakpoint(nodeId, null); 190 breakpoints.put(nodeId, breakpoint); 191 debugger.addBreakpoint(breakpoint, breakpoint); 192 } else { 193 breakpoint.setCondition(null); 194 } 195 } 196 197 public void addConditionalBreakpoint(String nodeId, String language, String predicate) { 198 Predicate condition = camelContext.resolveLanguage(language).createPredicate(predicate); 199 NodeBreakpoint breakpoint = breakpoints.get(nodeId); 200 if (breakpoint == null) { 201 logger.log("Adding conditional breakpoint " + nodeId + " [" + predicate + "]"); 202 breakpoint = new NodeBreakpoint(nodeId, condition); 203 breakpoints.put(nodeId, breakpoint); 204 debugger.addBreakpoint(breakpoint, breakpoint); 205 } else if (breakpoint.getCondition() == null) { 206 logger.log("Updating to conditional breakpoint " + nodeId + " [" + predicate + "]"); 207 debugger.removeBreakpoint(breakpoint); 208 breakpoints.put(nodeId, breakpoint); 209 debugger.addBreakpoint(breakpoint, breakpoint); 210 } else if (breakpoint.getCondition() != null) { 211 logger.log("Updating conditional breakpoint " + nodeId + " [" + predicate + "]"); 212 breakpoint.setCondition(condition); 213 } 214 } 215 216 public void removeBreakpoint(String nodeId) { 217 logger.log("Removing breakpoint " + nodeId); 218 // when removing a break point then ensure latches is cleared and counted down so we wont have hanging threads 219 suspendedBreakpointMessages.remove(nodeId); 220 SuspendedExchange se = suspendedBreakpoints.remove(nodeId); 221 NodeBreakpoint breakpoint = breakpoints.remove(nodeId); 222 if (breakpoint != null) { 223 debugger.removeBreakpoint(breakpoint); 224 } 225 if (se != null) { 226 se.getLatch().countDown(); 227 } 228 } 229 230 public void removeAllBreakpoints() { 231 // stop single stepping 232 singleStepExchangeId = null; 233 234 for (String nodeId : getSuspendedBreakpointNodeIds()) { 235 removeBreakpoint(nodeId); 236 } 237 } 238 239 public Set<String> getBreakpoints() { 240 return new LinkedHashSet<String>(breakpoints.keySet()); 241 } 242 243 public void resumeBreakpoint(String nodeId) { 244 resumeBreakpoint(nodeId, false); 245 } 246 247 private void resumeBreakpoint(String nodeId, boolean stepMode) { 248 logger.log("Resume breakpoint " + nodeId); 249 250 if (!stepMode) { 251 if (singleStepExchangeId != null) { 252 debugger.stopSingleStepExchange(singleStepExchangeId); 253 singleStepExchangeId = null; 254 } 255 } 256 257 // remember to remove the dumped message as its no longer in need 258 suspendedBreakpointMessages.remove(nodeId); 259 SuspendedExchange se = suspendedBreakpoints.remove(nodeId); 260 if (se != null) { 261 se.getLatch().countDown(); 262 } 263 } 264 265 public void setMessageBodyOnBreakpoint(String nodeId, Object body) { 266 SuspendedExchange se = suspendedBreakpoints.get(nodeId); 267 if (se != null) { 268 boolean remove = body == null; 269 if (remove) { 270 removeMessageBodyOnBreakpoint(nodeId); 271 } else { 272 Class oldType; 273 if (se.getExchange().hasOut()) { 274 oldType = se.getExchange().getOut().getBody() != null ? se.getExchange().getOut().getBody().getClass() : null; 275 } else { 276 oldType = se.getExchange().getIn().getBody() != null ? se.getExchange().getIn().getBody().getClass() : null; 277 } 278 setMessageBodyOnBreakpoint(nodeId, body, oldType); 279 } 280 } 281 } 282 283 public void setMessageBodyOnBreakpoint(String nodeId, Object body, Class type) { 284 SuspendedExchange se = suspendedBreakpoints.get(nodeId); 285 if (se != null) { 286 boolean remove = body == null; 287 if (remove) { 288 removeMessageBodyOnBreakpoint(nodeId); 289 } else { 290 logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body); 291 if (se.getExchange().hasOut()) { 292 // preserve type 293 if (type != null) { 294 se.getExchange().getOut().setBody(body, type); 295 } else { 296 se.getExchange().getOut().setBody(body); 297 } 298 } else { 299 if (type != null) { 300 se.getExchange().getIn().setBody(body, type); 301 } else { 302 se.getExchange().getIn().setBody(body); 303 } 304 } 305 } 306 } 307 } 308 309 public void removeMessageBodyOnBreakpoint(String nodeId) { 310 SuspendedExchange se = suspendedBreakpoints.get(nodeId); 311 if (se != null) { 312 logger.log("Breakpoint at node " + nodeId + " is removing message body on exchangeId: " + se.getExchange().getExchangeId()); 313 if (se.getExchange().hasOut()) { 314 se.getExchange().getOut().setBody(null); 315 } else { 316 se.getExchange().getIn().setBody(null); 317 } 318 } 319 } 320 321 public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value) throws NoTypeConversionAvailableException { 322 SuspendedExchange se = suspendedBreakpoints.get(nodeId); 323 if (se != null) { 324 Class oldType; 325 if (se.getExchange().hasOut()) { 326 oldType = se.getExchange().getOut().getHeader(headerName) != null ? se.getExchange().getOut().getHeader(headerName).getClass() : null; 327 } else { 328 oldType = se.getExchange().getIn().getHeader(headerName) != null ? se.getExchange().getIn().getHeader(headerName).getClass() : null; 329 } 330 setMessageHeaderOnBreakpoint(nodeId, headerName, value, oldType); 331 } 332 } 333 334 public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, Class type) throws NoTypeConversionAvailableException { 335 SuspendedExchange se = suspendedBreakpoints.get(nodeId); 336 if (se != null) { 337 logger.log("Breakpoint at node " + nodeId + " is updating message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName + " and value: " + value); 338 if (se.getExchange().hasOut()) { 339 if (type != null) { 340 Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value); 341 se.getExchange().getOut().setHeader(headerName, convertedValue); 342 } else { 343 se.getExchange().getOut().setHeader(headerName, value); 344 } 345 } else { 346 if (type != null) { 347 Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value); 348 se.getExchange().getIn().setHeader(headerName, convertedValue); 349 } else { 350 se.getExchange().getIn().setHeader(headerName, value); 351 } 352 } 353 } 354 } 355 356 public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) { 357 SuspendedExchange se = suspendedBreakpoints.get(nodeId); 358 if (se != null) { 359 logger.log("Breakpoint at node " + nodeId + " is removing message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName); 360 if (se.getExchange().hasOut()) { 361 se.getExchange().getOut().removeHeader(headerName); 362 } else { 363 se.getExchange().getIn().removeHeader(headerName); 364 } 365 } 366 } 367 368 public void resumeAll() { 369 logger.log("Resume all"); 370 // stop single stepping 371 singleStepExchangeId = null; 372 373 for (String node : getSuspendedBreakpointNodeIds()) { 374 // remember to remove the dumped message as its no longer in need 375 suspendedBreakpointMessages.remove(node); 376 SuspendedExchange se = suspendedBreakpoints.remove(node); 377 if (se != null) { 378 se.getLatch().countDown(); 379 } 380 } 381 } 382 383 public void stepBreakpoint(String nodeId) { 384 // if we are already in single step mode, then infer stepping 385 if (isSingleStepMode()) { 386 logger.log("stepBreakpoint " + nodeId + " is already in single step mode, so stepping instead."); 387 step(); 388 } 389 390 logger.log("Step breakpoint " + nodeId); 391 // we want to step current exchange to next 392 BacklogTracerEventMessage msg = suspendedBreakpointMessages.get(nodeId); 393 NodeBreakpoint breakpoint = breakpoints.get(nodeId); 394 if (msg != null && breakpoint != null) { 395 singleStepExchangeId = msg.getExchangeId(); 396 if (debugger.startSingleStepExchange(singleStepExchangeId, new StepBreakpoint())) { 397 // now resume 398 resumeBreakpoint(nodeId, true); 399 } 400 } 401 } 402 403 public void step() { 404 for (String node : getSuspendedBreakpointNodeIds()) { 405 // remember to remove the dumped message as its no longer in need 406 suspendedBreakpointMessages.remove(node); 407 SuspendedExchange se = suspendedBreakpoints.remove(node); 408 if (se != null) { 409 se.getLatch().countDown(); 410 } 411 } 412 } 413 414 public Set<String> getSuspendedBreakpointNodeIds() { 415 return new LinkedHashSet<String>(suspendedBreakpoints.keySet()); 416 } 417 418 public void disableBreakpoint(String nodeId) { 419 logger.log("Disable breakpoint " + nodeId); 420 NodeBreakpoint breakpoint = breakpoints.get(nodeId); 421 if (breakpoint != null) { 422 breakpoint.suspend(); 423 } 424 } 425 426 public void enableBreakpoint(String nodeId) { 427 logger.log("Enable breakpoint " + nodeId); 428 NodeBreakpoint breakpoint = breakpoints.get(nodeId); 429 if (breakpoint != null) { 430 breakpoint.activate(); 431 } 432 } 433 434 public int getBodyMaxChars() { 435 return bodyMaxChars; 436 } 437 438 public void setBodyMaxChars(int bodyMaxChars) { 439 this.bodyMaxChars = bodyMaxChars; 440 } 441 442 public boolean isBodyIncludeStreams() { 443 return bodyIncludeStreams; 444 } 445 446 public void setBodyIncludeStreams(boolean bodyIncludeStreams) { 447 this.bodyIncludeStreams = bodyIncludeStreams; 448 } 449 450 public boolean isBodyIncludeFiles() { 451 return bodyIncludeFiles; 452 } 453 454 public void setBodyIncludeFiles(boolean bodyIncludeFiles) { 455 this.bodyIncludeFiles = bodyIncludeFiles; 456 } 457 458 public String dumpTracedMessagesAsXml(String nodeId) { 459 logger.log("Dump trace message from breakpoint " + nodeId); 460 BacklogTracerEventMessage msg = suspendedBreakpointMessages.get(nodeId); 461 if (msg != null) { 462 return msg.toXml(0); 463 } else { 464 return null; 465 } 466 } 467 468 public long getDebugCounter() { 469 return debugCounter.get(); 470 } 471 472 public void resetDebugCounter() { 473 logger.log("Reset debug counter"); 474 debugCounter.set(0); 475 } 476 477 public boolean beforeProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) { 478 return debugger.beforeProcess(exchange, processor, definition); 479 } 480 481 public boolean afterProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition, long timeTaken) { 482 // noop 483 return false; 484 } 485 486 protected void doStart() throws Exception { 487 // noop 488 } 489 490 protected void doStop() throws Exception { 491 if (enabled.get()) { 492 disableDebugger(); 493 } 494 clearBreakpoints(); 495 } 496 497 private void clearBreakpoints() { 498 // make sure to clear state and latches is counted down so we wont have hanging threads 499 breakpoints.clear(); 500 for (SuspendedExchange se : suspendedBreakpoints.values()) { 501 se.getLatch().countDown(); 502 } 503 suspendedBreakpoints.clear(); 504 suspendedBreakpointMessages.clear(); 505 } 506 507 /** 508 * Represents a {@link org.apache.camel.spi.Breakpoint} that has a {@link Condition} on a specific node id. 509 */ 510 private final class NodeBreakpoint extends BreakpointSupport implements Condition { 511 512 private final String nodeId; 513 private Predicate condition; 514 515 private NodeBreakpoint(String nodeId, Predicate condition) { 516 this.nodeId = nodeId; 517 this.condition = condition; 518 } 519 520 public String getNodeId() { 521 return nodeId; 522 } 523 524 public Predicate getCondition() { 525 return condition; 526 } 527 528 public void setCondition(Predicate predicate) { 529 this.condition = predicate; 530 } 531 532 @Override 533 public void beforeProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) { 534 // store a copy of the message so we can see that from the debugger 535 Date timestamp = new Date(); 536 String toNode = definition.getId(); 537 String routeId = ProcessorDefinitionHelper.getRouteId(definition); 538 String exchangeId = exchange.getExchangeId(); 539 String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), isBodyIncludeFiles(), getBodyMaxChars()); 540 long uid = debugCounter.incrementAndGet(); 541 542 BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage(uid, timestamp, routeId, toNode, exchangeId, messageAsXml); 543 suspendedBreakpointMessages.put(nodeId, msg); 544 545 // suspend at this breakpoint 546 final SuspendedExchange se = suspendedBreakpoints.get(nodeId); 547 if (se != null) { 548 // now wait until we should continue 549 logger.log("NodeBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId()); 550 try { 551 boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS); 552 if (!hit) { 553 logger.log("NodeBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchange.getExchangeId(), LoggingLevel.WARN); 554 } else { 555 logger.log("NodeBreakpoint at node " + toNode + " is continued exchangeId: " + exchange.getExchangeId()); 556 } 557 } catch (InterruptedException e) { 558 // ignore 559 } 560 } 561 } 562 563 @Override 564 public boolean matchProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) { 565 // must match node 566 if (!nodeId.equals(definition.getId())) { 567 return false; 568 } 569 570 // if condition then must match 571 if (condition != null && !condition.matches(exchange)) { 572 return false; 573 } 574 575 // we only want to break one exchange at a time, so if there is already a suspended breakpoint then do not match 576 SuspendedExchange se = new SuspendedExchange(exchange, new CountDownLatch(1)); 577 boolean existing = suspendedBreakpoints.putIfAbsent(nodeId, se) != null; 578 return !existing; 579 } 580 581 @Override 582 public boolean matchEvent(Exchange exchange, EventObject event) { 583 return false; 584 } 585 } 586 587 /** 588 * Represents a {@link org.apache.camel.spi.Breakpoint} that is used during single step mode. 589 */ 590 private final class StepBreakpoint extends BreakpointSupport implements Condition { 591 592 @Override 593 public void beforeProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) { 594 // store a copy of the message so we can see that from the debugger 595 Date timestamp = new Date(); 596 String toNode = definition.getId(); 597 String routeId = ProcessorDefinitionHelper.getRouteId(definition); 598 String exchangeId = exchange.getExchangeId(); 599 String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), isBodyIncludeFiles(), getBodyMaxChars()); 600 long uid = debugCounter.incrementAndGet(); 601 602 BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage(uid, timestamp, routeId, toNode, exchangeId, messageAsXml); 603 suspendedBreakpointMessages.put(toNode, msg); 604 605 // suspend at this breakpoint 606 SuspendedExchange se = new SuspendedExchange(exchange, new CountDownLatch(1)); 607 suspendedBreakpoints.put(toNode, se); 608 609 // now wait until we should continue 610 logger.log("StepBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId()); 611 try { 612 boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS); 613 if (!hit) { 614 logger.log("StepBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchange.getExchangeId(), LoggingLevel.WARN); 615 } else { 616 logger.log("StepBreakpoint at node " + toNode + " is continued exchangeId: " + exchange.getExchangeId()); 617 } 618 } catch (InterruptedException e) { 619 // ignore 620 } 621 } 622 623 @Override 624 public boolean matchProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) { 625 return true; 626 } 627 628 @Override 629 public boolean matchEvent(Exchange exchange, EventObject event) { 630 return event instanceof ExchangeCompletedEvent; 631 } 632 633 @Override 634 public void onEvent(Exchange exchange, EventObject event, ProcessorDefinition<?> definition) { 635 // when the exchange is complete, we need to turn off single step mode if we were debug stepping the exchange 636 if (event instanceof ExchangeCompletedEvent) { 637 String completedId = ((ExchangeCompletedEvent) event).getExchange().getExchangeId(); 638 639 if (singleStepExchangeId != null && singleStepExchangeId.equals(completedId)) { 640 logger.log("ExchangeId: " + completedId + " is completed, so exiting single step mode."); 641 singleStepExchangeId = null; 642 } 643 } 644 } 645 } 646 647 }