001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.interceptor;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import java.util.Queue;
022    import java.util.concurrent.ArrayBlockingQueue;
023    import java.util.concurrent.atomic.AtomicLong;
024    
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.Predicate;
028    import org.apache.camel.Processor;
029    import org.apache.camel.api.management.mbean.BacklogTracerEventMessage;
030    import org.apache.camel.model.ProcessorDefinition;
031    import org.apache.camel.model.ProcessorDefinitionHelper;
032    import org.apache.camel.model.RouteDefinition;
033    import org.apache.camel.spi.InterceptStrategy;
034    import org.apache.camel.support.ServiceSupport;
035    import org.apache.camel.util.EndpointHelper;
036    import org.apache.camel.util.ObjectHelper;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * A tracer used for message tracing, storing a copy of the message details in a backlog.
042     * <p/>
043     * This tracer allows to store message tracers per node in the Camel routes. The tracers
044     * is stored in a backlog queue (FIFO based) which allows to pull the traced messages on demand.
045     */
046    public class BacklogTracer extends ServiceSupport implements InterceptStrategy {
047    
048        // lets limit the tracer to 100 thousand messages in total
049        public static final int MAX_BACKLOG_SIZE = 100 * 1000;
050        private static final Logger LOG = LoggerFactory.getLogger(BacklogTracer.class);
051        private final CamelContext camelContext;
052        private boolean enabled;
053        private final AtomicLong traceCounter = new AtomicLong(0);
054        // use a queue with a upper limit to avoid storing too many messages
055        private final Queue<DefaultBacklogTracerEventMessage> queue = new ArrayBlockingQueue<DefaultBacklogTracerEventMessage>(MAX_BACKLOG_SIZE);
056        // how many of the last messages to keep in the backlog at total
057        private int backlogSize = 1000;
058        private boolean removeOnDump = true;
059        private int bodyMaxChars = 128 * 1024;
060        private boolean bodyIncludeStreams;
061        private boolean bodyIncludeFiles = true;
062        // a pattern to filter tracing nodes
063        private String tracePattern;
064        private String[] patterns;
065        private String traceFilter;
066        private Predicate predicate;
067    
068        public BacklogTracer(CamelContext camelContext) {
069            this.camelContext = camelContext;
070        }
071    
072        public Queue<DefaultBacklogTracerEventMessage> getQueue() {
073            return queue;
074        }
075    
076        @Override
077        @Deprecated
078        public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition, Processor target, Processor nextTarget) throws Exception {
079            throw new UnsupportedOperationException("Deprecated");
080        }
081    
082        /**
083         * Creates a new backlog tracer.
084         *
085         * @param context Camel context
086         * @return a new backlog tracer
087         */
088        public static BacklogTracer createTracer(CamelContext context) {
089            BacklogTracer tracer = new BacklogTracer(context);
090            return tracer;
091        }
092    
093        /**
094         * A helper method to return the BacklogTracer instance if one is enabled
095         *
096         * @return the backlog tracer or null if none can be found
097         */
098        public static BacklogTracer getBacklogTracer(CamelContext context) {
099            List<InterceptStrategy> list = context.getInterceptStrategies();
100            for (InterceptStrategy interceptStrategy : list) {
101                if (interceptStrategy instanceof BacklogTracer) {
102                    return (BacklogTracer) interceptStrategy;
103                }
104            }
105            return null;
106        }
107    
108        /**
109         * Whether or not to trace the given processor definition.
110         *
111         * @param definition the processor definition
112         * @param exchange   the exchange
113         * @return <tt>true</tt> to trace, <tt>false</tt> to skip tracing
114         */
115        public boolean shouldTrace(ProcessorDefinition<?> definition, Exchange exchange) {
116            if (!enabled) {
117                return false;
118            }
119    
120            boolean pattern = true;
121            boolean filter = true;
122    
123            if (patterns != null) {
124                pattern = shouldTracePattern(definition);
125            }
126            if (predicate != null) {
127                filter = shouldTraceFilter(exchange);
128            }
129    
130            if (LOG.isTraceEnabled()) {
131                LOG.trace("Should trace evaluated {} -> pattern: {}, filter: {}", new Object[]{definition.getId(), pattern, filter});
132            }
133            return pattern && filter;
134        }
135    
136        private boolean shouldTracePattern(ProcessorDefinition<?> definition) {
137            for (String pattern : patterns) {
138                // match either route id, or node id
139                String id = definition.getId();
140                // use matchPattern method from endpoint helper that has a good matcher we use in Camel
141                if (EndpointHelper.matchPattern(id, pattern)) {
142                    return true;
143                }
144                RouteDefinition route = ProcessorDefinitionHelper.getRoute(definition);
145                if (route != null) {
146                    id = route.getId();
147                    if (EndpointHelper.matchPattern(id, pattern)) {
148                        return true;
149                    }
150                }
151            }
152            // not matched the pattern
153            return false;
154        }
155    
156        private boolean shouldTraceFilter(Exchange exchange) {
157            return predicate.matches(exchange);
158        }
159    
160        public boolean isEnabled() {
161            return enabled;
162        }
163    
164        public void setEnabled(boolean enabled) {
165            this.enabled = enabled;
166        }
167    
168        public int getBacklogSize() {
169            return backlogSize;
170        }
171    
172        public void setBacklogSize(int backlogSize) {
173            if (backlogSize <= 0) {
174                throw new IllegalArgumentException("The backlog size must be a positive number, was: " + backlogSize);
175            }
176            if (backlogSize > MAX_BACKLOG_SIZE) {
177                throw new IllegalArgumentException("The backlog size cannot be greater than the max size of " + MAX_BACKLOG_SIZE + ", was: " + backlogSize);
178            }
179            this.backlogSize = backlogSize;
180        }
181    
182        public boolean isRemoveOnDump() {
183            return removeOnDump;
184        }
185    
186        public void setRemoveOnDump(boolean removeOnDump) {
187            this.removeOnDump = removeOnDump;
188        }
189    
190        public int getBodyMaxChars() {
191            return bodyMaxChars;
192        }
193    
194        public void setBodyMaxChars(int bodyMaxChars) {
195            this.bodyMaxChars = bodyMaxChars;
196        }
197    
198        public boolean isBodyIncludeStreams() {
199            return bodyIncludeStreams;
200        }
201    
202        public void setBodyIncludeStreams(boolean bodyIncludeStreams) {
203            this.bodyIncludeStreams = bodyIncludeStreams;
204        }
205    
206        public boolean isBodyIncludeFiles() {
207            return bodyIncludeFiles;
208        }
209    
210        public void setBodyIncludeFiles(boolean bodyIncludeFiles) {
211            this.bodyIncludeFiles = bodyIncludeFiles;
212        }
213    
214        public String getTracePattern() {
215            return tracePattern;
216        }
217    
218        public void setTracePattern(String tracePattern) {
219            this.tracePattern = tracePattern;
220            if (tracePattern != null) {
221                // the pattern can have multiple nodes separated by comma
222                this.patterns = tracePattern.split(",");
223            } else {
224                this.patterns = null;
225            }
226        }
227    
228        public String getTraceFilter() {
229            return traceFilter;
230        }
231    
232        public void setTraceFilter(String filter) {
233            this.traceFilter = filter;
234            if (filter != null) {
235                // assume simple language
236                String name = ObjectHelper.before(filter, ":");
237                if (name == null) {
238                    // use simple language by default
239                    name = "simple";
240                }
241                predicate = camelContext.resolveLanguage(name).createPredicate(filter);
242            }
243        }
244    
245        public long getTraceCounter() {
246            return traceCounter.get();
247        }
248    
249        public void resetTraceCounter() {
250            traceCounter.set(0);
251        }
252    
253        public List<BacklogTracerEventMessage> dumpTracedMessages(String nodeId) {
254            List<BacklogTracerEventMessage> answer = new ArrayList<BacklogTracerEventMessage>();
255            if (nodeId != null) {
256                for (DefaultBacklogTracerEventMessage message : queue) {
257                    if (nodeId.equals(message.getToNode()) || nodeId.equals(message.getRouteId())) {
258                        answer.add(message);
259                    }
260                }
261            }
262    
263            if (removeOnDump) {
264                queue.removeAll(answer);
265            }
266    
267            return answer;
268        }
269    
270        public String dumpTracedMessagesAsXml(String nodeId) {
271            List<BacklogTracerEventMessage> events = dumpTracedMessages(nodeId);
272    
273            StringBuilder sb = new StringBuilder();
274            sb.append("<").append(BacklogTracerEventMessage.ROOT_TAG).append("s>");
275            for (BacklogTracerEventMessage event : events) {
276                sb.append("\n").append(event.toXml(2));
277            }
278            sb.append("\n</").append(BacklogTracerEventMessage.ROOT_TAG).append("s>");
279            return sb.toString();
280        }
281    
282        public List<BacklogTracerEventMessage> dumpAllTracedMessages() {
283            List<BacklogTracerEventMessage> answer = new ArrayList<BacklogTracerEventMessage>();
284            answer.addAll(queue);
285            if (isRemoveOnDump()) {
286                queue.clear();
287            }
288            return answer;
289        }
290    
291        public String dumpAllTracedMessagesAsXml() {
292            List<BacklogTracerEventMessage> events = dumpAllTracedMessages();
293    
294            StringBuilder sb = new StringBuilder();
295            sb.append("<").append(BacklogTracerEventMessage.ROOT_TAG).append("s>");
296            for (BacklogTracerEventMessage event : events) {
297                sb.append("\n").append(event.toXml(2));
298            }
299            sb.append("\n</").append(BacklogTracerEventMessage.ROOT_TAG).append("s>");
300            return sb.toString();
301        }
302    
303        public void clear() {
304            queue.clear();
305        }
306    
307        public long incrementTraceCounter() {
308            return traceCounter.incrementAndGet();
309        }
310    
311        @Override
312        protected void doStart() throws Exception {
313        }
314    
315        @Override
316        protected void doStop() throws Exception {
317            queue.clear();
318        }
319    
320    }