001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.concurrent.ExecutorService; 026 import javax.xml.bind.annotation.XmlAccessType; 027 import javax.xml.bind.annotation.XmlAccessorType; 028 import javax.xml.bind.annotation.XmlAttribute; 029 import javax.xml.bind.annotation.XmlElement; 030 import javax.xml.bind.annotation.XmlElementRef; 031 import javax.xml.bind.annotation.XmlRootElement; 032 import javax.xml.bind.annotation.XmlTransient; 033 034 import org.apache.camel.Predicate; 035 import org.apache.camel.Processor; 036 import org.apache.camel.processor.CamelInternalProcessor; 037 import org.apache.camel.processor.OnCompletionProcessor; 038 import org.apache.camel.spi.RouteContext; 039 040 /** 041 * Represents an XML <onCompletion/> element 042 * 043 * @version 044 */ 045 @XmlRootElement(name = "onCompletion") 046 @XmlAccessorType(XmlAccessType.FIELD) 047 public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> { 048 @XmlAttribute 049 private Boolean onCompleteOnly; 050 @XmlAttribute 051 private Boolean onFailureOnly; 052 @XmlElement(name = "onWhen") 053 private WhenDefinition onWhen; 054 @XmlAttribute 055 private String executorServiceRef; 056 @XmlAttribute(name = "useOriginalMessage") 057 private Boolean useOriginalMessagePolicy; 058 @XmlElementRef 059 private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); 060 @XmlTransient 061 private ExecutorService executorService; 062 @XmlTransient 063 private Boolean routeScoped; 064 // TODO: in Camel 3.0 the OnCompletionDefinition should not contain state and OnCompletion processors 065 @XmlTransient 066 private final Map<String, Processor> onCompletions = new HashMap<String, Processor>(); 067 068 public OnCompletionDefinition() { 069 } 070 071 public boolean isRouteScoped() { 072 // is context scoped by default 073 return routeScoped != null ? routeScoped : false; 074 } 075 076 public Processor getOnCompletion(String routeId) { 077 return onCompletions.get(routeId); 078 } 079 080 public Collection<Processor> getOnCompletions() { 081 return onCompletions.values(); 082 } 083 084 @Override 085 public String toString() { 086 return "onCompletion[" + getOutputs() + "]"; 087 } 088 089 @Override 090 public String getShortName() { 091 return "onCompletion"; 092 } 093 094 @Override 095 public String getLabel() { 096 return "onCompletion"; 097 } 098 099 @Override 100 public boolean isAbstract() { 101 return true; 102 } 103 104 @Override 105 public boolean isTopLevelOnly() { 106 return true; 107 } 108 109 @Override 110 public Processor createProcessor(RouteContext routeContext) throws Exception { 111 // assign whether this was a route scoped onCompletion or not 112 // we need to know this later when setting the parent, as only route scoped should have parent 113 // Note: this logic can possible be removed when the Camel routing engine decides at runtime 114 // to apply onCompletion in a more dynamic fashion than current code base 115 // and therefore is in a better position to decide among context/route scoped OnCompletion at runtime 116 if (routeScoped == null) { 117 routeScoped = super.getParent() != null; 118 } 119 120 if (isOnCompleteOnly() && isOnFailureOnly()) { 121 throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); 122 } 123 124 String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); 125 126 Processor childProcessor = this.createChildProcessor(routeContext, true); 127 128 // wrap the on completion route in a unit of work processor 129 CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor); 130 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId)); 131 internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext)); 132 133 onCompletions.put(routeId, internal); 134 135 Predicate when = null; 136 if (onWhen != null) { 137 when = onWhen.getExpression().createPredicate(routeContext); 138 } 139 140 // executor service is mandatory for on completion 141 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); 142 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, true); 143 144 // should be false by default 145 boolean original = getUseOriginalMessagePolicy() != null ? getUseOriginalMessagePolicy() : false; 146 OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), internal, 147 threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original); 148 return answer; 149 } 150 151 /** 152 * Removes all existing {@link org.apache.camel.model.OnCompletionDefinition} from the definition. 153 * <p/> 154 * This is used to let route scoped <tt>onCompletion</tt> overrule any global <tt>onCompletion</tt>. 155 * Hence we remove all existing as they are global. 156 * 157 * @param definition the parent definition that is the route 158 */ 159 public void removeAllOnCompletionDefinition(ProcessorDefinition<?> definition) { 160 for (Iterator<ProcessorDefinition<?>> it = definition.getOutputs().iterator(); it.hasNext();) { 161 ProcessorDefinition<?> out = it.next(); 162 if (out instanceof OnCompletionDefinition) { 163 it.remove(); 164 } 165 } 166 } 167 168 @Override 169 public ProcessorDefinition<?> end() { 170 // pop parent block, as we added our self as block to parent when synchronized was defined in the route 171 getParent().popBlock(); 172 return super.end(); 173 } 174 175 /** 176 * Will only synchronize when the {@link org.apache.camel.Exchange} completed successfully (no errors). 177 * 178 * @return the builder 179 */ 180 public OnCompletionDefinition onCompleteOnly() { 181 if (isOnFailureOnly()) { 182 throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); 183 } 184 // must define return type as OutputDefinition and not this type to avoid end user being able 185 // to invoke onFailureOnly/onCompleteOnly more than once 186 setOnCompleteOnly(Boolean.TRUE); 187 setOnFailureOnly(Boolean.FALSE); 188 return this; 189 } 190 191 /** 192 * Will only synchronize when the {@link org.apache.camel.Exchange} ended with failure (exception or FAULT message). 193 * 194 * @return the builder 195 */ 196 public OnCompletionDefinition onFailureOnly() { 197 if (isOnCompleteOnly()) { 198 throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); 199 } 200 // must define return type as OutputDefinition and not this type to avoid end user being able 201 // to invoke onFailureOnly/onCompleteOnly more than once 202 setOnCompleteOnly(Boolean.FALSE); 203 setOnFailureOnly(Boolean.TRUE); 204 return this; 205 } 206 207 /** 208 * Sets an additional predicate that should be true before the onCompletion is triggered. 209 * <p/> 210 * To be used for fine grained controlling whether a completion callback should be invoked or not 211 * 212 * @param predicate predicate that determines true or false 213 * @return the builder 214 */ 215 public OnCompletionDefinition onWhen(Predicate predicate) { 216 setOnWhen(new WhenDefinition(predicate)); 217 return this; 218 } 219 220 /** 221 * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion. 222 * <p/> 223 * By default this feature is off. 224 * 225 * @return the builder 226 */ 227 public OnCompletionDefinition useOriginalBody() { 228 setUseOriginalMessagePolicy(Boolean.TRUE); 229 return this; 230 } 231 232 public OnCompletionDefinition executorService(ExecutorService executorService) { 233 setExecutorService(executorService); 234 return this; 235 } 236 237 public OnCompletionDefinition executorServiceRef(String executorServiceRef) { 238 setExecutorServiceRef(executorServiceRef); 239 return this; 240 } 241 242 public List<ProcessorDefinition<?>> getOutputs() { 243 return outputs; 244 } 245 246 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 247 this.outputs = outputs; 248 } 249 250 public boolean isOutputSupported() { 251 return true; 252 } 253 254 public Boolean getOnCompleteOnly() { 255 return onCompleteOnly; 256 } 257 258 public void setOnCompleteOnly(Boolean onCompleteOnly) { 259 this.onCompleteOnly = onCompleteOnly; 260 } 261 262 public boolean isOnCompleteOnly() { 263 return onCompleteOnly != null && onCompleteOnly; 264 } 265 266 public Boolean getOnFailureOnly() { 267 return onFailureOnly; 268 } 269 270 public void setOnFailureOnly(Boolean onFailureOnly) { 271 this.onFailureOnly = onFailureOnly; 272 } 273 274 public boolean isOnFailureOnly() { 275 return onFailureOnly != null && onFailureOnly; 276 } 277 278 public WhenDefinition getOnWhen() { 279 return onWhen; 280 } 281 282 public void setOnWhen(WhenDefinition onWhen) { 283 this.onWhen = onWhen; 284 } 285 286 public ExecutorService getExecutorService() { 287 return executorService; 288 } 289 290 public void setExecutorService(ExecutorService executorService) { 291 this.executorService = executorService; 292 } 293 294 public String getExecutorServiceRef() { 295 return executorServiceRef; 296 } 297 298 public void setExecutorServiceRef(String executorServiceRef) { 299 this.executorServiceRef = executorServiceRef; 300 } 301 302 public Boolean getUseOriginalMessagePolicy() { 303 return useOriginalMessagePolicy != null; 304 } 305 306 public void setUseOriginalMessagePolicy(Boolean useOriginalMessagePolicy) { 307 this.useOriginalMessagePolicy = useOriginalMessagePolicy; 308 } 309 310 }