001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.management.mbean;
018    
019    import java.util.Date;
020    import java.util.HashMap;
021    import java.util.Map;
022    import java.util.concurrent.atomic.AtomicLong;
023    import javax.management.Notification;
024    
025    import org.apache.camel.Exchange;
026    import org.apache.camel.Message;
027    import org.apache.camel.Processor;
028    import org.apache.camel.Traceable;
029    import org.apache.camel.api.management.NotificationSender;
030    import org.apache.camel.api.management.NotificationSenderAware;
031    import org.apache.camel.model.ProcessorDefinition;
032    import org.apache.camel.processor.interceptor.TraceEventHandler;
033    import org.apache.camel.processor.interceptor.TraceInterceptor;
034    import org.apache.camel.processor.interceptor.Tracer;
035    import org.apache.camel.util.MessageHelper;
036    
037    public final class JMXNotificationTraceEventHandler implements TraceEventHandler, NotificationSenderAware {
038        private static final int MAX_MESSAGE_LENGTH = 60;
039        private final AtomicLong num = new AtomicLong();
040        private final Tracer tracer;
041        private NotificationSender notificationSender;
042    
043        public JMXNotificationTraceEventHandler(Tracer tracer) {
044            this.tracer = tracer;
045        }
046    
047        public void traceExchangeOut(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange, Object traceState) throws Exception {
048            // We do nothing here
049        }
050    
051        public Object traceExchangeIn(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception {
052            // Just trace the exchange as usual
053            traceExchange(node, target, traceInterceptor, exchange);
054            return null;
055        }
056    
057        public void traceExchange(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception {
058            if (notificationSender != null && tracer.isJmxTraceNotifications()) {
059                String body = MessageHelper.extractBodyForLogging(exchange.getIn(), "", false, true, tracer.getTraceBodySize());
060                
061                if (body == null) {
062                    body = "";
063                }
064                String message = body.substring(0, Math.min(body.length(), MAX_MESSAGE_LENGTH));
065                Map<String, Object> tm = createTraceMessage(node, exchange, body);
066    
067                Notification notification = new Notification("TraceNotification", exchange.toString(), num.getAndIncrement(), System.currentTimeMillis(), message);
068                notification.setUserData(tm);
069    
070                notificationSender.sendNotification(notification);
071            }
072    
073        }
074    
075        private Map<String, Object> createTraceMessage(ProcessorDefinition<?> node, Exchange exchange, String body) {
076            Map<String, Object> mi = new HashMap<String, Object>();
077            mi.put("ExchangeId", exchange.getExchangeId());
078            mi.put("EndpointURI", getEndpointUri(node));
079            mi.put("TimeStamp", new Date(System.currentTimeMillis()));
080            mi.put("Body", body);
081    
082            Message message = exchange.getIn();
083            Map<String, Object> sHeaders = message.getHeaders();
084            Map<String, Object> sProperties = exchange.getProperties();
085    
086            Map<String, String> headers = new HashMap<String, String>();
087            for (String key : sHeaders.keySet()) {
088                headers.put(key, message.getHeader(key, String.class));
089            }
090            mi.put("Headers", headers);
091    
092            Map<String, String> properties = new HashMap<String, String>();
093            for (String key : sProperties.keySet()) {
094                properties.put(key, exchange.getProperty(key, String.class));
095            }
096            mi.put("Properties", properties);
097            return mi;
098        }
099    
100        private String getEndpointUri(ProcessorDefinition<?> node) {
101            if (node instanceof Traceable) {
102                Traceable tr = (Traceable)node;
103                return tr.getTraceLabel();
104            } else {
105                return node.getLabel();
106            }
107        }
108    
109        @Override
110        public void setNotificationSender(NotificationSender sender) {
111            this.notificationSender = sender;
112        }
113    
114    }