001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.concurrent.ExecutorService;
026    import javax.xml.bind.annotation.XmlAccessType;
027    import javax.xml.bind.annotation.XmlAccessorType;
028    import javax.xml.bind.annotation.XmlAttribute;
029    import javax.xml.bind.annotation.XmlElement;
030    import javax.xml.bind.annotation.XmlElementRef;
031    import javax.xml.bind.annotation.XmlRootElement;
032    import javax.xml.bind.annotation.XmlTransient;
033    
034    import org.apache.camel.Predicate;
035    import org.apache.camel.Processor;
036    import org.apache.camel.processor.CamelInternalProcessor;
037    import org.apache.camel.processor.OnCompletionProcessor;
038    import org.apache.camel.spi.RouteContext;
039    
040    /**
041     * Represents an XML <onCompletion/> element
042     *
043     * @version 
044     */
045    @XmlRootElement(name = "onCompletion")
046    @XmlAccessorType(XmlAccessType.FIELD)
047    public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> {
048        @XmlAttribute
049        private Boolean onCompleteOnly;
050        @XmlAttribute
051        private Boolean onFailureOnly;
052        @XmlElement(name = "onWhen")
053        private WhenDefinition onWhen;
054        @XmlAttribute
055        private String executorServiceRef;
056        @XmlAttribute(name = "useOriginalMessage")
057        private Boolean useOriginalMessagePolicy;
058        @XmlElementRef
059        private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
060        @XmlTransient
061        private ExecutorService executorService;
062        @XmlTransient
063        private Boolean routeScoped;
064        // TODO: in Camel 3.0 the OnCompletionDefinition should not contain state and OnCompletion processors
065        @XmlTransient
066        private final Map<String, Processor> onCompletions = new HashMap<String, Processor>();
067    
068        public OnCompletionDefinition() {
069        }
070    
071        public boolean isRouteScoped() {
072            // is context scoped by default
073            return routeScoped != null ? routeScoped : false;
074        }
075    
076        public Processor getOnCompletion(String routeId) {
077            return onCompletions.get(routeId);
078        }
079    
080        public Collection<Processor> getOnCompletions() {
081            return onCompletions.values();
082        }
083    
084        @Override
085        public String toString() {
086            return "onCompletion[" + getOutputs() + "]";
087        }
088    
089        @Override
090        public String getShortName() {
091            return "onCompletion";
092        }
093    
094        @Override
095        public String getLabel() {
096            return "onCompletion";
097        }
098    
099        @Override
100        public boolean isAbstract() {
101            return true;
102        }
103    
104        @Override
105        public boolean isTopLevelOnly() {
106            return true;
107        }
108    
109        @Override
110        public Processor createProcessor(RouteContext routeContext) throws Exception {
111            // assign whether this was a route scoped onCompletion or not
112            // we need to know this later when setting the parent, as only route scoped should have parent
113            // Note: this logic can possible be removed when the Camel routing engine decides at runtime
114            // to apply onCompletion in a more dynamic fashion than current code base
115            // and therefore is in a better position to decide among context/route scoped OnCompletion at runtime
116            if (routeScoped == null) {
117                routeScoped = super.getParent() != null;
118            }
119    
120            if (isOnCompleteOnly() && isOnFailureOnly()) {
121                throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
122            }
123    
124            String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
125    
126            Processor childProcessor = this.createChildProcessor(routeContext, true);
127    
128            // wrap the on completion route in a unit of work processor
129            CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor);
130            internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId));
131            internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext));
132    
133            onCompletions.put(routeId, internal);
134    
135            Predicate when = null;
136            if (onWhen != null) {
137                when = onWhen.getExpression().createPredicate(routeContext);
138            }
139    
140            // executor service is mandatory for on completion
141            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
142            ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, true);
143    
144            // should be false by default
145            boolean original = getUseOriginalMessagePolicy() != null ? getUseOriginalMessagePolicy() : false;
146            OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), internal,
147                    threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original);
148            return answer;
149        }
150    
151        /**
152         * Removes all existing {@link org.apache.camel.model.OnCompletionDefinition} from the definition.
153         * <p/>
154         * This is used to let route scoped <tt>onCompletion</tt> overrule any global <tt>onCompletion</tt>.
155         * Hence we remove all existing as they are global.
156         *
157         * @param definition the parent definition that is the route
158         */
159        public void removeAllOnCompletionDefinition(ProcessorDefinition<?> definition) {
160            for (Iterator<ProcessorDefinition<?>> it = definition.getOutputs().iterator(); it.hasNext();) {
161                ProcessorDefinition<?> out = it.next();
162                if (out instanceof OnCompletionDefinition) {
163                    it.remove();
164                }
165            }
166        }
167    
168        @Override
169        public ProcessorDefinition<?> end() {
170            // pop parent block, as we added our self as block to parent when synchronized was defined in the route
171            getParent().popBlock();
172            return super.end();
173        }
174    
175        /**
176         * Will only synchronize when the {@link org.apache.camel.Exchange} completed successfully (no errors).
177         *
178         * @return the builder
179         */
180        public OnCompletionDefinition onCompleteOnly() {
181            if (isOnFailureOnly()) {
182                throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
183            }
184            // must define return type as OutputDefinition and not this type to avoid end user being able
185            // to invoke onFailureOnly/onCompleteOnly more than once
186            setOnCompleteOnly(Boolean.TRUE);
187            setOnFailureOnly(Boolean.FALSE);
188            return this;
189        }
190    
191        /**
192         * Will only synchronize when the {@link org.apache.camel.Exchange} ended with failure (exception or FAULT message).
193         *
194         * @return the builder
195         */
196        public OnCompletionDefinition onFailureOnly() {
197            if (isOnCompleteOnly()) {
198                throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
199            }
200            // must define return type as OutputDefinition and not this type to avoid end user being able
201            // to invoke onFailureOnly/onCompleteOnly more than once
202            setOnCompleteOnly(Boolean.FALSE);
203            setOnFailureOnly(Boolean.TRUE);
204            return this;
205        }
206    
207        /**
208         * Sets an additional predicate that should be true before the onCompletion is triggered.
209         * <p/>
210         * To be used for fine grained controlling whether a completion callback should be invoked or not
211         *
212         * @param predicate predicate that determines true or false
213         * @return the builder
214         */
215        public OnCompletionDefinition onWhen(Predicate predicate) {
216            setOnWhen(new WhenDefinition(predicate));
217            return this;
218        }
219    
220        /**
221         * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion.
222         * <p/>
223         * By default this feature is off.
224         *
225         * @return the builder
226         */
227        public OnCompletionDefinition useOriginalBody() {
228            setUseOriginalMessagePolicy(Boolean.TRUE);
229            return this;
230        }
231    
232        public OnCompletionDefinition executorService(ExecutorService executorService) {
233            setExecutorService(executorService);
234            return this;
235        }
236    
237        public OnCompletionDefinition executorServiceRef(String executorServiceRef) {
238            setExecutorServiceRef(executorServiceRef);
239            return this;
240        }
241    
242        public List<ProcessorDefinition<?>> getOutputs() {
243            return outputs;
244        }
245    
246        public void setOutputs(List<ProcessorDefinition<?>> outputs) {
247            this.outputs = outputs;
248        }
249    
250        public boolean isOutputSupported() {
251            return true;
252        }
253    
254        public Boolean getOnCompleteOnly() {
255            return onCompleteOnly;
256        }
257    
258        public void setOnCompleteOnly(Boolean onCompleteOnly) {
259            this.onCompleteOnly = onCompleteOnly;
260        }
261    
262        public boolean isOnCompleteOnly() {
263            return onCompleteOnly != null && onCompleteOnly;
264        }
265    
266        public Boolean getOnFailureOnly() {
267            return onFailureOnly;
268        }
269    
270        public void setOnFailureOnly(Boolean onFailureOnly) {
271            this.onFailureOnly = onFailureOnly;
272        }
273    
274        public boolean isOnFailureOnly() {
275            return onFailureOnly != null && onFailureOnly;
276        }
277    
278        public WhenDefinition getOnWhen() {
279            return onWhen;
280        }
281    
282        public void setOnWhen(WhenDefinition onWhen) {
283            this.onWhen = onWhen;
284        }
285    
286        public ExecutorService getExecutorService() {
287            return executorService;
288        }
289    
290        public void setExecutorService(ExecutorService executorService) {
291            this.executorService = executorService;
292        }
293    
294        public String getExecutorServiceRef() {
295            return executorServiceRef;
296        }
297    
298        public void setExecutorServiceRef(String executorServiceRef) {
299            this.executorServiceRef = executorServiceRef;
300        }
301    
302        public Boolean getUseOriginalMessagePolicy() {
303            return useOriginalMessagePolicy != null;
304        }
305    
306        public void setUseOriginalMessagePolicy(Boolean useOriginalMessagePolicy) {
307            this.useOriginalMessagePolicy = useOriginalMessagePolicy;
308        }
309    
310    }