001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import java.util.concurrent.ExecutorService; 022 import java.util.concurrent.ScheduledExecutorService; 023 import javax.xml.bind.annotation.XmlAccessType; 024 import javax.xml.bind.annotation.XmlAccessorType; 025 import javax.xml.bind.annotation.XmlAttribute; 026 import javax.xml.bind.annotation.XmlElement; 027 import javax.xml.bind.annotation.XmlElementRef; 028 import javax.xml.bind.annotation.XmlRootElement; 029 import javax.xml.bind.annotation.XmlTransient; 030 031 import org.apache.camel.CamelContextAware; 032 import org.apache.camel.Expression; 033 import org.apache.camel.Predicate; 034 import org.apache.camel.Processor; 035 import org.apache.camel.builder.ExpressionClause; 036 import org.apache.camel.model.language.ExpressionDefinition; 037 import org.apache.camel.processor.CamelInternalProcessor; 038 import org.apache.camel.processor.aggregate.AggregateProcessor; 039 import org.apache.camel.processor.aggregate.AggregationStrategy; 040 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 041 import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy; 042 import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; 043 import org.apache.camel.spi.AggregationRepository; 044 import org.apache.camel.spi.RouteContext; 045 import org.apache.camel.util.concurrent.SynchronousExecutorService; 046 047 /** 048 * Represents an XML <aggregate/> element 049 * 050 * @version 051 */ 052 @XmlRootElement(name = "aggregate") 053 @XmlAccessorType(XmlAccessType.FIELD) 054 public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> { 055 @XmlElement(name = "correlationExpression", required = true) 056 private ExpressionSubElementDefinition correlationExpression; 057 @XmlElement(name = "completionPredicate") 058 private ExpressionSubElementDefinition completionPredicate; 059 @XmlElement(name = "completionTimeout") 060 private ExpressionSubElementDefinition completionTimeoutExpression; 061 @XmlElement(name = "completionSize") 062 private ExpressionSubElementDefinition completionSizeExpression; 063 @XmlElement(name = "optimisticLockRetryPolicy") 064 private OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition; 065 @XmlTransient 066 private ExpressionDefinition expression; 067 @XmlElementRef 068 private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); 069 @XmlTransient 070 private AggregationStrategy aggregationStrategy; 071 @XmlTransient 072 private ExecutorService executorService; 073 @XmlTransient 074 private ScheduledExecutorService timeoutCheckerExecutorService; 075 @XmlTransient 076 private AggregationRepository aggregationRepository; 077 @XmlTransient 078 private OptimisticLockRetryPolicy optimisticLockRetryPolicy; 079 @XmlAttribute 080 private Boolean parallelProcessing; 081 @XmlAttribute 082 private Boolean optimisticLocking; 083 @XmlAttribute 084 private String executorServiceRef; 085 @XmlAttribute 086 private String timeoutCheckerExecutorServiceRef; 087 @XmlAttribute 088 private String aggregationRepositoryRef; 089 @XmlAttribute 090 private String strategyRef; 091 @XmlAttribute 092 private String strategyMethodName; 093 @XmlAttribute 094 private Boolean strategyMethodAllowNull; 095 @XmlAttribute 096 private Integer completionSize; 097 @XmlAttribute 098 private Long completionInterval; 099 @XmlAttribute 100 private Long completionTimeout; 101 @XmlAttribute 102 private Boolean completionFromBatchConsumer; 103 @XmlAttribute 104 private Boolean groupExchanges; 105 @XmlAttribute 106 private Boolean eagerCheckCompletion; 107 @XmlAttribute 108 private Boolean ignoreInvalidCorrelationKeys; 109 @XmlAttribute 110 private Integer closeCorrelationKeyOnCompletion; 111 @XmlAttribute 112 private Boolean discardOnCompletionTimeout; 113 @XmlAttribute 114 private Boolean forceCompletionOnStop; 115 116 public AggregateDefinition() { 117 } 118 119 public AggregateDefinition(Predicate predicate) { 120 if (predicate != null) { 121 setExpression(ExpressionNodeHelper.toExpressionDefinition(predicate)); 122 } 123 } 124 125 public AggregateDefinition(Expression correlationExpression) { 126 if (correlationExpression != null) { 127 setExpression(ExpressionNodeHelper.toExpressionDefinition(correlationExpression)); 128 } 129 } 130 131 public AggregateDefinition(ExpressionDefinition correlationExpression) { 132 this.expression = correlationExpression; 133 } 134 135 public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 136 this(correlationExpression); 137 this.aggregationStrategy = aggregationStrategy; 138 } 139 140 @Override 141 public String toString() { 142 return "Aggregate[" + description() + " -> " + getOutputs() + "]"; 143 } 144 145 protected String description() { 146 return getExpression() != null ? getExpression().getLabel() : ""; 147 } 148 149 @Override 150 public String getShortName() { 151 return "aggregate"; 152 } 153 154 @Override 155 public String getLabel() { 156 return "aggregate[" + description() + "]"; 157 } 158 159 @Override 160 public Processor createProcessor(RouteContext routeContext) throws Exception { 161 return createAggregator(routeContext); 162 } 163 164 protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception { 165 Processor childProcessor = this.createChildProcessor(routeContext, true); 166 167 String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); 168 169 // wrap the aggregate route in a unit of work processor 170 CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor); 171 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId)); 172 internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext)); 173 174 Expression correlation = getExpression().createExpression(routeContext); 175 AggregationStrategy strategy = createAggregationStrategy(routeContext); 176 177 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing()); 178 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, isParallelProcessing()); 179 if (threadPool == null && !isParallelProcessing()) { 180 // executor service is mandatory for the Aggregator 181 // we do not run in parallel mode, but use a synchronous executor, so we run in current thread 182 threadPool = new SynchronousExecutorService(); 183 shutdownThreadPool = true; 184 } 185 186 AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), internal, 187 correlation, strategy, threadPool, shutdownThreadPool); 188 189 AggregationRepository repository = createAggregationRepository(routeContext); 190 if (repository != null) { 191 answer.setAggregationRepository(repository); 192 } 193 194 // this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool 195 boolean shutdownTimeoutThreadPool = false; 196 ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService; 197 if (timeoutThreadPool == null && timeoutCheckerExecutorServiceRef != null) { 198 // lookup existing thread pool 199 timeoutThreadPool = routeContext.getCamelContext().getRegistry().lookupByNameAndType(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class); 200 if (timeoutThreadPool == null) { 201 // then create a thread pool assuming the ref is a thread pool profile id 202 timeoutThreadPool = routeContext.getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, 203 AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER, timeoutCheckerExecutorServiceRef); 204 if (timeoutThreadPool == null) { 205 throw new IllegalArgumentException("ExecutorServiceRef " + timeoutCheckerExecutorServiceRef + " not found in registry or as a thread pool profile."); 206 } 207 shutdownTimeoutThreadPool = true; 208 } 209 } 210 answer.setTimeoutCheckerExecutorService(timeoutThreadPool); 211 answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool); 212 213 // set other options 214 answer.setParallelProcessing(isParallelProcessing()); 215 answer.setOptimisticLocking(isOptimisticLocking()); 216 if (getCompletionPredicate() != null) { 217 Predicate predicate = getCompletionPredicate().createPredicate(routeContext); 218 answer.setCompletionPredicate(predicate); 219 } 220 if (getCompletionTimeoutExpression() != null) { 221 Expression expression = getCompletionTimeoutExpression().createExpression(routeContext); 222 answer.setCompletionTimeoutExpression(expression); 223 } 224 if (getCompletionTimeout() != null) { 225 answer.setCompletionTimeout(getCompletionTimeout()); 226 } 227 if (getCompletionInterval() != null) { 228 answer.setCompletionInterval(getCompletionInterval()); 229 } 230 if (getCompletionSizeExpression() != null) { 231 Expression expression = getCompletionSizeExpression().createExpression(routeContext); 232 answer.setCompletionSizeExpression(expression); 233 } 234 if (getCompletionSize() != null) { 235 answer.setCompletionSize(getCompletionSize()); 236 } 237 if (getCompletionFromBatchConsumer() != null) { 238 answer.setCompletionFromBatchConsumer(isCompletionFromBatchConsumer()); 239 } 240 if (getEagerCheckCompletion() != null) { 241 answer.setEagerCheckCompletion(isEagerCheckCompletion()); 242 } 243 if (getIgnoreInvalidCorrelationKeys() != null) { 244 answer.setIgnoreInvalidCorrelationKeys(isIgnoreInvalidCorrelationKeys()); 245 } 246 if (getCloseCorrelationKeyOnCompletion() != null) { 247 answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion()); 248 } 249 if (getDiscardOnCompletionTimeout() != null) { 250 answer.setDiscardOnCompletionTimeout(isDiscardOnCompletionTimeout()); 251 } 252 if (getForceCompletionOnStop() != null) { 253 answer.setForceCompletionOnStop(getForceCompletionOnStop()); 254 } 255 if (optimisticLockRetryPolicy == null) { 256 if (getOptimisticLockRetryPolicyDefinition() != null) { 257 answer.setOptimisticLockRetryPolicy(getOptimisticLockRetryPolicyDefinition().createOptimisticLockRetryPolicy()); 258 } 259 } else { 260 answer.setOptimisticLockRetryPolicy(optimisticLockRetryPolicy); 261 } 262 return answer; 263 } 264 265 @Override 266 protected void configureChild(ProcessorDefinition<?> output) { 267 if (expression != null && expression instanceof ExpressionClause) { 268 ExpressionClause<?> clause = (ExpressionClause<?>) expression; 269 if (clause.getExpressionType() != null) { 270 // if using the Java DSL then the expression may have been set using the 271 // ExpressionClause which is a fancy builder to define expressions and predicates 272 // using fluent builders in the DSL. However we need afterwards a callback to 273 // reset the expression to the expression type the ExpressionClause did build for us 274 expression = clause.getExpressionType(); 275 // set the correlation expression from the expression type, as the model definition 276 // would then be accurate 277 correlationExpression = new ExpressionSubElementDefinition(); 278 correlationExpression.setExpressionType(clause.getExpressionType()); 279 } 280 } 281 } 282 283 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 284 AggregationStrategy strategy = getAggregationStrategy(); 285 if (strategy == null && strategyRef != null) { 286 Object aggStrategy = routeContext.lookup(strategyRef, Object.class); 287 if (aggStrategy instanceof AggregationStrategy) { 288 strategy = (AggregationStrategy) aggStrategy; 289 } else if (aggStrategy != null) { 290 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName()); 291 if (getStrategyMethodAllowNull() != null) { 292 adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); 293 adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); 294 } 295 strategy = adapter; 296 } else { 297 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); 298 } 299 } 300 301 if (groupExchanges != null && groupExchanges) { 302 if (strategy != null || strategyRef != null) { 303 throw new IllegalArgumentException("Options groupExchanges and AggregationStrategy cannot be enabled at the same time"); 304 } 305 if (eagerCheckCompletion != null && !eagerCheckCompletion) { 306 throw new IllegalArgumentException("Option eagerCheckCompletion cannot be false when groupExchanges has been enabled"); 307 } 308 // set eager check to enabled by default when using grouped exchanges 309 setEagerCheckCompletion(true); 310 // if grouped exchange is enabled then use special strategy for that 311 strategy = new GroupedExchangeAggregationStrategy(); 312 } 313 314 if (strategy == null) { 315 throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this); 316 } 317 318 if (strategy instanceof CamelContextAware) { 319 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 320 } 321 322 return strategy; 323 } 324 325 private AggregationRepository createAggregationRepository(RouteContext routeContext) { 326 AggregationRepository repository = getAggregationRepository(); 327 if (repository == null && aggregationRepositoryRef != null) { 328 repository = routeContext.mandatoryLookup(aggregationRepositoryRef, AggregationRepository.class); 329 } 330 return repository; 331 } 332 333 public AggregationStrategy getAggregationStrategy() { 334 return aggregationStrategy; 335 } 336 337 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 338 this.aggregationStrategy = aggregationStrategy; 339 } 340 341 public String getAggregationStrategyRef() { 342 return strategyRef; 343 } 344 345 public void setAggregationStrategyRef(String aggregationStrategyRef) { 346 this.strategyRef = aggregationStrategyRef; 347 } 348 349 public String getAggregationStrategyMethodName() { 350 return strategyMethodName; 351 } 352 353 public void setAggregationStrategyMethodName(String strategyMethodName) { 354 this.strategyMethodName = strategyMethodName; 355 } 356 357 public Boolean getStrategyMethodAllowNull() { 358 return strategyMethodAllowNull; 359 } 360 361 public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { 362 this.strategyMethodAllowNull = strategyMethodAllowNull; 363 } 364 365 public Integer getCompletionSize() { 366 return completionSize; 367 } 368 369 public void setCompletionSize(Integer completionSize) { 370 this.completionSize = completionSize; 371 } 372 373 public OptimisticLockRetryPolicyDefinition getOptimisticLockRetryPolicyDefinition() { 374 return optimisticLockRetryPolicyDefinition; 375 } 376 377 public void setOptimisticLockRetryPolicyDefinition(OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition) { 378 this.optimisticLockRetryPolicyDefinition = optimisticLockRetryPolicyDefinition; 379 } 380 381 public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() { 382 return optimisticLockRetryPolicy; 383 } 384 385 public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) { 386 this.optimisticLockRetryPolicy = optimisticLockRetryPolicy; 387 } 388 389 public Long getCompletionInterval() { 390 return completionInterval; 391 } 392 393 public void setCompletionInterval(Long completionInterval) { 394 this.completionInterval = completionInterval; 395 } 396 397 public Long getCompletionTimeout() { 398 return completionTimeout; 399 } 400 401 public void setCompletionTimeout(Long completionTimeout) { 402 this.completionTimeout = completionTimeout; 403 } 404 405 public ExpressionSubElementDefinition getCompletionPredicate() { 406 return completionPredicate; 407 } 408 409 public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) { 410 this.completionPredicate = completionPredicate; 411 } 412 413 public ExpressionSubElementDefinition getCompletionTimeoutExpression() { 414 return completionTimeoutExpression; 415 } 416 417 public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) { 418 this.completionTimeoutExpression = completionTimeoutExpression; 419 } 420 421 public ExpressionSubElementDefinition getCompletionSizeExpression() { 422 return completionSizeExpression; 423 } 424 425 public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) { 426 this.completionSizeExpression = completionSizeExpression; 427 } 428 429 public Boolean getGroupExchanges() { 430 return groupExchanges; 431 } 432 433 public boolean isGroupExchanges() { 434 return groupExchanges != null && groupExchanges; 435 } 436 437 public void setGroupExchanges(Boolean groupExchanges) { 438 this.groupExchanges = groupExchanges; 439 } 440 441 public Boolean getCompletionFromBatchConsumer() { 442 return completionFromBatchConsumer; 443 } 444 445 public boolean isCompletionFromBatchConsumer() { 446 return completionFromBatchConsumer != null && completionFromBatchConsumer; 447 } 448 449 public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) { 450 this.completionFromBatchConsumer = completionFromBatchConsumer; 451 } 452 453 public ExecutorService getExecutorService() { 454 return executorService; 455 } 456 457 public void setExecutorService(ExecutorService executorService) { 458 this.executorService = executorService; 459 } 460 461 public Boolean getOptimisticLocking() { 462 return optimisticLocking; 463 } 464 465 public void setOptimisticLocking(boolean optimisticLocking) { 466 this.optimisticLocking = optimisticLocking; 467 } 468 469 public boolean isOptimisticLocking() { 470 return optimisticLocking != null && optimisticLocking; 471 } 472 473 public Boolean getParallelProcessing() { 474 return parallelProcessing; 475 } 476 477 public boolean isParallelProcessing() { 478 return parallelProcessing != null && parallelProcessing; 479 } 480 481 public void setParallelProcessing(boolean parallelProcessing) { 482 this.parallelProcessing = parallelProcessing; 483 } 484 485 public String getExecutorServiceRef() { 486 return executorServiceRef; 487 } 488 489 public void setExecutorServiceRef(String executorServiceRef) { 490 this.executorServiceRef = executorServiceRef; 491 } 492 493 public String getStrategyRef() { 494 return strategyRef; 495 } 496 497 public void setStrategyRef(String strategyRef) { 498 this.strategyRef = strategyRef; 499 } 500 501 public String getStrategyMethodName() { 502 return strategyMethodName; 503 } 504 505 public void setStrategyMethodName(String strategyMethodName) { 506 this.strategyMethodName = strategyMethodName; 507 } 508 509 public Boolean getEagerCheckCompletion() { 510 return eagerCheckCompletion; 511 } 512 513 public boolean isEagerCheckCompletion() { 514 return eagerCheckCompletion != null && eagerCheckCompletion; 515 } 516 517 public void setEagerCheckCompletion(Boolean eagerCheckCompletion) { 518 this.eagerCheckCompletion = eagerCheckCompletion; 519 } 520 521 public Boolean getIgnoreInvalidCorrelationKeys() { 522 return ignoreInvalidCorrelationKeys; 523 } 524 525 public boolean isIgnoreInvalidCorrelationKeys() { 526 return ignoreInvalidCorrelationKeys != null && ignoreInvalidCorrelationKeys; 527 } 528 529 public void setIgnoreInvalidCorrelationKeys(Boolean ignoreInvalidCorrelationKeys) { 530 this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys; 531 } 532 533 public Integer getCloseCorrelationKeyOnCompletion() { 534 return closeCorrelationKeyOnCompletion; 535 } 536 537 public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) { 538 this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion; 539 } 540 541 public AggregationRepository getAggregationRepository() { 542 return aggregationRepository; 543 } 544 545 public void setAggregationRepository(AggregationRepository aggregationRepository) { 546 this.aggregationRepository = aggregationRepository; 547 } 548 549 public String getAggregationRepositoryRef() { 550 return aggregationRepositoryRef; 551 } 552 553 public void setAggregationRepositoryRef(String aggregationRepositoryRef) { 554 this.aggregationRepositoryRef = aggregationRepositoryRef; 555 } 556 557 public Boolean getDiscardOnCompletionTimeout() { 558 return discardOnCompletionTimeout; 559 } 560 561 public boolean isDiscardOnCompletionTimeout() { 562 return discardOnCompletionTimeout != null && discardOnCompletionTimeout; 563 } 564 565 public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) { 566 this.discardOnCompletionTimeout = discardOnCompletionTimeout; 567 } 568 569 public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { 570 this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; 571 } 572 573 public ScheduledExecutorService getTimeoutCheckerExecutorService() { 574 return timeoutCheckerExecutorService; 575 } 576 577 public void setTimeoutCheckerExecutorServiceRef(String timeoutCheckerExecutorServiceRef) { 578 this.timeoutCheckerExecutorServiceRef = timeoutCheckerExecutorServiceRef; 579 } 580 581 public String getTimeoutCheckerExecutorServiceRef() { 582 return timeoutCheckerExecutorServiceRef; 583 } 584 585 // Fluent API 586 //------------------------------------------------------------------------- 587 588 /** 589 * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange. 590 * At opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange. 591 * 592 * @return builder 593 */ 594 public AggregateDefinition eagerCheckCompletion() { 595 setEagerCheckCompletion(true); 596 return this; 597 } 598 599 /** 600 * If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just 601 * ignore the incoming Exchange. 602 * 603 * @return builder 604 */ 605 public AggregateDefinition ignoreInvalidCorrelationKeys() { 606 setIgnoreInvalidCorrelationKeys(true); 607 return this; 608 } 609 610 /** 611 * Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key 612 * that has been closed, it will be defined and a {@link org.apache.camel.processor.aggregate.ClosedCorrelationKeyException} 613 * is thrown. 614 * 615 * @param capacity the maximum capacity of the closed correlation key cache. 616 * Use <tt>0</tt> or negative value for unbounded capacity. 617 * @return builder 618 */ 619 public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) { 620 setCloseCorrelationKeyOnCompletion(capacity); 621 return this; 622 } 623 624 /** 625 * Discards the aggregated message on completion timeout. 626 * <p/> 627 * This means on timeout the aggregated message is dropped and not sent out of the aggregator. 628 * 629 * @return builder 630 */ 631 public AggregateDefinition discardOnCompletionTimeout() { 632 setDiscardOnCompletionTimeout(true); 633 return this; 634 } 635 636 /** 637 * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer} 638 * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported 639 * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete. 640 * 641 * @return builder 642 */ 643 public AggregateDefinition completionFromBatchConsumer() { 644 setCompletionFromBatchConsumer(true); 645 return this; 646 } 647 648 /** 649 * Sets the completion size, which is the number of aggregated exchanges which would 650 * cause the aggregate to consider the group as complete and send out the aggregated exchange. 651 * 652 * @param completionSize the completion size 653 * @return builder 654 */ 655 public AggregateDefinition completionSize(int completionSize) { 656 setCompletionSize(completionSize); 657 return this; 658 } 659 660 /** 661 * Sets the completion size, which is the number of aggregated exchanges which would 662 * cause the aggregate to consider the group as complete and send out the aggregated exchange. 663 * 664 * @param completionSize the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type 665 * @return builder 666 */ 667 public AggregateDefinition completionSize(Expression completionSize) { 668 setCompletionSizeExpression(new ExpressionSubElementDefinition(completionSize)); 669 return this; 670 } 671 672 /** 673 * Sets the completion interval, which would cause the aggregate to consider the group as complete 674 * and send out the aggregated exchange. 675 * 676 * @param completionInterval the interval in millis 677 * @return the builder 678 */ 679 public AggregateDefinition completionInterval(long completionInterval) { 680 setCompletionInterval(completionInterval); 681 return this; 682 } 683 684 /** 685 * Sets the completion timeout, which would cause the aggregate to consider the group as complete 686 * and send out the aggregated exchange. 687 * 688 * @param completionTimeout the timeout in millis 689 * @return the builder 690 */ 691 public AggregateDefinition completionTimeout(long completionTimeout) { 692 setCompletionTimeout(completionTimeout); 693 return this; 694 } 695 696 /** 697 * Sets the completion timeout, which would cause the aggregate to consider the group as complete 698 * and send out the aggregated exchange. 699 * 700 * @param completionTimeout the timeout as an {@link Expression} which is evaluated as a {@link Long} type 701 * @return the builder 702 */ 703 public AggregateDefinition completionTimeout(Expression completionTimeout) { 704 setCompletionTimeoutExpression(new ExpressionSubElementDefinition(completionTimeout)); 705 return this; 706 } 707 708 /** 709 * Sets the aggregate strategy to use 710 * 711 * @param aggregationStrategy the aggregate strategy to use 712 * @return the builder 713 */ 714 public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 715 setAggregationStrategy(aggregationStrategy); 716 return this; 717 } 718 719 /** 720 * Sets the aggregate strategy to use 721 * 722 * @param aggregationStrategyRef reference to the strategy to lookup in the registry 723 * @return the builder 724 */ 725 public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) { 726 setAggregationStrategyRef(aggregationStrategyRef); 727 return this; 728 } 729 730 /** 731 * Sets the method name to use when using a POJO as {@link AggregationStrategy}. 732 * 733 * @param methodName the method name to call 734 * @return the builder 735 */ 736 public AggregateDefinition aggregationStrategyMethodName(String methodName) { 737 setAggregationStrategyMethodName(methodName); 738 return this; 739 } 740 741 /** 742 * Sets allowing null when using a POJO as {@link AggregationStrategy}. 743 * 744 * @return the builder 745 */ 746 public AggregateDefinition aggregationStrategyMethodAllowNull() { 747 setStrategyMethodAllowNull(true); 748 return this; 749 } 750 751 /** 752 * Sets the custom aggregate repository to use. 753 * <p/> 754 * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository} 755 * 756 * @param aggregationRepository the aggregate repository to use 757 * @return the builder 758 */ 759 public AggregateDefinition aggregationRepository(AggregationRepository aggregationRepository) { 760 setAggregationRepository(aggregationRepository); 761 return this; 762 } 763 764 /** 765 * Sets the custom aggregate repository to use 766 * <p/> 767 * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository} 768 * 769 * @param aggregationRepositoryRef reference to the repository to lookup in the registry 770 * @return the builder 771 */ 772 public AggregateDefinition aggregationRepositoryRef(String aggregationRepositoryRef) { 773 setAggregationRepositoryRef(aggregationRepositoryRef); 774 return this; 775 } 776 777 /** 778 * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single 779 * combined Exchange holding all the aggregated exchanges in a {@link java.util.List}. 780 * 781 * @return the builder 782 */ 783 public AggregateDefinition groupExchanges() { 784 setGroupExchanges(true); 785 // must use eager check when using grouped exchanges 786 setEagerCheckCompletion(true); 787 return this; 788 } 789 790 /** 791 * Sets the predicate used to determine if the aggregation is completed 792 * 793 * @param predicate the predicate 794 * @return the builder 795 */ 796 public AggregateDefinition completionPredicate(Predicate predicate) { 797 checkNoCompletedPredicate(); 798 setCompletionPredicate(new ExpressionSubElementDefinition(predicate)); 799 return this; 800 } 801 802 /** 803 * Sets the force completion on stop flag, which considers the current group as complete 804 * and sends out the aggregated exchange when the stop event is executed 805 * 806 * @return builder 807 */ 808 public AggregateDefinition forceCompletionOnStop() { 809 setForceCompletionOnStop(true); 810 return this; 811 } 812 813 public Boolean getForceCompletionOnStop() { 814 return forceCompletionOnStop; 815 } 816 817 public boolean isForceCompletionOnStop() { 818 return forceCompletionOnStop != null && forceCompletionOnStop; 819 } 820 821 public void setForceCompletionOnStop(Boolean forceCompletionOnStop) { 822 this.forceCompletionOnStop = forceCompletionOnStop; 823 } 824 825 /** 826 * Sending the aggregated output in parallel 827 * 828 * @return the builder 829 */ 830 public AggregateDefinition parallelProcessing() { 831 setParallelProcessing(true); 832 return this; 833 } 834 835 public AggregateDefinition optimisticLocking() { 836 setOptimisticLocking(true); 837 return this; 838 } 839 840 public AggregateDefinition optimisticLockRetryPolicy(OptimisticLockRetryPolicy policy) { 841 setOptimisticLockRetryPolicy(policy); 842 return this; 843 } 844 845 public AggregateDefinition executorService(ExecutorService executorService) { 846 setExecutorService(executorService); 847 return this; 848 } 849 850 public AggregateDefinition executorServiceRef(String executorServiceRef) { 851 setExecutorServiceRef(executorServiceRef); 852 return this; 853 } 854 855 public AggregateDefinition timeoutCheckerExecutorService(ScheduledExecutorService executorService) { 856 setTimeoutCheckerExecutorService(executorService); 857 return this; 858 } 859 860 public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) { 861 setTimeoutCheckerExecutorServiceRef(executorServiceRef); 862 return this; 863 } 864 865 protected void checkNoCompletedPredicate() { 866 if (getCompletionPredicate() != null) { 867 throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this); 868 } 869 } 870 871 public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) { 872 this.correlationExpression = correlationExpression; 873 } 874 875 public ExpressionSubElementDefinition getCorrelationExpression() { 876 return correlationExpression; 877 } 878 879 // Section - Methods from ExpressionNode 880 // Needed to copy methods from ExpressionNode here so that I could specify the 881 // correlation expression as optional in JAXB 882 883 public ExpressionDefinition getExpression() { 884 if (expression == null && correlationExpression != null) { 885 expression = correlationExpression.getExpressionType(); 886 } 887 return expression; 888 } 889 890 public void setExpression(ExpressionDefinition expression) { 891 this.expression = expression; 892 } 893 894 @Override 895 public List<ProcessorDefinition<?>> getOutputs() { 896 return outputs; 897 } 898 899 public boolean isOutputSupported() { 900 return true; 901 } 902 903 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 904 this.outputs = outputs; 905 } 906 907 }