001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.aggregate;
018    
019    import java.util.ArrayList;
020    import java.util.Collections;
021    import java.util.LinkedHashSet;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.Set;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.ConcurrentSkipListSet;
027    import java.util.concurrent.ExecutorService;
028    import java.util.concurrent.ScheduledExecutorService;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.atomic.AtomicInteger;
031    import java.util.concurrent.locks.Lock;
032    import java.util.concurrent.locks.ReentrantLock;
033    
034    import org.apache.camel.AsyncCallback;
035    import org.apache.camel.AsyncProcessor;
036    import org.apache.camel.CamelContext;
037    import org.apache.camel.CamelExchangeException;
038    import org.apache.camel.Endpoint;
039    import org.apache.camel.Exchange;
040    import org.apache.camel.Expression;
041    import org.apache.camel.Navigate;
042    import org.apache.camel.NoSuchEndpointException;
043    import org.apache.camel.Predicate;
044    import org.apache.camel.Processor;
045    import org.apache.camel.ProducerTemplate;
046    import org.apache.camel.TimeoutMap;
047    import org.apache.camel.Traceable;
048    import org.apache.camel.impl.LoggingExceptionHandler;
049    import org.apache.camel.spi.AggregationRepository;
050    import org.apache.camel.spi.ExceptionHandler;
051    import org.apache.camel.spi.OptimisticLockingAggregationRepository;
052    import org.apache.camel.spi.RecoverableAggregationRepository;
053    import org.apache.camel.spi.ShutdownPrepared;
054    import org.apache.camel.spi.Synchronization;
055    import org.apache.camel.support.DefaultTimeoutMap;
056    import org.apache.camel.support.ServiceSupport;
057    import org.apache.camel.util.AsyncProcessorHelper;
058    import org.apache.camel.util.ExchangeHelper;
059    import org.apache.camel.util.LRUCache;
060    import org.apache.camel.util.ObjectHelper;
061    import org.apache.camel.util.ServiceHelper;
062    import org.apache.camel.util.StopWatch;
063    import org.apache.camel.util.TimeUtils;
064    import org.slf4j.Logger;
065    import org.slf4j.LoggerFactory;
066    
067    /**
068     * An implementation of the <a
069     * href="http://camel.apache.org/aggregator2.html">Aggregator</a>
070     * pattern where a batch of messages are processed (up to a maximum amount or
071     * until some timeout is reached) and messages for the same correlation key are
072     * combined together using some kind of {@link AggregationStrategy}
073     * (by default the latest message is used) to compress many message exchanges
074     * into a smaller number of exchanges.
075     * <p/>
076     * A good example of this is stock market data; you may be receiving 30,000
077     * messages/second and you may want to throttle it right down so that multiple
078     * messages for the same stock are combined (or just the latest message is used
079     * and older prices are discarded). Another idea is to combine line item messages
080     * together into a single invoice message.
081     */
082    public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared {
083    
084        public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
085    
086        private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class);
087    
088        private final Lock lock = new ReentrantLock();
089        private final CamelContext camelContext;
090        private final Processor processor;
091        private AggregationStrategy aggregationStrategy;
092        private Expression correlationExpression;
093        private final ExecutorService executorService;
094        private final boolean shutdownExecutorService;
095        private OptimisticLockRetryPolicy optimisticLockRetryPolicy = new OptimisticLockRetryPolicy();
096        private ScheduledExecutorService timeoutCheckerExecutorService;
097        private boolean shutdownTimeoutCheckerExecutorService;
098        private ScheduledExecutorService recoverService;
099        // store correlation key -> exchange id in timeout map
100        private TimeoutMap<String, String> timeoutMap;
101        private ExceptionHandler exceptionHandler;
102        private AggregationRepository aggregationRepository;
103        private Map<String, String> closedCorrelationKeys;
104        private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<String>();
105        private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
106        private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>();
107    
108        // keep booking about redelivery
109        private class RedeliveryData {
110            int redeliveryCounter;
111        }
112    
113        // options
114        private boolean ignoreInvalidCorrelationKeys;
115        private Integer closeCorrelationKeyOnCompletion;
116        private boolean parallelProcessing;
117        private boolean optimisticLocking;
118    
119        // different ways to have completion triggered
120        private boolean eagerCheckCompletion;
121        private Predicate completionPredicate;
122        private long completionTimeout;
123        private Expression completionTimeoutExpression;
124        private long completionInterval;
125        private int completionSize;
126        private Expression completionSizeExpression;
127        private boolean completionFromBatchConsumer;
128        private AtomicInteger batchConsumerCounter = new AtomicInteger();
129        private boolean discardOnCompletionTimeout;
130        private boolean forceCompletionOnStop;
131    
132        private ProducerTemplate deadLetterProducerTemplate;
133    
134        public AggregateProcessor(CamelContext camelContext, Processor processor,
135                                  Expression correlationExpression, AggregationStrategy aggregationStrategy,
136                                  ExecutorService executorService, boolean shutdownExecutorService) {
137            ObjectHelper.notNull(camelContext, "camelContext");
138            ObjectHelper.notNull(processor, "processor");
139            ObjectHelper.notNull(correlationExpression, "correlationExpression");
140            ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
141            ObjectHelper.notNull(executorService, "executorService");
142            this.camelContext = camelContext;
143            this.processor = processor;
144            this.correlationExpression = correlationExpression;
145            this.aggregationStrategy = aggregationStrategy;
146            this.executorService = executorService;
147            this.shutdownExecutorService = shutdownExecutorService;
148            this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
149        }
150    
151        @Override
152        public String toString() {
153            return "AggregateProcessor[to: " + processor + "]";
154        }
155    
156        public String getTraceLabel() {
157            return "aggregate[" + correlationExpression + "]";
158        }
159    
160        public List<Processor> next() {
161            if (!hasNext()) {
162                return null;
163            }
164            List<Processor> answer = new ArrayList<Processor>(1);
165            answer.add(processor);
166            return answer;
167        }
168    
169        public boolean hasNext() {
170            return processor != null;
171        }
172    
173        public void process(Exchange exchange) throws Exception {
174            AsyncProcessorHelper.process(this, exchange);
175        }
176    
177        public boolean process(Exchange exchange, AsyncCallback callback) {
178            try {
179                doProcess(exchange);
180            } catch (Throwable e) {
181                exchange.setException(e);
182            }
183            callback.done(true);
184            return true;
185        }
186    
187        protected void doProcess(Exchange exchange) throws Exception {
188    
189            //check for the special header to force completion of all groups (and ignore the exchange otherwise)
190            boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
191            if (completeAllGroups) {
192                forceCompletionOfAllGroups();
193                return;
194            }
195    
196            // compute correlation expression
197            String key = correlationExpression.evaluate(exchange, String.class);
198            if (ObjectHelper.isEmpty(key)) {
199                // we have a bad correlation key
200                if (isIgnoreInvalidCorrelationKeys()) {
201                    LOG.debug("Invalid correlation key. This Exchange will be ignored: {}", exchange);
202                    return;
203                } else {
204                    throw new CamelExchangeException("Invalid correlation key", exchange);
205                }
206            }
207    
208            // is the correlation key closed?
209            if (closedCorrelationKeys != null && closedCorrelationKeys.containsKey(key)) {
210                throw new ClosedCorrelationKeyException(key, exchange);
211            }
212    
213            // when optimist locking is enabled we keep trying until we succeed
214            if (optimisticLocking) {
215                List<Exchange> aggregated = null;
216                boolean exhaustedRetries = true;
217                int attempt = 0;
218                do {
219                    attempt++;
220                    // copy exchange, and do not share the unit of work
221                    // the aggregated output runs in another unit of work
222                    Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
223                    try {
224                        aggregated = doAggregation(key, copy);
225                        exhaustedRetries = false;
226                        break;
227                    } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
228                        LOG.trace("On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to add() key: {} and exchange: {}",
229                                  new Object[]{attempt, aggregationRepository, key, copy, e});
230                        optimisticLockRetryPolicy.doDelay(attempt);
231                    }
232                } while (optimisticLockRetryPolicy.shouldRetry(attempt));
233    
234                if (exhaustedRetries) {
235                    throw new CamelExchangeException("Exhausted optimistic locking retry attempts, tried " + attempt + " times", exchange,
236                            new OptimisticLockingAggregationRepository.OptimisticLockingException());
237                } else if (aggregated != null) {
238                    // we are completed so submit to completion
239                    for (Exchange agg : aggregated) {
240                        onSubmitCompletion(key, agg);
241                    }
242                }
243            } else {
244                // copy exchange, and do not share the unit of work
245                // the aggregated output runs in another unit of work
246                Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
247    
248                // when memory based then its fast using synchronized, but if the aggregation repository is IO
249                // bound such as JPA etc then concurrent aggregation per correlation key could
250                // improve performance as we can run aggregation repository get/add in parallel
251                List<Exchange> aggregated = null;
252                lock.lock();
253                try {
254                    aggregated = doAggregation(key, copy);
255                } finally {
256                    lock.unlock();
257                }
258    
259                // we are completed so do that work outside the lock
260                if (aggregated != null) {
261                    for (Exchange agg : aggregated) {
262                        onSubmitCompletion(key, agg);
263                    }
264                }
265            }
266    
267            // check for the special header to force completion of all groups (inclusive of the message)
268            boolean completeAllGroupsInclusive = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class);
269            if (completeAllGroupsInclusive) {
270                forceCompletionOfAllGroups();
271            }
272        }
273    
274        /**
275         * Aggregates the exchange with the given correlation key
276         * <p/>
277         * This method <b>must</b> be run synchronized as we cannot aggregate the same correlation key
278         * in parallel.
279         * <p/>
280         * The returned {@link Exchange} should be send downstream using the {@link #onSubmitCompletion(String, org.apache.camel.Exchange)}
281         * method which sends out the aggregated and completed {@link Exchange}.
282         *
283         * @param key      the correlation key
284         * @param newExchange the exchange
285         * @return the aggregated exchange(s) which is complete, or <tt>null</tt> if not yet complete
286         * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating
287         */
288        private List<Exchange> doAggregation(String key, Exchange newExchange) throws CamelExchangeException {
289            LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
290    
291            Exchange answer;
292            Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key);
293            Exchange oldExchange = originalExchange;
294    
295            Integer size = 1;
296            if (oldExchange != null) {
297                // hack to support legacy AggregationStrategy's that modify and return the oldExchange, these will not
298                // working when using an identify based approach for optimistic locking like the MemoryAggregationRepository.
299                if (optimisticLocking && aggregationRepository instanceof MemoryAggregationRepository) {
300                    oldExchange = originalExchange.copy();
301                }
302                size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class);
303                size++;
304            }
305    
306            // check if we are complete
307            String complete = null;
308            if (isEagerCheckCompletion()) {
309                // put the current aggregated size on the exchange so its avail during completion check
310                newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
311                complete = isCompleted(key, newExchange);
312                // remove it afterwards
313                newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
314            }
315    
316            // prepare the exchanges for aggregation and then aggregate them
317            ExchangeHelper.prepareAggregation(oldExchange, newExchange);
318            // must catch any exception from aggregation
319            try {
320                answer = onAggregation(oldExchange, newExchange);
321            } catch (Throwable e) {
322                throw new CamelExchangeException("Error occurred during aggregation", newExchange, e);
323            }
324            if (answer == null) {
325                throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", newExchange);
326            }
327    
328            // update the aggregated size
329            answer.setProperty(Exchange.AGGREGATED_SIZE, size);
330    
331            // maybe we should check completion after the aggregation
332            if (!isEagerCheckCompletion()) {
333                complete = isCompleted(key, answer);
334            }
335    
336            List<Exchange> list = new ArrayList<Exchange>();
337    
338            // only need to update aggregation repository if we are not complete
339            if (complete == null) {
340                doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
341                // we are not complete so the answer should be null
342                answer = null;
343            } else {
344                // if batch consumer completion is enabled then we need to complete the group
345                if ("consumer".equals(complete)) {
346                    for (String batchKey : batchConsumerCorrelationKeys) {
347                        Exchange batchAnswer;
348                        if (batchKey.equals(key)) {
349                            // skip the current aggregated key as we have already aggregated it and have the answer
350                            batchAnswer = answer;
351                        } else {
352                            batchAnswer = aggregationRepository.get(camelContext, batchKey);
353                        }
354    
355                        if (batchAnswer != null) {
356                            batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
357                            onCompletion(batchKey, originalExchange, batchAnswer, false);
358                            list.add(batchAnswer);
359                        }
360                    }
361                    batchConsumerCorrelationKeys.clear();
362                    // we have already submitted to completion, so answer should be null
363                    answer = null;
364                } else {
365                    // we are complete for this exchange
366                    answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
367                    answer = onCompletion(key, originalExchange, answer, false);
368                }
369            }
370    
371            LOG.trace("onAggregation +++  end  +++ with correlation key: {}", key);
372            if (answer != null) {
373                list.add(answer);
374            }
375            return list;
376        }
377    
378        protected void doAggregationRepositoryAdd(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) {
379            LOG.trace("In progress aggregated oldExchange: {}, newExchange: {} with correlation key: {}", new Object[]{oldExchange, newExchange, key});
380            if (optimisticLocking) {
381                try {
382                    ((OptimisticLockingAggregationRepository)aggregationRepository).add(camelContext, key, oldExchange, newExchange);
383                } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
384                    onOptimisticLockingFailure(oldExchange, newExchange);
385                    throw e;
386                }
387            } else {
388                aggregationRepository.add(camelContext, key, newExchange);
389            }
390        }
391    
392        protected void onOptimisticLockingFailure(Exchange oldExchange, Exchange newExchange) {
393            if (aggregationStrategy instanceof OptimisticLockingAwareAggregationStrategy) {
394                LOG.trace("onOptimisticLockFailure with AggregationStrategy: {}, oldExchange: {}, newExchange: {}",
395                          new Object[]{aggregationStrategy, oldExchange, newExchange});
396                ((OptimisticLockingAwareAggregationStrategy)aggregationStrategy).onOptimisticLockFailure(oldExchange, newExchange);
397            }
398        }
399    
400        /**
401         * Tests whether the given exchange is complete or not
402         *
403         * @param key      the correlation key
404         * @param exchange the incoming exchange
405         * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion
406         */
407        protected String isCompleted(String key, Exchange exchange) {
408            // batch consumer completion must always run first
409            if (isCompletionFromBatchConsumer()) {
410                batchConsumerCorrelationKeys.add(key);
411                batchConsumerCounter.incrementAndGet();
412                int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
413                if (size > 0 && batchConsumerCounter.intValue() >= size) {
414                    // batch consumer is complete then reset the counter
415                    batchConsumerCounter.set(0);
416                    return "consumer";
417                }
418            }
419    
420            if (getCompletionPredicate() != null) {
421                boolean answer = getCompletionPredicate().matches(exchange);
422                if (answer) {
423                    return "predicate";
424                }
425            }
426    
427            boolean sizeChecked = false;
428            if (getCompletionSizeExpression() != null) {
429                Integer value = getCompletionSizeExpression().evaluate(exchange, Integer.class);
430                if (value != null && value > 0) {
431                    // mark as already checked size as expression takes precedence over static configured
432                    sizeChecked = true;
433                    int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
434                    if (size >= value) {
435                        return "size";
436                    }
437                }
438            }
439            if (!sizeChecked && getCompletionSize() > 0) {
440                int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
441                if (size >= getCompletionSize()) {
442                    return "size";
443                }
444            }
445    
446            // timeout can be either evaluated based on an expression or from a fixed value
447            // expression takes precedence
448            boolean timeoutSet = false;
449            if (getCompletionTimeoutExpression() != null) {
450                Long value = getCompletionTimeoutExpression().evaluate(exchange, Long.class);
451                if (value != null && value > 0) {
452                    if (LOG.isTraceEnabled()) {
453                        LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}",
454                                new Object[]{key, value, exchange});
455                    }
456                    addExchangeToTimeoutMap(key, exchange, value);
457                    timeoutSet = true;
458                }
459            }
460            if (!timeoutSet && getCompletionTimeout() > 0) {
461                // timeout is used so use the timeout map to keep an eye on this
462                if (LOG.isTraceEnabled()) {
463                    LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}",
464                            new Object[]{key, getCompletionTimeout(), exchange});
465                }
466                addExchangeToTimeoutMap(key, exchange, getCompletionTimeout());
467            }
468    
469            // not complete
470            return null;
471        }
472    
473        protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
474            return aggregationStrategy.aggregate(oldExchange, newExchange);
475        }
476    
477        protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) {
478            // store the correlation key as property before we remove so the repository has that information
479            if (original != null) {
480                original.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
481            }
482            aggregated.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
483    
484            // only remove if we have previous added (as we could potentially complete with only 1 exchange)
485            // (if we have previous added then we have that as the original exchange)
486            if (original != null) {
487                // remove from repository as its completed, we do this first as to trigger any OptimisticLockingException's
488                aggregationRepository.remove(aggregated.getContext(), key, original);
489            }
490    
491            if (!fromTimeout && timeoutMap != null) {
492                // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
493                timeoutMap.remove(key);
494            }
495    
496            // this key has been closed so add it to the closed map
497            if (closedCorrelationKeys != null) {
498                closedCorrelationKeys.put(key, key);
499            }
500    
501            if (fromTimeout) {
502                // invoke timeout if its timeout aware aggregation strategy,
503                // to allow any custom processing before discarding the exchange
504                if (aggregationStrategy instanceof TimeoutAwareAggregationStrategy) {
505                    long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1;
506                    ((TimeoutAwareAggregationStrategy) aggregationStrategy).timeout(aggregated, -1, -1, timeout);
507                }
508            }
509    
510            Exchange answer;
511            if (fromTimeout && isDiscardOnCompletionTimeout()) {
512                // discard due timeout
513                LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
514                // must confirm the discarded exchange
515                aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId());
516                // and remove redelivery state as well
517                redeliveryState.remove(aggregated.getExchangeId());
518                // the completion was from timeout and we should just discard it
519                answer = null;
520            } else {
521                // the aggregated exchange should be published (sent out)
522                answer = aggregated;
523            }
524    
525            return answer;
526        }
527    
528        private void onSubmitCompletion(final String key, final Exchange exchange) {
529            LOG.debug("Aggregation complete for correlation key {} sending aggregated exchange: {}", key, exchange);
530    
531            // add this as in progress before we submit the task
532            inProgressCompleteExchanges.add(exchange.getExchangeId());
533    
534            // invoke the on completion callback
535            if (aggregationStrategy instanceof CompletionAwareAggregationStrategy) {
536                ((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange);
537            }
538    
539            // send this exchange
540            executorService.submit(new Runnable() {
541                public void run() {
542                    LOG.debug("Processing aggregated exchange: {}", exchange);
543    
544                    // add on completion task so we remember to update the inProgressCompleteExchanges
545                    exchange.addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId()));
546    
547                    try {
548                        processor.process(exchange);
549                    } catch (Throwable e) {
550                        exchange.setException(e);
551                    }
552    
553                    // log exception if there was a problem
554                    if (exchange.getException() != null) {
555                        // if there was an exception then let the exception handler handle it
556                        getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException());
557                    } else {
558                        LOG.trace("Processing aggregated exchange: {} complete.", exchange);
559                    }
560                }
561            });
562        }
563    
564        /**
565         * Restores the timeout map with timeout values from the aggregation repository.
566         * <p/>
567         * This is needed in case the aggregator has been stopped and started again (for example a server restart).
568         * Then the existing exchanges from the {@link AggregationRepository} must have their timeout conditions restored.
569         */
570        protected void restoreTimeoutMapFromAggregationRepository() throws Exception {
571            // grab the timeout value for each partly aggregated exchange
572            Set<String> keys = aggregationRepository.getKeys();
573            if (keys == null || keys.isEmpty()) {
574                return;
575            }
576    
577            StopWatch watch = new StopWatch();
578            LOG.trace("Starting restoring CompletionTimeout for {} existing exchanges from the aggregation repository...", keys.size());
579    
580            for (String key : keys) {
581                Exchange exchange = aggregationRepository.get(camelContext, key);
582                // grab the timeout value
583                long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0;
584                if (timeout > 0) {
585                    LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout);
586                    addExchangeToTimeoutMap(key, exchange, timeout);
587                }
588            }
589    
590            // log duration of this task so end user can see how long it takes to pre-check this upon starting
591            LOG.info("Restored {} CompletionTimeout conditions in the AggregationTimeoutChecker in {}",
592                    timeoutMap.size(), TimeUtils.printDuration(watch.stop()));
593        }
594    
595        /**
596         * Adds the given exchange to the timeout map, which is used by the timeout checker task to trigger timeouts.
597         *
598         * @param key      the correlation key
599         * @param exchange the exchange
600         * @param timeout  the timeout value in millis
601         */
602        private void addExchangeToTimeoutMap(String key, Exchange exchange, long timeout) {
603            // store the timeout value on the exchange as well, in case we need it later
604            exchange.setProperty(Exchange.AGGREGATED_TIMEOUT, timeout);
605            timeoutMap.put(key, exchange.getExchangeId(), timeout);
606        }
607    
608        public Predicate getCompletionPredicate() {
609            return completionPredicate;
610        }
611    
612        public void setCompletionPredicate(Predicate completionPredicate) {
613            this.completionPredicate = completionPredicate;
614        }
615    
616        public boolean isEagerCheckCompletion() {
617            return eagerCheckCompletion;
618        }
619    
620        public void setEagerCheckCompletion(boolean eagerCheckCompletion) {
621            this.eagerCheckCompletion = eagerCheckCompletion;
622        }
623    
624        public long getCompletionTimeout() {
625            return completionTimeout;
626        }
627    
628        public void setCompletionTimeout(long completionTimeout) {
629            this.completionTimeout = completionTimeout;
630        }
631    
632        public Expression getCompletionTimeoutExpression() {
633            return completionTimeoutExpression;
634        }
635    
636        public void setCompletionTimeoutExpression(Expression completionTimeoutExpression) {
637            this.completionTimeoutExpression = completionTimeoutExpression;
638        }
639    
640        public long getCompletionInterval() {
641            return completionInterval;
642        }
643    
644        public void setCompletionInterval(long completionInterval) {
645            this.completionInterval = completionInterval;
646        }
647    
648        public int getCompletionSize() {
649            return completionSize;
650        }
651    
652        public void setCompletionSize(int completionSize) {
653            this.completionSize = completionSize;
654        }
655    
656        public Expression getCompletionSizeExpression() {
657            return completionSizeExpression;
658        }
659    
660        public void setCompletionSizeExpression(Expression completionSizeExpression) {
661            this.completionSizeExpression = completionSizeExpression;
662        }
663    
664        public boolean isIgnoreInvalidCorrelationKeys() {
665            return ignoreInvalidCorrelationKeys;
666        }
667    
668        public void setIgnoreInvalidCorrelationKeys(boolean ignoreInvalidCorrelationKeys) {
669            this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys;
670        }
671    
672        public Integer getCloseCorrelationKeyOnCompletion() {
673            return closeCorrelationKeyOnCompletion;
674        }
675    
676        public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
677            this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
678        }
679    
680        public boolean isCompletionFromBatchConsumer() {
681            return completionFromBatchConsumer;
682        }
683    
684        public void setCompletionFromBatchConsumer(boolean completionFromBatchConsumer) {
685            this.completionFromBatchConsumer = completionFromBatchConsumer;
686        }
687    
688        public ExceptionHandler getExceptionHandler() {
689            return exceptionHandler;
690        }
691    
692        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
693            this.exceptionHandler = exceptionHandler;
694        }
695    
696        public boolean isParallelProcessing() {
697            return parallelProcessing;
698        }
699    
700        public void setParallelProcessing(boolean parallelProcessing) {
701            this.parallelProcessing = parallelProcessing;
702        }
703    
704        public boolean isOptimisticLocking() {
705            return optimisticLocking;
706        }
707    
708        public void setOptimisticLocking(boolean optimisticLocking) {
709            this.optimisticLocking = optimisticLocking;
710        }
711    
712        public AggregationRepository getAggregationRepository() {
713            return aggregationRepository;
714        }
715    
716        public void setAggregationRepository(AggregationRepository aggregationRepository) {
717            this.aggregationRepository = aggregationRepository;
718        }
719    
720        public boolean isDiscardOnCompletionTimeout() {
721            return discardOnCompletionTimeout;
722        }
723    
724        public void setDiscardOnCompletionTimeout(boolean discardOnCompletionTimeout) {
725            this.discardOnCompletionTimeout = discardOnCompletionTimeout;
726        }
727    
728        public void setForceCompletionOnStop(boolean forceCompletionOnStop) {
729            this.forceCompletionOnStop = forceCompletionOnStop;
730        }
731    
732        public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
733            this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
734        }
735    
736        public ScheduledExecutorService getTimeoutCheckerExecutorService() {
737            return timeoutCheckerExecutorService;
738        }
739    
740        public boolean isShutdownTimeoutCheckerExecutorService() {
741            return shutdownTimeoutCheckerExecutorService;
742        }
743    
744        public void setShutdownTimeoutCheckerExecutorService(boolean shutdownTimeoutCheckerExecutorService) {
745            this.shutdownTimeoutCheckerExecutorService = shutdownTimeoutCheckerExecutorService;
746        }
747    
748        public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) {
749            this.optimisticLockRetryPolicy = optimisticLockRetryPolicy;
750        }
751    
752        public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() {
753            return optimisticLockRetryPolicy;
754        }
755    
756        public AggregationStrategy getAggregationStrategy() {
757            return aggregationStrategy;
758        }
759    
760        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
761            this.aggregationStrategy = aggregationStrategy;
762        }
763    
764        public Expression getCorrelationExpression() {
765            return correlationExpression;
766        }
767    
768        public void setCorrelationExpression(Expression correlationExpression) {
769            this.correlationExpression = correlationExpression;
770        }
771    
772        /**
773         * On completion task which keeps the booking of the in progress up to date
774         */
775        private final class AggregateOnCompletion implements Synchronization {
776            private final String exchangeId;
777    
778            private AggregateOnCompletion(String exchangeId) {
779                // must use the original exchange id as it could potentially change if send over SEDA etc.
780                this.exchangeId = exchangeId;
781            }
782    
783            public void onFailure(Exchange exchange) {
784                LOG.trace("Aggregated exchange onFailure: {}", exchange);
785    
786                // must remember to remove in progress when we failed
787                inProgressCompleteExchanges.remove(exchangeId);
788                // do not remove redelivery state as we need it when we redeliver again later
789            }
790    
791            public void onComplete(Exchange exchange) {
792                LOG.trace("Aggregated exchange onComplete: {}", exchange);
793    
794                // only confirm if we processed without a problem
795                try {
796                    aggregationRepository.confirm(exchange.getContext(), exchangeId);
797                    // and remove redelivery state as well
798                    redeliveryState.remove(exchangeId);
799                } finally {
800                    // must remember to remove in progress when we are complete
801                    inProgressCompleteExchanges.remove(exchangeId);
802                }
803            }
804    
805            @Override
806            public String toString() {
807                return "AggregateOnCompletion";
808            }
809        }
810    
811        /**
812         * Background task that looks for aggregated exchanges which is triggered by completion timeouts.
813         */
814        private final class AggregationTimeoutMap extends DefaultTimeoutMap<String, String> {
815    
816            private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
817                // do NOT use locking on the timeout map as this aggregator has its own shared lock we will use instead
818                super(executor, requestMapPollTimeMillis, optimisticLocking);
819            }
820    
821            @Override
822            public void purge() {
823                // must acquire the shared aggregation lock to be able to purge
824                if (!optimisticLocking) { lock.lock(); }
825                try {
826                    super.purge();
827                } finally {
828                    if (!optimisticLocking) { lock.unlock(); }
829                }
830            }
831    
832            @Override
833            public boolean onEviction(String key, String exchangeId) {
834                log.debug("Completion timeout triggered for correlation key: {}", key);
835    
836                boolean inProgress = inProgressCompleteExchanges.contains(exchangeId);
837                if (inProgress) {
838                    LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
839                    return true;
840                }
841    
842                // get the aggregated exchange
843                boolean evictionStolen = false;
844                Exchange answer = aggregationRepository.get(camelContext, key);
845                if (answer == null) {
846                    evictionStolen = true;
847                } else {
848                    // indicate it was completed by timeout
849                    answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
850                    try {
851                        answer = onCompletion(key, answer, answer, true);
852                        if (answer != null) {
853                            onSubmitCompletion(key, answer);
854                        }
855                    } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
856                        evictionStolen = true;
857                    }
858                }
859    
860                if (optimisticLocking && evictionStolen) {
861                    LOG.debug("Another Camel instance has already successfully correlated or processed this timeout eviction "
862                              + "for exchange with id: {} and correlation id: {}", exchangeId, key);
863                }
864                return true;
865            }
866        }
867    
868        /**
869         * Background task that triggers completion based on interval.
870         */
871        private final class AggregationIntervalTask implements Runnable {
872    
873            public void run() {
874                // only run if CamelContext has been fully started
875                if (!camelContext.getStatus().isStarted()) {
876                    LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", camelContext.getName());
877                    return;
878                }
879    
880                LOG.trace("Starting completion interval task");
881    
882                // trigger completion for all in the repository
883                Set<String> keys = aggregationRepository.getKeys();
884    
885                if (keys != null && !keys.isEmpty()) {
886                    // must acquire the shared aggregation lock to be able to trigger interval completion
887                    if (!optimisticLocking) { lock.lock(); }
888                    try {
889                        for (String key : keys) {
890                            boolean stolenInterval = false;
891                            Exchange exchange = aggregationRepository.get(camelContext, key);
892                            if (exchange == null) {
893                                stolenInterval = true;
894                            } else {
895                                LOG.trace("Completion interval triggered for correlation key: {}", key);
896                                // indicate it was completed by interval
897                                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
898                                try {
899                                    Exchange answer = onCompletion(key, exchange, exchange, false);
900                                    if (answer != null) {
901                                        onSubmitCompletion(key, answer);
902                                    }
903                                } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
904                                    stolenInterval = true;
905                                }
906                            }
907                            if (optimisticLocking && stolenInterval) {
908                                LOG.debug("Another Camel instance has already processed this interval aggregation for exchange with correlation id: {}", key);
909                            }
910                        }
911                    } finally {
912                        if (!optimisticLocking) { lock.unlock(); }
913                    }
914                }
915    
916                LOG.trace("Completion interval task complete");
917            }
918        }
919    
920        /**
921         * Background task that looks for aggregated exchanges to recover.
922         */
923        private final class RecoverTask implements Runnable {
924            private final RecoverableAggregationRepository recoverable;
925    
926            private RecoverTask(RecoverableAggregationRepository recoverable) {
927                this.recoverable = recoverable;
928            }
929    
930            public void run() {
931                // only run if CamelContext has been fully started
932                if (!camelContext.getStatus().isStarted()) {
933                    LOG.trace("Recover check cannot start due CamelContext({}) has not been started yet", camelContext.getName());
934                    return;
935                }
936    
937                LOG.trace("Starting recover check");
938    
939                // copy the current in progress before doing scan
940                final Set<String> copyOfInProgress = new LinkedHashSet<String>(inProgressCompleteExchanges);
941    
942                Set<String> exchangeIds = recoverable.scan(camelContext);
943                for (String exchangeId : exchangeIds) {
944    
945                    // we may shutdown while doing recovery
946                    if (!isRunAllowed()) {
947                        LOG.info("We are shutting down so stop recovering");
948                        return;
949                    }
950    
951                    // consider in progress if it was in progress before we did the scan, or currently after we did the scan
952                    // its safer to consider it in progress than risk duplicates due both in progress + recovered
953                    boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId);
954                    if (inProgress) {
955                        LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
956                    } else {
957                        LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId);
958                        Exchange exchange = recoverable.recover(camelContext, exchangeId);
959                        if (exchange != null) {
960                            // get the correlation key
961                            String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
962                            // and mark it as redelivered
963                            exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
964    
965                            // get the current redelivery data
966                            RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
967    
968                            // if we are exhausted, then move to dead letter channel
969                            if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
970                                LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries()
971                                        + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri());
972    
973                                // send to DLC
974                                try {
975                                    // set redelivery counter
976                                    exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
977                                    exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
978                                    deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
979                                } catch (Throwable e) {
980                                    exchange.setException(e);
981                                }
982    
983                                // handle if failed
984                                if (exchange.getException() != null) {
985                                    getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException());
986                                } else {
987                                    // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
988                                    recoverable.confirm(camelContext, exchangeId);
989                                }
990                            } else {
991                                // update current redelivery state
992                                if (data == null) {
993                                    // create new data
994                                    data = new RedeliveryData();
995                                    redeliveryState.put(exchange.getExchangeId(), data);
996                                }
997                                data.redeliveryCounter++;
998    
999                                // set redelivery counter
1000                                exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
1001                                if (recoverable.getMaximumRedeliveries() > 0) {
1002                                    exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries());
1003                                }
1004    
1005                                LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId);
1006    
1007                                // not exhaust so resubmit the recovered exchange
1008                                onSubmitCompletion(key, exchange);
1009                            }
1010                        }
1011                    }
1012                }
1013    
1014                LOG.trace("Recover check complete");
1015            }
1016        }
1017    
1018        @Override
1019        protected void doStart() throws Exception {
1020            if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
1021                    && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null
1022                    && getCompletionSizeExpression() == null) {
1023                throw new IllegalStateException("At least one of the completions options"
1024                        + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
1025            }
1026    
1027            if (getCloseCorrelationKeyOnCompletion() != null) {
1028                if (getCloseCorrelationKeyOnCompletion() > 0) {
1029                    LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of " + getCloseCorrelationKeyOnCompletion());
1030                    closedCorrelationKeys = new LRUCache<String, String>(getCloseCorrelationKeyOnCompletion());
1031                } else {
1032                    LOG.info("Using ClosedCorrelationKeys with unbounded capacity");
1033                    closedCorrelationKeys = new ConcurrentHashMap<String, String>();
1034                }
1035            }
1036    
1037            if (aggregationRepository == null) {
1038                aggregationRepository = new MemoryAggregationRepository(optimisticLocking);
1039                LOG.info("Defaulting to MemoryAggregationRepository");
1040            }
1041    
1042            if (optimisticLocking) {
1043                if (!(aggregationRepository instanceof OptimisticLockingAggregationRepository)) {
1044                    throw new IllegalArgumentException("Optimistic locking cannot be enabled without using an AggregationRepository that implements OptimisticLockingAggregationRepository");
1045                }
1046                LOG.info("Optimistic locking is enabled");
1047            }
1048    
1049            ServiceHelper.startServices(aggregationStrategy, processor, aggregationRepository);
1050    
1051            // should we use recover checker
1052            if (aggregationRepository instanceof RecoverableAggregationRepository) {
1053                RecoverableAggregationRepository recoverable = (RecoverableAggregationRepository) aggregationRepository;
1054                if (recoverable.isUseRecovery()) {
1055                    long interval = recoverable.getRecoveryIntervalInMillis();
1056                    if (interval <= 0) {
1057                        throw new IllegalArgumentException("AggregationRepository has recovery enabled and the RecoveryInterval option must be a positive number, was: " + interval);
1058                    }
1059    
1060                    // create a background recover thread to check every interval
1061                    recoverService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateRecoverChecker", 1);
1062                    Runnable recoverTask = new RecoverTask(recoverable);
1063                    LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every " + interval + " millis.");
1064                    // use fixed delay so there is X interval between each run
1065                    recoverService.scheduleWithFixedDelay(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS);
1066    
1067                    if (recoverable.getDeadLetterUri() != null) {
1068                        int max = recoverable.getMaximumRedeliveries();
1069                        if (max <= 0) {
1070                            throw new IllegalArgumentException("Option maximumRedeliveries must be a positive number, was: " + max);
1071                        }
1072                        LOG.info("After " + max + " failed redelivery attempts Exchanges will be moved to deadLetterUri: " + recoverable.getDeadLetterUri());
1073    
1074                        // dead letter uri must be a valid endpoint
1075                        Endpoint endpoint = camelContext.getEndpoint(recoverable.getDeadLetterUri());
1076                        if (endpoint == null) {
1077                            throw new NoSuchEndpointException(recoverable.getDeadLetterUri());
1078                        }
1079                        deadLetterProducerTemplate = camelContext.createProducerTemplate();
1080                    }
1081                }
1082            }
1083    
1084            if (getCompletionInterval() > 0 && getCompletionTimeout() > 0) {
1085                throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both.");
1086            }
1087            if (getCompletionInterval() > 0) {
1088                LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis.");
1089                if (getTimeoutCheckerExecutorService() == null) {
1090                    setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
1091                    shutdownTimeoutCheckerExecutorService = true;
1092                }
1093                // trigger completion based on interval
1094                getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS);
1095            }
1096    
1097            // start timeout service if its in use
1098            if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
1099                LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout() + " millis of inactivity.");
1100                if (getTimeoutCheckerExecutorService() == null) {
1101                    setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
1102                    shutdownTimeoutCheckerExecutorService = true;
1103                }
1104                // check for timed out aggregated messages once every second
1105                timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L);
1106                // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we
1107                // need to re-establish the timeout map so timeout can trigger
1108                restoreTimeoutMapFromAggregationRepository();
1109                ServiceHelper.startService(timeoutMap);
1110            }
1111        }
1112    
1113        @Override
1114        protected void doStop() throws Exception {
1115            // note: we cannot do doForceCompletionOnStop from this doStop method
1116            // as this is handled in the prepareShutdown method which is also invoked when stopping a route
1117            // and is better suited for preparing to shutdown than this doStop method is
1118    
1119            if (recoverService != null) {
1120                camelContext.getExecutorServiceManager().shutdown(recoverService);
1121            }
1122            ServiceHelper.stopServices(timeoutMap, processor, deadLetterProducerTemplate);
1123    
1124            if (closedCorrelationKeys != null) {
1125                // it may be a service so stop it as well
1126                ServiceHelper.stopService(closedCorrelationKeys);
1127                closedCorrelationKeys.clear();
1128            }
1129            batchConsumerCorrelationKeys.clear();
1130            redeliveryState.clear();
1131        }
1132    
1133        @Override
1134        public void prepareShutdown(boolean forced) {
1135            // we are shutting down, so force completion if this option was enabled
1136            // but only do this when forced=false, as that is when we have chance to
1137            // send out new messages to be routed by Camel. When forced=true, then
1138            // we have to shutdown in a hurry
1139            if (!forced && forceCompletionOnStop) {
1140                doForceCompletionOnStop();
1141            }
1142        }
1143    
1144        private void doForceCompletionOnStop() {
1145            int expected = forceCompletionOfAllGroups();
1146    
1147            StopWatch watch = new StopWatch();
1148            while (inProgressCompleteExchanges.size() > 0) {
1149                LOG.trace("Waiting for {} inflight exchanges to complete", inProgressCompleteExchanges.size());
1150                try {
1151                    Thread.sleep(100);
1152                } catch (InterruptedException e) {
1153                    // break out as we got interrupted such as the JVM terminating
1154                    LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", inProgressCompleteExchanges.size());
1155                    break;
1156                }
1157            }
1158    
1159            if (expected > 0) {
1160                LOG.info("Forcing completion of all groups with {} exchanges completed in {}", expected, TimeUtils.printDuration(watch.stop()));
1161            }
1162        }
1163    
1164        @Override
1165        protected void doShutdown() throws Exception {
1166            // shutdown aggregation repository and the strategy
1167            ServiceHelper.stopAndShutdownServices(aggregationRepository, aggregationStrategy);
1168    
1169            // cleanup when shutting down
1170            inProgressCompleteExchanges.clear();
1171    
1172            if (shutdownExecutorService) {
1173                camelContext.getExecutorServiceManager().shutdownNow(executorService);
1174            }
1175            if (shutdownTimeoutCheckerExecutorService) {
1176                camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService);
1177                timeoutCheckerExecutorService = null;
1178            }
1179    
1180            super.doShutdown();
1181        }
1182    
1183        public int forceCompletionOfAllGroups() {
1184    
1185            // only run if CamelContext has been fully started or is stopping
1186            boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping();
1187            if (!allow) {
1188                LOG.warn("Cannot start force completion of all groups because CamelContext({}) has not been started", camelContext.getName());
1189                return 0;
1190            }
1191    
1192            LOG.trace("Starting force completion of all groups task");
1193    
1194            // trigger completion for all in the repository
1195            Set<String> keys = aggregationRepository.getKeys();
1196    
1197            int total = 0;
1198            if (keys != null && !keys.isEmpty()) {
1199                // must acquire the shared aggregation lock to be able to trigger force completion
1200                if (!optimisticLocking) { lock.lock(); }
1201                total = keys.size();
1202                try {
1203                    for (String key : keys) {
1204                        Exchange exchange = aggregationRepository.get(camelContext, key);
1205                        if (exchange != null) {
1206                            LOG.trace("Force completion triggered for correlation key: {}", key);
1207                            // indicate it was completed by a force completion request
1208                            exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
1209                            Exchange answer = onCompletion(key, exchange, exchange, false);
1210                            if (answer != null) {
1211                                onSubmitCompletion(key, answer);
1212                            }
1213                        }
1214                    }
1215                } finally {
1216                    if (!optimisticLocking) { lock.unlock(); }
1217                }
1218            }
1219            LOG.trace("Completed force completion of all groups task");
1220    
1221            if (total > 0) {
1222                LOG.debug("Forcing completion of all groups with {} exchanges", total);
1223            }
1224            return total;
1225        }
1226    
1227    }