001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import java.util.concurrent.ExecutorService;
022    import javax.xml.bind.annotation.XmlAccessType;
023    import javax.xml.bind.annotation.XmlAccessorType;
024    import javax.xml.bind.annotation.XmlAttribute;
025    import javax.xml.bind.annotation.XmlRootElement;
026    import javax.xml.bind.annotation.XmlTransient;
027    
028    import org.apache.camel.CamelContextAware;
029    import org.apache.camel.Processor;
030    import org.apache.camel.processor.CamelInternalProcessor;
031    import org.apache.camel.processor.MulticastProcessor;
032    import org.apache.camel.processor.aggregate.AggregationStrategy;
033    import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
034    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
035    import org.apache.camel.spi.RouteContext;
036    import org.apache.camel.util.CamelContextHelper;
037    
038    /**
039     * Represents an XML <multicast/> element
040     *
041     * @version 
042     */
043    @XmlRootElement(name = "multicast")
044    @XmlAccessorType(XmlAccessType.FIELD)
045    public class MulticastDefinition extends OutputDefinition<MulticastDefinition> implements ExecutorServiceAwareDefinition<MulticastDefinition> {
046        @XmlAttribute
047        private Boolean parallelProcessing;
048        @XmlAttribute
049        private String strategyRef;
050        @XmlAttribute
051        private String strategyMethodName;
052        @XmlAttribute
053        private Boolean strategyMethodAllowNull;
054        @XmlTransient
055        private ExecutorService executorService;
056        @XmlAttribute
057        private String executorServiceRef;
058        @XmlAttribute
059        private Boolean streaming;
060        @XmlAttribute
061        private Boolean stopOnException;
062        @XmlAttribute
063        private Long timeout;
064        @XmlTransient
065        private AggregationStrategy aggregationStrategy;
066        @XmlAttribute
067        private String onPrepareRef;
068        @XmlTransient
069        private Processor onPrepare;
070        @XmlAttribute
071        private Boolean shareUnitOfWork;
072    
073        public MulticastDefinition() {
074        }
075    
076        @Override
077        public String toString() {
078            return "Multicast[" + getOutputs() + "]";
079        }
080    
081        @Override
082        public String getLabel() {
083            return "multicast";
084        }
085        
086        @Override
087        public String getShortName() {
088            return "multicast";
089        }
090    
091        @Override
092        public Processor createProcessor(RouteContext routeContext) throws Exception {
093            Processor answer = this.createChildProcessor(routeContext, true);
094    
095            // force the answer as a multicast processor even if there is only one child processor in the multicast
096            if (!(answer instanceof MulticastProcessor)) {
097                List<Processor> list = new ArrayList<Processor>(1);
098                list.add(answer);
099                answer = createCompositeProcessor(routeContext, list);
100            }
101            return answer;
102        }
103    
104        // Fluent API
105        // -------------------------------------------------------------------------
106    
107        /**
108         * Set the multicasting aggregationStrategy
109         *
110         * @return the builder
111         */
112        public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
113            setAggregationStrategy(aggregationStrategy);
114            return this;
115        }
116        
117        /**
118         * Set the aggregationStrategy
119         *
120         * @param aggregationStrategyRef a reference to a strategy to lookup
121         * @return the builder
122         */
123        public MulticastDefinition aggregationStrategyRef(String aggregationStrategyRef) {
124            setStrategyRef(aggregationStrategyRef);
125            return this;
126        }
127    
128        /**
129         * Sets the method name to use when using a POJO as {@link AggregationStrategy}.
130         *
131         * @param  methodName the method name to call
132         * @return the builder
133         */
134        public MulticastDefinition aggregationStrategyMethodName(String methodName) {
135            setStrategyMethodName(methodName);
136            return this;
137        }
138    
139        /**
140         * Sets allowing null when using a POJO as {@link AggregationStrategy}.
141         *
142         * @return the builder
143         */
144        public MulticastDefinition aggregationStrategyMethodAllowNull() {
145            setStrategyMethodAllowNull(true);
146            return this;
147        }
148    
149        /**
150         * Uses the {@link java.util.concurrent.ExecutorService} to do the multicasting work
151         *     
152         * @return the builder
153         */
154        public MulticastDefinition parallelProcessing() {
155            setParallelProcessing(true);
156            return this;
157        }
158        
159        /**
160         * Aggregates the responses as the are done (e.g. out of order sequence)
161         *
162         * @return the builder
163         */
164        public MulticastDefinition streaming() {
165            setStreaming(true);
166            return this;
167        }
168    
169        /**
170         * Will now stop further processing if an exception or failure occurred during processing of an
171         * {@link org.apache.camel.Exchange} and the caused exception will be thrown.
172         * <p/>
173         * Will also stop if processing the exchange failed (has a fault message) or an exception
174         * was thrown and handled by the error handler (such as using onException). In all situations
175         * the multicast will stop further processing. This is the same behavior as in pipeline, which
176         * is used by the routing engine.
177         * <p/>
178         * The default behavior is to <b>not</b> stop but continue processing till the end
179         *
180         * @return the builder
181         */
182        public MulticastDefinition stopOnException() {
183            setStopOnException(true);
184            return this;
185        }
186        
187        public MulticastDefinition executorService(ExecutorService executorService) {
188            setExecutorService(executorService);
189            return this;
190        }
191        
192        public MulticastDefinition executorServiceRef(String executorServiceRef) {
193            setExecutorServiceRef(executorServiceRef);
194            return this;
195        }
196    
197        /**
198         * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
199         * This can be used to deep-clone messages that should be send, or any custom logic needed before
200         * the exchange is send.
201         *
202         * @param onPrepare the processor
203         * @return the builder
204         */
205        public MulticastDefinition onPrepare(Processor onPrepare) {
206            setOnPrepare(onPrepare);
207            return this;
208        }
209    
210        /**
211         * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
212         * This can be used to deep-clone messages that should be send, or any custom logic needed before
213         * the exchange is send.
214         *
215         * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
216         * @return the builder
217         */
218        public MulticastDefinition onPrepareRef(String onPrepareRef) {
219            setOnPrepareRef(onPrepareRef);
220            return this;
221        }
222    
223        /**
224         * Sets a timeout value in millis to use when using parallelProcessing.
225         *
226         * @param timeout timeout in millis
227         * @return the builder
228         */
229        public MulticastDefinition timeout(long timeout) {
230            setTimeout(timeout);
231            return this;
232        }
233    
234        /**
235         * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages.
236         *
237         * @return the builder.
238         * @see org.apache.camel.spi.SubUnitOfWork
239         */
240        public MulticastDefinition shareUnitOfWork() {
241            setShareUnitOfWork(true);
242            return this;
243        }
244    
245        protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception {
246            AggregationStrategy strategy = createAggregationStrategy(routeContext);
247            if (strategy == null) {
248                // default to use latest aggregation strategy
249                strategy = new UseLatestAggregationStrategy();
250            }
251    
252            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
253            ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing());
254    
255            long timeout = getTimeout() != null ? getTimeout() : 0;
256            if (timeout > 0 && !isParallelProcessing()) {
257                throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
258            }
259            if (onPrepareRef != null) {
260                onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
261            }
262    
263            MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, strategy, isParallelProcessing(),
264                                          threadPool, shutdownThreadPool, isStreaming(), isStopOnException(), timeout, onPrepare, isShareUnitOfWork());
265            if (isShareUnitOfWork()) {
266                // wrap answer in a sub unit of work, since we share the unit of work
267                CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer);
268                internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice());
269                return internalProcessor;
270            }
271            return answer;
272        }
273    
274        private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
275            AggregationStrategy strategy = getAggregationStrategy();
276            if (strategy == null && strategyRef != null) {
277                Object aggStrategy = routeContext.lookup(strategyRef, Object.class);
278                if (aggStrategy instanceof AggregationStrategy) {
279                    strategy = (AggregationStrategy) aggStrategy;
280                } else if (aggStrategy != null) {
281                    AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName());
282                    if (getStrategyMethodAllowNull() != null) {
283                        adapter.setAllowNullNewExchange(getStrategyMethodAllowNull());
284                        adapter.setAllowNullOldExchange(getStrategyMethodAllowNull());
285                    }
286                    strategy = adapter;
287                } else {
288                    throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
289                }
290            }
291    
292            if (strategy != null && strategy instanceof CamelContextAware) {
293                ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
294            }
295    
296            return strategy;
297        }
298    
299    
300        public AggregationStrategy getAggregationStrategy() {
301            return aggregationStrategy;
302        }
303    
304        public MulticastDefinition setAggregationStrategy(AggregationStrategy aggregationStrategy) {
305            this.aggregationStrategy = aggregationStrategy;
306            return this;
307        }
308    
309        public Boolean getParallelProcessing() {
310            return parallelProcessing;
311        }
312    
313        public void setParallelProcessing(Boolean parallelProcessing) {
314            this.parallelProcessing = parallelProcessing;
315        }
316    
317        public boolean isParallelProcessing() {
318            return parallelProcessing != null && parallelProcessing;
319        }
320    
321        public Boolean getStreaming() {
322            return streaming;
323        }
324    
325        public void setStreaming(Boolean streaming) {
326            this.streaming = streaming;
327        }
328    
329        public boolean isStreaming() {
330            return streaming != null && streaming;
331        }
332    
333        public Boolean getStopOnException() {
334            return stopOnException;
335        }
336    
337        public void setStopOnException(Boolean stopOnException) {
338            this.stopOnException = stopOnException;
339        }
340    
341        public Boolean isStopOnException() {
342            return stopOnException != null && stopOnException;
343        }
344    
345        public ExecutorService getExecutorService() {
346            return executorService;
347        }
348    
349        public void setExecutorService(ExecutorService executorService) {
350            this.executorService = executorService;
351        }
352    
353        public String getStrategyRef() {
354            return strategyRef;
355        }
356    
357        public void setStrategyRef(String strategyRef) {
358            this.strategyRef = strategyRef;
359        }
360    
361        public String getStrategyMethodName() {
362            return strategyMethodName;
363        }
364    
365        public void setStrategyMethodName(String strategyMethodName) {
366            this.strategyMethodName = strategyMethodName;
367        }
368    
369        public Boolean getStrategyMethodAllowNull() {
370            return strategyMethodAllowNull;
371        }
372    
373        public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) {
374            this.strategyMethodAllowNull = strategyMethodAllowNull;
375        }
376    
377        public String getExecutorServiceRef() {
378            return executorServiceRef;
379        }
380    
381        public void setExecutorServiceRef(String executorServiceRef) {
382            this.executorServiceRef = executorServiceRef;
383        }
384    
385        public Long getTimeout() {
386            return timeout;
387        }
388    
389        public void setTimeout(Long timeout) {
390            this.timeout = timeout;
391        }
392    
393        public String getOnPrepareRef() {
394            return onPrepareRef;
395        }
396    
397        public void setOnPrepareRef(String onPrepareRef) {
398            this.onPrepareRef = onPrepareRef;
399        }
400    
401        public Processor getOnPrepare() {
402            return onPrepare;
403        }
404    
405        public void setOnPrepare(Processor onPrepare) {
406            this.onPrepare = onPrepare;
407        }
408    
409        public Boolean getShareUnitOfWork() {
410            return shareUnitOfWork;
411        }
412    
413        public void setShareUnitOfWork(Boolean shareUnitOfWork) {
414            this.shareUnitOfWork = shareUnitOfWork;
415        }
416    
417        public boolean isShareUnitOfWork() {
418            return shareUnitOfWork != null && shareUnitOfWork;
419        }
420    
421    }