001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.io.Closeable;
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.CompletionService;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.ConcurrentMap;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.ExecutionException;
031    import java.util.concurrent.ExecutorCompletionService;
032    import java.util.concurrent.ExecutorService;
033    import java.util.concurrent.Future;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.atomic.AtomicBoolean;
036    import java.util.concurrent.atomic.AtomicInteger;
037    
038    import org.apache.camel.AsyncCallback;
039    import org.apache.camel.AsyncProcessor;
040    import org.apache.camel.CamelContext;
041    import org.apache.camel.CamelExchangeException;
042    import org.apache.camel.Endpoint;
043    import org.apache.camel.ErrorHandlerFactory;
044    import org.apache.camel.Exchange;
045    import org.apache.camel.Navigate;
046    import org.apache.camel.Processor;
047    import org.apache.camel.Producer;
048    import org.apache.camel.Traceable;
049    import org.apache.camel.processor.aggregate.AggregationStrategy;
050    import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
051    import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
052    import org.apache.camel.spi.RouteContext;
053    import org.apache.camel.spi.TracedRouteNodes;
054    import org.apache.camel.spi.UnitOfWork;
055    import org.apache.camel.support.ServiceSupport;
056    import org.apache.camel.util.AsyncProcessorConverterHelper;
057    import org.apache.camel.util.AsyncProcessorHelper;
058    import org.apache.camel.util.CastUtils;
059    import org.apache.camel.util.EventHelper;
060    import org.apache.camel.util.ExchangeHelper;
061    import org.apache.camel.util.IOHelper;
062    import org.apache.camel.util.KeyValueHolder;
063    import org.apache.camel.util.ObjectHelper;
064    import org.apache.camel.util.ServiceHelper;
065    import org.apache.camel.util.StopWatch;
066    import org.apache.camel.util.concurrent.AtomicException;
067    import org.apache.camel.util.concurrent.AtomicExchange;
068    import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
069    import org.slf4j.Logger;
070    import org.slf4j.LoggerFactory;
071    
072    import static org.apache.camel.util.ObjectHelper.notNull;
073    
074    
075    /**
076     * Implements the Multicast pattern to send a message exchange to a number of
077     * endpoints, each endpoint receiving a copy of the message exchange.
078     *
079     * @version 
080     * @see Pipeline
081     */
082    public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable {
083    
084        private static final Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class);
085    
086        /**
087         * Class that represent each step in the multicast route to do
088         */
089        static final class DefaultProcessorExchangePair implements ProcessorExchangePair {
090            private final int index;
091            private final Processor processor;
092            private final Processor prepared;
093            private final Exchange exchange;
094    
095            private DefaultProcessorExchangePair(int index, Processor processor, Processor prepared, Exchange exchange) {
096                this.index = index;
097                this.processor = processor;
098                this.prepared = prepared;
099                this.exchange = exchange;
100            }
101    
102            public int getIndex() {
103                return index;
104            }
105    
106            public Exchange getExchange() {
107                return exchange;
108            }
109    
110            public Producer getProducer() {
111                if (processor instanceof Producer) {
112                    return (Producer) processor;
113                }
114                return null;
115            }
116    
117            public Processor getProcessor() {
118                return prepared;
119            }
120    
121            public void begin() {
122                // noop
123            }
124    
125            public void done() {
126                // noop
127            }
128    
129        }
130    
131        /**
132         * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges
133         * <p/>
134         * See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods.
135         */
136        static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> {
137    
138            public PreparedErrorHandler(RouteContext key, Processor value) {
139                super(key, value);
140            }
141    
142        }
143    
144        protected final Processor onPrepare;
145        private final CamelContext camelContext;
146        private Collection<Processor> processors;
147        private final AggregationStrategy aggregationStrategy;
148        private final boolean parallelProcessing;
149        private final boolean streaming;
150        private final boolean stopOnException;
151        private final ExecutorService executorService;
152        private final boolean shutdownExecutorService;
153        private ExecutorService aggregateExecutorService;
154        private final long timeout;
155        private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>();
156        private final boolean shareUnitOfWork;
157    
158        public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) {
159            this(camelContext, processors, null);
160        }
161    
162        public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
163            this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false);
164        }
165    
166        public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
167                                  boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
168                                  boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
169            notNull(camelContext, "camelContext");
170            this.camelContext = camelContext;
171            this.processors = processors;
172            this.aggregationStrategy = aggregationStrategy;
173            this.executorService = executorService;
174            this.shutdownExecutorService = shutdownExecutorService;
175            this.streaming = streaming;
176            this.stopOnException = stopOnException;
177            // must enable parallel if executor service is provided
178            this.parallelProcessing = parallelProcessing || executorService != null;
179            this.timeout = timeout;
180            this.onPrepare = onPrepare;
181            this.shareUnitOfWork = shareUnitOfWork;
182        }
183    
184        @Override
185        public String toString() {
186            return "Multicast[" + getProcessors() + "]";
187        }
188    
189        public String getTraceLabel() {
190            return "multicast";
191        }
192    
193        public CamelContext getCamelContext() {
194            return camelContext;
195        }
196    
197        public void process(Exchange exchange) throws Exception {
198            AsyncProcessorHelper.process(this, exchange);
199        }
200    
201        public boolean process(Exchange exchange, AsyncCallback callback) {
202            final AtomicExchange result = new AtomicExchange();
203            Iterable<ProcessorExchangePair> pairs = null;
204    
205            try {
206                boolean sync = true;
207    
208                pairs = createProcessorExchangePairs(exchange);
209    
210                if (isParallelProcessing()) {
211                    // ensure an executor is set when running in parallel
212                    ObjectHelper.notNull(executorService, "executorService", this);
213                    doProcessParallel(exchange, result, pairs, isStreaming(), callback);
214                } else {
215                    sync = doProcessSequential(exchange, result, pairs, callback);
216                }
217    
218                if (!sync) {
219                    // the remainder of the multicast will be completed async
220                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
221                    return false;
222                }
223            } catch (Throwable e) {
224                exchange.setException(e);
225                // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted
226                // and do the done work
227                doDone(exchange, null, pairs, callback, true, false);
228                return true;
229            }
230    
231            // multicasting was processed successfully
232            // and do the done work
233            Exchange subExchange = result.get() != null ? result.get() : null;
234            doDone(exchange, subExchange, pairs, callback, true, true);
235            return true;
236        }
237    
238        protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs,
239                                         final boolean streaming, final AsyncCallback callback) throws Exception {
240    
241            ObjectHelper.notNull(executorService, "ExecutorService", this);
242            ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this);
243    
244            final CompletionService<Exchange> completion;
245            if (streaming) {
246                // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence)
247                completion = new ExecutorCompletionService<Exchange>(executorService);
248            } else {
249                // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence)
250                completion = new SubmitOrderedCompletionService<Exchange>(executorService);
251            }
252    
253            final AtomicInteger total = new AtomicInteger(0);
254            final Iterator<ProcessorExchangePair> it = pairs.iterator();
255    
256            if (it.hasNext()) {
257                // when parallel then aggregate on the fly
258                final AtomicBoolean running = new AtomicBoolean(true);
259                final AtomicBoolean allTasksSubmitted = new AtomicBoolean();
260                final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1);
261                final AtomicException executionException = new AtomicException();
262    
263                // issue task to execute in separate thread so it can aggregate on-the-fly
264                // while we submit new tasks, and those tasks complete concurrently
265                // this allows us to optimize work and reduce memory consumption
266                final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running,
267                        aggregationOnTheFlyDone, allTasksSubmitted, executionException);
268                final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean();
269    
270                LOG.trace("Starting to submit parallel tasks");
271    
272                while (it.hasNext()) {
273                    final ProcessorExchangePair pair = it.next();
274                    final Exchange subExchange = pair.getExchange();
275                    updateNewExchange(subExchange, total.intValue(), pairs, it);
276    
277                    completion.submit(new Callable<Exchange>() {
278                        public Exchange call() throws Exception {
279                            // only start the aggregation task when the task is being executed to avoid staring
280                            // the aggregation task to early and pile up too many threads
281                            if (aggregationTaskSubmitted.compareAndSet(false, true)) {
282                                // but only submit the task once
283                                aggregateExecutorService.submit(aggregateOnTheFlyTask);
284                            }
285    
286                            if (!running.get()) {
287                                // do not start processing the task if we are not running
288                                return subExchange;
289                            }
290    
291                            try {
292                                doProcessParallel(pair);
293                            } catch (Throwable e) {
294                                subExchange.setException(e);
295                            }
296    
297                            // Decide whether to continue with the multicast or not; similar logic to the Pipeline
298                            Integer number = getExchangeIndex(subExchange);
299                            boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG);
300                            if (stopOnException && !continueProcessing) {
301                                // signal to stop running
302                                running.set(false);
303                                // throw caused exception
304                                if (subExchange.getException() != null) {
305                                    // wrap in exception to explain where it failed
306                                    CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException());
307                                    subExchange.setException(cause);
308                                }
309                            }
310    
311                            LOG.trace("Parallel processing complete for exchange: {}", subExchange);
312                            return subExchange;
313                        }
314                    });
315    
316                    total.incrementAndGet();
317                }
318    
319                // signal all tasks has been submitted
320                LOG.trace("Signaling that all {} tasks has been submitted.", total.get());
321                allTasksSubmitted.set(true);
322    
323                // its to hard to do parallel async routing so we let the caller thread be synchronously
324                // and have it pickup the replies and do the aggregation (eg we use a latch to wait)
325                // wait for aggregation to be done
326                LOG.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", total.get(), original.getExchangeId());
327                aggregationOnTheFlyDone.await();
328    
329                // did we fail for whatever reason, if so throw that caused exception
330                if (executionException.get() != null) {
331                    if (LOG.isDebugEnabled()) {
332                        LOG.debug("Parallel processing failed due {}", executionException.get().getMessage());
333                    }
334                    throw executionException.get();
335                }
336            }
337    
338            // no everything is okay so we are done
339            LOG.debug("Done parallel processing {} exchanges", total);
340        }
341    
342        /**
343         * Task to aggregate on-the-fly for completed tasks when using parallel processing.
344         * <p/>
345         * This ensures lower memory consumption as we do not need to keep all completed tasks in memory
346         * before we perform aggregation. Instead this separate thread will run and aggregate when new
347         * completed tasks is done.
348         * <p/>
349         * The logic is fairly complex as this implementation has to keep track how far it got, and also
350         * signal back to the <i>main</t> thread when its done, so the <i>main</t> thread can continue
351         * processing when the entire splitting is done.
352         */
353        private final class AggregateOnTheFlyTask implements Runnable {
354    
355            private final AtomicExchange result;
356            private final Exchange original;
357            private final AtomicInteger total;
358            private final CompletionService<Exchange> completion;
359            private final AtomicBoolean running;
360            private final CountDownLatch aggregationOnTheFlyDone;
361            private final AtomicBoolean allTasksSubmitted;
362            private final AtomicException executionException;
363    
364            private AggregateOnTheFlyTask(AtomicExchange result, Exchange original, AtomicInteger total,
365                                          CompletionService<Exchange> completion, AtomicBoolean running,
366                                          CountDownLatch aggregationOnTheFlyDone, AtomicBoolean allTasksSubmitted,
367                                          AtomicException executionException) {
368                this.result = result;
369                this.original = original;
370                this.total = total;
371                this.completion = completion;
372                this.running = running;
373                this.aggregationOnTheFlyDone = aggregationOnTheFlyDone;
374                this.allTasksSubmitted = allTasksSubmitted;
375                this.executionException = executionException;
376            }
377    
378            public void run() {
379                LOG.trace("Aggregate on the fly task started for exchangeId: {}", original.getExchangeId());
380    
381                try {
382                    aggregateOnTheFly();
383                } catch (Throwable e) {
384                    if (e instanceof Exception) {
385                        executionException.set((Exception) e);
386                    } else {
387                        executionException.set(ObjectHelper.wrapRuntimeCamelException(e));
388                    }
389                } finally {
390                    // must signal we are done so the latch can open and let the other thread continue processing
391                    LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId());
392                    LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId());
393                    aggregationOnTheFlyDone.countDown();
394                }
395            }
396    
397            private void aggregateOnTheFly() throws InterruptedException, ExecutionException {
398                boolean timedOut = false;
399                boolean stoppedOnException = false;
400                final StopWatch watch = new StopWatch();
401                int aggregated = 0;
402                boolean done = false;
403                // not a for loop as on the fly may still run
404                while (!done) {
405                    // check if we have already aggregate everything
406                    if (allTasksSubmitted.get() && aggregated >= total.get()) {
407                        LOG.debug("Done aggregating {} exchanges on the fly.", aggregated);
408                        break;
409                    }
410    
411                    Future<Exchange> future;
412                    if (timedOut) {
413                        // we are timed out but try to grab if some tasks has been completed
414                        // poll will return null if no tasks is present
415                        future = completion.poll();
416                        LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future);
417                    } else if (timeout > 0) {
418                        long left = timeout - watch.taken();
419                        if (left < 0) {
420                            left = 0;
421                        }
422                        LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left);
423                        future = completion.poll(left, TimeUnit.MILLISECONDS);
424                    } else {
425                        LOG.trace("Polling completion task #{}", aggregated);
426                        // we must not block so poll every second
427                        future = completion.poll(1, TimeUnit.SECONDS);
428                        if (future == null) {
429                            // and continue loop which will recheck if we are done
430                            continue;
431                        }
432                    }
433    
434                    if (future == null && timedOut) {
435                        // we are timed out and no more tasks complete so break out
436                        break;
437                    } else if (future == null) {
438                        // timeout occurred
439                        AggregationStrategy strategy = getAggregationStrategy(null);
440                        if (strategy instanceof TimeoutAwareAggregationStrategy) {
441                            // notify the strategy we timed out
442                            Exchange oldExchange = result.get();
443                            if (oldExchange == null) {
444                                // if they all timed out the result may not have been set yet, so use the original exchange
445                                oldExchange = original;
446                            }
447                            ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated, total.intValue(), timeout);
448                        } else {
449                            // log a WARN we timed out since it will not be aggregated and the Exchange will be lost
450                            LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated);
451                        }
452                        LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated);
453                        timedOut = true;
454    
455                        // mark that index as timed out, which allows us to try to retrieve
456                        // any already completed tasks in the next loop
457                        if (completion instanceof SubmitOrderedCompletionService) {
458                            ((SubmitOrderedCompletionService<?>) completion).timeoutTask();
459                        }
460                    } else {
461                        // there is a result to aggregate
462                        Exchange subExchange = future.get();
463    
464                        // Decide whether to continue with the multicast or not; similar logic to the Pipeline
465                        Integer number = getExchangeIndex(subExchange);
466                        boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG);
467                        if (stopOnException && !continueProcessing) {
468                            // we want to stop on exception and an exception or failure occurred
469                            // this is similar to what the pipeline does, so we should do the same to not surprise end users
470                            // so we should set the failed exchange as the result and break out
471                            result.set(subExchange);
472                            stoppedOnException = true;
473                            break;
474                        }
475    
476                        // we got a result so aggregate it
477                        AggregationStrategy strategy = getAggregationStrategy(subExchange);
478                        doAggregate(strategy, result, subExchange);
479                    }
480    
481                    aggregated++;
482                }
483    
484                if (timedOut || stoppedOnException) {
485                    if (timedOut) {
486                        LOG.debug("Cancelling tasks due timeout after {} millis.", timeout);
487                    }
488                    if (stoppedOnException) {
489                        LOG.debug("Cancelling tasks due stopOnException.");
490                    }
491                    // cancel tasks as we timed out (its safe to cancel done tasks)
492                    running.set(false);
493                }
494            }
495        }
496    
497        protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception {
498            AtomicInteger total = new AtomicInteger();
499            Iterator<ProcessorExchangePair> it = pairs.iterator();
500    
501            while (it.hasNext()) {
502                ProcessorExchangePair pair = it.next();
503                Exchange subExchange = pair.getExchange();
504                updateNewExchange(subExchange, total.get(), pairs, it);
505    
506                boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
507                if (!sync) {
508                    if (LOG.isTraceEnabled()) {
509                        LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId());
510                    }
511                    // the remainder of the multicast will be completed async
512                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
513                    return false;
514                }
515    
516                if (LOG.isTraceEnabled()) {
517                    LOG.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId());
518                }
519    
520                // Decide whether to continue with the multicast or not; similar logic to the Pipeline
521                // remember to test for stop on exception and aggregate before copying back results
522                boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
523                if (stopOnException && !continueProcessing) {
524                    if (subExchange.getException() != null) {
525                        // wrap in exception to explain where it failed
526                        CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException());
527                        subExchange.setException(cause);
528                    }
529                    // we want to stop on exception, and the exception was handled by the error handler
530                    // this is similar to what the pipeline does, so we should do the same to not surprise end users
531                    // so we should set the failed exchange as the result and be done
532                    result.set(subExchange);
533                    return true;
534                }
535    
536                LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange);
537    
538                doAggregate(getAggregationStrategy(subExchange), result, subExchange);
539                total.incrementAndGet();
540            }
541    
542            LOG.debug("Done sequential processing {} exchanges", total);
543    
544            return true;
545        }
546    
547        private boolean doProcessSequential(final Exchange original, final AtomicExchange result,
548                                            final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it,
549                                            final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) {
550            boolean sync = true;
551    
552            final Exchange exchange = pair.getExchange();
553            Processor processor = pair.getProcessor();
554            final Producer producer = pair.getProducer();
555    
556            TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
557    
558            // compute time taken if sending to another endpoint
559            final StopWatch watch = producer != null ? new StopWatch() : null;
560    
561            try {
562                // prepare tracing starting from a new block
563                if (traced != null) {
564                    traced.pushBlock();
565                }
566    
567                if (producer != null) {
568                    EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
569                }
570                // let the prepared process it, remember to begin the exchange pair
571                AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
572                pair.begin();
573                sync = async.process(exchange, new AsyncCallback() {
574                    public void done(boolean doneSync) {
575                        // we are done with the exchange pair
576                        pair.done();
577    
578                        // okay we are done, so notify the exchange was sent
579                        if (producer != null) {
580                            long timeTaken = watch.stop();
581                            Endpoint endpoint = producer.getEndpoint();
582                            // emit event that the exchange was sent to the endpoint
583                            EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
584                        }
585    
586                        // we only have to handle async completion of the routing slip
587                        if (doneSync) {
588                            return;
589                        }
590    
591                        // continue processing the multicast asynchronously
592                        Exchange subExchange = exchange;
593    
594                        // Decide whether to continue with the multicast or not; similar logic to the Pipeline
595                        // remember to test for stop on exception and aggregate before copying back results
596                        boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
597                        if (stopOnException && !continueProcessing) {
598                            if (subExchange.getException() != null) {
599                                // wrap in exception to explain where it failed
600                                subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
601                            } else {
602                                // we want to stop on exception, and the exception was handled by the error handler
603                                // this is similar to what the pipeline does, so we should do the same to not surprise end users
604                                // so we should set the failed exchange as the result and be done
605                                result.set(subExchange);
606                            }
607                            // and do the done work
608                            doDone(original, subExchange, pairs, callback, false, true);
609                            return;
610                        }
611    
612                        try {
613                            doAggregate(getAggregationStrategy(subExchange), result, subExchange);
614                        } catch (Throwable e) {
615                            // wrap in exception to explain where it failed
616                            subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
617                            // and do the done work
618                            doDone(original, subExchange, pairs, callback, false, true);
619                            return;
620                        }
621    
622                        total.incrementAndGet();
623    
624                        // maybe there are more processors to multicast
625                        while (it.hasNext()) {
626    
627                            // prepare and run the next
628                            ProcessorExchangePair pair = it.next();
629                            subExchange = pair.getExchange();
630                            updateNewExchange(subExchange, total.get(), pairs, it);
631                            boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
632    
633                            if (!sync) {
634                                LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
635                                return;
636                            }
637    
638                            // Decide whether to continue with the multicast or not; similar logic to the Pipeline
639                            // remember to test for stop on exception and aggregate before copying back results
640                            continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
641                            if (stopOnException && !continueProcessing) {
642                                if (subExchange.getException() != null) {
643                                    // wrap in exception to explain where it failed
644                                    subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
645                                } else {
646                                    // we want to stop on exception, and the exception was handled by the error handler
647                                    // this is similar to what the pipeline does, so we should do the same to not surprise end users
648                                    // so we should set the failed exchange as the result and be done
649                                    result.set(subExchange);
650                                }
651                                // and do the done work
652                                doDone(original, subExchange, pairs, callback, false, true);
653                                return;
654                            }
655    
656                            // must catch any exceptions from aggregation
657                            try {
658                                doAggregate(getAggregationStrategy(subExchange), result, subExchange);
659                            } catch (Throwable e) {
660                                // wrap in exception to explain where it failed
661                                subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
662                                // and do the done work
663                                doDone(original, subExchange, pairs, callback, false, true);
664                                return;
665                            }
666    
667                            total.incrementAndGet();
668                        }
669    
670                        // do the done work
671                        subExchange = result.get() != null ? result.get() : null;
672                        doDone(original, subExchange, pairs, callback, false, true);
673                    }
674                });
675            } finally {
676                // pop the block so by next round we have the same staring point and thus the tracing looks accurate
677                if (traced != null) {
678                    traced.popBlock();
679                }
680            }
681    
682            return sync;
683        }
684    
685        private void doProcessParallel(final ProcessorExchangePair pair) throws Exception {
686            final Exchange exchange = pair.getExchange();
687            Processor processor = pair.getProcessor();
688            Producer producer = pair.getProducer();
689    
690            TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
691    
692            // compute time taken if sending to another endpoint
693            StopWatch watch = null;
694            if (producer != null) {
695                watch = new StopWatch();
696            }
697    
698            try {
699                // prepare tracing starting from a new block
700                if (traced != null) {
701                    traced.pushBlock();
702                }
703    
704                if (producer != null) {
705                    EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
706                }
707                // let the prepared process it, remember to begin the exchange pair
708                AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
709                pair.begin();
710                // we invoke it synchronously as parallel async routing is too hard
711                AsyncProcessorHelper.process(async, exchange);
712            } finally {
713                pair.done();
714                // pop the block so by next round we have the same staring point and thus the tracing looks accurate
715                if (traced != null) {
716                    traced.popBlock();
717                }
718                if (producer != null) {
719                    long timeTaken = watch.stop();
720                    Endpoint endpoint = producer.getEndpoint();
721                    // emit event that the exchange was sent to the endpoint
722                    // this is okay to do here in the finally block, as the processing is not using the async routing engine
723                    //( we invoke it synchronously as parallel async routing is too hard)
724                    EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
725                }
726            }
727        }
728    
729        /**
730         * Common work which must be done when we are done multicasting.
731         * <p/>
732         * This logic applies for both running synchronous and asynchronous as there are multiple exist points
733         * when using the asynchronous routing engine. And therefore we want the logic in one method instead
734         * of being scattered.
735         *
736         * @param original     the original exchange
737         * @param subExchange  the current sub exchange, can be <tt>null</tt> for the synchronous part
738         * @param pairs        the pairs with the exchanges to process
739         * @param callback     the callback
740         * @param doneSync     the <tt>doneSync</tt> parameter to call on callback
741         * @param forceExhaust whether or not error handling is exhausted
742         */
743        protected void doDone(Exchange original, Exchange subExchange, final Iterable<ProcessorExchangePair> pairs,
744                              AsyncCallback callback, boolean doneSync, boolean forceExhaust) {
745    
746            // we are done so close the pairs iterator
747            if (pairs != null && pairs instanceof Closeable) {
748                IOHelper.close((Closeable) pairs, "pairs", LOG);
749            }
750    
751            AggregationStrategy strategy = getAggregationStrategy(subExchange);
752            // invoke the on completion callback
753            if (strategy instanceof CompletionAwareAggregationStrategy) {
754                ((CompletionAwareAggregationStrategy) strategy).onCompletion(subExchange);
755            }
756    
757            // cleanup any per exchange aggregation strategy
758            removeAggregationStrategyFromExchange(original);
759    
760            // we need to know if there was an exception, and if the stopOnException option was enabled
761            // also we would need to know if any error handler has attempted redelivery and exhausted
762            boolean stoppedOnException = false;
763            boolean exception = false;
764            boolean exhaust = forceExhaust || subExchange != null && (subExchange.getException() != null || ExchangeHelper.isRedeliveryExhausted(subExchange));
765            if (original.getException() != null || subExchange != null && subExchange.getException() != null) {
766                // there was an exception and we stopped
767                stoppedOnException = isStopOnException();
768                exception = true;
769            }
770    
771            // must copy results at this point
772            if (subExchange != null) {
773                if (stoppedOnException) {
774                    // if we stopped due an exception then only propagte the exception
775                    original.setException(subExchange.getException());
776                } else {
777                    // copy the current result to original so it will contain this result of this eip
778                    ExchangeHelper.copyResults(original, subExchange);
779                }
780            }
781    
782            // .. and then if there was an exception we need to configure the redelivery exhaust
783            // for example the noErrorHandler will not cause redelivery exhaust so if this error
784            // handled has been in use, then the exhaust would be false (if not forced)
785            if (exception) {
786                // multicast uses error handling on its output processors and they have tried to redeliver
787                // so we shall signal back to the other error handlers that we are exhausted and they should not
788                // also try to redeliver as we will then do that twice
789                original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
790            }
791    
792            callback.done(doneSync);
793        }
794    
795        /**
796         * Aggregate the {@link Exchange} with the current result
797         *
798         * @param strategy the aggregation strategy to use
799         * @param result   the current result
800         * @param exchange the exchange to be added to the result
801         */
802        protected synchronized void doAggregate(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) {
803            if (strategy != null) {
804                // prepare the exchanges for aggregation
805                Exchange oldExchange = result.get();
806                ExchangeHelper.prepareAggregation(oldExchange, exchange);
807                result.set(strategy.aggregate(oldExchange, exchange));
808            }
809        }
810    
811        protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
812                                         Iterator<ProcessorExchangePair> it) {
813            exchange.setProperty(Exchange.MULTICAST_INDEX, index);
814            if (it.hasNext()) {
815                exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE);
816            } else {
817                exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE);
818            }
819        }
820    
821        protected Integer getExchangeIndex(Exchange exchange) {
822            return exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class);
823        }
824    
825        protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
826            List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
827    
828            int index = 0;
829            for (Processor processor : processors) {
830                // copy exchange, and do not share the unit of work
831                Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
832    
833                // if we share unit of work, we need to prepare the child exchange
834                if (isShareUnitOfWork()) {
835                    prepareSharedUnitOfWork(copy, exchange);
836                }
837    
838                // and add the pair
839                RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
840                result.add(createProcessorExchangePair(index++, processor, copy, routeContext));
841            }
842    
843            if (exchange.getException() != null) {
844                // force any exceptions occurred during creation of exchange paris to be thrown
845                // before returning the answer;
846                throw exchange.getException();
847            }
848    
849            return result;
850        }
851    
852        /**
853         * Creates the {@link ProcessorExchangePair} which holds the processor and exchange to be send out.
854         * <p/>
855         * You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they
856         * need to be specially prepared before use.
857         *
858         * @param index        the index
859         * @param processor    the processor
860         * @param exchange     the exchange
861         * @param routeContext the route context
862         * @return prepared for use
863         */
864        protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange,
865                                                                    RouteContext routeContext) {
866            Processor prepared = processor;
867    
868            // set property which endpoint we send to
869            setToEndpoint(exchange, prepared);
870    
871            // rework error handling to support fine grained error handling
872            prepared = createErrorHandler(routeContext, exchange, prepared);
873    
874            // invoke on prepare on the exchange if specified
875            if (onPrepare != null) {
876                try {
877                    onPrepare.process(exchange);
878                } catch (Exception e) {
879                    exchange.setException(e);
880                }
881            }
882            return new DefaultProcessorExchangePair(index, processor, prepared, exchange);
883        }
884    
885        protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) {
886            Processor answer;
887    
888            boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class);
889    
890            // do not wrap in error handler if we are inside a try block
891            if (!tryBlock && routeContext != null) {
892                // wrap the producer in error handler so we have fine grained error handling on
893                // the output side instead of the input side
894                // this is needed to support redelivery on that output alone and not doing redelivery
895                // for the entire multicast block again which will start from scratch again
896    
897                // create key for cache
898                final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor);
899    
900                // lookup cached first to reuse and preserve memory
901                answer = errorHandlers.get(key);
902                if (answer != null) {
903                    LOG.trace("Using existing error handler for: {}", processor);
904                    return answer;
905                }
906    
907                LOG.trace("Creating error handler for: {}", processor);
908                ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
909                // create error handler (create error handler directly to keep it light weight,
910                // instead of using ProcessorDefinition.wrapInErrorHandler)
911                try {
912                    processor = builder.createErrorHandler(routeContext, processor);
913    
914                    // and wrap in unit of work processor so the copy exchange also can run under UoW
915                    answer = createUnitOfWorkProcessor(routeContext, processor, exchange);
916    
917                    boolean child = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null;
918    
919                    // must start the error handler
920                    ServiceHelper.startServices(answer);
921    
922                    // here we don't cache the child unit of work
923                    if (!child) {
924                        // add to cache
925                        errorHandlers.putIfAbsent(key, answer);
926                    }
927    
928                } catch (Exception e) {
929                    throw ObjectHelper.wrapRuntimeCamelException(e);
930                }
931            } else {
932                // and wrap in unit of work processor so the copy exchange also can run under UoW
933                answer = createUnitOfWorkProcessor(routeContext, processor, exchange);
934            }
935    
936            return answer;
937        }
938    
939        /**
940         * Strategy to create the unit of work to be used for the sub route
941         *
942         * @param routeContext the route context
943         * @param processor    the processor
944         * @param exchange     the exchange
945         * @return the unit of work processor
946         */
947        protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) {
948            String routeId = routeContext != null ? routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()) : null;
949            CamelInternalProcessor internal = new CamelInternalProcessor(processor);
950    
951            // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
952            UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
953            if (parent != null) {
954                internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeId, parent));
955            } else {
956                internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId));
957            }
958    
959            // and then in route context so we can keep track which route this is at runtime
960            if (routeContext != null) {
961                internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext));
962            }
963            return internal;
964        }
965    
966        /**
967         * Prepares the exchange for participating in a shared unit of work
968         * <p/>
969         * This ensures a child exchange can access its parent {@link UnitOfWork} when it participate
970         * in a shared unit of work.
971         *
972         * @param childExchange  the child exchange
973         * @param parentExchange the parent exchange
974         */
975        protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange parentExchange) {
976            childExchange.setProperty(Exchange.PARENT_UNIT_OF_WORK, parentExchange.getUnitOfWork());
977        }
978    
979        protected void doStart() throws Exception {
980            if (isParallelProcessing() && executorService == null) {
981                throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set");
982            }
983            if (timeout > 0 && !isParallelProcessing()) {
984                throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled");
985            }
986            if (isParallelProcessing() && aggregateExecutorService == null) {
987                // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread
988                // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run
989                // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing
990                String name = getClass().getSimpleName() + "-AggregateTask";
991                aggregateExecutorService = createAggregateExecutorService(name);
992            }
993            ServiceHelper.startServices(aggregationStrategy, processors);
994        }
995    
996        /**
997         * Strategy to create the thread pool for the aggregator background task which waits for and aggregates
998         * completed tasks when running in parallel mode.
999         *
1000         * @param name  the suggested name for the background thread
1001         * @return the thread pool
1002         */
1003        protected synchronized ExecutorService createAggregateExecutorService(String name) {
1004            // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in
1005            return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
1006        }
1007    
1008        @Override
1009        protected void doStop() throws Exception {
1010            ServiceHelper.stopServices(processors, errorHandlers, aggregationStrategy);
1011        }
1012    
1013        @Override
1014        protected void doShutdown() throws Exception {
1015            ServiceHelper.stopAndShutdownServices(processors, errorHandlers, aggregationStrategy);
1016            // only clear error handlers when shutting down
1017            errorHandlers.clear();
1018    
1019            if (shutdownExecutorService && executorService != null) {
1020                getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
1021            }
1022            if (aggregateExecutorService != null) {
1023                getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService);
1024            }
1025        }
1026    
1027        protected static void setToEndpoint(Exchange exchange, Processor processor) {
1028            if (processor instanceof Producer) {
1029                Producer producer = (Producer) processor;
1030                exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
1031            }
1032        }
1033    
1034        protected AggregationStrategy getAggregationStrategy(Exchange exchange) {
1035            AggregationStrategy answer = null;
1036    
1037            // prefer to use per Exchange aggregation strategy over a global strategy
1038            if (exchange != null) {
1039                Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
1040                Map<Object, AggregationStrategy> map = CastUtils.cast(property);
1041                if (map != null) {
1042                    answer = map.get(this);
1043                }
1044            }
1045            if (answer == null) {
1046                // fallback to global strategy
1047                answer = getAggregationStrategy();
1048            }
1049            return answer;
1050        }
1051    
1052        /**
1053         * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}.
1054         *
1055         * @param exchange            the exchange
1056         * @param aggregationStrategy the strategy
1057         */
1058        protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) {
1059            Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
1060            Map<Object, AggregationStrategy> map = CastUtils.cast(property);
1061            if (map == null) {
1062                map = new ConcurrentHashMap<Object, AggregationStrategy>();
1063            } else {
1064                // it is not safe to use the map directly as the exchange doesn't have the deep copy of it's properties
1065                // we just create a new copy if we need to change the map
1066                map = new ConcurrentHashMap<Object, AggregationStrategy>(map);
1067            }
1068            // store the strategy using this processor as the key
1069            // (so we can store multiple strategies on the same exchange)
1070            map.put(this, aggregationStrategy);
1071            exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map);
1072        }
1073    
1074        /**
1075         * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange}
1076         * which must be done after use.
1077         *
1078         * @param exchange the current exchange
1079         */
1080        protected void removeAggregationStrategyFromExchange(Exchange exchange) {
1081            Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
1082            Map<Object, AggregationStrategy> map = CastUtils.cast(property);
1083            if (map == null) {
1084                return;
1085            }
1086            // remove the strategy using this processor as the key
1087            map.remove(this);
1088        }
1089    
1090        /**
1091         * Is the multicast processor working in streaming mode?
1092         * <p/>
1093         * In streaming mode:
1094         * <ul>
1095         * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li>
1096         * <li>for parallel processing, we start aggregating responses as they get send back to the processor;
1097         * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li>
1098         * </ul>
1099         */
1100        public boolean isStreaming() {
1101            return streaming;
1102        }
1103    
1104        /**
1105         * Should the multicast processor stop processing further exchanges in case of an exception occurred?
1106         */
1107        public boolean isStopOnException() {
1108            return stopOnException;
1109        }
1110    
1111        /**
1112         * Returns the producers to multicast to
1113         */
1114        public Collection<Processor> getProcessors() {
1115            return processors;
1116        }
1117    
1118        /**
1119         * An optional timeout in millis when using parallel processing
1120         */
1121        public long getTimeout() {
1122            return timeout;
1123        }
1124    
1125        /**
1126         * Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead.
1127         */
1128        public AggregationStrategy getAggregationStrategy() {
1129            return aggregationStrategy;
1130        }
1131    
1132        public boolean isParallelProcessing() {
1133            return parallelProcessing;
1134        }
1135    
1136        public boolean isShareUnitOfWork() {
1137            return shareUnitOfWork;
1138        }
1139    
1140        public List<Processor> next() {
1141            if (!hasNext()) {
1142                return null;
1143            }
1144            return new ArrayList<Processor>(processors);
1145        }
1146    
1147        public boolean hasNext() {
1148            return processors != null && !processors.isEmpty();
1149        }
1150    }