001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.concurrent.ExecutorService;
020    
021    import javax.xml.bind.annotation.XmlAccessType;
022    import javax.xml.bind.annotation.XmlAccessorType;
023    import javax.xml.bind.annotation.XmlAttribute;
024    import javax.xml.bind.annotation.XmlRootElement;
025    import javax.xml.bind.annotation.XmlTransient;
026    
027    import org.apache.camel.CamelContextAware;
028    import org.apache.camel.Expression;
029    import org.apache.camel.Processor;
030    import org.apache.camel.model.language.ExpressionDefinition;
031    import org.apache.camel.processor.CamelInternalProcessor;
032    import org.apache.camel.processor.Splitter;
033    import org.apache.camel.processor.aggregate.AggregationStrategy;
034    import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
035    import org.apache.camel.spi.RouteContext;
036    import org.apache.camel.util.CamelContextHelper;
037    
038    /**
039     * Represents an XML <split/> element
040     *
041     * @version 
042     */
043    @XmlRootElement(name = "split")
044    @XmlAccessorType(XmlAccessType.FIELD)
045    public class SplitDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<SplitDefinition> {
046        @XmlTransient
047        private AggregationStrategy aggregationStrategy;
048        @XmlTransient
049        private ExecutorService executorService;
050        @XmlAttribute
051        private Boolean parallelProcessing;
052        @XmlAttribute
053        private String strategyRef;
054        @XmlAttribute
055        private String strategyMethodName;
056        @XmlAttribute
057        private Boolean strategyMethodAllowNull;
058        @XmlAttribute
059        private String executorServiceRef;
060        @XmlAttribute
061        private Boolean streaming;
062        @XmlAttribute
063        private Boolean stopOnException;
064        @XmlAttribute
065        private Long timeout;
066        @XmlAttribute
067        private String onPrepareRef;
068        @XmlTransient
069        private Processor onPrepare;
070        @XmlAttribute
071        private Boolean shareUnitOfWork;
072    
073        public SplitDefinition() {
074        }
075    
076        public SplitDefinition(Expression expression) {
077            super(expression);
078        }
079    
080        public SplitDefinition(ExpressionDefinition expression) {
081            super(expression);
082        }
083    
084        @Override
085        public String toString() {
086            return "Split[" + getExpression() + " -> " + getOutputs() + "]";
087        }
088    
089        @Override
090        public String getShortName() {
091            return "split";
092        }
093    
094        @Override
095        public String getLabel() {
096            return "split[" + getExpression() + "]";
097        }
098    
099        @Override
100        public Processor createProcessor(RouteContext routeContext) throws Exception {
101            Processor childProcessor = this.createChildProcessor(routeContext, true);
102            aggregationStrategy = createAggregationStrategy(routeContext);
103    
104            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
105            ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing());
106    
107            long timeout = getTimeout() != null ? getTimeout() : 0;
108            if (timeout > 0 && !isParallelProcessing()) {
109                throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
110            }
111            if (onPrepareRef != null) {
112                onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
113            }
114    
115            Expression exp = getExpression().createExpression(routeContext);
116    
117            Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
118                                isParallelProcessing(), threadPool, shutdownThreadPool, isStreaming(), isStopOnException(),
119                                timeout, onPrepare, isShareUnitOfWork());
120            if (isShareUnitOfWork()) {
121                // wrap answer in a sub unit of work, since we share the unit of work
122                CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer);
123                internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice());
124                return internalProcessor;
125            }
126            return answer;
127        }
128    
129        private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
130            AggregationStrategy strategy = getAggregationStrategy();
131            if (strategy == null && strategyRef != null) {
132                Object aggStrategy = routeContext.lookup(strategyRef, Object.class);
133                if (aggStrategy instanceof AggregationStrategy) {
134                    strategy = (AggregationStrategy) aggStrategy;
135                } else if (aggStrategy != null) {
136                    AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName());
137                    if (getStrategyMethodAllowNull() != null) {
138                        adapter.setAllowNullNewExchange(getStrategyMethodAllowNull());
139                        adapter.setAllowNullOldExchange(getStrategyMethodAllowNull());
140                    }
141                    strategy = adapter;
142                } else {
143                    throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
144                }
145            }
146    
147            if (strategy != null && strategy instanceof CamelContextAware) {
148                ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
149            }
150    
151            return strategy;
152        }
153    
154        // Fluent API
155        // -------------------------------------------------------------------------
156    
157        /**
158         * Set the aggregationStrategy
159         *
160         * @return the builder
161         */
162        public SplitDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
163            setAggregationStrategy(aggregationStrategy);
164            return this;
165        }
166        
167        /**
168         * Set the aggregationStrategy
169         *
170         * @param aggregationStrategyRef a reference to a strategy to lookup
171         * @return the builder
172         */
173        public SplitDefinition aggregationStrategyRef(String aggregationStrategyRef) {
174            setStrategyRef(aggregationStrategyRef);
175            return this;
176        }
177    
178        /**
179         * Sets the method name to use when using a POJO as {@link AggregationStrategy}.
180         *
181         * @param  methodName the method name to call
182         * @return the builder
183         */
184        public SplitDefinition aggregationStrategyMethodName(String methodName) {
185            setStrategyMethodName(methodName);
186            return this;
187        }
188    
189        /**
190         * Sets allowing null when using a POJO as {@link AggregationStrategy}.
191         *
192         * @return the builder
193         */
194        public SplitDefinition aggregationStrategyMethodAllowNull() {
195            setStrategyMethodAllowNull(true);
196            return this;
197        }
198    
199        /**
200         * Doing the splitting work in parallel
201         *
202         * @return the builder
203         */
204        public SplitDefinition parallelProcessing() {
205            setParallelProcessing(true);
206            return this;
207        }
208        
209        /**
210         * Enables streaming. 
211         * See {@link org.apache.camel.model.SplitDefinition#isStreaming()} for more information
212         *
213         * @return the builder
214         */
215        public SplitDefinition streaming() {
216            setStreaming(true);
217            return this;
218        }
219        
220        /**
221         * Will now stop further processing if an exception or failure occurred during processing of an
222         * {@link org.apache.camel.Exchange} and the caused exception will be thrown.
223         * <p/>
224         * Will also stop if processing the exchange failed (has a fault message) or an exception
225         * was thrown and handled by the error handler (such as using onException). In all situations
226         * the splitter will stop further processing. This is the same behavior as in pipeline, which
227         * is used by the routing engine.
228         * <p/>
229         * The default behavior is to <b>not</b> stop but continue processing till the end
230         *
231         * @return the builder
232         */
233        public SplitDefinition stopOnException() {
234            setStopOnException(true);
235            return this;
236        }
237       
238        public SplitDefinition executorService(ExecutorService executorService) {
239            setExecutorService(executorService);
240            return this;
241        }
242        
243        public SplitDefinition executorServiceRef(String executorServiceRef) {
244            setExecutorServiceRef(executorServiceRef);
245            return this;
246        }
247    
248        /**
249         * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
250         * This can be used to deep-clone messages that should be send, or any custom logic needed before
251         * the exchange is send.
252         *
253         * @param onPrepare the processor
254         * @return the builder
255         */
256        public SplitDefinition onPrepare(Processor onPrepare) {
257            setOnPrepare(onPrepare);
258            return this;
259        }
260    
261        /**
262         * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
263         * This can be used to deep-clone messages that should be send, or any custom logic needed before
264         * the exchange is send.
265         *
266         * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
267         * @return the builder
268         */
269        public SplitDefinition onPrepareRef(String onPrepareRef) {
270            setOnPrepareRef(onPrepareRef);
271            return this;
272        }
273    
274        /**
275         * Sets a timeout value in millis to use when using parallelProcessing.
276         *
277         * @param timeout timeout in millis
278         * @return the builder
279         */
280        public SplitDefinition timeout(long timeout) {
281            setTimeout(timeout);
282            return this;
283        }
284    
285        /**
286         * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages.
287         *
288         * @return the builder.
289         * @see org.apache.camel.spi.SubUnitOfWork
290         */
291        public SplitDefinition shareUnitOfWork() {
292            setShareUnitOfWork(true);
293            return this;
294        }
295    
296        // Properties
297        //-------------------------------------------------------------------------
298    
299        public AggregationStrategy getAggregationStrategy() {
300            return aggregationStrategy;
301        }
302    
303        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
304            this.aggregationStrategy = aggregationStrategy;
305        }
306    
307        public Boolean getParallelProcessing() {
308            return parallelProcessing;
309        }
310    
311        public void setParallelProcessing(Boolean parallelProcessing) {
312            this.parallelProcessing = parallelProcessing;
313        }
314    
315        public boolean isParallelProcessing() {
316            return parallelProcessing != null && parallelProcessing;
317        }
318    
319        public Boolean getStreaming() {
320            return streaming;
321        }
322    
323        public void setStreaming(Boolean streaming) {
324            this.streaming = streaming;
325        }
326    
327        /**
328         * The splitter should use streaming -- exchanges are being sent as the data for them becomes available.
329         * This improves throughput and memory usage, but it has a drawback:
330         * - the sent exchanges will no longer contain the {@link org.apache.camel.Exchange#SPLIT_SIZE} header property
331         *
332         * @return whether or not streaming should be used
333         */
334        public boolean isStreaming() {
335            return streaming != null && streaming;
336        }
337    
338        public Boolean getStopOnException() {
339            return stopOnException;
340        }
341    
342        public void setStopOnException(Boolean stopOnException) {
343            this.stopOnException = stopOnException;
344        }
345    
346        public Boolean isStopOnException() {
347            return stopOnException != null && stopOnException;
348        }
349    
350        public ExecutorService getExecutorService() {
351            return executorService;
352        }
353    
354        public void setExecutorService(ExecutorService executorService) {
355            this.executorService = executorService;
356        }
357    
358        public String getStrategyRef() {
359            return strategyRef;
360        }
361    
362        public void setStrategyRef(String strategyRef) {
363            this.strategyRef = strategyRef;
364        }
365    
366        public String getStrategyMethodName() {
367            return strategyMethodName;
368        }
369    
370        public void setStrategyMethodName(String strategyMethodName) {
371            this.strategyMethodName = strategyMethodName;
372        }
373    
374        public Boolean getStrategyMethodAllowNull() {
375            return strategyMethodAllowNull;
376        }
377    
378        public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) {
379            this.strategyMethodAllowNull = strategyMethodAllowNull;
380        }
381    
382        public String getExecutorServiceRef() {
383            return executorServiceRef;
384        }
385    
386        public void setExecutorServiceRef(String executorServiceRef) {
387            this.executorServiceRef = executorServiceRef;
388        }
389    
390        public Long getTimeout() {
391            return timeout;
392        }
393    
394        public void setTimeout(Long timeout) {
395            this.timeout = timeout;
396        }
397    
398        public String getOnPrepareRef() {
399            return onPrepareRef;
400        }
401    
402        public void setOnPrepareRef(String onPrepareRef) {
403            this.onPrepareRef = onPrepareRef;
404        }
405    
406        public Processor getOnPrepare() {
407            return onPrepare;
408        }
409    
410        public void setOnPrepare(Processor onPrepare) {
411            this.onPrepare = onPrepare;
412        }
413    
414        public Boolean getShareUnitOfWork() {
415            return shareUnitOfWork;
416        }
417    
418        public void setShareUnitOfWork(Boolean shareUnitOfWork) {
419            this.shareUnitOfWork = shareUnitOfWork;
420        }
421    
422        public boolean isShareUnitOfWork() {
423            return shareUnitOfWork != null && shareUnitOfWork;
424        }
425    }