001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import java.util.concurrent.ExecutorService;
022    import java.util.concurrent.ScheduledExecutorService;
023    import javax.xml.bind.annotation.XmlAccessType;
024    import javax.xml.bind.annotation.XmlAccessorType;
025    import javax.xml.bind.annotation.XmlAttribute;
026    import javax.xml.bind.annotation.XmlElement;
027    import javax.xml.bind.annotation.XmlElementRef;
028    import javax.xml.bind.annotation.XmlRootElement;
029    import javax.xml.bind.annotation.XmlTransient;
030    
031    import org.apache.camel.CamelContextAware;
032    import org.apache.camel.Expression;
033    import org.apache.camel.Predicate;
034    import org.apache.camel.Processor;
035    import org.apache.camel.builder.ExpressionClause;
036    import org.apache.camel.model.language.ExpressionDefinition;
037    import org.apache.camel.processor.CamelInternalProcessor;
038    import org.apache.camel.processor.aggregate.AggregateProcessor;
039    import org.apache.camel.processor.aggregate.AggregationStrategy;
040    import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
041    import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
042    import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
043    import org.apache.camel.spi.AggregationRepository;
044    import org.apache.camel.spi.RouteContext;
045    import org.apache.camel.util.concurrent.SynchronousExecutorService;
046    
047    /**
048     * Represents an XML <aggregate/> element
049     *
050     * @version 
051     */
052    @XmlRootElement(name = "aggregate")
053    @XmlAccessorType(XmlAccessType.FIELD)
054    public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> {
055        @XmlElement(name = "correlationExpression", required = true)
056        private ExpressionSubElementDefinition correlationExpression;
057        @XmlElement(name = "completionPredicate")
058        private ExpressionSubElementDefinition completionPredicate;
059        @XmlElement(name = "completionTimeout")
060        private ExpressionSubElementDefinition completionTimeoutExpression;
061        @XmlElement(name = "completionSize")
062        private ExpressionSubElementDefinition completionSizeExpression;
063        @XmlElement(name = "optimisticLockRetryPolicy")
064        private OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition;
065        @XmlTransient
066        private ExpressionDefinition expression;
067        @XmlElementRef
068        private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
069        @XmlTransient
070        private AggregationStrategy aggregationStrategy;
071        @XmlTransient
072        private ExecutorService executorService;
073        @XmlTransient
074        private ScheduledExecutorService timeoutCheckerExecutorService;
075        @XmlTransient
076        private AggregationRepository aggregationRepository;
077        @XmlTransient
078        private OptimisticLockRetryPolicy optimisticLockRetryPolicy;
079        @XmlAttribute
080        private Boolean parallelProcessing;
081        @XmlAttribute
082        private Boolean optimisticLocking;
083        @XmlAttribute
084        private String executorServiceRef;
085        @XmlAttribute
086        private String timeoutCheckerExecutorServiceRef;
087        @XmlAttribute
088        private String aggregationRepositoryRef;
089        @XmlAttribute
090        private String strategyRef;
091        @XmlAttribute
092        private String strategyMethodName;
093        @XmlAttribute
094        private Boolean strategyMethodAllowNull;
095        @XmlAttribute
096        private Integer completionSize;
097        @XmlAttribute
098        private Long completionInterval;
099        @XmlAttribute
100        private Long completionTimeout;
101        @XmlAttribute
102        private Boolean completionFromBatchConsumer;
103        @XmlAttribute
104        private Boolean groupExchanges;
105        @XmlAttribute
106        private Boolean eagerCheckCompletion;
107        @XmlAttribute
108        private Boolean ignoreInvalidCorrelationKeys;
109        @XmlAttribute
110        private Integer closeCorrelationKeyOnCompletion;
111        @XmlAttribute
112        private Boolean discardOnCompletionTimeout;
113        @XmlAttribute
114        private Boolean forceCompletionOnStop;
115    
116        public AggregateDefinition() {
117        }
118    
119        public AggregateDefinition(Predicate predicate) {
120            if (predicate != null) {
121                setExpression(ExpressionNodeHelper.toExpressionDefinition(predicate));
122            }
123        }    
124        
125        public AggregateDefinition(Expression correlationExpression) {
126            if (correlationExpression != null) {
127                setExpression(ExpressionNodeHelper.toExpressionDefinition(correlationExpression));
128            }
129        }
130    
131        public AggregateDefinition(ExpressionDefinition correlationExpression) {
132            this.expression = correlationExpression;
133        }
134    
135        public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
136            this(correlationExpression);
137            this.aggregationStrategy = aggregationStrategy;
138        }
139    
140        @Override
141        public String toString() {
142            return "Aggregate[" + description() + " -> " + getOutputs() + "]";
143        }
144        
145        protected String description() {
146            return getExpression() != null ? getExpression().getLabel() : "";
147        }
148    
149        @Override
150        public String getShortName() {
151            return "aggregate";
152        }
153    
154        @Override
155        public String getLabel() {
156            return "aggregate[" + description() + "]";
157        }
158    
159        @Override
160        public Processor createProcessor(RouteContext routeContext) throws Exception {
161            return createAggregator(routeContext);
162        }
163    
164        protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception {
165            Processor childProcessor = this.createChildProcessor(routeContext, true);
166    
167            String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
168    
169            // wrap the aggregate route in a unit of work processor
170            CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor);
171            internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId));
172            internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext));
173    
174            Expression correlation = getExpression().createExpression(routeContext);
175            AggregationStrategy strategy = createAggregationStrategy(routeContext);
176    
177            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
178            ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, isParallelProcessing());
179            if (threadPool == null && !isParallelProcessing()) {
180                // executor service is mandatory for the Aggregator
181                // we do not run in parallel mode, but use a synchronous executor, so we run in current thread
182                threadPool = new SynchronousExecutorService();
183                shutdownThreadPool = true;
184            }
185    
186            AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), internal,
187                    correlation, strategy, threadPool, shutdownThreadPool);
188    
189            AggregationRepository repository = createAggregationRepository(routeContext);
190            if (repository != null) {
191                answer.setAggregationRepository(repository);
192            }
193    
194            // this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool
195            boolean shutdownTimeoutThreadPool = false;
196            ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService;
197            if (timeoutThreadPool == null && timeoutCheckerExecutorServiceRef != null) {
198                // lookup existing thread pool
199                timeoutThreadPool = routeContext.getCamelContext().getRegistry().lookupByNameAndType(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class);
200                if (timeoutThreadPool == null) {
201                    // then create a thread pool assuming the ref is a thread pool profile id
202                    timeoutThreadPool = routeContext.getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
203                            AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER, timeoutCheckerExecutorServiceRef);
204                    if (timeoutThreadPool == null) {
205                        throw new IllegalArgumentException("ExecutorServiceRef " + timeoutCheckerExecutorServiceRef + " not found in registry or as a thread pool profile.");
206                    }
207                    shutdownTimeoutThreadPool = true;
208                }
209            }
210            answer.setTimeoutCheckerExecutorService(timeoutThreadPool);
211            answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool);
212    
213            // set other options
214            answer.setParallelProcessing(isParallelProcessing());
215            answer.setOptimisticLocking(isOptimisticLocking());
216            if (getCompletionPredicate() != null) {
217                Predicate predicate = getCompletionPredicate().createPredicate(routeContext);
218                answer.setCompletionPredicate(predicate);
219            }
220            if (getCompletionTimeoutExpression() != null) {
221                Expression expression = getCompletionTimeoutExpression().createExpression(routeContext);
222                answer.setCompletionTimeoutExpression(expression);
223            }
224            if (getCompletionTimeout() != null) {
225                answer.setCompletionTimeout(getCompletionTimeout());
226            }
227            if (getCompletionInterval() != null) {
228                answer.setCompletionInterval(getCompletionInterval());
229            }
230            if (getCompletionSizeExpression() != null) {
231                Expression expression = getCompletionSizeExpression().createExpression(routeContext);
232                answer.setCompletionSizeExpression(expression);
233            }
234            if (getCompletionSize() != null) {
235                answer.setCompletionSize(getCompletionSize());
236            }
237            if (getCompletionFromBatchConsumer() != null) {
238                answer.setCompletionFromBatchConsumer(isCompletionFromBatchConsumer());
239            }
240            if (getEagerCheckCompletion() != null) {
241                answer.setEagerCheckCompletion(isEagerCheckCompletion());
242            }
243            if (getIgnoreInvalidCorrelationKeys() != null) {
244                answer.setIgnoreInvalidCorrelationKeys(isIgnoreInvalidCorrelationKeys());
245            }
246            if (getCloseCorrelationKeyOnCompletion() != null) {
247                answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion());
248            }
249            if (getDiscardOnCompletionTimeout() != null) {
250                answer.setDiscardOnCompletionTimeout(isDiscardOnCompletionTimeout());
251            }
252            if (getForceCompletionOnStop() != null) {
253                answer.setForceCompletionOnStop(getForceCompletionOnStop());
254            }
255            if (optimisticLockRetryPolicy == null) {
256                if (getOptimisticLockRetryPolicyDefinition() != null) {
257                    answer.setOptimisticLockRetryPolicy(getOptimisticLockRetryPolicyDefinition().createOptimisticLockRetryPolicy());
258                }
259            } else {
260                answer.setOptimisticLockRetryPolicy(optimisticLockRetryPolicy);
261            }
262            return answer;
263        }
264    
265        @Override
266        protected void configureChild(ProcessorDefinition<?> output) {
267            if (expression != null && expression instanceof ExpressionClause) {
268                ExpressionClause<?> clause = (ExpressionClause<?>) expression;
269                if (clause.getExpressionType() != null) {
270                    // if using the Java DSL then the expression may have been set using the
271                    // ExpressionClause which is a fancy builder to define expressions and predicates
272                    // using fluent builders in the DSL. However we need afterwards a callback to
273                    // reset the expression to the expression type the ExpressionClause did build for us
274                    expression = clause.getExpressionType();
275                    // set the correlation expression from the expression type, as the model definition
276                    // would then be accurate
277                    correlationExpression = new ExpressionSubElementDefinition();
278                    correlationExpression.setExpressionType(clause.getExpressionType());
279                }
280            }
281        }
282    
283        private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
284            AggregationStrategy strategy = getAggregationStrategy();
285            if (strategy == null && strategyRef != null) {
286                Object aggStrategy = routeContext.lookup(strategyRef, Object.class);
287                if (aggStrategy instanceof AggregationStrategy) {
288                    strategy = (AggregationStrategy) aggStrategy;
289                } else if (aggStrategy != null) {
290                    AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName());
291                    if (getStrategyMethodAllowNull() != null) {
292                        adapter.setAllowNullNewExchange(getStrategyMethodAllowNull());
293                        adapter.setAllowNullOldExchange(getStrategyMethodAllowNull());
294                    }
295                    strategy = adapter;
296                } else {
297                    throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
298                }
299            }
300    
301            if (groupExchanges != null && groupExchanges) {
302                if (strategy != null || strategyRef != null) {
303                    throw new IllegalArgumentException("Options groupExchanges and AggregationStrategy cannot be enabled at the same time");
304                }
305                if (eagerCheckCompletion != null && !eagerCheckCompletion) {
306                    throw new IllegalArgumentException("Option eagerCheckCompletion cannot be false when groupExchanges has been enabled");
307                }
308                // set eager check to enabled by default when using grouped exchanges
309                setEagerCheckCompletion(true);
310                // if grouped exchange is enabled then use special strategy for that
311                strategy = new GroupedExchangeAggregationStrategy();
312            }
313    
314            if (strategy == null) {
315                throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this);
316            }
317    
318            if (strategy instanceof CamelContextAware) {
319                ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
320            }
321    
322            return strategy;
323        }
324    
325        private AggregationRepository createAggregationRepository(RouteContext routeContext) {
326            AggregationRepository repository = getAggregationRepository();
327            if (repository == null && aggregationRepositoryRef != null) {
328                repository = routeContext.mandatoryLookup(aggregationRepositoryRef, AggregationRepository.class);
329            }
330            return repository;
331        }
332    
333        public AggregationStrategy getAggregationStrategy() {
334            return aggregationStrategy;
335        }
336    
337        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
338            this.aggregationStrategy = aggregationStrategy;
339        }
340    
341        public String getAggregationStrategyRef() {
342            return strategyRef;
343        }
344    
345        public void setAggregationStrategyRef(String aggregationStrategyRef) {
346            this.strategyRef = aggregationStrategyRef;
347        }
348    
349        public String getAggregationStrategyMethodName() {
350            return strategyMethodName;
351        }
352    
353        public void setAggregationStrategyMethodName(String strategyMethodName) {
354            this.strategyMethodName = strategyMethodName;
355        }
356    
357        public Boolean getStrategyMethodAllowNull() {
358            return strategyMethodAllowNull;
359        }
360    
361        public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) {
362            this.strategyMethodAllowNull = strategyMethodAllowNull;
363        }
364    
365        public Integer getCompletionSize() {
366            return completionSize;
367        }
368    
369        public void setCompletionSize(Integer completionSize) {
370            this.completionSize = completionSize;
371        }
372    
373        public OptimisticLockRetryPolicyDefinition getOptimisticLockRetryPolicyDefinition() {
374            return optimisticLockRetryPolicyDefinition;
375        }
376    
377        public void setOptimisticLockRetryPolicyDefinition(OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition) {
378            this.optimisticLockRetryPolicyDefinition = optimisticLockRetryPolicyDefinition;
379        }
380    
381        public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() {
382            return optimisticLockRetryPolicy;
383        }
384    
385        public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) {
386            this.optimisticLockRetryPolicy = optimisticLockRetryPolicy;
387        }
388    
389        public Long getCompletionInterval() {
390            return completionInterval;
391        }
392    
393        public void setCompletionInterval(Long completionInterval) {
394            this.completionInterval = completionInterval;
395        }
396    
397        public Long getCompletionTimeout() {
398            return completionTimeout;
399        }
400    
401        public void setCompletionTimeout(Long completionTimeout) {
402            this.completionTimeout = completionTimeout;
403        }
404    
405        public ExpressionSubElementDefinition getCompletionPredicate() {
406            return completionPredicate;
407        }
408    
409        public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) {
410            this.completionPredicate = completionPredicate;
411        }
412    
413        public ExpressionSubElementDefinition getCompletionTimeoutExpression() {
414            return completionTimeoutExpression;
415        }
416    
417        public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) {
418            this.completionTimeoutExpression = completionTimeoutExpression;
419        }
420    
421        public ExpressionSubElementDefinition getCompletionSizeExpression() {
422            return completionSizeExpression;
423        }
424    
425        public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) {
426            this.completionSizeExpression = completionSizeExpression;
427        }
428    
429        public Boolean getGroupExchanges() {
430            return groupExchanges;
431        }
432    
433        public boolean isGroupExchanges() {
434            return groupExchanges != null && groupExchanges;
435        }
436    
437        public void setGroupExchanges(Boolean groupExchanges) {
438            this.groupExchanges = groupExchanges;
439        }
440    
441        public Boolean getCompletionFromBatchConsumer() {
442            return completionFromBatchConsumer;
443        }
444    
445        public boolean isCompletionFromBatchConsumer() {
446            return completionFromBatchConsumer != null && completionFromBatchConsumer;
447        }
448    
449        public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) {
450            this.completionFromBatchConsumer = completionFromBatchConsumer;
451        }
452    
453        public ExecutorService getExecutorService() {
454            return executorService;
455        }
456    
457        public void setExecutorService(ExecutorService executorService) {
458            this.executorService = executorService;
459        }
460    
461        public Boolean getOptimisticLocking() {
462            return optimisticLocking;
463        }
464    
465        public void setOptimisticLocking(boolean optimisticLocking) {
466            this.optimisticLocking = optimisticLocking;
467        }
468    
469        public boolean isOptimisticLocking() {
470            return optimisticLocking != null && optimisticLocking;
471        }
472    
473        public Boolean getParallelProcessing() {
474            return parallelProcessing;
475        }
476    
477        public boolean isParallelProcessing() {
478            return parallelProcessing != null && parallelProcessing;
479        }
480    
481        public void setParallelProcessing(boolean parallelProcessing) {
482            this.parallelProcessing = parallelProcessing;
483        }
484    
485        public String getExecutorServiceRef() {
486            return executorServiceRef;
487        }
488    
489        public void setExecutorServiceRef(String executorServiceRef) {
490            this.executorServiceRef = executorServiceRef;
491        }
492    
493        public String getStrategyRef() {
494            return strategyRef;
495        }
496    
497        public void setStrategyRef(String strategyRef) {
498            this.strategyRef = strategyRef;
499        }
500    
501        public String getStrategyMethodName() {
502            return strategyMethodName;
503        }
504    
505        public void setStrategyMethodName(String strategyMethodName) {
506            this.strategyMethodName = strategyMethodName;
507        }
508    
509        public Boolean getEagerCheckCompletion() {
510            return eagerCheckCompletion;
511        }
512    
513        public boolean isEagerCheckCompletion() {
514            return eagerCheckCompletion != null && eagerCheckCompletion;
515        }
516    
517        public void setEagerCheckCompletion(Boolean eagerCheckCompletion) {
518            this.eagerCheckCompletion = eagerCheckCompletion;
519        }
520    
521        public Boolean getIgnoreInvalidCorrelationKeys() {
522            return ignoreInvalidCorrelationKeys;
523        }
524    
525        public boolean isIgnoreInvalidCorrelationKeys() {
526            return ignoreInvalidCorrelationKeys != null && ignoreInvalidCorrelationKeys;
527        }
528    
529        public void setIgnoreInvalidCorrelationKeys(Boolean ignoreInvalidCorrelationKeys) {
530            this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys;
531        }
532    
533        public Integer getCloseCorrelationKeyOnCompletion() {
534            return closeCorrelationKeyOnCompletion;
535        }
536    
537        public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
538            this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
539        }
540    
541        public AggregationRepository getAggregationRepository() {
542            return aggregationRepository;
543        }
544    
545        public void setAggregationRepository(AggregationRepository aggregationRepository) {
546            this.aggregationRepository = aggregationRepository;
547        }
548    
549        public String getAggregationRepositoryRef() {
550            return aggregationRepositoryRef;
551        }
552    
553        public void setAggregationRepositoryRef(String aggregationRepositoryRef) {
554            this.aggregationRepositoryRef = aggregationRepositoryRef;
555        }
556    
557        public Boolean getDiscardOnCompletionTimeout() {
558            return discardOnCompletionTimeout;
559        }
560    
561        public boolean isDiscardOnCompletionTimeout() {
562            return discardOnCompletionTimeout != null && discardOnCompletionTimeout;
563        }
564    
565        public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) {
566            this.discardOnCompletionTimeout = discardOnCompletionTimeout;
567        }
568        
569        public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
570            this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
571        }
572    
573        public ScheduledExecutorService getTimeoutCheckerExecutorService() {
574            return timeoutCheckerExecutorService;
575        }
576    
577        public void setTimeoutCheckerExecutorServiceRef(String timeoutCheckerExecutorServiceRef) {
578            this.timeoutCheckerExecutorServiceRef = timeoutCheckerExecutorServiceRef;
579        }
580    
581        public String getTimeoutCheckerExecutorServiceRef() {
582            return timeoutCheckerExecutorServiceRef;
583        }
584    
585        // Fluent API
586        //-------------------------------------------------------------------------
587    
588        /**
589         * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange.
590         * At opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange.
591         *
592         * @return builder
593         */
594        public AggregateDefinition eagerCheckCompletion() {
595            setEagerCheckCompletion(true);
596            return this;
597        }
598    
599        /**
600         * If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just
601         * ignore the incoming Exchange.
602         *
603         * @return builder
604         */
605        public AggregateDefinition ignoreInvalidCorrelationKeys() {
606            setIgnoreInvalidCorrelationKeys(true);
607            return this;
608        }
609    
610        /**
611         * Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key
612         * that has been closed, it will be defined and a {@link org.apache.camel.processor.aggregate.ClosedCorrelationKeyException}
613         * is thrown.
614         *
615         * @param capacity the maximum capacity of the closed correlation key cache.
616         *                 Use <tt>0</tt> or negative value for unbounded capacity.
617         * @return builder
618         */
619        public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) {
620            setCloseCorrelationKeyOnCompletion(capacity);
621            return this;
622        }
623    
624        /**
625         * Discards the aggregated message on completion timeout.
626         * <p/>
627         * This means on timeout the aggregated message is dropped and not sent out of the aggregator.
628         *
629         * @return builder
630         */
631        public AggregateDefinition discardOnCompletionTimeout() {
632            setDiscardOnCompletionTimeout(true);
633            return this;
634        }
635    
636        /**
637         * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
638         * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
639         * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
640         *
641         * @return builder
642         */
643        public AggregateDefinition completionFromBatchConsumer() {
644            setCompletionFromBatchConsumer(true);
645            return this;
646        }
647    
648        /**
649         * Sets the completion size, which is the number of aggregated exchanges which would
650         * cause the aggregate to consider the group as complete and send out the aggregated exchange.
651         *
652         * @param completionSize  the completion size
653         * @return builder
654         */
655        public AggregateDefinition completionSize(int completionSize) {
656            setCompletionSize(completionSize);
657            return this;
658        }
659    
660        /**
661         * Sets the completion size, which is the number of aggregated exchanges which would
662         * cause the aggregate to consider the group as complete and send out the aggregated exchange.
663         *
664         * @param completionSize  the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type
665         * @return builder
666         */
667        public AggregateDefinition completionSize(Expression completionSize) {
668            setCompletionSizeExpression(new ExpressionSubElementDefinition(completionSize));
669            return this;
670        }
671    
672        /**
673         * Sets the completion interval, which would cause the aggregate to consider the group as complete
674         * and send out the aggregated exchange.
675         *
676         * @param completionInterval  the interval in millis
677         * @return the builder
678         */
679        public AggregateDefinition completionInterval(long completionInterval) {
680            setCompletionInterval(completionInterval);
681            return this;
682        }
683    
684        /**
685         * Sets the completion timeout, which would cause the aggregate to consider the group as complete
686         * and send out the aggregated exchange.
687         *
688         * @param completionTimeout  the timeout in millis
689         * @return the builder
690         */
691        public AggregateDefinition completionTimeout(long completionTimeout) {
692            setCompletionTimeout(completionTimeout);
693            return this;
694        }
695    
696        /**
697         * Sets the completion timeout, which would cause the aggregate to consider the group as complete
698         * and send out the aggregated exchange.
699         *
700         * @param completionTimeout  the timeout as an {@link Expression} which is evaluated as a {@link Long} type
701         * @return the builder
702         */
703        public AggregateDefinition completionTimeout(Expression completionTimeout) {
704            setCompletionTimeoutExpression(new ExpressionSubElementDefinition(completionTimeout));
705            return this;
706        }
707    
708        /**
709         * Sets the aggregate strategy to use
710         *
711         * @param aggregationStrategy  the aggregate strategy to use
712         * @return the builder
713         */
714        public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
715            setAggregationStrategy(aggregationStrategy);
716            return this;
717        }
718    
719        /**
720         * Sets the aggregate strategy to use
721         *
722         * @param aggregationStrategyRef  reference to the strategy to lookup in the registry
723         * @return the builder
724         */
725        public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) {
726            setAggregationStrategyRef(aggregationStrategyRef);
727            return this;
728        }
729    
730        /**
731         * Sets the method name to use when using a POJO as {@link AggregationStrategy}.
732         *
733         * @param  methodName the method name to call
734         * @return the builder
735         */
736        public AggregateDefinition aggregationStrategyMethodName(String methodName) {
737            setAggregationStrategyMethodName(methodName);
738            return this;
739        }
740    
741        /**
742         * Sets allowing null when using a POJO as {@link AggregationStrategy}.
743         *
744         * @return the builder
745         */
746        public AggregateDefinition aggregationStrategyMethodAllowNull() {
747            setStrategyMethodAllowNull(true);
748            return this;
749        }
750    
751        /**
752         * Sets the custom aggregate repository to use.
753         * <p/>
754         * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
755         *
756         * @param aggregationRepository  the aggregate repository to use
757         * @return the builder
758         */
759        public AggregateDefinition aggregationRepository(AggregationRepository aggregationRepository) {
760            setAggregationRepository(aggregationRepository);
761            return this;
762        }
763    
764        /**
765         * Sets the custom aggregate repository to use
766         * <p/>
767         * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
768         *
769         * @param aggregationRepositoryRef  reference to the repository to lookup in the registry
770         * @return the builder
771         */
772        public AggregateDefinition aggregationRepositoryRef(String aggregationRepositoryRef) {
773            setAggregationRepositoryRef(aggregationRepositoryRef);
774            return this;
775        }
776    
777        /**
778         * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single
779         * combined Exchange holding all the aggregated exchanges in a {@link java.util.List}.
780         *
781         * @return the builder
782         */
783        public AggregateDefinition groupExchanges() {
784            setGroupExchanges(true);
785            // must use eager check when using grouped exchanges
786            setEagerCheckCompletion(true);
787            return this;
788        }
789    
790        /**
791         * Sets the predicate used to determine if the aggregation is completed
792         *
793         * @param predicate  the predicate
794         * @return the builder
795         */
796        public AggregateDefinition completionPredicate(Predicate predicate) {
797            checkNoCompletedPredicate();
798            setCompletionPredicate(new ExpressionSubElementDefinition(predicate));
799            return this;
800        }
801    
802         /**
803         * Sets the force completion on stop flag, which considers the current group as complete
804         * and sends out the aggregated exchange when the stop event is executed
805         *
806         * @return builder
807         */
808        public AggregateDefinition forceCompletionOnStop() {
809            setForceCompletionOnStop(true);
810            return this;
811        }
812    
813        public Boolean getForceCompletionOnStop() {
814            return forceCompletionOnStop;
815        }
816    
817        public boolean isForceCompletionOnStop() {
818            return forceCompletionOnStop != null && forceCompletionOnStop;
819        }
820    
821        public void setForceCompletionOnStop(Boolean forceCompletionOnStop) {
822            this.forceCompletionOnStop = forceCompletionOnStop;
823        }
824    
825        /**
826         * Sending the aggregated output in parallel
827         *
828         * @return the builder
829         */
830        public AggregateDefinition parallelProcessing() {
831            setParallelProcessing(true);
832            return this;
833        }
834    
835        public AggregateDefinition optimisticLocking() {
836            setOptimisticLocking(true);
837            return this;
838        }
839    
840        public AggregateDefinition optimisticLockRetryPolicy(OptimisticLockRetryPolicy policy) {
841            setOptimisticLockRetryPolicy(policy);
842            return this;
843        }
844        
845        public AggregateDefinition executorService(ExecutorService executorService) {
846            setExecutorService(executorService);
847            return this;
848        }
849    
850        public AggregateDefinition executorServiceRef(String executorServiceRef) {
851            setExecutorServiceRef(executorServiceRef);
852            return this;
853        }
854    
855        public AggregateDefinition timeoutCheckerExecutorService(ScheduledExecutorService executorService) {
856            setTimeoutCheckerExecutorService(executorService);
857            return this;
858        }
859    
860        public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) {
861            setTimeoutCheckerExecutorServiceRef(executorServiceRef);
862            return this;
863        }
864        
865        protected void checkNoCompletedPredicate() {
866            if (getCompletionPredicate() != null) {
867                throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this);
868            }
869        }
870    
871        public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
872            this.correlationExpression = correlationExpression;
873        }
874    
875        public ExpressionSubElementDefinition getCorrelationExpression() {
876            return correlationExpression;
877        }
878    
879        // Section - Methods from ExpressionNode
880        // Needed to copy methods from ExpressionNode here so that I could specify the
881        // correlation expression as optional in JAXB
882    
883        public ExpressionDefinition getExpression() {
884            if (expression == null && correlationExpression != null) {
885                expression = correlationExpression.getExpressionType();            
886            }
887            return expression;
888        }
889    
890        public void setExpression(ExpressionDefinition expression) {
891            this.expression = expression;
892        }
893    
894        @Override
895        public List<ProcessorDefinition<?>> getOutputs() {
896            return outputs;
897        }
898    
899        public boolean isOutputSupported() {
900            return true;
901        }
902    
903        public void setOutputs(List<ProcessorDefinition<?>> outputs) {
904            this.outputs = outputs;
905        }
906    
907    }