001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import java.util.concurrent.ExecutorService;
022    
023    import javax.xml.bind.annotation.XmlAccessType;
024    import javax.xml.bind.annotation.XmlAccessorType;
025    import javax.xml.bind.annotation.XmlAttribute;
026    import javax.xml.bind.annotation.XmlRootElement;
027    import javax.xml.bind.annotation.XmlTransient;
028    
029    import org.apache.camel.CamelContextAware;
030    import org.apache.camel.Expression;
031    import org.apache.camel.Processor;
032    import org.apache.camel.model.language.ExpressionDefinition;
033    import org.apache.camel.processor.EvaluateExpressionProcessor;
034    import org.apache.camel.processor.Pipeline;
035    import org.apache.camel.processor.RecipientList;
036    import org.apache.camel.processor.aggregate.AggregationStrategy;
037    import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
038    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
039    import org.apache.camel.spi.RouteContext;
040    import org.apache.camel.util.CamelContextHelper;
041    
042    /**
043     * Represents an XML <recipientList/> element
044     *
045     * @version 
046     */
047    @XmlRootElement(name = "recipientList")
048    @XmlAccessorType(XmlAccessType.FIELD)
049    public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> extends NoOutputExpressionNode implements ExecutorServiceAwareDefinition<RecipientListDefinition<Type>> {
050        @XmlTransient
051        private AggregationStrategy aggregationStrategy;
052        @XmlTransient
053        private ExecutorService executorService;
054        @XmlAttribute
055        private String delimiter;
056        @XmlAttribute
057        private Boolean parallelProcessing;
058        @XmlAttribute
059        private String strategyRef;
060        @XmlAttribute
061        private String strategyMethodName;
062        @XmlAttribute
063        private Boolean strategyMethodAllowNull;
064        @XmlAttribute
065        private String executorServiceRef;
066        @XmlAttribute
067        private Boolean stopOnException;
068        @XmlAttribute
069        private Boolean ignoreInvalidEndpoints;
070        @XmlAttribute
071        private Boolean streaming;
072        @XmlAttribute
073        private Long timeout;
074        @XmlAttribute
075        private String onPrepareRef;
076        @XmlTransient
077        private Processor onPrepare;
078        @XmlAttribute
079        private Boolean shareUnitOfWork;
080    
081        public RecipientListDefinition() {
082        }
083    
084        public RecipientListDefinition(ExpressionDefinition expression) {
085            super(expression);
086        }
087    
088        public RecipientListDefinition(Expression expression) {
089            super(expression);
090        }
091    
092        @Override
093        public String toString() {
094            return "RecipientList[" + getExpression() + "]";
095        }
096    
097        @Override
098        public String getShortName() {
099            return "recipientList";
100        }
101        
102        @Override
103        public String getLabel() {
104            return "recipientList[" + getExpression() + "]";
105        }
106    
107        @Override
108        public Processor createProcessor(RouteContext routeContext) throws Exception {
109            final Expression expression = getExpression().createExpression(routeContext);
110    
111            RecipientList answer;
112            if (delimiter != null) {
113                answer = new RecipientList(routeContext.getCamelContext(), expression, delimiter);
114            } else {
115                answer = new RecipientList(routeContext.getCamelContext(), expression);
116            }
117            answer.setAggregationStrategy(createAggregationStrategy(routeContext));
118            answer.setParallelProcessing(isParallelProcessing());
119            answer.setStreaming(isStreaming());   
120            answer.setShareUnitOfWork(isShareUnitOfWork());
121            if (onPrepareRef != null) {
122                onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
123            }
124            if (onPrepare != null) {
125                answer.setOnPrepare(onPrepare);
126            }
127            if (stopOnException != null) {
128                answer.setStopOnException(isStopOnException());
129            }
130            if (ignoreInvalidEndpoints != null) {
131                answer.setIgnoreInvalidEndpoints(ignoreInvalidEndpoints);
132            }
133            if (getTimeout() != null) {
134                answer.setTimeout(getTimeout());
135            }
136    
137            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
138            ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing());
139            answer.setExecutorService(threadPool);
140            answer.setShutdownExecutorService(shutdownThreadPool);
141            long timeout = getTimeout() != null ? getTimeout() : 0;
142            if (timeout > 0 && !isParallelProcessing()) {
143                throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
144            }
145    
146            // create a pipeline with two processors
147            // the first is the eval processor which evaluates the expression to use
148            // the second is the recipient list
149            List<Processor> pipe = new ArrayList<Processor>(2);
150    
151            // the eval processor must be wrapped in error handler, so in case there was an
152            // error during evaluation, the error handler can deal with it
153            // the recipient list is not in error handler, as its has its own special error handling
154            // when sending to the recipients individually
155            Processor evalProcessor = new EvaluateExpressionProcessor(expression);
156            evalProcessor = super.wrapInErrorHandler(routeContext, evalProcessor);
157    
158            pipe.add(evalProcessor);
159            pipe.add(answer);
160    
161            // wrap in nested pipeline so this appears as one processor
162            // (threads definition does this as well)
163            return new Pipeline(routeContext.getCamelContext(), pipe) {
164                @Override
165                public String toString() {
166                    return "RecipientList[" + expression + "]";
167                }
168            };
169        }
170    
171        private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
172            AggregationStrategy strategy = getAggregationStrategy();
173            if (strategy == null && strategyRef != null) {
174                Object aggStrategy = routeContext.lookup(strategyRef, Object.class);
175                if (aggStrategy instanceof AggregationStrategy) {
176                    strategy = (AggregationStrategy) aggStrategy;
177                } else if (aggStrategy != null) {
178                    AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName());
179                    if (getStrategyMethodAllowNull() != null) {
180                        adapter.setAllowNullNewExchange(getStrategyMethodAllowNull());
181                        adapter.setAllowNullOldExchange(getStrategyMethodAllowNull());
182                    }
183                    strategy = adapter;
184                } else {
185                    throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
186                }
187            }
188            if (strategy == null) {
189                // fallback to use latest
190                strategy = new UseLatestAggregationStrategy();
191            }
192    
193            if (strategy instanceof CamelContextAware) {
194                ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
195            }
196    
197            return strategy;
198        }
199    
200        // Fluent API
201        // -------------------------------------------------------------------------
202    
203        @Override
204        @SuppressWarnings("unchecked")
205        public Type end() {
206            // allow end() to return to previous type so you can continue in the DSL
207            return (Type) super.end();
208        }
209    
210        /**
211         * Set the delimiter
212         *
213         * @param delimiter the delimiter
214         * @return the builder
215         */
216        public RecipientListDefinition<Type> delimiter(String delimiter) {
217            setDelimiter(delimiter);
218            return this;
219        }
220    
221        /**
222         * Set the aggregationStrategy
223         *
224         * @param aggregationStrategy the strategy
225         * @return the builder
226         */
227        public RecipientListDefinition<Type> aggregationStrategy(AggregationStrategy aggregationStrategy) {
228            setAggregationStrategy(aggregationStrategy);
229            return this;
230        }
231    
232        /**
233         * Set the aggregationStrategy
234         *
235         * @param aggregationStrategyRef a reference to a strategy to lookup
236         * @return the builder
237         */
238        public RecipientListDefinition<Type> aggregationStrategyRef(String aggregationStrategyRef) {
239            setStrategyRef(aggregationStrategyRef);
240            return this;
241        }
242    
243        /**
244         * Sets the method name to use when using a POJO as {@link AggregationStrategy}.
245         *
246         * @param  methodName the method name to call
247         * @return the builder
248         */
249        public RecipientListDefinition<Type> aggregationStrategyMethodName(String methodName) {
250            setStrategyMethodName(methodName);
251            return this;
252        }
253    
254        /**
255         * Sets allowing null when using a POJO as {@link AggregationStrategy}.
256         *
257         * @return the builder
258         */
259        public RecipientListDefinition<Type> aggregationStrategyMethodAllowNull() {
260            setStrategyMethodAllowNull(true);
261            return this;
262        }
263    
264        /**
265         * Ignore the invalidate endpoint exception when try to create a producer with that endpoint
266         *
267         * @return the builder
268         */
269        public RecipientListDefinition<Type> ignoreInvalidEndpoints() {
270            setIgnoreInvalidEndpoints(true);
271            return this;
272        }
273    
274        /**
275         * Doing the recipient list work in parallel
276         *
277         * @return the builder
278         */
279        public RecipientListDefinition<Type> parallelProcessing() {
280            setParallelProcessing(true);
281            return this;
282        }
283        
284        /**
285         * Doing the recipient list work in streaming model
286         *
287         * @return the builder
288         */
289        public RecipientListDefinition<Type> streaming() {
290            setStreaming(true);
291            return this;
292        }
293    
294        /**
295         * Will now stop further processing if an exception or failure occurred during processing of an
296         * {@link org.apache.camel.Exchange} and the caused exception will be thrown.
297         * <p/>
298         * Will also stop if processing the exchange failed (has a fault message) or an exception
299         * was thrown and handled by the error handler (such as using onException). In all situations
300         * the recipient list will stop further processing. This is the same behavior as in pipeline, which
301         * is used by the routing engine.
302         * <p/>
303         * The default behavior is to <b>not</b> stop but continue processing till the end
304         *
305         * @return the builder
306         */
307        public RecipientListDefinition<Type> stopOnException() {
308            setStopOnException(true);
309            return this;
310        }
311    
312        public RecipientListDefinition<Type> executorService(ExecutorService executorService) {
313            setExecutorService(executorService);
314            return this;
315        }
316    
317        public RecipientListDefinition<Type> executorServiceRef(String executorServiceRef) {
318            setExecutorServiceRef(executorServiceRef);
319            return this;
320        }
321    
322        /**
323         * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send.
324         * This can be used to deep-clone messages that should be send, or any custom logic needed before
325         * the exchange is send.
326         *
327         * @param onPrepare the processor
328         * @return the builder
329         */
330        public RecipientListDefinition<Type> onPrepare(Processor onPrepare) {
331            setOnPrepare(onPrepare);
332            return this;
333        }
334    
335        /**
336         * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
337         * This can be used to deep-clone messages that should be send, or any custom logic needed before
338         * the exchange is send.
339         *
340         * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
341         * @return the builder
342         */
343        public RecipientListDefinition<Type> onPrepareRef(String onPrepareRef) {
344            setOnPrepareRef(onPrepareRef);
345            return this;
346        }
347    
348        /**
349         * Sets a timeout value in millis to use when using parallelProcessing.
350         *
351         * @param timeout timeout in millis
352         * @return the builder
353         */
354        public RecipientListDefinition<Type> timeout(long timeout) {
355            setTimeout(timeout);
356            return this;
357        }
358    
359        /**
360         * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages.
361         *
362         * @return the builder.
363         * @see org.apache.camel.spi.SubUnitOfWork
364         */
365        public RecipientListDefinition<Type> shareUnitOfWork() {
366            setShareUnitOfWork(true);
367            return this;
368        }
369    
370        // Properties
371        //-------------------------------------------------------------------------
372    
373        public String getDelimiter() {
374            return delimiter;
375        }
376    
377        public void setDelimiter(String delimiter) {
378            this.delimiter = delimiter;
379        }
380    
381        public Boolean getParallelProcessing() {
382            return parallelProcessing;
383        }
384    
385        public void setParallelProcessing(Boolean parallelProcessing) {
386            this.parallelProcessing = parallelProcessing;
387        }
388    
389        public boolean isParallelProcessing() {
390            return parallelProcessing != null && parallelProcessing;
391        }
392    
393        public String getStrategyRef() {
394            return strategyRef;
395        }
396    
397        public void setStrategyRef(String strategyRef) {
398            this.strategyRef = strategyRef;
399        }
400    
401        public String getStrategyMethodName() {
402            return strategyMethodName;
403        }
404    
405        public void setStrategyMethodName(String strategyMethodName) {
406            this.strategyMethodName = strategyMethodName;
407        }
408    
409        public Boolean getStrategyMethodAllowNull() {
410            return strategyMethodAllowNull;
411        }
412    
413        public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) {
414            this.strategyMethodAllowNull = strategyMethodAllowNull;
415        }
416    
417        public String getExecutorServiceRef() {
418            return executorServiceRef;
419        }
420    
421        public void setExecutorServiceRef(String executorServiceRef) {
422            this.executorServiceRef = executorServiceRef;
423        }
424    
425        public Boolean getIgnoreInvalidEndpoints() {
426            return ignoreInvalidEndpoints;
427        }
428    
429        public void setIgnoreInvalidEndpoints(Boolean ignoreInvalidEndpoints) {
430            this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
431        }
432    
433        public boolean isIgnoreInvalidEndpoints() {
434            return ignoreInvalidEndpoints != null && ignoreInvalidEndpoints;
435        }
436    
437        public Boolean getStopOnException() {
438            return stopOnException;
439        }
440    
441        public void setStopOnException(Boolean stopOnException) {
442            this.stopOnException = stopOnException;
443        }
444    
445        public boolean isStopOnException() {
446            return stopOnException != null && stopOnException;
447        }
448    
449        public AggregationStrategy getAggregationStrategy() {
450            return aggregationStrategy;
451        }
452    
453        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
454            this.aggregationStrategy = aggregationStrategy;
455        }
456    
457        public ExecutorService getExecutorService() {
458            return executorService;
459        }
460    
461        public void setExecutorService(ExecutorService executorService) {
462            this.executorService = executorService;
463        }
464    
465        public Boolean getStreaming() {
466            return streaming;
467        }
468    
469        public void setStreaming(Boolean streaming) {
470            this.streaming = streaming;
471        }
472    
473        public boolean isStreaming() {
474            return streaming != null && streaming;
475        }
476    
477        public Long getTimeout() {
478            return timeout;
479        }
480    
481        public void setTimeout(Long timeout) {
482            this.timeout = timeout;
483        }
484    
485        public String getOnPrepareRef() {
486            return onPrepareRef;
487        }
488    
489        public void setOnPrepareRef(String onPrepareRef) {
490            this.onPrepareRef = onPrepareRef;
491        }
492    
493        public Processor getOnPrepare() {
494            return onPrepare;
495        }
496    
497        public void setOnPrepare(Processor onPrepare) {
498            this.onPrepare = onPrepare;
499        }
500    
501        public Boolean getShareUnitOfWork() {
502            return shareUnitOfWork;
503        }
504    
505        public void setShareUnitOfWork(Boolean shareUnitOfWork) {
506            this.shareUnitOfWork = shareUnitOfWork;
507        }
508    
509        public boolean isShareUnitOfWork() {
510            return shareUnitOfWork != null && shareUnitOfWork;
511        }
512    
513    }