001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.ArrayList;
020    import java.util.Date;
021    import java.util.Iterator;
022    import java.util.LinkedHashSet;
023    import java.util.List;
024    import java.util.Set;
025    import java.util.Stack;
026    
027    import org.apache.camel.AsyncCallback;
028    import org.apache.camel.CamelContext;
029    import org.apache.camel.CamelUnitOfWorkException;
030    import org.apache.camel.Exchange;
031    import org.apache.camel.Message;
032    import org.apache.camel.Processor;
033    import org.apache.camel.Service;
034    import org.apache.camel.spi.RouteContext;
035    import org.apache.camel.spi.SubUnitOfWork;
036    import org.apache.camel.spi.SubUnitOfWorkCallback;
037    import org.apache.camel.spi.Synchronization;
038    import org.apache.camel.spi.SynchronizationVetoable;
039    import org.apache.camel.spi.TracedRouteNodes;
040    import org.apache.camel.spi.UnitOfWork;
041    import org.apache.camel.util.EventHelper;
042    import org.apache.camel.util.UnitOfWorkHelper;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    /**
047     * The default implementation of {@link org.apache.camel.spi.UnitOfWork}
048     */
049    public class DefaultUnitOfWork implements UnitOfWork, Service {
050        private static final Logger LOG = LoggerFactory.getLogger(DefaultUnitOfWork.class);
051    
052        // TODO: This implementation seems to have transformed itself into a to broad concern
053        // where unit of work is doing a bit more work than the transactional aspect that ties
054        // to its name. Maybe this implementation should be named ExchangeContext and we can
055        // introduce a simpler UnitOfWork concept. This would also allow us to refactor the
056        // SubUnitOfWork into a general parent/child unit of work concept. However this
057        // requires API changes and thus is best kept for Camel 3.0
058    
059        private UnitOfWork parent;
060        private String id;
061        private CamelContext context;
062        private List<Synchronization> synchronizations;
063        private Message originalInMessage;
064        private final TracedRouteNodes tracedRouteNodes;
065        private Set<Object> transactedBy;
066        private final Stack<RouteContext> routeContextStack = new Stack<RouteContext>();
067        private Stack<DefaultSubUnitOfWork> subUnitOfWorks;
068        private final transient Logger log;
069        
070        public DefaultUnitOfWork(Exchange exchange) {
071            this(exchange, LOG);
072        }
073    
074        protected DefaultUnitOfWork(Exchange exchange, Logger logger) {
075            log = logger;
076            if (log.isTraceEnabled()) {
077                log.trace("UnitOfWork created for ExchangeId: {} with {}", exchange.getExchangeId(), exchange);
078            }
079            tracedRouteNodes = new DefaultTracedRouteNodes();
080            context = exchange.getContext();
081    
082            if (context.isAllowUseOriginalMessage()) {
083                // TODO: Camel 3.0: the copy on facade strategy will help us here in the future
084                // TODO: optimize to only copy original message if enabled to do so in the route
085                // special for JmsMessage as it can cause it to loose headers later.
086                // This will be resolved when we get the message facade with copy on write implemented
087                if (exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage")) {
088                    this.originalInMessage = new DefaultMessage();
089                    this.originalInMessage.setBody(exchange.getIn().getBody());
090                    this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders());
091                } else {
092                    this.originalInMessage = exchange.getIn().copy();
093                }
094            }
095    
096            // TODO: Optimize to only copy if useOriginalMessage has been enabled
097    
098            // mark the creation time when this Exchange was created
099            if (exchange.getProperty(Exchange.CREATED_TIMESTAMP) == null) {
100                exchange.setProperty(Exchange.CREATED_TIMESTAMP, new Date());
101            }
102    
103            // inject breadcrumb header if enabled
104            if (exchange.getContext().isUseBreadcrumb()) {
105                // create or use existing breadcrumb
106                String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
107                if (breadcrumbId == null) {
108                    // no existing breadcrumb, so create a new one based on the message id
109                    breadcrumbId = exchange.getIn().getMessageId();
110                    exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId);
111                }
112            }
113            
114            // setup whether the exchange is externally redelivered or not (if not initialized before)
115            // store as property so we know that the origin exchange was redelivered
116            if (exchange.getProperty(Exchange.EXTERNAL_REDELIVERED) == null) {
117                exchange.setProperty(Exchange.EXTERNAL_REDELIVERED, exchange.isExternalRedelivered());
118            }
119    
120            // fire event
121            try {
122                EventHelper.notifyExchangeCreated(exchange.getContext(), exchange);
123            } catch (Throwable e) {
124                // must catch exceptions to ensure the exchange is not failing due to notification event failed
125                log.warn("Exception occurred during event notification. This exception will be ignored.", e);
126            }
127    
128            // register to inflight registry
129            if (exchange.getContext() != null) {
130                exchange.getContext().getInflightRepository().add(exchange);
131            }
132        }
133    
134        UnitOfWork newInstance(Exchange exchange) {
135            return new DefaultUnitOfWork(exchange);
136        }
137    
138        @Override
139        public void setParentUnitOfWork(UnitOfWork parentUnitOfWork) {
140            this.parent = parentUnitOfWork;
141        }
142    
143        public UnitOfWork createChildUnitOfWork(Exchange childExchange) {
144            // create a new child unit of work, and mark me as its parent
145            UnitOfWork answer = newInstance(childExchange);
146            answer.setParentUnitOfWork(this);
147            return answer;
148        }
149    
150        public void start() throws Exception {
151            id = null;
152        }
153    
154        public void stop() throws Exception {
155            // need to clean up when we are stopping to not leak memory
156            if (synchronizations != null) {
157                synchronizations.clear();
158            }
159            if (tracedRouteNodes != null) {
160                tracedRouteNodes.clear();
161            }
162            if (transactedBy != null) {
163                transactedBy.clear();
164            }
165            synchronized (routeContextStack) {
166                if (!routeContextStack.isEmpty()) {
167                    routeContextStack.clear();
168                }
169            }
170            if (subUnitOfWorks != null) {
171                subUnitOfWorks.clear();
172            }
173            originalInMessage = null;
174            parent = null;
175            id = null;
176        }
177    
178        public synchronized void addSynchronization(Synchronization synchronization) {
179            if (synchronizations == null) {
180                synchronizations = new ArrayList<Synchronization>();
181            }
182            log.trace("Adding synchronization {}", synchronization);
183            synchronizations.add(synchronization);
184        }
185    
186        public synchronized void removeSynchronization(Synchronization synchronization) {
187            if (synchronizations != null) {
188                synchronizations.remove(synchronization);
189            }
190        }
191    
192        public synchronized boolean containsSynchronization(Synchronization synchronization) {
193            return synchronizations != null && synchronizations.contains(synchronization);
194        }
195    
196        public void handoverSynchronization(Exchange target) {
197            if (synchronizations == null || synchronizations.isEmpty()) {
198                return;
199            }
200    
201            Iterator<Synchronization> it = synchronizations.iterator();
202            while (it.hasNext()) {
203                Synchronization synchronization = it.next();
204    
205                boolean handover = true;
206                if (synchronization instanceof SynchronizationVetoable) {
207                    SynchronizationVetoable veto = (SynchronizationVetoable) synchronization;
208                    handover = veto.allowHandover();
209                }
210    
211                if (handover) {
212                    log.trace("Handover synchronization {} to: {}", synchronization, target);
213                    target.addOnCompletion(synchronization);
214                    // remove it if its handed over
215                    it.remove();
216                } else {
217                    log.trace("Handover not allow for synchronization {}", synchronization);
218                }
219            }
220        }
221    
222        public void done(Exchange exchange) {
223            log.trace("UnitOfWork done for ExchangeId: {} with {}", exchange.getExchangeId(), exchange);
224    
225            boolean failed = exchange.isFailed();
226    
227            // at first done the synchronizations
228            UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, log);
229    
230            // notify uow callback if in use
231            try {
232                SubUnitOfWorkCallback uowCallback = getSubUnitOfWorkCallback();
233                if (uowCallback != null) {
234                    uowCallback.onDone(exchange);
235                }
236            } catch (Throwable e) {
237                // must catch exceptions to ensure synchronizations is also invoked
238                log.warn("Exception occurred during savepoint onDone. This exception will be ignored.", e);
239            }
240    
241            // unregister from inflight registry, before signalling we are done
242            if (exchange.getContext() != null) {
243                exchange.getContext().getInflightRepository().remove(exchange);
244            }
245    
246            // then fire event to signal the exchange is done
247            try {
248                if (failed) {
249                    EventHelper.notifyExchangeFailed(exchange.getContext(), exchange);
250                } else {
251                    EventHelper.notifyExchangeDone(exchange.getContext(), exchange);
252                }
253            } catch (Throwable e) {
254                // must catch exceptions to ensure synchronizations is also invoked
255                log.warn("Exception occurred during event notification. This exception will be ignored.", e);
256            }
257        }
258    
259        public String getId() {
260            if (id == null) {
261                id = context.getUuidGenerator().generateUuid();
262            }
263            return id;
264        }
265    
266        public Message getOriginalInMessage() {
267            return originalInMessage;
268        }
269    
270        public TracedRouteNodes getTracedRouteNodes() {
271            return tracedRouteNodes;
272        }
273    
274        public boolean isTransacted() {
275            return transactedBy != null && !transactedBy.isEmpty();
276        }
277    
278        public boolean isTransactedBy(Object key) {
279            return getTransactedBy().contains(key);
280        }
281    
282        public void beginTransactedBy(Object key) {
283            getTransactedBy().add(key);
284        }
285    
286        public void endTransactedBy(Object key) {
287            getTransactedBy().remove(key);
288        }
289    
290        public RouteContext getRouteContext() {
291            synchronized (routeContextStack) {
292                if (routeContextStack.isEmpty()) {
293                    return null;
294                }
295                return routeContextStack.peek();
296            }
297        }
298    
299        public void pushRouteContext(RouteContext routeContext) {
300            synchronized (routeContextStack) {
301                routeContextStack.add(routeContext);
302            }
303        }
304    
305        public RouteContext popRouteContext() {
306            synchronized (routeContextStack) {
307                if (routeContextStack.isEmpty()) {
308                    return null;
309                }
310                return routeContextStack.pop();
311            }
312        }
313    
314        public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) {
315            // no wrapping needed
316            return callback;
317        }
318    
319        public void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync) {
320        }
321    
322        @Override
323        public void beginSubUnitOfWork(Exchange exchange) {
324            if (log.isTraceEnabled()) {
325                log.trace("beginSubUnitOfWork exchangeId: {}", exchange.getExchangeId());
326            }
327    
328            if (subUnitOfWorks == null) {
329                subUnitOfWorks = new Stack<DefaultSubUnitOfWork>();
330            }
331            subUnitOfWorks.push(new DefaultSubUnitOfWork());
332        }
333    
334        @Override
335        public void endSubUnitOfWork(Exchange exchange) {
336            if (log.isTraceEnabled()) {
337                log.trace("endSubUnitOfWork exchangeId: {}", exchange.getExchangeId());
338            }
339    
340            if (subUnitOfWorks == null || subUnitOfWorks.isEmpty()) {
341                return;
342            }
343    
344            // pop last sub unit of work as its now ended
345            SubUnitOfWork subUoW = subUnitOfWorks.pop();
346            if (subUoW.isFailed()) {
347                // the sub unit of work failed so set an exception containing all the caused exceptions
348                // and mark the exchange for rollback only
349    
350                // if there are multiple exceptions then wrap those into another exception with them all
351                Exception cause;
352                List<Exception> list = subUoW.getExceptions();
353                if (list != null) {
354                    if (list.size() == 1) {
355                        cause = list.get(0);
356                    } else {
357                        cause = new CamelUnitOfWorkException(exchange, list);
358                    }
359                    exchange.setException(cause);
360                }
361                // mark it as rollback and that the unit of work is exhausted. This ensures that we do not try
362                // to redeliver this exception (again)
363                exchange.setProperty(Exchange.ROLLBACK_ONLY, true);
364                exchange.setProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, true);
365                // and remove any indications of error handled which will prevent this exception to be noticed
366                // by the error handler which we want to react with the result of the sub unit of work
367                exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, null);
368                exchange.setProperty(Exchange.FAILURE_HANDLED, null);
369                if (log.isTraceEnabled()) {
370                    log.trace("endSubUnitOfWork exchangeId: {} with {} caused exceptions.", exchange.getExchangeId(), list != null ? list.size() : 0);
371                }
372            }
373        }
374    
375        @Override
376        public SubUnitOfWorkCallback getSubUnitOfWorkCallback() {
377            // if there is a parent-child relationship between unit of works
378            // then we should use the callback strategies from the parent
379            if (parent != null) {
380                return parent.getSubUnitOfWorkCallback();
381            }
382    
383            if (subUnitOfWorks == null || subUnitOfWorks.isEmpty()) {
384                return null;
385            }
386            return subUnitOfWorks.peek();
387        }
388    
389        private Set<Object> getTransactedBy() {
390            if (transactedBy == null) {
391                transactedBy = new LinkedHashSet<Object>();
392            }
393            return transactedBy;
394        }
395    
396        @Override
397        public String toString() {
398            return "DefaultUnitOfWork";
399        }
400    }