001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.aggregate; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.LinkedHashSet; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.Set; 025 import java.util.concurrent.ConcurrentHashMap; 026 import java.util.concurrent.ConcurrentSkipListSet; 027 import java.util.concurrent.ExecutorService; 028 import java.util.concurrent.ScheduledExecutorService; 029 import java.util.concurrent.TimeUnit; 030 import java.util.concurrent.atomic.AtomicInteger; 031 import java.util.concurrent.locks.Lock; 032 import java.util.concurrent.locks.ReentrantLock; 033 034 import org.apache.camel.AsyncCallback; 035 import org.apache.camel.AsyncProcessor; 036 import org.apache.camel.CamelContext; 037 import org.apache.camel.CamelExchangeException; 038 import org.apache.camel.Endpoint; 039 import org.apache.camel.Exchange; 040 import org.apache.camel.Expression; 041 import org.apache.camel.Navigate; 042 import org.apache.camel.NoSuchEndpointException; 043 import org.apache.camel.Predicate; 044 import org.apache.camel.Processor; 045 import org.apache.camel.ProducerTemplate; 046 import org.apache.camel.TimeoutMap; 047 import org.apache.camel.Traceable; 048 import org.apache.camel.impl.LoggingExceptionHandler; 049 import org.apache.camel.spi.AggregationRepository; 050 import org.apache.camel.spi.ExceptionHandler; 051 import org.apache.camel.spi.OptimisticLockingAggregationRepository; 052 import org.apache.camel.spi.RecoverableAggregationRepository; 053 import org.apache.camel.spi.ShutdownPrepared; 054 import org.apache.camel.spi.Synchronization; 055 import org.apache.camel.support.DefaultTimeoutMap; 056 import org.apache.camel.support.ServiceSupport; 057 import org.apache.camel.util.AsyncProcessorHelper; 058 import org.apache.camel.util.ExchangeHelper; 059 import org.apache.camel.util.LRUCache; 060 import org.apache.camel.util.ObjectHelper; 061 import org.apache.camel.util.ServiceHelper; 062 import org.apache.camel.util.StopWatch; 063 import org.apache.camel.util.TimeUtils; 064 import org.slf4j.Logger; 065 import org.slf4j.LoggerFactory; 066 067 /** 068 * An implementation of the <a 069 * href="http://camel.apache.org/aggregator2.html">Aggregator</a> 070 * pattern where a batch of messages are processed (up to a maximum amount or 071 * until some timeout is reached) and messages for the same correlation key are 072 * combined together using some kind of {@link AggregationStrategy} 073 * (by default the latest message is used) to compress many message exchanges 074 * into a smaller number of exchanges. 075 * <p/> 076 * A good example of this is stock market data; you may be receiving 30,000 077 * messages/second and you may want to throttle it right down so that multiple 078 * messages for the same stock are combined (or just the latest message is used 079 * and older prices are discarded). Another idea is to combine line item messages 080 * together into a single invoice message. 081 */ 082 public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared { 083 084 public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker"; 085 086 private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class); 087 088 private final Lock lock = new ReentrantLock(); 089 private final CamelContext camelContext; 090 private final Processor processor; 091 private AggregationStrategy aggregationStrategy; 092 private Expression correlationExpression; 093 private final ExecutorService executorService; 094 private final boolean shutdownExecutorService; 095 private OptimisticLockRetryPolicy optimisticLockRetryPolicy = new OptimisticLockRetryPolicy(); 096 private ScheduledExecutorService timeoutCheckerExecutorService; 097 private boolean shutdownTimeoutCheckerExecutorService; 098 private ScheduledExecutorService recoverService; 099 // store correlation key -> exchange id in timeout map 100 private TimeoutMap<String, String> timeoutMap; 101 private ExceptionHandler exceptionHandler; 102 private AggregationRepository aggregationRepository; 103 private Map<String, String> closedCorrelationKeys; 104 private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<String>(); 105 private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); 106 private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>(); 107 108 // keep booking about redelivery 109 private class RedeliveryData { 110 int redeliveryCounter; 111 } 112 113 // options 114 private boolean ignoreInvalidCorrelationKeys; 115 private Integer closeCorrelationKeyOnCompletion; 116 private boolean parallelProcessing; 117 private boolean optimisticLocking; 118 119 // different ways to have completion triggered 120 private boolean eagerCheckCompletion; 121 private Predicate completionPredicate; 122 private long completionTimeout; 123 private Expression completionTimeoutExpression; 124 private long completionInterval; 125 private int completionSize; 126 private Expression completionSizeExpression; 127 private boolean completionFromBatchConsumer; 128 private AtomicInteger batchConsumerCounter = new AtomicInteger(); 129 private boolean discardOnCompletionTimeout; 130 private boolean forceCompletionOnStop; 131 132 private ProducerTemplate deadLetterProducerTemplate; 133 134 public AggregateProcessor(CamelContext camelContext, Processor processor, 135 Expression correlationExpression, AggregationStrategy aggregationStrategy, 136 ExecutorService executorService, boolean shutdownExecutorService) { 137 ObjectHelper.notNull(camelContext, "camelContext"); 138 ObjectHelper.notNull(processor, "processor"); 139 ObjectHelper.notNull(correlationExpression, "correlationExpression"); 140 ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy"); 141 ObjectHelper.notNull(executorService, "executorService"); 142 this.camelContext = camelContext; 143 this.processor = processor; 144 this.correlationExpression = correlationExpression; 145 this.aggregationStrategy = aggregationStrategy; 146 this.executorService = executorService; 147 this.shutdownExecutorService = shutdownExecutorService; 148 this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass()); 149 } 150 151 @Override 152 public String toString() { 153 return "AggregateProcessor[to: " + processor + "]"; 154 } 155 156 public String getTraceLabel() { 157 return "aggregate[" + correlationExpression + "]"; 158 } 159 160 public List<Processor> next() { 161 if (!hasNext()) { 162 return null; 163 } 164 List<Processor> answer = new ArrayList<Processor>(1); 165 answer.add(processor); 166 return answer; 167 } 168 169 public boolean hasNext() { 170 return processor != null; 171 } 172 173 public void process(Exchange exchange) throws Exception { 174 AsyncProcessorHelper.process(this, exchange); 175 } 176 177 public boolean process(Exchange exchange, AsyncCallback callback) { 178 try { 179 doProcess(exchange); 180 } catch (Throwable e) { 181 exchange.setException(e); 182 } 183 callback.done(true); 184 return true; 185 } 186 187 protected void doProcess(Exchange exchange) throws Exception { 188 189 //check for the special header to force completion of all groups (and ignore the exchange otherwise) 190 boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); 191 if (completeAllGroups) { 192 forceCompletionOfAllGroups(); 193 return; 194 } 195 196 // compute correlation expression 197 String key = correlationExpression.evaluate(exchange, String.class); 198 if (ObjectHelper.isEmpty(key)) { 199 // we have a bad correlation key 200 if (isIgnoreInvalidCorrelationKeys()) { 201 LOG.debug("Invalid correlation key. This Exchange will be ignored: {}", exchange); 202 return; 203 } else { 204 throw new CamelExchangeException("Invalid correlation key", exchange); 205 } 206 } 207 208 // is the correlation key closed? 209 if (closedCorrelationKeys != null && closedCorrelationKeys.containsKey(key)) { 210 throw new ClosedCorrelationKeyException(key, exchange); 211 } 212 213 // when optimist locking is enabled we keep trying until we succeed 214 if (optimisticLocking) { 215 List<Exchange> aggregated = null; 216 boolean exhaustedRetries = true; 217 int attempt = 0; 218 do { 219 attempt++; 220 // copy exchange, and do not share the unit of work 221 // the aggregated output runs in another unit of work 222 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 223 try { 224 aggregated = doAggregation(key, copy); 225 exhaustedRetries = false; 226 break; 227 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { 228 LOG.trace("On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to add() key: {} and exchange: {}", 229 new Object[]{attempt, aggregationRepository, key, copy, e}); 230 optimisticLockRetryPolicy.doDelay(attempt); 231 } 232 } while (optimisticLockRetryPolicy.shouldRetry(attempt)); 233 234 if (exhaustedRetries) { 235 throw new CamelExchangeException("Exhausted optimistic locking retry attempts, tried " + attempt + " times", exchange, 236 new OptimisticLockingAggregationRepository.OptimisticLockingException()); 237 } else if (aggregated != null) { 238 // we are completed so submit to completion 239 for (Exchange agg : aggregated) { 240 onSubmitCompletion(key, agg); 241 } 242 } 243 } else { 244 // copy exchange, and do not share the unit of work 245 // the aggregated output runs in another unit of work 246 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 247 248 // when memory based then its fast using synchronized, but if the aggregation repository is IO 249 // bound such as JPA etc then concurrent aggregation per correlation key could 250 // improve performance as we can run aggregation repository get/add in parallel 251 List<Exchange> aggregated = null; 252 lock.lock(); 253 try { 254 aggregated = doAggregation(key, copy); 255 } finally { 256 lock.unlock(); 257 } 258 259 // we are completed so do that work outside the lock 260 if (aggregated != null) { 261 for (Exchange agg : aggregated) { 262 onSubmitCompletion(key, agg); 263 } 264 } 265 } 266 267 // check for the special header to force completion of all groups (inclusive of the message) 268 boolean completeAllGroupsInclusive = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class); 269 if (completeAllGroupsInclusive) { 270 forceCompletionOfAllGroups(); 271 } 272 } 273 274 /** 275 * Aggregates the exchange with the given correlation key 276 * <p/> 277 * This method <b>must</b> be run synchronized as we cannot aggregate the same correlation key 278 * in parallel. 279 * <p/> 280 * The returned {@link Exchange} should be send downstream using the {@link #onSubmitCompletion(String, org.apache.camel.Exchange)} 281 * method which sends out the aggregated and completed {@link Exchange}. 282 * 283 * @param key the correlation key 284 * @param newExchange the exchange 285 * @return the aggregated exchange(s) which is complete, or <tt>null</tt> if not yet complete 286 * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating 287 */ 288 private List<Exchange> doAggregation(String key, Exchange newExchange) throws CamelExchangeException { 289 LOG.trace("onAggregation +++ start +++ with correlation key: {}", key); 290 291 Exchange answer; 292 Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key); 293 Exchange oldExchange = originalExchange; 294 295 Integer size = 1; 296 if (oldExchange != null) { 297 // hack to support legacy AggregationStrategy's that modify and return the oldExchange, these will not 298 // working when using an identify based approach for optimistic locking like the MemoryAggregationRepository. 299 if (optimisticLocking && aggregationRepository instanceof MemoryAggregationRepository) { 300 oldExchange = originalExchange.copy(); 301 } 302 size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class); 303 size++; 304 } 305 306 // check if we are complete 307 String complete = null; 308 if (isEagerCheckCompletion()) { 309 // put the current aggregated size on the exchange so its avail during completion check 310 newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); 311 complete = isCompleted(key, newExchange); 312 // remove it afterwards 313 newExchange.removeProperty(Exchange.AGGREGATED_SIZE); 314 } 315 316 // prepare the exchanges for aggregation and then aggregate them 317 ExchangeHelper.prepareAggregation(oldExchange, newExchange); 318 // must catch any exception from aggregation 319 try { 320 answer = onAggregation(oldExchange, newExchange); 321 } catch (Throwable e) { 322 throw new CamelExchangeException("Error occurred during aggregation", newExchange, e); 323 } 324 if (answer == null) { 325 throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", newExchange); 326 } 327 328 // update the aggregated size 329 answer.setProperty(Exchange.AGGREGATED_SIZE, size); 330 331 // maybe we should check completion after the aggregation 332 if (!isEagerCheckCompletion()) { 333 complete = isCompleted(key, answer); 334 } 335 336 List<Exchange> list = new ArrayList<Exchange>(); 337 338 // only need to update aggregation repository if we are not complete 339 if (complete == null) { 340 doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer); 341 // we are not complete so the answer should be null 342 answer = null; 343 } else { 344 // if batch consumer completion is enabled then we need to complete the group 345 if ("consumer".equals(complete)) { 346 for (String batchKey : batchConsumerCorrelationKeys) { 347 Exchange batchAnswer; 348 if (batchKey.equals(key)) { 349 // skip the current aggregated key as we have already aggregated it and have the answer 350 batchAnswer = answer; 351 } else { 352 batchAnswer = aggregationRepository.get(camelContext, batchKey); 353 } 354 355 if (batchAnswer != null) { 356 batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); 357 onCompletion(batchKey, originalExchange, batchAnswer, false); 358 list.add(batchAnswer); 359 } 360 } 361 batchConsumerCorrelationKeys.clear(); 362 // we have already submitted to completion, so answer should be null 363 answer = null; 364 } else { 365 // we are complete for this exchange 366 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); 367 answer = onCompletion(key, originalExchange, answer, false); 368 } 369 } 370 371 LOG.trace("onAggregation +++ end +++ with correlation key: {}", key); 372 if (answer != null) { 373 list.add(answer); 374 } 375 return list; 376 } 377 378 protected void doAggregationRepositoryAdd(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) { 379 LOG.trace("In progress aggregated oldExchange: {}, newExchange: {} with correlation key: {}", new Object[]{oldExchange, newExchange, key}); 380 if (optimisticLocking) { 381 try { 382 ((OptimisticLockingAggregationRepository)aggregationRepository).add(camelContext, key, oldExchange, newExchange); 383 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { 384 onOptimisticLockingFailure(oldExchange, newExchange); 385 throw e; 386 } 387 } else { 388 aggregationRepository.add(camelContext, key, newExchange); 389 } 390 } 391 392 protected void onOptimisticLockingFailure(Exchange oldExchange, Exchange newExchange) { 393 if (aggregationStrategy instanceof OptimisticLockingAwareAggregationStrategy) { 394 LOG.trace("onOptimisticLockFailure with AggregationStrategy: {}, oldExchange: {}, newExchange: {}", 395 new Object[]{aggregationStrategy, oldExchange, newExchange}); 396 ((OptimisticLockingAwareAggregationStrategy)aggregationStrategy).onOptimisticLockFailure(oldExchange, newExchange); 397 } 398 } 399 400 /** 401 * Tests whether the given exchange is complete or not 402 * 403 * @param key the correlation key 404 * @param exchange the incoming exchange 405 * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion 406 */ 407 protected String isCompleted(String key, Exchange exchange) { 408 // batch consumer completion must always run first 409 if (isCompletionFromBatchConsumer()) { 410 batchConsumerCorrelationKeys.add(key); 411 batchConsumerCounter.incrementAndGet(); 412 int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class); 413 if (size > 0 && batchConsumerCounter.intValue() >= size) { 414 // batch consumer is complete then reset the counter 415 batchConsumerCounter.set(0); 416 return "consumer"; 417 } 418 } 419 420 if (getCompletionPredicate() != null) { 421 boolean answer = getCompletionPredicate().matches(exchange); 422 if (answer) { 423 return "predicate"; 424 } 425 } 426 427 boolean sizeChecked = false; 428 if (getCompletionSizeExpression() != null) { 429 Integer value = getCompletionSizeExpression().evaluate(exchange, Integer.class); 430 if (value != null && value > 0) { 431 // mark as already checked size as expression takes precedence over static configured 432 sizeChecked = true; 433 int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); 434 if (size >= value) { 435 return "size"; 436 } 437 } 438 } 439 if (!sizeChecked && getCompletionSize() > 0) { 440 int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); 441 if (size >= getCompletionSize()) { 442 return "size"; 443 } 444 } 445 446 // timeout can be either evaluated based on an expression or from a fixed value 447 // expression takes precedence 448 boolean timeoutSet = false; 449 if (getCompletionTimeoutExpression() != null) { 450 Long value = getCompletionTimeoutExpression().evaluate(exchange, Long.class); 451 if (value != null && value > 0) { 452 if (LOG.isTraceEnabled()) { 453 LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}", 454 new Object[]{key, value, exchange}); 455 } 456 addExchangeToTimeoutMap(key, exchange, value); 457 timeoutSet = true; 458 } 459 } 460 if (!timeoutSet && getCompletionTimeout() > 0) { 461 // timeout is used so use the timeout map to keep an eye on this 462 if (LOG.isTraceEnabled()) { 463 LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}", 464 new Object[]{key, getCompletionTimeout(), exchange}); 465 } 466 addExchangeToTimeoutMap(key, exchange, getCompletionTimeout()); 467 } 468 469 // not complete 470 return null; 471 } 472 473 protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) { 474 return aggregationStrategy.aggregate(oldExchange, newExchange); 475 } 476 477 protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) { 478 // store the correlation key as property before we remove so the repository has that information 479 if (original != null) { 480 original.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key); 481 } 482 aggregated.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key); 483 484 // only remove if we have previous added (as we could potentially complete with only 1 exchange) 485 // (if we have previous added then we have that as the original exchange) 486 if (original != null) { 487 // remove from repository as its completed, we do this first as to trigger any OptimisticLockingException's 488 aggregationRepository.remove(aggregated.getContext(), key, original); 489 } 490 491 if (!fromTimeout && timeoutMap != null) { 492 // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker) 493 timeoutMap.remove(key); 494 } 495 496 // this key has been closed so add it to the closed map 497 if (closedCorrelationKeys != null) { 498 closedCorrelationKeys.put(key, key); 499 } 500 501 if (fromTimeout) { 502 // invoke timeout if its timeout aware aggregation strategy, 503 // to allow any custom processing before discarding the exchange 504 if (aggregationStrategy instanceof TimeoutAwareAggregationStrategy) { 505 long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1; 506 ((TimeoutAwareAggregationStrategy) aggregationStrategy).timeout(aggregated, -1, -1, timeout); 507 } 508 } 509 510 Exchange answer; 511 if (fromTimeout && isDiscardOnCompletionTimeout()) { 512 // discard due timeout 513 LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated); 514 // must confirm the discarded exchange 515 aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId()); 516 // and remove redelivery state as well 517 redeliveryState.remove(aggregated.getExchangeId()); 518 // the completion was from timeout and we should just discard it 519 answer = null; 520 } else { 521 // the aggregated exchange should be published (sent out) 522 answer = aggregated; 523 } 524 525 return answer; 526 } 527 528 private void onSubmitCompletion(final String key, final Exchange exchange) { 529 LOG.debug("Aggregation complete for correlation key {} sending aggregated exchange: {}", key, exchange); 530 531 // add this as in progress before we submit the task 532 inProgressCompleteExchanges.add(exchange.getExchangeId()); 533 534 // invoke the on completion callback 535 if (aggregationStrategy instanceof CompletionAwareAggregationStrategy) { 536 ((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange); 537 } 538 539 // send this exchange 540 executorService.submit(new Runnable() { 541 public void run() { 542 LOG.debug("Processing aggregated exchange: {}", exchange); 543 544 // add on completion task so we remember to update the inProgressCompleteExchanges 545 exchange.addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId())); 546 547 try { 548 processor.process(exchange); 549 } catch (Throwable e) { 550 exchange.setException(e); 551 } 552 553 // log exception if there was a problem 554 if (exchange.getException() != null) { 555 // if there was an exception then let the exception handler handle it 556 getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); 557 } else { 558 LOG.trace("Processing aggregated exchange: {} complete.", exchange); 559 } 560 } 561 }); 562 } 563 564 /** 565 * Restores the timeout map with timeout values from the aggregation repository. 566 * <p/> 567 * This is needed in case the aggregator has been stopped and started again (for example a server restart). 568 * Then the existing exchanges from the {@link AggregationRepository} must have their timeout conditions restored. 569 */ 570 protected void restoreTimeoutMapFromAggregationRepository() throws Exception { 571 // grab the timeout value for each partly aggregated exchange 572 Set<String> keys = aggregationRepository.getKeys(); 573 if (keys == null || keys.isEmpty()) { 574 return; 575 } 576 577 StopWatch watch = new StopWatch(); 578 LOG.trace("Starting restoring CompletionTimeout for {} existing exchanges from the aggregation repository...", keys.size()); 579 580 for (String key : keys) { 581 Exchange exchange = aggregationRepository.get(camelContext, key); 582 // grab the timeout value 583 long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0; 584 if (timeout > 0) { 585 LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout); 586 addExchangeToTimeoutMap(key, exchange, timeout); 587 } 588 } 589 590 // log duration of this task so end user can see how long it takes to pre-check this upon starting 591 LOG.info("Restored {} CompletionTimeout conditions in the AggregationTimeoutChecker in {}", 592 timeoutMap.size(), TimeUtils.printDuration(watch.stop())); 593 } 594 595 /** 596 * Adds the given exchange to the timeout map, which is used by the timeout checker task to trigger timeouts. 597 * 598 * @param key the correlation key 599 * @param exchange the exchange 600 * @param timeout the timeout value in millis 601 */ 602 private void addExchangeToTimeoutMap(String key, Exchange exchange, long timeout) { 603 // store the timeout value on the exchange as well, in case we need it later 604 exchange.setProperty(Exchange.AGGREGATED_TIMEOUT, timeout); 605 timeoutMap.put(key, exchange.getExchangeId(), timeout); 606 } 607 608 public Predicate getCompletionPredicate() { 609 return completionPredicate; 610 } 611 612 public void setCompletionPredicate(Predicate completionPredicate) { 613 this.completionPredicate = completionPredicate; 614 } 615 616 public boolean isEagerCheckCompletion() { 617 return eagerCheckCompletion; 618 } 619 620 public void setEagerCheckCompletion(boolean eagerCheckCompletion) { 621 this.eagerCheckCompletion = eagerCheckCompletion; 622 } 623 624 public long getCompletionTimeout() { 625 return completionTimeout; 626 } 627 628 public void setCompletionTimeout(long completionTimeout) { 629 this.completionTimeout = completionTimeout; 630 } 631 632 public Expression getCompletionTimeoutExpression() { 633 return completionTimeoutExpression; 634 } 635 636 public void setCompletionTimeoutExpression(Expression completionTimeoutExpression) { 637 this.completionTimeoutExpression = completionTimeoutExpression; 638 } 639 640 public long getCompletionInterval() { 641 return completionInterval; 642 } 643 644 public void setCompletionInterval(long completionInterval) { 645 this.completionInterval = completionInterval; 646 } 647 648 public int getCompletionSize() { 649 return completionSize; 650 } 651 652 public void setCompletionSize(int completionSize) { 653 this.completionSize = completionSize; 654 } 655 656 public Expression getCompletionSizeExpression() { 657 return completionSizeExpression; 658 } 659 660 public void setCompletionSizeExpression(Expression completionSizeExpression) { 661 this.completionSizeExpression = completionSizeExpression; 662 } 663 664 public boolean isIgnoreInvalidCorrelationKeys() { 665 return ignoreInvalidCorrelationKeys; 666 } 667 668 public void setIgnoreInvalidCorrelationKeys(boolean ignoreInvalidCorrelationKeys) { 669 this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys; 670 } 671 672 public Integer getCloseCorrelationKeyOnCompletion() { 673 return closeCorrelationKeyOnCompletion; 674 } 675 676 public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) { 677 this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion; 678 } 679 680 public boolean isCompletionFromBatchConsumer() { 681 return completionFromBatchConsumer; 682 } 683 684 public void setCompletionFromBatchConsumer(boolean completionFromBatchConsumer) { 685 this.completionFromBatchConsumer = completionFromBatchConsumer; 686 } 687 688 public ExceptionHandler getExceptionHandler() { 689 return exceptionHandler; 690 } 691 692 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 693 this.exceptionHandler = exceptionHandler; 694 } 695 696 public boolean isParallelProcessing() { 697 return parallelProcessing; 698 } 699 700 public void setParallelProcessing(boolean parallelProcessing) { 701 this.parallelProcessing = parallelProcessing; 702 } 703 704 public boolean isOptimisticLocking() { 705 return optimisticLocking; 706 } 707 708 public void setOptimisticLocking(boolean optimisticLocking) { 709 this.optimisticLocking = optimisticLocking; 710 } 711 712 public AggregationRepository getAggregationRepository() { 713 return aggregationRepository; 714 } 715 716 public void setAggregationRepository(AggregationRepository aggregationRepository) { 717 this.aggregationRepository = aggregationRepository; 718 } 719 720 public boolean isDiscardOnCompletionTimeout() { 721 return discardOnCompletionTimeout; 722 } 723 724 public void setDiscardOnCompletionTimeout(boolean discardOnCompletionTimeout) { 725 this.discardOnCompletionTimeout = discardOnCompletionTimeout; 726 } 727 728 public void setForceCompletionOnStop(boolean forceCompletionOnStop) { 729 this.forceCompletionOnStop = forceCompletionOnStop; 730 } 731 732 public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { 733 this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; 734 } 735 736 public ScheduledExecutorService getTimeoutCheckerExecutorService() { 737 return timeoutCheckerExecutorService; 738 } 739 740 public boolean isShutdownTimeoutCheckerExecutorService() { 741 return shutdownTimeoutCheckerExecutorService; 742 } 743 744 public void setShutdownTimeoutCheckerExecutorService(boolean shutdownTimeoutCheckerExecutorService) { 745 this.shutdownTimeoutCheckerExecutorService = shutdownTimeoutCheckerExecutorService; 746 } 747 748 public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) { 749 this.optimisticLockRetryPolicy = optimisticLockRetryPolicy; 750 } 751 752 public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() { 753 return optimisticLockRetryPolicy; 754 } 755 756 public AggregationStrategy getAggregationStrategy() { 757 return aggregationStrategy; 758 } 759 760 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 761 this.aggregationStrategy = aggregationStrategy; 762 } 763 764 public Expression getCorrelationExpression() { 765 return correlationExpression; 766 } 767 768 public void setCorrelationExpression(Expression correlationExpression) { 769 this.correlationExpression = correlationExpression; 770 } 771 772 /** 773 * On completion task which keeps the booking of the in progress up to date 774 */ 775 private final class AggregateOnCompletion implements Synchronization { 776 private final String exchangeId; 777 778 private AggregateOnCompletion(String exchangeId) { 779 // must use the original exchange id as it could potentially change if send over SEDA etc. 780 this.exchangeId = exchangeId; 781 } 782 783 public void onFailure(Exchange exchange) { 784 LOG.trace("Aggregated exchange onFailure: {}", exchange); 785 786 // must remember to remove in progress when we failed 787 inProgressCompleteExchanges.remove(exchangeId); 788 // do not remove redelivery state as we need it when we redeliver again later 789 } 790 791 public void onComplete(Exchange exchange) { 792 LOG.trace("Aggregated exchange onComplete: {}", exchange); 793 794 // only confirm if we processed without a problem 795 try { 796 aggregationRepository.confirm(exchange.getContext(), exchangeId); 797 // and remove redelivery state as well 798 redeliveryState.remove(exchangeId); 799 } finally { 800 // must remember to remove in progress when we are complete 801 inProgressCompleteExchanges.remove(exchangeId); 802 } 803 } 804 805 @Override 806 public String toString() { 807 return "AggregateOnCompletion"; 808 } 809 } 810 811 /** 812 * Background task that looks for aggregated exchanges which is triggered by completion timeouts. 813 */ 814 private final class AggregationTimeoutMap extends DefaultTimeoutMap<String, String> { 815 816 private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) { 817 // do NOT use locking on the timeout map as this aggregator has its own shared lock we will use instead 818 super(executor, requestMapPollTimeMillis, optimisticLocking); 819 } 820 821 @Override 822 public void purge() { 823 // must acquire the shared aggregation lock to be able to purge 824 if (!optimisticLocking) { lock.lock(); } 825 try { 826 super.purge(); 827 } finally { 828 if (!optimisticLocking) { lock.unlock(); } 829 } 830 } 831 832 @Override 833 public boolean onEviction(String key, String exchangeId) { 834 log.debug("Completion timeout triggered for correlation key: {}", key); 835 836 boolean inProgress = inProgressCompleteExchanges.contains(exchangeId); 837 if (inProgress) { 838 LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); 839 return true; 840 } 841 842 // get the aggregated exchange 843 boolean evictionStolen = false; 844 Exchange answer = aggregationRepository.get(camelContext, key); 845 if (answer == null) { 846 evictionStolen = true; 847 } else { 848 // indicate it was completed by timeout 849 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); 850 try { 851 answer = onCompletion(key, answer, answer, true); 852 if (answer != null) { 853 onSubmitCompletion(key, answer); 854 } 855 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { 856 evictionStolen = true; 857 } 858 } 859 860 if (optimisticLocking && evictionStolen) { 861 LOG.debug("Another Camel instance has already successfully correlated or processed this timeout eviction " 862 + "for exchange with id: {} and correlation id: {}", exchangeId, key); 863 } 864 return true; 865 } 866 } 867 868 /** 869 * Background task that triggers completion based on interval. 870 */ 871 private final class AggregationIntervalTask implements Runnable { 872 873 public void run() { 874 // only run if CamelContext has been fully started 875 if (!camelContext.getStatus().isStarted()) { 876 LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", camelContext.getName()); 877 return; 878 } 879 880 LOG.trace("Starting completion interval task"); 881 882 // trigger completion for all in the repository 883 Set<String> keys = aggregationRepository.getKeys(); 884 885 if (keys != null && !keys.isEmpty()) { 886 // must acquire the shared aggregation lock to be able to trigger interval completion 887 if (!optimisticLocking) { lock.lock(); } 888 try { 889 for (String key : keys) { 890 boolean stolenInterval = false; 891 Exchange exchange = aggregationRepository.get(camelContext, key); 892 if (exchange == null) { 893 stolenInterval = true; 894 } else { 895 LOG.trace("Completion interval triggered for correlation key: {}", key); 896 // indicate it was completed by interval 897 exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval"); 898 try { 899 Exchange answer = onCompletion(key, exchange, exchange, false); 900 if (answer != null) { 901 onSubmitCompletion(key, answer); 902 } 903 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { 904 stolenInterval = true; 905 } 906 } 907 if (optimisticLocking && stolenInterval) { 908 LOG.debug("Another Camel instance has already processed this interval aggregation for exchange with correlation id: {}", key); 909 } 910 } 911 } finally { 912 if (!optimisticLocking) { lock.unlock(); } 913 } 914 } 915 916 LOG.trace("Completion interval task complete"); 917 } 918 } 919 920 /** 921 * Background task that looks for aggregated exchanges to recover. 922 */ 923 private final class RecoverTask implements Runnable { 924 private final RecoverableAggregationRepository recoverable; 925 926 private RecoverTask(RecoverableAggregationRepository recoverable) { 927 this.recoverable = recoverable; 928 } 929 930 public void run() { 931 // only run if CamelContext has been fully started 932 if (!camelContext.getStatus().isStarted()) { 933 LOG.trace("Recover check cannot start due CamelContext({}) has not been started yet", camelContext.getName()); 934 return; 935 } 936 937 LOG.trace("Starting recover check"); 938 939 // copy the current in progress before doing scan 940 final Set<String> copyOfInProgress = new LinkedHashSet<String>(inProgressCompleteExchanges); 941 942 Set<String> exchangeIds = recoverable.scan(camelContext); 943 for (String exchangeId : exchangeIds) { 944 945 // we may shutdown while doing recovery 946 if (!isRunAllowed()) { 947 LOG.info("We are shutting down so stop recovering"); 948 return; 949 } 950 951 // consider in progress if it was in progress before we did the scan, or currently after we did the scan 952 // its safer to consider it in progress than risk duplicates due both in progress + recovered 953 boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId); 954 if (inProgress) { 955 LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); 956 } else { 957 LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId); 958 Exchange exchange = recoverable.recover(camelContext, exchangeId); 959 if (exchange != null) { 960 // get the correlation key 961 String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class); 962 // and mark it as redelivered 963 exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE); 964 965 // get the current redelivery data 966 RedeliveryData data = redeliveryState.get(exchange.getExchangeId()); 967 968 // if we are exhausted, then move to dead letter channel 969 if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) { 970 LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries() 971 + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri()); 972 973 // send to DLC 974 try { 975 // set redelivery counter 976 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); 977 exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); 978 deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange); 979 } catch (Throwable e) { 980 exchange.setException(e); 981 } 982 983 // handle if failed 984 if (exchange.getException() != null) { 985 getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException()); 986 } else { 987 // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again 988 recoverable.confirm(camelContext, exchangeId); 989 } 990 } else { 991 // update current redelivery state 992 if (data == null) { 993 // create new data 994 data = new RedeliveryData(); 995 redeliveryState.put(exchange.getExchangeId(), data); 996 } 997 data.redeliveryCounter++; 998 999 // set redelivery counter 1000 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); 1001 if (recoverable.getMaximumRedeliveries() > 0) { 1002 exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries()); 1003 } 1004 1005 LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId); 1006 1007 // not exhaust so resubmit the recovered exchange 1008 onSubmitCompletion(key, exchange); 1009 } 1010 } 1011 } 1012 } 1013 1014 LOG.trace("Recover check complete"); 1015 } 1016 } 1017 1018 @Override 1019 protected void doStart() throws Exception { 1020 if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null 1021 && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null 1022 && getCompletionSizeExpression() == null) { 1023 throw new IllegalStateException("At least one of the completions options" 1024 + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set"); 1025 } 1026 1027 if (getCloseCorrelationKeyOnCompletion() != null) { 1028 if (getCloseCorrelationKeyOnCompletion() > 0) { 1029 LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of " + getCloseCorrelationKeyOnCompletion()); 1030 closedCorrelationKeys = new LRUCache<String, String>(getCloseCorrelationKeyOnCompletion()); 1031 } else { 1032 LOG.info("Using ClosedCorrelationKeys with unbounded capacity"); 1033 closedCorrelationKeys = new ConcurrentHashMap<String, String>(); 1034 } 1035 } 1036 1037 if (aggregationRepository == null) { 1038 aggregationRepository = new MemoryAggregationRepository(optimisticLocking); 1039 LOG.info("Defaulting to MemoryAggregationRepository"); 1040 } 1041 1042 if (optimisticLocking) { 1043 if (!(aggregationRepository instanceof OptimisticLockingAggregationRepository)) { 1044 throw new IllegalArgumentException("Optimistic locking cannot be enabled without using an AggregationRepository that implements OptimisticLockingAggregationRepository"); 1045 } 1046 LOG.info("Optimistic locking is enabled"); 1047 } 1048 1049 ServiceHelper.startServices(aggregationStrategy, processor, aggregationRepository); 1050 1051 // should we use recover checker 1052 if (aggregationRepository instanceof RecoverableAggregationRepository) { 1053 RecoverableAggregationRepository recoverable = (RecoverableAggregationRepository) aggregationRepository; 1054 if (recoverable.isUseRecovery()) { 1055 long interval = recoverable.getRecoveryIntervalInMillis(); 1056 if (interval <= 0) { 1057 throw new IllegalArgumentException("AggregationRepository has recovery enabled and the RecoveryInterval option must be a positive number, was: " + interval); 1058 } 1059 1060 // create a background recover thread to check every interval 1061 recoverService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateRecoverChecker", 1); 1062 Runnable recoverTask = new RecoverTask(recoverable); 1063 LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every " + interval + " millis."); 1064 // use fixed delay so there is X interval between each run 1065 recoverService.scheduleWithFixedDelay(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS); 1066 1067 if (recoverable.getDeadLetterUri() != null) { 1068 int max = recoverable.getMaximumRedeliveries(); 1069 if (max <= 0) { 1070 throw new IllegalArgumentException("Option maximumRedeliveries must be a positive number, was: " + max); 1071 } 1072 LOG.info("After " + max + " failed redelivery attempts Exchanges will be moved to deadLetterUri: " + recoverable.getDeadLetterUri()); 1073 1074 // dead letter uri must be a valid endpoint 1075 Endpoint endpoint = camelContext.getEndpoint(recoverable.getDeadLetterUri()); 1076 if (endpoint == null) { 1077 throw new NoSuchEndpointException(recoverable.getDeadLetterUri()); 1078 } 1079 deadLetterProducerTemplate = camelContext.createProducerTemplate(); 1080 } 1081 } 1082 } 1083 1084 if (getCompletionInterval() > 0 && getCompletionTimeout() > 0) { 1085 throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both."); 1086 } 1087 if (getCompletionInterval() > 0) { 1088 LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis."); 1089 if (getTimeoutCheckerExecutorService() == null) { 1090 setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1)); 1091 shutdownTimeoutCheckerExecutorService = true; 1092 } 1093 // trigger completion based on interval 1094 getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS); 1095 } 1096 1097 // start timeout service if its in use 1098 if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) { 1099 LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout() + " millis of inactivity."); 1100 if (getTimeoutCheckerExecutorService() == null) { 1101 setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1)); 1102 shutdownTimeoutCheckerExecutorService = true; 1103 } 1104 // check for timed out aggregated messages once every second 1105 timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L); 1106 // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we 1107 // need to re-establish the timeout map so timeout can trigger 1108 restoreTimeoutMapFromAggregationRepository(); 1109 ServiceHelper.startService(timeoutMap); 1110 } 1111 } 1112 1113 @Override 1114 protected void doStop() throws Exception { 1115 // note: we cannot do doForceCompletionOnStop from this doStop method 1116 // as this is handled in the prepareShutdown method which is also invoked when stopping a route 1117 // and is better suited for preparing to shutdown than this doStop method is 1118 1119 if (recoverService != null) { 1120 camelContext.getExecutorServiceManager().shutdown(recoverService); 1121 } 1122 ServiceHelper.stopServices(timeoutMap, processor, deadLetterProducerTemplate); 1123 1124 if (closedCorrelationKeys != null) { 1125 // it may be a service so stop it as well 1126 ServiceHelper.stopService(closedCorrelationKeys); 1127 closedCorrelationKeys.clear(); 1128 } 1129 batchConsumerCorrelationKeys.clear(); 1130 redeliveryState.clear(); 1131 } 1132 1133 @Override 1134 public void prepareShutdown(boolean forced) { 1135 // we are shutting down, so force completion if this option was enabled 1136 // but only do this when forced=false, as that is when we have chance to 1137 // send out new messages to be routed by Camel. When forced=true, then 1138 // we have to shutdown in a hurry 1139 if (!forced && forceCompletionOnStop) { 1140 doForceCompletionOnStop(); 1141 } 1142 } 1143 1144 private void doForceCompletionOnStop() { 1145 int expected = forceCompletionOfAllGroups(); 1146 1147 StopWatch watch = new StopWatch(); 1148 while (inProgressCompleteExchanges.size() > 0) { 1149 LOG.trace("Waiting for {} inflight exchanges to complete", inProgressCompleteExchanges.size()); 1150 try { 1151 Thread.sleep(100); 1152 } catch (InterruptedException e) { 1153 // break out as we got interrupted such as the JVM terminating 1154 LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", inProgressCompleteExchanges.size()); 1155 break; 1156 } 1157 } 1158 1159 if (expected > 0) { 1160 LOG.info("Forcing completion of all groups with {} exchanges completed in {}", expected, TimeUtils.printDuration(watch.stop())); 1161 } 1162 } 1163 1164 @Override 1165 protected void doShutdown() throws Exception { 1166 // shutdown aggregation repository and the strategy 1167 ServiceHelper.stopAndShutdownServices(aggregationRepository, aggregationStrategy); 1168 1169 // cleanup when shutting down 1170 inProgressCompleteExchanges.clear(); 1171 1172 if (shutdownExecutorService) { 1173 camelContext.getExecutorServiceManager().shutdownNow(executorService); 1174 } 1175 if (shutdownTimeoutCheckerExecutorService) { 1176 camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService); 1177 timeoutCheckerExecutorService = null; 1178 } 1179 1180 super.doShutdown(); 1181 } 1182 1183 public int forceCompletionOfAllGroups() { 1184 1185 // only run if CamelContext has been fully started or is stopping 1186 boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping(); 1187 if (!allow) { 1188 LOG.warn("Cannot start force completion of all groups because CamelContext({}) has not been started", camelContext.getName()); 1189 return 0; 1190 } 1191 1192 LOG.trace("Starting force completion of all groups task"); 1193 1194 // trigger completion for all in the repository 1195 Set<String> keys = aggregationRepository.getKeys(); 1196 1197 int total = 0; 1198 if (keys != null && !keys.isEmpty()) { 1199 // must acquire the shared aggregation lock to be able to trigger force completion 1200 if (!optimisticLocking) { lock.lock(); } 1201 total = keys.size(); 1202 try { 1203 for (String key : keys) { 1204 Exchange exchange = aggregationRepository.get(camelContext, key); 1205 if (exchange != null) { 1206 LOG.trace("Force completion triggered for correlation key: {}", key); 1207 // indicate it was completed by a force completion request 1208 exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); 1209 Exchange answer = onCompletion(key, exchange, exchange, false); 1210 if (answer != null) { 1211 onSubmitCompletion(key, answer); 1212 } 1213 } 1214 } 1215 } finally { 1216 if (!optimisticLocking) { lock.unlock(); } 1217 } 1218 } 1219 LOG.trace("Completed force completion of all groups task"); 1220 1221 if (total > 0) { 1222 LOG.debug("Forcing completion of all groups with {} exchanges", total); 1223 } 1224 return total; 1225 } 1226 1227 }