001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.interceptor;
018    
019    import java.util.Date;
020    import java.util.EventObject;
021    import java.util.LinkedHashSet;
022    import java.util.List;
023    import java.util.Set;
024    import java.util.concurrent.ConcurrentHashMap;
025    import java.util.concurrent.ConcurrentMap;
026    import java.util.concurrent.CountDownLatch;
027    import java.util.concurrent.TimeUnit;
028    import java.util.concurrent.atomic.AtomicBoolean;
029    import java.util.concurrent.atomic.AtomicLong;
030    
031    import org.apache.camel.CamelContext;
032    import org.apache.camel.Exchange;
033    import org.apache.camel.LoggingLevel;
034    import org.apache.camel.NoTypeConversionAvailableException;
035    import org.apache.camel.Predicate;
036    import org.apache.camel.Processor;
037    import org.apache.camel.api.management.mbean.BacklogTracerEventMessage;
038    import org.apache.camel.impl.BreakpointSupport;
039    import org.apache.camel.impl.DefaultDebugger;
040    import org.apache.camel.management.event.ExchangeCompletedEvent;
041    import org.apache.camel.model.ProcessorDefinition;
042    import org.apache.camel.model.ProcessorDefinitionHelper;
043    import org.apache.camel.spi.Condition;
044    import org.apache.camel.spi.Debugger;
045    import org.apache.camel.spi.InterceptStrategy;
046    import org.apache.camel.support.ServiceSupport;
047    import org.apache.camel.util.CamelLogger;
048    import org.apache.camel.util.MessageHelper;
049    import org.apache.camel.util.ObjectHelper;
050    import org.apache.camel.util.ServiceHelper;
051    import org.slf4j.Logger;
052    import org.slf4j.LoggerFactory;
053    
054    /**
055     * A {@link org.apache.camel.spi.Debugger} that has easy debugging functionality which
056     * can be used from JMX with {@link org.apache.camel.api.management.mbean.ManagedBacklogDebuggerMBean}.
057     * <p/>
058     * This implementation allows to set breakpoints (with or without a condition) and inspect the {@link Exchange}
059     * dumped in XML in {@link BacklogTracerEventMessage} format. There is operations to resume suspended breakpoints
060     * to continue routing the {@link Exchange}. There is also step functionality so you can single step a given
061     * {@link Exchange}.
062     * <p/>
063     * This implementation will only break the first {@link Exchange} that arrives to a breakpoint. If Camel routes using
064     * concurrency then sub-sequent {@link Exchange} will continue to be routed, if there breakpoint already holds a
065     * suspended {@link Exchange}.
066     */
067    public class BacklogDebugger extends ServiceSupport implements InterceptStrategy {
068    
069        private static final Logger LOG = LoggerFactory.getLogger(BacklogDebugger.class);
070    
071        private long fallbackTimeout = 300;
072        private final CamelContext camelContext;
073        private LoggingLevel loggingLevel = LoggingLevel.INFO;
074        private final CamelLogger logger = new CamelLogger(LOG, loggingLevel);
075        private final AtomicBoolean enabled = new AtomicBoolean();
076        private final AtomicLong debugCounter = new AtomicLong(0);
077        private final Debugger debugger;
078        private final ConcurrentMap<String, NodeBreakpoint> breakpoints = new ConcurrentHashMap<String, NodeBreakpoint>();
079        private final ConcurrentMap<String, SuspendedExchange> suspendedBreakpoints = new ConcurrentHashMap<String, SuspendedExchange>();
080        private final ConcurrentMap<String, BacklogTracerEventMessage> suspendedBreakpointMessages = new ConcurrentHashMap<String, BacklogTracerEventMessage>();
081        private volatile String singleStepExchangeId;
082        private int bodyMaxChars = 128 * 1024;
083        private boolean bodyIncludeStreams;
084        private boolean bodyIncludeFiles = true;
085    
086        /**
087         * A suspend {@link Exchange} at a breakpoint.
088         */
089        private static final class SuspendedExchange {
090            private final Exchange exchange;
091            private final CountDownLatch latch;
092    
093            /**
094             * @param exchange the suspend exchange
095             * @param latch    the latch to use to continue routing the exchange
096             */
097            private SuspendedExchange(Exchange exchange, CountDownLatch latch) {
098                this.exchange = exchange;
099                this.latch = latch;
100            }
101    
102            public Exchange getExchange() {
103                return exchange;
104            }
105    
106            public CountDownLatch getLatch() {
107                return latch;
108            }
109        }
110    
111        public BacklogDebugger(CamelContext camelContext) {
112            this.camelContext = camelContext;
113            DefaultDebugger debugger = new DefaultDebugger(camelContext);
114            debugger.setUseTracer(false);
115            this.debugger = debugger;
116        }
117    
118        @Override
119        @Deprecated
120        public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition, Processor target, Processor nextTarget) throws Exception {
121            throw new UnsupportedOperationException("Deprecated");
122        }
123    
124        /**
125         * A helper method to return the BacklogDebugger instance if one is enabled
126         *
127         * @return the backlog debugger or null if none can be found
128         */
129        public static BacklogDebugger getBacklogDebugger(CamelContext context) {
130            List<InterceptStrategy> list = context.getInterceptStrategies();
131            for (InterceptStrategy interceptStrategy : list) {
132                if (interceptStrategy instanceof BacklogDebugger) {
133                    return (BacklogDebugger) interceptStrategy;
134                }
135            }
136            return null;
137        }
138    
139        public Debugger getDebugger() {
140            return debugger;
141        }
142    
143        public String getLoggingLevel() {
144            return loggingLevel.name();
145        }
146    
147        public void setLoggingLevel(String level) {
148            loggingLevel = LoggingLevel.valueOf(level);
149            logger.setLevel(loggingLevel);
150        }
151    
152        public void enableDebugger() {
153            logger.log("Enabling debugger");
154            try {
155                ServiceHelper.startService(debugger);
156                enabled.set(true);
157            } catch (Exception e) {
158                throw ObjectHelper.wrapRuntimeCamelException(e);
159            }
160        }
161    
162        public void disableDebugger() {
163            logger.log("Disabling debugger");
164            try {
165                enabled.set(false);
166                ServiceHelper.stopService(debugger);
167            } catch (Exception e) {
168                // ignore
169            }
170            clearBreakpoints();
171        }
172    
173        public boolean isEnabled() {
174            return enabled.get();
175        }
176    
177        public boolean hasBreakpoint(String nodeId) {
178            return breakpoints.containsKey(nodeId);
179        }
180    
181        public boolean isSingleStepMode() {
182            return singleStepExchangeId != null;
183        }
184    
185        public void addBreakpoint(String nodeId) {
186            NodeBreakpoint breakpoint = breakpoints.get(nodeId);
187            if (breakpoint == null) {
188                logger.log("Adding breakpoint " + nodeId);
189                breakpoint = new NodeBreakpoint(nodeId, null);
190                breakpoints.put(nodeId, breakpoint);
191                debugger.addBreakpoint(breakpoint, breakpoint);
192            } else {
193                breakpoint.setCondition(null);
194            }
195        }
196    
197        public void addConditionalBreakpoint(String nodeId, String language, String predicate) {
198            Predicate condition = camelContext.resolveLanguage(language).createPredicate(predicate);
199            NodeBreakpoint breakpoint = breakpoints.get(nodeId);
200            if (breakpoint == null) {
201                logger.log("Adding conditional breakpoint " + nodeId + " [" + predicate + "]");
202                breakpoint = new NodeBreakpoint(nodeId, condition);
203                breakpoints.put(nodeId, breakpoint);
204                debugger.addBreakpoint(breakpoint, breakpoint);
205            } else if (breakpoint.getCondition() == null) {
206                logger.log("Updating to conditional breakpoint " + nodeId + " [" + predicate + "]");
207                debugger.removeBreakpoint(breakpoint);
208                breakpoints.put(nodeId, breakpoint);
209                debugger.addBreakpoint(breakpoint, breakpoint);
210            } else if (breakpoint.getCondition() != null) {
211                logger.log("Updating conditional breakpoint " + nodeId + " [" + predicate + "]");
212                breakpoint.setCondition(condition);
213            }
214        }
215    
216        public void removeBreakpoint(String nodeId) {
217            logger.log("Removing breakpoint " + nodeId);
218            // when removing a break point then ensure latches is cleared and counted down so we wont have hanging threads
219            suspendedBreakpointMessages.remove(nodeId);
220            SuspendedExchange se = suspendedBreakpoints.remove(nodeId);
221            NodeBreakpoint breakpoint = breakpoints.remove(nodeId);
222            if (breakpoint != null) {
223                debugger.removeBreakpoint(breakpoint);
224            }
225            if (se != null) {
226                se.getLatch().countDown();
227            }
228        }
229    
230        public void removeAllBreakpoints() {
231            // stop single stepping
232            singleStepExchangeId = null;
233    
234            for (String nodeId : getSuspendedBreakpointNodeIds()) {
235                removeBreakpoint(nodeId);
236            }
237        }
238    
239        public Set<String> getBreakpoints() {
240            return new LinkedHashSet<String>(breakpoints.keySet());
241        }
242    
243        public void resumeBreakpoint(String nodeId) {
244            resumeBreakpoint(nodeId, false);
245        }
246    
247        private void resumeBreakpoint(String nodeId, boolean stepMode) {
248            logger.log("Resume breakpoint " + nodeId);
249    
250            if (!stepMode) {
251                if (singleStepExchangeId != null) {
252                    debugger.stopSingleStepExchange(singleStepExchangeId);
253                    singleStepExchangeId = null;
254                }
255            }
256    
257            // remember to remove the dumped message as its no longer in need
258            suspendedBreakpointMessages.remove(nodeId);
259            SuspendedExchange se = suspendedBreakpoints.remove(nodeId);
260            if (se != null) {
261                se.getLatch().countDown();
262            }
263        }
264    
265        public void setMessageBodyOnBreakpoint(String nodeId, Object body) {
266            SuspendedExchange se = suspendedBreakpoints.get(nodeId);
267            if (se != null) {
268                boolean remove = body == null;
269                if (remove) {
270                    removeMessageBodyOnBreakpoint(nodeId);
271                } else {
272                    Class oldType;
273                    if (se.getExchange().hasOut()) {
274                        oldType = se.getExchange().getOut().getBody() != null ? se.getExchange().getOut().getBody().getClass() : null;
275                    } else {
276                        oldType = se.getExchange().getIn().getBody() != null ? se.getExchange().getIn().getBody().getClass() : null;
277                    }
278                    setMessageBodyOnBreakpoint(nodeId, body, oldType);
279                }
280            }
281        }
282    
283        public void setMessageBodyOnBreakpoint(String nodeId, Object body, Class type) {
284            SuspendedExchange se = suspendedBreakpoints.get(nodeId);
285            if (se != null) {
286                boolean remove = body == null;
287                if (remove) {
288                    removeMessageBodyOnBreakpoint(nodeId);
289                } else {
290                    logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body);
291                    if (se.getExchange().hasOut()) {
292                        // preserve type
293                        if (type != null) {
294                            se.getExchange().getOut().setBody(body, type);
295                        } else {
296                            se.getExchange().getOut().setBody(body);
297                        }
298                    } else {
299                        if (type != null) {
300                            se.getExchange().getIn().setBody(body, type);
301                        } else {
302                            se.getExchange().getIn().setBody(body);
303                        }
304                    }
305                }
306            }
307        }
308    
309        public void removeMessageBodyOnBreakpoint(String nodeId) {
310            SuspendedExchange se = suspendedBreakpoints.get(nodeId);
311            if (se != null) {
312                logger.log("Breakpoint at node " + nodeId + " is removing message body on exchangeId: " + se.getExchange().getExchangeId());
313                if (se.getExchange().hasOut()) {
314                    se.getExchange().getOut().setBody(null);
315                } else {
316                    se.getExchange().getIn().setBody(null);
317                }
318            }
319        }
320    
321        public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value) throws NoTypeConversionAvailableException {
322            SuspendedExchange se = suspendedBreakpoints.get(nodeId);
323            if (se != null) {
324                Class oldType;
325                if (se.getExchange().hasOut()) {
326                    oldType = se.getExchange().getOut().getHeader(headerName) != null ? se.getExchange().getOut().getHeader(headerName).getClass() : null;
327                } else {
328                    oldType = se.getExchange().getIn().getHeader(headerName) != null ? se.getExchange().getIn().getHeader(headerName).getClass() : null;
329                }
330                setMessageHeaderOnBreakpoint(nodeId, headerName, value, oldType);
331            }
332        }
333    
334        public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, Class type) throws NoTypeConversionAvailableException {
335            SuspendedExchange se = suspendedBreakpoints.get(nodeId);
336            if (se != null) {
337                logger.log("Breakpoint at node " + nodeId + " is updating message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName + " and value: " + value);
338                if (se.getExchange().hasOut()) {
339                    if (type != null) {
340                        Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value);
341                        se.getExchange().getOut().setHeader(headerName, convertedValue);
342                    } else {
343                        se.getExchange().getOut().setHeader(headerName, value);
344                    }
345                } else {
346                    if (type != null) {
347                        Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value);
348                        se.getExchange().getIn().setHeader(headerName, convertedValue);
349                    } else {
350                        se.getExchange().getIn().setHeader(headerName, value);
351                    }
352                }
353            }
354        }
355    
356        public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) {
357            SuspendedExchange se = suspendedBreakpoints.get(nodeId);
358            if (se != null) {
359                logger.log("Breakpoint at node " + nodeId + " is removing message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName);
360                if (se.getExchange().hasOut()) {
361                    se.getExchange().getOut().removeHeader(headerName);
362                } else {
363                    se.getExchange().getIn().removeHeader(headerName);
364                }
365            }
366        }
367    
368        public void resumeAll() {
369            logger.log("Resume all");
370            // stop single stepping
371            singleStepExchangeId = null;
372    
373            for (String node : getSuspendedBreakpointNodeIds()) {
374                // remember to remove the dumped message as its no longer in need
375                suspendedBreakpointMessages.remove(node);
376                SuspendedExchange se = suspendedBreakpoints.remove(node);
377                if (se != null) {
378                    se.getLatch().countDown();
379                }
380            }
381        }
382    
383        public void stepBreakpoint(String nodeId) {
384            // if we are already in single step mode, then infer stepping
385            if (isSingleStepMode()) {
386                logger.log("stepBreakpoint " + nodeId + " is already in single step mode, so stepping instead.");
387                step();
388            }
389    
390            logger.log("Step breakpoint " + nodeId);
391            // we want to step current exchange to next
392            BacklogTracerEventMessage msg = suspendedBreakpointMessages.get(nodeId);
393            NodeBreakpoint breakpoint = breakpoints.get(nodeId);
394            if (msg != null && breakpoint != null) {
395                singleStepExchangeId = msg.getExchangeId();
396                if (debugger.startSingleStepExchange(singleStepExchangeId, new StepBreakpoint())) {
397                    // now resume
398                    resumeBreakpoint(nodeId, true);
399                }
400            }
401        }
402    
403        public void step() {
404            for (String node : getSuspendedBreakpointNodeIds()) {
405                // remember to remove the dumped message as its no longer in need
406                suspendedBreakpointMessages.remove(node);
407                SuspendedExchange se = suspendedBreakpoints.remove(node);
408                if (se != null) {
409                    se.getLatch().countDown();
410                }
411            }
412        }
413    
414        public Set<String> getSuspendedBreakpointNodeIds() {
415            return new LinkedHashSet<String>(suspendedBreakpoints.keySet());
416        }
417    
418        public void disableBreakpoint(String nodeId) {
419            logger.log("Disable breakpoint " + nodeId);
420            NodeBreakpoint breakpoint = breakpoints.get(nodeId);
421            if (breakpoint != null) {
422                breakpoint.suspend();
423            }
424        }
425    
426        public void enableBreakpoint(String nodeId) {
427            logger.log("Enable breakpoint " + nodeId);
428            NodeBreakpoint breakpoint = breakpoints.get(nodeId);
429            if (breakpoint != null) {
430                breakpoint.activate();
431            }
432        }
433    
434        public int getBodyMaxChars() {
435            return bodyMaxChars;
436        }
437    
438        public void setBodyMaxChars(int bodyMaxChars) {
439            this.bodyMaxChars = bodyMaxChars;
440        }
441    
442        public boolean isBodyIncludeStreams() {
443            return bodyIncludeStreams;
444        }
445    
446        public void setBodyIncludeStreams(boolean bodyIncludeStreams) {
447            this.bodyIncludeStreams = bodyIncludeStreams;
448        }
449    
450        public boolean isBodyIncludeFiles() {
451            return bodyIncludeFiles;
452        }
453    
454        public void setBodyIncludeFiles(boolean bodyIncludeFiles) {
455            this.bodyIncludeFiles = bodyIncludeFiles;
456        }
457    
458        public String dumpTracedMessagesAsXml(String nodeId) {
459            logger.log("Dump trace message from breakpoint " + nodeId);
460            BacklogTracerEventMessage msg = suspendedBreakpointMessages.get(nodeId);
461            if (msg != null) {
462                return msg.toXml(0);
463            } else {
464                return null;
465            }
466        }
467    
468        public long getDebugCounter() {
469            return debugCounter.get();
470        }
471    
472        public void resetDebugCounter() {
473            logger.log("Reset debug counter");
474            debugCounter.set(0);
475        }
476    
477        public boolean beforeProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) {
478            return debugger.beforeProcess(exchange, processor, definition);
479        }
480    
481        public boolean afterProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition, long timeTaken) {
482            // noop
483            return false;
484        }
485    
486        protected void doStart() throws Exception {
487            // noop
488        }
489    
490        protected void doStop() throws Exception {
491            if (enabled.get()) {
492                disableDebugger();
493            }
494            clearBreakpoints();
495        }
496    
497        private void clearBreakpoints() {
498            // make sure to clear state and latches is counted down so we wont have hanging threads
499            breakpoints.clear();
500            for (SuspendedExchange se : suspendedBreakpoints.values()) {
501                se.getLatch().countDown();
502            }
503            suspendedBreakpoints.clear();
504            suspendedBreakpointMessages.clear();
505        }
506    
507        /**
508         * Represents a {@link org.apache.camel.spi.Breakpoint} that has a {@link Condition} on a specific node id.
509         */
510        private final class NodeBreakpoint extends BreakpointSupport implements Condition {
511    
512            private final String nodeId;
513            private Predicate condition;
514    
515            private NodeBreakpoint(String nodeId, Predicate condition) {
516                this.nodeId = nodeId;
517                this.condition = condition;
518            }
519    
520            public String getNodeId() {
521                return nodeId;
522            }
523    
524            public Predicate getCondition() {
525                return condition;
526            }
527    
528            public void setCondition(Predicate predicate) {
529                this.condition = predicate;
530            }
531    
532            @Override
533            public void beforeProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) {
534                // store a copy of the message so we can see that from the debugger
535                Date timestamp = new Date();
536                String toNode = definition.getId();
537                String routeId = ProcessorDefinitionHelper.getRouteId(definition);
538                String exchangeId = exchange.getExchangeId();
539                String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), isBodyIncludeFiles(), getBodyMaxChars());
540                long uid = debugCounter.incrementAndGet();
541    
542                BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage(uid, timestamp, routeId, toNode, exchangeId, messageAsXml);
543                suspendedBreakpointMessages.put(nodeId, msg);
544    
545                // suspend at this breakpoint
546                final SuspendedExchange se = suspendedBreakpoints.get(nodeId);
547                if (se != null) {
548                    // now wait until we should continue
549                    logger.log("NodeBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId());
550                    try {
551                        boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS);
552                        if (!hit) {
553                            logger.log("NodeBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchange.getExchangeId(), LoggingLevel.WARN);
554                        } else {
555                            logger.log("NodeBreakpoint at node " + toNode + " is continued exchangeId: " + exchange.getExchangeId());
556                        }
557                    } catch (InterruptedException e) {
558                        // ignore
559                    }
560                }
561            }
562    
563            @Override
564            public boolean matchProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) {
565                // must match node
566                if (!nodeId.equals(definition.getId())) {
567                    return false;
568                }
569    
570                // if condition then must match
571                if (condition != null && !condition.matches(exchange)) {
572                    return false;
573                }
574    
575                // we only want to break one exchange at a time, so if there is already a suspended breakpoint then do not match
576                SuspendedExchange se = new SuspendedExchange(exchange, new CountDownLatch(1));
577                boolean existing = suspendedBreakpoints.putIfAbsent(nodeId, se) != null;
578                return !existing;
579            }
580    
581            @Override
582            public boolean matchEvent(Exchange exchange, EventObject event) {
583                return false;
584            }
585        }
586    
587        /**
588         * Represents a {@link org.apache.camel.spi.Breakpoint} that is used during single step mode.
589         */
590        private final class StepBreakpoint extends BreakpointSupport implements Condition {
591    
592            @Override
593            public void beforeProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) {
594                // store a copy of the message so we can see that from the debugger
595                Date timestamp = new Date();
596                String toNode = definition.getId();
597                String routeId = ProcessorDefinitionHelper.getRouteId(definition);
598                String exchangeId = exchange.getExchangeId();
599                String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), isBodyIncludeFiles(), getBodyMaxChars());
600                long uid = debugCounter.incrementAndGet();
601    
602                BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage(uid, timestamp, routeId, toNode, exchangeId, messageAsXml);
603                suspendedBreakpointMessages.put(toNode, msg);
604    
605                // suspend at this breakpoint
606                SuspendedExchange se = new SuspendedExchange(exchange, new CountDownLatch(1));
607                suspendedBreakpoints.put(toNode, se);
608    
609                // now wait until we should continue
610                logger.log("StepBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId());
611                try {
612                    boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS);
613                    if (!hit) {
614                        logger.log("StepBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchange.getExchangeId(), LoggingLevel.WARN);
615                    } else {
616                        logger.log("StepBreakpoint at node " + toNode + " is continued exchangeId: " + exchange.getExchangeId());
617                    }
618                } catch (InterruptedException e) {
619                    // ignore
620                }
621            }
622    
623            @Override
624            public boolean matchProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) {
625                return true;
626            }
627    
628            @Override
629            public boolean matchEvent(Exchange exchange, EventObject event) {
630                return event instanceof ExchangeCompletedEvent;
631            }
632    
633            @Override
634            public void onEvent(Exchange exchange, EventObject event, ProcessorDefinition<?> definition) {
635                // when the exchange is complete, we need to turn off single step mode if we were debug stepping the exchange
636                if (event instanceof ExchangeCompletedEvent) {
637                    String completedId = ((ExchangeCompletedEvent) event).getExchange().getExchangeId();
638    
639                    if (singleStepExchangeId != null && singleStepExchangeId.equals(completedId)) {
640                        logger.log("ExchangeId: " + completedId + " is completed, so exiting single step mode.");
641                        singleStepExchangeId = null;
642                    }
643                }
644            }
645        }
646    
647    }