001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import org.apache.camel.AsyncCallback;
020    import org.apache.camel.Exchange;
021    import org.apache.camel.Processor;
022    import org.apache.camel.spi.RouteContext;
023    import org.apache.camel.spi.UnitOfWork;
024    import org.slf4j.Logger;
025    import org.slf4j.LoggerFactory;
026    import org.slf4j.MDC;
027    
028    /**
029     * This unit of work supports <a href="http://www.slf4j.org/api/org/slf4j/MDC.html">MDC</a>.
030     *
031     * @version 
032     */
033    public class MDCUnitOfWork extends DefaultUnitOfWork {
034    
035        public static final String MDC_BREADCRUMB_ID = "camel.breadcrumbId";
036        public static final String MDC_EXCHANGE_ID = "camel.exchangeId";
037        public static final String MDC_MESSAGE_ID = "camel.messageId";
038        public static final String MDC_CORRELATION_ID = "camel.correlationId";
039        public static final String MDC_ROUTE_ID = "camel.routeId";
040        public static final String MDC_CAMEL_CONTEXT_ID = "camel.contextId";
041        public static final String MDC_TRANSACTION_KEY = "camel.transactionKey";
042    
043        private static final Logger LOG = LoggerFactory.getLogger(MDCUnitOfWork.class);
044    
045        private final String originalBreadcrumbId;
046        private final String originalExchangeId;
047        private final String originalMessageId;
048        private final String originalCorrelationId;
049        private final String originalRouteId;
050        private final String originalCamelContextId;
051        private final String originalTransactionKey;
052    
053        public MDCUnitOfWork(Exchange exchange) {
054            super(exchange, LOG);
055    
056            // remember existing values
057            this.originalExchangeId = MDC.get(MDC_EXCHANGE_ID);
058            this.originalMessageId = MDC.get(MDC_MESSAGE_ID);
059            this.originalBreadcrumbId = MDC.get(MDC_BREADCRUMB_ID);
060            this.originalCorrelationId = MDC.get(MDC_CORRELATION_ID);
061            this.originalRouteId = MDC.get(MDC_ROUTE_ID);
062            this.originalCamelContextId = MDC.get(MDC_CAMEL_CONTEXT_ID);
063            this.originalTransactionKey = MDC.get(MDC_TRANSACTION_KEY);
064    
065            // must add exchange and message id in constructor
066            MDC.put(MDC_EXCHANGE_ID, exchange.getExchangeId());
067            String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId();
068            MDC.put(MDC_MESSAGE_ID, msgId);
069            // the camel context id is from exchange
070            MDC.put(MDC_CAMEL_CONTEXT_ID, exchange.getContext().getName());
071            // and add optional correlation id
072            String corrId = exchange.getProperty(Exchange.CORRELATION_ID, String.class);
073            if (corrId != null) {
074                MDC.put(MDC_CORRELATION_ID, corrId);
075            }
076            // and add optional breadcrumb id
077            String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
078            if (breadcrumbId != null) {
079                MDC.put(MDC_BREADCRUMB_ID, breadcrumbId);
080            }
081        }
082    
083        @Override
084        public UnitOfWork newInstance(Exchange exchange) {
085            return new MDCUnitOfWork(exchange);
086        }
087    
088        @Override
089        public void stop() throws Exception {
090            super.stop();
091            // and remove when stopping
092            clear();
093        }
094    
095        @Override
096        public void pushRouteContext(RouteContext routeContext) {
097            MDC.put(MDC_ROUTE_ID, routeContext.getRoute().getId());
098            super.pushRouteContext(routeContext);
099        }
100    
101        @Override
102        public RouteContext popRouteContext() {
103            MDC.remove(MDC_ROUTE_ID);
104            return super.popRouteContext();
105        }
106    
107        @Override
108        public void beginTransactedBy(Object key) {
109            MDC.put(MDC_TRANSACTION_KEY, key.toString());
110            super.beginTransactedBy(key);
111        }
112    
113        @Override
114        public void endTransactedBy(Object key) {
115            MDC.remove(MDC_TRANSACTION_KEY);
116            super.endTransactedBy(key);
117        }
118    
119        @Override
120        public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) {
121            String routeId = MDC.get(MDC_ROUTE_ID);
122            if (routeId != null) {
123                // only need MDC callback if we have a route id
124                return new MDCCallback(callback, routeId);
125            } else {
126                return callback;
127            }
128        }
129    
130        @Override
131        public void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync) {
132            if (!doneSync) {
133                // must clear MDC on current thread as the exchange is being processed asynchronously
134                // by another thread
135                clear();
136            }
137            super.afterProcess(processor, exchange, callback, doneSync);
138        }
139    
140        /**
141         * Clears information put on the MDC by this {@link MDCUnitOfWork}
142         */
143        public void clear() {
144            if (this.originalBreadcrumbId != null) {
145                MDC.put(MDC_BREADCRUMB_ID, originalBreadcrumbId);
146            } else {
147                MDC.remove(MDC_BREADCRUMB_ID);
148            }
149            if (this.originalExchangeId != null) {
150                MDC.put(MDC_EXCHANGE_ID, originalExchangeId);
151            } else {
152                MDC.remove(MDC_EXCHANGE_ID);
153            }
154            if (this.originalMessageId != null) {
155                MDC.put(MDC_MESSAGE_ID, originalMessageId);
156            } else {
157                MDC.remove(MDC_MESSAGE_ID);
158            }
159            if (this.originalCorrelationId != null) {
160                MDC.put(MDC_CORRELATION_ID, originalCorrelationId);
161            } else {
162                MDC.remove(MDC_CORRELATION_ID);
163            }
164            if (this.originalRouteId != null) {
165                MDC.put(MDC_ROUTE_ID, originalRouteId);
166            } else {
167                MDC.remove(MDC_ROUTE_ID);
168            }
169            if (this.originalCamelContextId != null) {
170                MDC.put(MDC_CAMEL_CONTEXT_ID, originalCamelContextId);
171            } else {
172                MDC.remove(MDC_CAMEL_CONTEXT_ID);
173            }
174            if (this.originalTransactionKey != null) {
175                MDC.put(MDC_TRANSACTION_KEY, originalTransactionKey);
176            } else {
177                MDC.remove(MDC_TRANSACTION_KEY);
178            }
179        }
180    
181        @Override
182        public String toString() {
183            return "MDCUnitOfWork";
184        }
185    
186        /**
187         * {@link AsyncCallback} which preserves {@link org.slf4j.MDC} when
188         * the asynchronous routing engine is being used.
189         */
190        private static final class MDCCallback implements AsyncCallback {
191    
192            private final AsyncCallback delegate;
193            private final String routeId;
194    
195            private MDCCallback(AsyncCallback delegate, String routeId) {
196                this.delegate = delegate;
197                this.routeId = routeId;
198            }
199    
200            public void done(boolean doneSync) {
201                try {
202                    MDC.put(MDC_ROUTE_ID, routeId);
203                } finally {
204                    // muse ensure delegate is invoked
205                    delegate.done(doneSync);
206                }
207            }
208    
209            @Override
210            public String toString() {
211                return delegate.toString();
212            }
213        }
214    
215    }