001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.ArrayList; 020 import java.util.Date; 021 import java.util.Iterator; 022 import java.util.LinkedHashSet; 023 import java.util.List; 024 import java.util.Set; 025 import java.util.Stack; 026 027 import org.apache.camel.AsyncCallback; 028 import org.apache.camel.CamelContext; 029 import org.apache.camel.CamelUnitOfWorkException; 030 import org.apache.camel.Exchange; 031 import org.apache.camel.Message; 032 import org.apache.camel.Processor; 033 import org.apache.camel.Service; 034 import org.apache.camel.spi.RouteContext; 035 import org.apache.camel.spi.SubUnitOfWork; 036 import org.apache.camel.spi.SubUnitOfWorkCallback; 037 import org.apache.camel.spi.Synchronization; 038 import org.apache.camel.spi.SynchronizationVetoable; 039 import org.apache.camel.spi.TracedRouteNodes; 040 import org.apache.camel.spi.UnitOfWork; 041 import org.apache.camel.util.EventHelper; 042 import org.apache.camel.util.UnitOfWorkHelper; 043 import org.slf4j.Logger; 044 import org.slf4j.LoggerFactory; 045 046 /** 047 * The default implementation of {@link org.apache.camel.spi.UnitOfWork} 048 */ 049 public class DefaultUnitOfWork implements UnitOfWork, Service { 050 private static final Logger LOG = LoggerFactory.getLogger(DefaultUnitOfWork.class); 051 052 // TODO: This implementation seems to have transformed itself into a to broad concern 053 // where unit of work is doing a bit more work than the transactional aspect that ties 054 // to its name. Maybe this implementation should be named ExchangeContext and we can 055 // introduce a simpler UnitOfWork concept. This would also allow us to refactor the 056 // SubUnitOfWork into a general parent/child unit of work concept. However this 057 // requires API changes and thus is best kept for Camel 3.0 058 059 private UnitOfWork parent; 060 private String id; 061 private CamelContext context; 062 private List<Synchronization> synchronizations; 063 private Message originalInMessage; 064 private final TracedRouteNodes tracedRouteNodes; 065 private Set<Object> transactedBy; 066 private final Stack<RouteContext> routeContextStack = new Stack<RouteContext>(); 067 private Stack<DefaultSubUnitOfWork> subUnitOfWorks; 068 private final transient Logger log; 069 070 public DefaultUnitOfWork(Exchange exchange) { 071 this(exchange, LOG); 072 } 073 074 protected DefaultUnitOfWork(Exchange exchange, Logger logger) { 075 log = logger; 076 if (log.isTraceEnabled()) { 077 log.trace("UnitOfWork created for ExchangeId: {} with {}", exchange.getExchangeId(), exchange); 078 } 079 tracedRouteNodes = new DefaultTracedRouteNodes(); 080 context = exchange.getContext(); 081 082 if (context.isAllowUseOriginalMessage()) { 083 // TODO: Camel 3.0: the copy on facade strategy will help us here in the future 084 // TODO: optimize to only copy original message if enabled to do so in the route 085 // special for JmsMessage as it can cause it to loose headers later. 086 // This will be resolved when we get the message facade with copy on write implemented 087 if (exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage")) { 088 this.originalInMessage = new DefaultMessage(); 089 this.originalInMessage.setBody(exchange.getIn().getBody()); 090 this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders()); 091 } else { 092 this.originalInMessage = exchange.getIn().copy(); 093 } 094 } 095 096 // TODO: Optimize to only copy if useOriginalMessage has been enabled 097 098 // mark the creation time when this Exchange was created 099 if (exchange.getProperty(Exchange.CREATED_TIMESTAMP) == null) { 100 exchange.setProperty(Exchange.CREATED_TIMESTAMP, new Date()); 101 } 102 103 // inject breadcrumb header if enabled 104 if (exchange.getContext().isUseBreadcrumb()) { 105 // create or use existing breadcrumb 106 String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); 107 if (breadcrumbId == null) { 108 // no existing breadcrumb, so create a new one based on the message id 109 breadcrumbId = exchange.getIn().getMessageId(); 110 exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId); 111 } 112 } 113 114 // setup whether the exchange is externally redelivered or not (if not initialized before) 115 // store as property so we know that the origin exchange was redelivered 116 if (exchange.getProperty(Exchange.EXTERNAL_REDELIVERED) == null) { 117 exchange.setProperty(Exchange.EXTERNAL_REDELIVERED, exchange.isExternalRedelivered()); 118 } 119 120 // fire event 121 try { 122 EventHelper.notifyExchangeCreated(exchange.getContext(), exchange); 123 } catch (Throwable e) { 124 // must catch exceptions to ensure the exchange is not failing due to notification event failed 125 log.warn("Exception occurred during event notification. This exception will be ignored.", e); 126 } 127 128 // register to inflight registry 129 if (exchange.getContext() != null) { 130 exchange.getContext().getInflightRepository().add(exchange); 131 } 132 } 133 134 UnitOfWork newInstance(Exchange exchange) { 135 return new DefaultUnitOfWork(exchange); 136 } 137 138 @Override 139 public void setParentUnitOfWork(UnitOfWork parentUnitOfWork) { 140 this.parent = parentUnitOfWork; 141 } 142 143 public UnitOfWork createChildUnitOfWork(Exchange childExchange) { 144 // create a new child unit of work, and mark me as its parent 145 UnitOfWork answer = newInstance(childExchange); 146 answer.setParentUnitOfWork(this); 147 return answer; 148 } 149 150 public void start() throws Exception { 151 id = null; 152 } 153 154 public void stop() throws Exception { 155 // need to clean up when we are stopping to not leak memory 156 if (synchronizations != null) { 157 synchronizations.clear(); 158 } 159 if (tracedRouteNodes != null) { 160 tracedRouteNodes.clear(); 161 } 162 if (transactedBy != null) { 163 transactedBy.clear(); 164 } 165 synchronized (routeContextStack) { 166 if (!routeContextStack.isEmpty()) { 167 routeContextStack.clear(); 168 } 169 } 170 if (subUnitOfWorks != null) { 171 subUnitOfWorks.clear(); 172 } 173 originalInMessage = null; 174 parent = null; 175 id = null; 176 } 177 178 public synchronized void addSynchronization(Synchronization synchronization) { 179 if (synchronizations == null) { 180 synchronizations = new ArrayList<Synchronization>(); 181 } 182 log.trace("Adding synchronization {}", synchronization); 183 synchronizations.add(synchronization); 184 } 185 186 public synchronized void removeSynchronization(Synchronization synchronization) { 187 if (synchronizations != null) { 188 synchronizations.remove(synchronization); 189 } 190 } 191 192 public synchronized boolean containsSynchronization(Synchronization synchronization) { 193 return synchronizations != null && synchronizations.contains(synchronization); 194 } 195 196 public void handoverSynchronization(Exchange target) { 197 if (synchronizations == null || synchronizations.isEmpty()) { 198 return; 199 } 200 201 Iterator<Synchronization> it = synchronizations.iterator(); 202 while (it.hasNext()) { 203 Synchronization synchronization = it.next(); 204 205 boolean handover = true; 206 if (synchronization instanceof SynchronizationVetoable) { 207 SynchronizationVetoable veto = (SynchronizationVetoable) synchronization; 208 handover = veto.allowHandover(); 209 } 210 211 if (handover) { 212 log.trace("Handover synchronization {} to: {}", synchronization, target); 213 target.addOnCompletion(synchronization); 214 // remove it if its handed over 215 it.remove(); 216 } else { 217 log.trace("Handover not allow for synchronization {}", synchronization); 218 } 219 } 220 } 221 222 public void done(Exchange exchange) { 223 log.trace("UnitOfWork done for ExchangeId: {} with {}", exchange.getExchangeId(), exchange); 224 225 boolean failed = exchange.isFailed(); 226 227 // at first done the synchronizations 228 UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, log); 229 230 // notify uow callback if in use 231 try { 232 SubUnitOfWorkCallback uowCallback = getSubUnitOfWorkCallback(); 233 if (uowCallback != null) { 234 uowCallback.onDone(exchange); 235 } 236 } catch (Throwable e) { 237 // must catch exceptions to ensure synchronizations is also invoked 238 log.warn("Exception occurred during savepoint onDone. This exception will be ignored.", e); 239 } 240 241 // unregister from inflight registry, before signalling we are done 242 if (exchange.getContext() != null) { 243 exchange.getContext().getInflightRepository().remove(exchange); 244 } 245 246 // then fire event to signal the exchange is done 247 try { 248 if (failed) { 249 EventHelper.notifyExchangeFailed(exchange.getContext(), exchange); 250 } else { 251 EventHelper.notifyExchangeDone(exchange.getContext(), exchange); 252 } 253 } catch (Throwable e) { 254 // must catch exceptions to ensure synchronizations is also invoked 255 log.warn("Exception occurred during event notification. This exception will be ignored.", e); 256 } 257 } 258 259 public String getId() { 260 if (id == null) { 261 id = context.getUuidGenerator().generateUuid(); 262 } 263 return id; 264 } 265 266 public Message getOriginalInMessage() { 267 return originalInMessage; 268 } 269 270 public TracedRouteNodes getTracedRouteNodes() { 271 return tracedRouteNodes; 272 } 273 274 public boolean isTransacted() { 275 return transactedBy != null && !transactedBy.isEmpty(); 276 } 277 278 public boolean isTransactedBy(Object key) { 279 return getTransactedBy().contains(key); 280 } 281 282 public void beginTransactedBy(Object key) { 283 getTransactedBy().add(key); 284 } 285 286 public void endTransactedBy(Object key) { 287 getTransactedBy().remove(key); 288 } 289 290 public RouteContext getRouteContext() { 291 synchronized (routeContextStack) { 292 if (routeContextStack.isEmpty()) { 293 return null; 294 } 295 return routeContextStack.peek(); 296 } 297 } 298 299 public void pushRouteContext(RouteContext routeContext) { 300 synchronized (routeContextStack) { 301 routeContextStack.add(routeContext); 302 } 303 } 304 305 public RouteContext popRouteContext() { 306 synchronized (routeContextStack) { 307 if (routeContextStack.isEmpty()) { 308 return null; 309 } 310 return routeContextStack.pop(); 311 } 312 } 313 314 public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) { 315 // no wrapping needed 316 return callback; 317 } 318 319 public void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync) { 320 } 321 322 @Override 323 public void beginSubUnitOfWork(Exchange exchange) { 324 if (log.isTraceEnabled()) { 325 log.trace("beginSubUnitOfWork exchangeId: {}", exchange.getExchangeId()); 326 } 327 328 if (subUnitOfWorks == null) { 329 subUnitOfWorks = new Stack<DefaultSubUnitOfWork>(); 330 } 331 subUnitOfWorks.push(new DefaultSubUnitOfWork()); 332 } 333 334 @Override 335 public void endSubUnitOfWork(Exchange exchange) { 336 if (log.isTraceEnabled()) { 337 log.trace("endSubUnitOfWork exchangeId: {}", exchange.getExchangeId()); 338 } 339 340 if (subUnitOfWorks == null || subUnitOfWorks.isEmpty()) { 341 return; 342 } 343 344 // pop last sub unit of work as its now ended 345 SubUnitOfWork subUoW = subUnitOfWorks.pop(); 346 if (subUoW.isFailed()) { 347 // the sub unit of work failed so set an exception containing all the caused exceptions 348 // and mark the exchange for rollback only 349 350 // if there are multiple exceptions then wrap those into another exception with them all 351 Exception cause; 352 List<Exception> list = subUoW.getExceptions(); 353 if (list != null) { 354 if (list.size() == 1) { 355 cause = list.get(0); 356 } else { 357 cause = new CamelUnitOfWorkException(exchange, list); 358 } 359 exchange.setException(cause); 360 } 361 // mark it as rollback and that the unit of work is exhausted. This ensures that we do not try 362 // to redeliver this exception (again) 363 exchange.setProperty(Exchange.ROLLBACK_ONLY, true); 364 exchange.setProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, true); 365 // and remove any indications of error handled which will prevent this exception to be noticed 366 // by the error handler which we want to react with the result of the sub unit of work 367 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, null); 368 exchange.setProperty(Exchange.FAILURE_HANDLED, null); 369 if (log.isTraceEnabled()) { 370 log.trace("endSubUnitOfWork exchangeId: {} with {} caused exceptions.", exchange.getExchangeId(), list != null ? list.size() : 0); 371 } 372 } 373 } 374 375 @Override 376 public SubUnitOfWorkCallback getSubUnitOfWorkCallback() { 377 // if there is a parent-child relationship between unit of works 378 // then we should use the callback strategies from the parent 379 if (parent != null) { 380 return parent.getSubUnitOfWorkCallback(); 381 } 382 383 if (subUnitOfWorks == null || subUnitOfWorks.isEmpty()) { 384 return null; 385 } 386 return subUnitOfWorks.peek(); 387 } 388 389 private Set<Object> getTransactedBy() { 390 if (transactedBy == null) { 391 transactedBy = new LinkedHashSet<Object>(); 392 } 393 return transactedBy; 394 } 395 396 @Override 397 public String toString() { 398 return "DefaultUnitOfWork"; 399 } 400 }