/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.camel.processor; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.CamelExchangeException; import org.apache.camel.Endpoint; import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Exchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.Traceable; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.TracedRouteNodes; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.CastUtils; import org.apache.camel.util.EventHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.KeyValueHolder; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.StopWatch; import org.apache.camel.util.concurrent.AtomicException; import org.apache.camel.util.concurrent.AtomicExchange; import org.apache.camel.util.concurrent.SubmitOrderedCompletionService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.camel.util.ObjectHelper.notNull; /** * Implements the Multicast pattern to send a message exchange to a number of * endpoints, each endpoint receiving a copy of the message exchange. * * @version * @see Pipeline */ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate, Traceable { private static final transient Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class); /** * Class that represent each step in the multicast route to do */ static final class DefaultProcessorExchangePair implements ProcessorExchangePair { private final int index; private final Processor processor; private final Processor prepared; private final Exchange exchange; private DefaultProcessorExchangePair(int index, Processor processor, Processor prepared, Exchange exchange) { this.index = index; this.processor = processor; this.prepared = prepared; this.exchange = exchange; } public int getIndex() { return index; } public Exchange getExchange() { return exchange; } public Producer getProducer() { if (processor instanceof Producer) { return (Producer) processor; } return null; } public Processor getProcessor() { return prepared; } public void begin() { // noop } public void done() { // noop } } /** * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges *

* See the createProcessorExchangePair and createErrorHandler methods. */ static final class PreparedErrorHandler extends KeyValueHolder { public PreparedErrorHandler(RouteContext key, Processor value) { super(key, value); } } protected final Processor onPrepare; private final CamelContext camelContext; private Collection processors; private final AggregationStrategy aggregationStrategy; private final boolean parallelProcessing; private final boolean streaming; private final boolean stopOnException; private final ExecutorService executorService; private final boolean shutdownExecutorService; private ExecutorService aggregateExecutorService; private final long timeout; private final ConcurrentMap errorHandlers = new ConcurrentHashMap(); private final boolean shareUnitOfWork; public MulticastProcessor(CamelContext camelContext, Collection processors) { this(camelContext, processors, null); } public MulticastProcessor(CamelContext camelContext, Collection processors, AggregationStrategy aggregationStrategy) { this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false); } public MulticastProcessor(CamelContext camelContext, Collection processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { notNull(camelContext, "camelContext"); this.camelContext = camelContext; this.processors = processors; this.aggregationStrategy = aggregationStrategy; this.executorService = executorService; this.shutdownExecutorService = shutdownExecutorService; this.streaming = streaming; this.stopOnException = stopOnException; // must enable parallel if executor service is provided this.parallelProcessing = parallelProcessing || executorService != null; this.timeout = timeout; this.onPrepare = onPrepare; this.shareUnitOfWork = shareUnitOfWork; } @Override public String toString() { return "Multicast[" + getProcessors() + "]"; } public String getTraceLabel() { return "multicast"; } public CamelContext getCamelContext() { return camelContext; } public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } public boolean process(Exchange exchange, AsyncCallback callback) { final AtomicExchange result = new AtomicExchange(); final Iterable pairs; try { boolean sync = true; pairs = createProcessorExchangePairs(exchange); if (isParallelProcessing()) { // ensure an executor is set when running in parallel ObjectHelper.notNull(executorService, "executorService", this); doProcessParallel(exchange, result, pairs, isStreaming(), callback); } else { sync = doProcessSequential(exchange, result, pairs, callback); } if (!sync) { // the remainder of the multicast will be completed async // so we break out now, then the callback will be invoked which then continue routing from where we left here return false; } } catch (Throwable e) { exchange.setException(e); // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted // and do the done work doDone(exchange, null, callback, true, false); return true; } // multicasting was processed successfully // and do the done work Exchange subExchange = result.get() != null ? result.get() : null; doDone(exchange, subExchange, callback, true, true); return true; } protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable pairs, final boolean streaming, final AsyncCallback callback) throws Exception { ObjectHelper.notNull(executorService, "ExecutorService", this); ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this); final CompletionService completion; if (streaming) { // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence) completion = new ExecutorCompletionService(executorService); } else { // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence) completion = new SubmitOrderedCompletionService(executorService); } final AtomicInteger total = new AtomicInteger(0); final Iterator it = pairs.iterator(); if (it.hasNext()) { // when parallel then aggregate on the fly final AtomicBoolean running = new AtomicBoolean(true); final AtomicBoolean allTasksSubmitted = new AtomicBoolean(); final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1); final AtomicException executionException = new AtomicException(); // issue task to execute in separate thread so it can aggregate on-the-fly // while we submit new tasks, and those tasks complete concurrently // this allows us to optimize work and reduce memory consumption final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running, aggregationOnTheFlyDone, allTasksSubmitted, executionException); final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean(); LOG.trace("Starting to submit parallel tasks"); while (it.hasNext()) { final ProcessorExchangePair pair = it.next(); final Exchange subExchange = pair.getExchange(); updateNewExchange(subExchange, total.intValue(), pairs, it); completion.submit(new Callable() { public Exchange call() throws Exception { // only start the aggregation task when the task is being executed to avoid staring // the aggregation task to early and pile up too many threads if (aggregationTaskSubmitted.compareAndSet(false, true)) { // but only submit the task once aggregateExecutorService.submit(aggregateOnTheFlyTask); } if (!running.get()) { // do not start processing the task if we are not running return subExchange; } try { doProcessParallel(pair); } catch (Throwable e) { subExchange.setException(e); } // Decide whether to continue with the multicast or not; similar logic to the Pipeline Integer number = getExchangeIndex(subExchange); boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); if (stopOnException && !continueProcessing) { // signal to stop running running.set(false); // throw caused exception if (subExchange.getException() != null) { // wrap in exception to explain where it failed CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException()); subExchange.setException(cause); } } LOG.trace("Parallel processing complete for exchange: {}", subExchange); return subExchange; } }); total.incrementAndGet(); } // signal all tasks has been submitted LOG.trace("Signaling that all {} tasks has been submitted.", total.get()); allTasksSubmitted.set(true); // its to hard to do parallel async routing so we let the caller thread be synchronously // and have it pickup the replies and do the aggregation (eg we use a latch to wait) // wait for aggregation to be done LOG.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", total.get(), original.getExchangeId()); aggregationOnTheFlyDone.await(); // did we fail for whatever reason, if so throw that caused exception if (executionException.get() != null) { if (LOG.isDebugEnabled()) { LOG.debug("Parallel processing failed due {}", executionException.get().getMessage()); } throw executionException.get(); } } // no everything is okay so we are done LOG.debug("Done parallel processing {} exchanges", total); } /** * Task to aggregate on-the-fly for completed tasks when using parallel processing. *

* This ensures lower memory consumption as we do not need to keep all completed tasks in memory * before we perform aggregation. Instead this separate thread will run and aggregate when new * completed tasks is done. *

* The logic is fairly complex as this implementation has to keep track how far it got, and also * signal back to the main thread when its done, so the main thread can continue * processing when the entire splitting is done. */ private final class AggregateOnTheFlyTask implements Runnable { private final AtomicExchange result; private final Exchange original; private final AtomicInteger total; private final CompletionService completion; private final AtomicBoolean running; private final CountDownLatch aggregationOnTheFlyDone; private final AtomicBoolean allTasksSubmitted; private final AtomicException executionException; private AggregateOnTheFlyTask(AtomicExchange result, Exchange original, AtomicInteger total, CompletionService completion, AtomicBoolean running, CountDownLatch aggregationOnTheFlyDone, AtomicBoolean allTasksSubmitted, AtomicException executionException) { this.result = result; this.original = original; this.total = total; this.completion = completion; this.running = running; this.aggregationOnTheFlyDone = aggregationOnTheFlyDone; this.allTasksSubmitted = allTasksSubmitted; this.executionException = executionException; } public void run() { LOG.trace("Aggregate on the fly task started for exchangeId: {}", original.getExchangeId()); try { aggregateOnTheFly(); } catch (Throwable e) { if (e instanceof Exception) { executionException.set((Exception) e); } else { executionException.set(ObjectHelper.wrapRuntimeCamelException(e)); } } finally { // must signal we are done so the latch can open and let the other thread continue processing LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId()); LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId()); aggregationOnTheFlyDone.countDown(); } } private void aggregateOnTheFly() throws InterruptedException, ExecutionException { boolean timedOut = false; boolean stoppedOnException = false; final StopWatch watch = new StopWatch(); int aggregated = 0; boolean done = false; // not a for loop as on the fly may still run while (!done) { // check if we have already aggregate everything if (allTasksSubmitted.get() && aggregated >= total.get()) { LOG.debug("Done aggregating {} exchanges on the fly.", aggregated); break; } Future future; if (timedOut) { // we are timed out but try to grab if some tasks has been completed // poll will return null if no tasks is present future = completion.poll(); LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future); } else if (timeout > 0) { long left = timeout - watch.taken(); if (left < 0) { left = 0; } LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left); future = completion.poll(left, TimeUnit.MILLISECONDS); } else { LOG.trace("Polling completion task #{}", aggregated); // we must not block so poll every second future = completion.poll(1, TimeUnit.SECONDS); if (future == null) { // and continue loop which will recheck if we are done continue; } } if (future == null && timedOut) { // we are timed out and no more tasks complete so break out break; } else if (future == null) { // timeout occurred AggregationStrategy strategy = getAggregationStrategy(null); if (strategy instanceof TimeoutAwareAggregationStrategy) { // notify the strategy we timed out Exchange oldExchange = result.get(); if (oldExchange == null) { // if they all timed out the result may not have been set yet, so use the original exchange oldExchange = original; } ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated, total.intValue(), timeout); } else { // log a WARN we timed out since it will not be aggregated and the Exchange will be lost LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated); } LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated); timedOut = true; // mark that index as timed out, which allows us to try to retrieve // any already completed tasks in the next loop if (completion instanceof SubmitOrderedCompletionService) { ((SubmitOrderedCompletionService) completion).timeoutTask(); } } else { // there is a result to aggregate Exchange subExchange = future.get(); // Decide whether to continue with the multicast or not; similar logic to the Pipeline Integer number = getExchangeIndex(subExchange); boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); if (stopOnException && !continueProcessing) { // we want to stop on exception and an exception or failure occurred // this is similar to what the pipeline does, so we should do the same to not surprise end users // so we should set the failed exchange as the result and break out result.set(subExchange); stoppedOnException = true; break; } // we got a result so aggregate it AggregationStrategy strategy = getAggregationStrategy(subExchange); doAggregate(strategy, result, subExchange); } aggregated++; } if (timedOut || stoppedOnException) { if (timedOut) { LOG.debug("Cancelling tasks due timeout after {} millis.", timeout); } if (stoppedOnException) { LOG.debug("Cancelling tasks due stopOnException."); } // cancel tasks as we timed out (its safe to cancel done tasks) running.set(false); } } } protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable pairs, AsyncCallback callback) throws Exception { AtomicInteger total = new AtomicInteger(); Iterator it = pairs.iterator(); while (it.hasNext()) { ProcessorExchangePair pair = it.next(); Exchange subExchange = pair.getExchange(); updateNewExchange(subExchange, total.get(), pairs, it); boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total); if (!sync) { if (LOG.isTraceEnabled()) { LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId()); } // the remainder of the multicast will be completed async // so we break out now, then the callback will be invoked which then continue routing from where we left here return false; } if (LOG.isTraceEnabled()) { LOG.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId()); } // Decide whether to continue with the multicast or not; similar logic to the Pipeline // remember to test for stop on exception and aggregate before copying back results boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); if (stopOnException && !continueProcessing) { if (subExchange.getException() != null) { // wrap in exception to explain where it failed CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException()); subExchange.setException(cause); } // we want to stop on exception, and the exception was handled by the error handler // this is similar to what the pipeline does, so we should do the same to not surprise end users // so we should set the failed exchange as the result and be done result.set(subExchange); return true; } LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange); doAggregate(getAggregationStrategy(subExchange), result, subExchange); total.incrementAndGet(); } LOG.debug("Done sequential processing {} exchanges", total); return true; } private boolean doProcessSequential(final Exchange original, final AtomicExchange result, final Iterable pairs, final Iterator it, final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) { boolean sync = true; final Exchange exchange = pair.getExchange(); Processor processor = pair.getProcessor(); final Producer producer = pair.getProducer(); TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; // compute time taken if sending to another endpoint final StopWatch watch = producer != null ? new StopWatch() : null; try { // prepare tracing starting from a new block if (traced != null) { traced.pushBlock(); } if (producer != null) { EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); } // let the prepared process it, remember to begin the exchange pair AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); pair.begin(); sync = AsyncProcessorHelper.process(async, exchange, new AsyncCallback() { public void done(boolean doneSync) { // we are done with the exchange pair pair.done(); // okay we are done, so notify the exchange was sent if (producer != null) { long timeTaken = watch.stop(); Endpoint endpoint = producer.getEndpoint(); // emit event that the exchange was sent to the endpoint EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); } // we only have to handle async completion of the routing slip if (doneSync) { return; } // continue processing the multicast asynchronously Exchange subExchange = exchange; // Decide whether to continue with the multicast or not; similar logic to the Pipeline // remember to test for stop on exception and aggregate before copying back results boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); if (stopOnException && !continueProcessing) { if (subExchange.getException() != null) { // wrap in exception to explain where it failed subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException())); } else { // we want to stop on exception, and the exception was handled by the error handler // this is similar to what the pipeline does, so we should do the same to not surprise end users // so we should set the failed exchange as the result and be done result.set(subExchange); } // and do the done work doDone(original, subExchange, callback, false, true); return; } try { doAggregate(getAggregationStrategy(subExchange), result, subExchange); } catch (Throwable e) { // wrap in exception to explain where it failed subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); // and do the done work doDone(original, subExchange, callback, false, true); return; } total.incrementAndGet(); // maybe there are more processors to multicast while (it.hasNext()) { // prepare and run the next ProcessorExchangePair pair = it.next(); subExchange = pair.getExchange(); updateNewExchange(subExchange, total.get(), pairs, it); boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total); if (!sync) { LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId()); return; } // Decide whether to continue with the multicast or not; similar logic to the Pipeline // remember to test for stop on exception and aggregate before copying back results continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); if (stopOnException && !continueProcessing) { if (subExchange.getException() != null) { // wrap in exception to explain where it failed subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException())); } else { // we want to stop on exception, and the exception was handled by the error handler // this is similar to what the pipeline does, so we should do the same to not surprise end users // so we should set the failed exchange as the result and be done result.set(subExchange); } // and do the done work doDone(original, subExchange, callback, false, true); return; } // must catch any exceptions from aggregation try { doAggregate(getAggregationStrategy(subExchange), result, subExchange); } catch (Throwable e) { // wrap in exception to explain where it failed subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); // and do the done work doDone(original, subExchange, callback, false, true); return; } total.incrementAndGet(); } // do the done work subExchange = result.get() != null ? result.get() : null; doDone(original, subExchange, callback, false, true); } }); } finally { // pop the block so by next round we have the same staring point and thus the tracing looks accurate if (traced != null) { traced.popBlock(); } } return sync; } private void doProcessParallel(final ProcessorExchangePair pair) throws Exception { final Exchange exchange = pair.getExchange(); Processor processor = pair.getProcessor(); Producer producer = pair.getProducer(); TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; // compute time taken if sending to another endpoint StopWatch watch = null; if (producer != null) { watch = new StopWatch(); } try { // prepare tracing starting from a new block if (traced != null) { traced.pushBlock(); } if (producer != null) { EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); } // let the prepared process it, remember to begin the exchange pair AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); pair.begin(); // we invoke it synchronously as parallel async routing is too hard AsyncProcessorHelper.process(async, exchange); } finally { pair.done(); // pop the block so by next round we have the same staring point and thus the tracing looks accurate if (traced != null) { traced.popBlock(); } if (producer != null) { long timeTaken = watch.stop(); Endpoint endpoint = producer.getEndpoint(); // emit event that the exchange was sent to the endpoint // this is okay to do here in the finally block, as the processing is not using the async routing engine //( we invoke it synchronously as parallel async routing is too hard) EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); } } } /** * Common work which must be done when we are done multicasting. *

* This logic applies for both running synchronous and asynchronous as there are multiple exist points * when using the asynchronous routing engine. And therefore we want the logic in one method instead * of being scattered. * * @param original the original exchange * @param subExchange the current sub exchange, can be null for the synchronous part * @param callback the callback * @param doneSync the doneSync parameter to call on callback * @param exhaust whether or not error handling is exhausted */ protected void doDone(Exchange original, Exchange subExchange, AsyncCallback callback, boolean doneSync, boolean exhaust) { // cleanup any per exchange aggregation strategy removeAggregationStrategyFromExchange(original); if (original.getException() != null || subExchange != null && subExchange.getException() != null) { // multicast uses error handling on its output processors and they have tried to redeliver // so we shall signal back to the other error handlers that we are exhausted and they should not // also try to redeliver as we will then do that twice original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); } if (subExchange != null) { // and copy the current result to original so it will contain this result of this eip ExchangeHelper.copyResults(original, subExchange); } callback.done(doneSync); } /** * Aggregate the {@link Exchange} with the current result * * @param strategy the aggregation strategy to use * @param result the current result * @param exchange the exchange to be added to the result */ protected synchronized void doAggregate(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { if (strategy != null) { // prepare the exchanges for aggregation Exchange oldExchange = result.get(); ExchangeHelper.prepareAggregation(oldExchange, exchange); result.set(strategy.aggregate(oldExchange, exchange)); } } protected void updateNewExchange(Exchange exchange, int index, Iterable allPairs, Iterator it) { exchange.setProperty(Exchange.MULTICAST_INDEX, index); if (it.hasNext()) { exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE); } else { exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE); } } protected Integer getExchangeIndex(Exchange exchange) { return exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class); } protected Iterable createProcessorExchangePairs(Exchange exchange) throws Exception { List result = new ArrayList(processors.size()); int index = 0; for (Processor processor : processors) { // copy exchange, and do not share the unit of work Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); // if we share unit of work, we need to prepare the child exchange if (isShareUnitOfWork()) { prepareSharedUnitOfWork(copy, exchange); } // and add the pair RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; result.add(createProcessorExchangePair(index++, processor, copy, routeContext)); } if (exchange.getException() != null) { // force any exceptions occurred during creation of exchange paris to be thrown // before returning the answer; throw exchange.getException(); } return result; } /** * Creates the {@link ProcessorExchangePair} which holds the processor and exchange to be send out. *

* You must use this method to create the instances of {@link ProcessorExchangePair} as they * need to be specially prepared before use. * * @param index the index * @param processor the processor * @param exchange the exchange * @param routeContext the route context * @return prepared for use */ protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange, RouteContext routeContext) { Processor prepared = processor; // set property which endpoint we send to setToEndpoint(exchange, prepared); // rework error handling to support fine grained error handling prepared = createErrorHandler(routeContext, exchange, prepared); // invoke on prepare on the exchange if specified if (onPrepare != null) { try { onPrepare.process(exchange); } catch (Exception e) { exchange.setException(e); } } return new DefaultProcessorExchangePair(index, processor, prepared, exchange); } protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) { Processor answer; if (routeContext != null) { // wrap the producer in error handler so we have fine grained error handling on // the output side instead of the input side // this is needed to support redelivery on that output alone and not doing redelivery // for the entire multicast block again which will start from scratch again // create key for cache final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor); // lookup cached first to reuse and preserve memory answer = errorHandlers.get(key); if (answer != null) { LOG.trace("Using existing error handler for: {}", processor); return answer; } LOG.trace("Creating error handler for: {}", processor); ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder(); // create error handler (create error handler directly to keep it light weight, // instead of using ProcessorDefinition.wrapInErrorHandler) try { processor = builder.createErrorHandler(routeContext, processor); // and wrap in unit of work processor so the copy exchange also can run under UoW answer = createUnitOfWorkProcessor(routeContext, processor, exchange); // must start the error handler ServiceHelper.startServices(answer); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } // here we don't cache the ChildUnitOfWorkProcessor // As the UnitOfWorkProcess will be delegate to the Parent if (!(answer instanceof ChildUnitOfWorkProcessor)) { // add to cache errorHandlers.putIfAbsent(key, answer); } } else { // and wrap in unit of work processor so the copy exchange also can run under UoW answer = createUnitOfWorkProcessor(routeContext, processor, exchange); } return answer; } /** * Strategy to create the {@link UnitOfWorkProcessor} to be used for the sub route * * @param routeContext the route context * @param processor the processor wrapped in this unit of work processor * @param exchange the exchange * @return the unit of work processor */ protected UnitOfWorkProcessor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) { UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); if (parent != null) { return new ChildUnitOfWorkProcessor(parent, routeContext, processor); } else { return new UnitOfWorkProcessor(routeContext, processor); } } /** * Prepares the exchange for participating in a shared unit of work *

* This ensures a child exchange can access its parent {@link UnitOfWork} when it participate * in a shared unit of work. * * @param childExchange the child exchange * @param parentExchange the parent exchange */ protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange parentExchange) { childExchange.setProperty(Exchange.PARENT_UNIT_OF_WORK, parentExchange.getUnitOfWork()); } protected void doStart() throws Exception { if (isParallelProcessing() && executorService == null) { throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set"); } if (timeout > 0 && !isParallelProcessing()) { throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled"); } if (isParallelProcessing() && aggregateExecutorService == null) { // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing String name = getClass().getSimpleName() + "-AggregateTask"; aggregateExecutorService = createAggregateExecutorService(name); } ServiceHelper.startServices(aggregationStrategy, processors); } /** * Strategy to create the thread pool for the aggregator background task which waits for and aggregates * completed tasks when running in parallel mode. * * @param name the suggested name for the background thread * @return the thread pool */ protected synchronized ExecutorService createAggregateExecutorService(String name) { // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name); } @Override protected void doStop() throws Exception { ServiceHelper.stopServices(processors, errorHandlers, aggregationStrategy); } @Override protected void doShutdown() throws Exception { ServiceHelper.stopAndShutdownServices(processors, errorHandlers, aggregationStrategy); // only clear error handlers when shutting down errorHandlers.clear(); if (shutdownExecutorService && executorService != null) { getCamelContext().getExecutorServiceManager().shutdownNow(executorService); } if (aggregateExecutorService != null) { getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService); } } protected static void setToEndpoint(Exchange exchange, Processor processor) { if (processor instanceof Producer) { Producer producer = (Producer) processor; exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); } } protected AggregationStrategy getAggregationStrategy(Exchange exchange) { AggregationStrategy answer = null; // prefer to use per Exchange aggregation strategy over a global strategy if (exchange != null) { Map property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); Map map = CastUtils.cast(property); if (map != null) { answer = map.get(this); } } if (answer == null) { // fallback to global strategy answer = getAggregationStrategy(); } return answer; } /** * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}. * * @param exchange the exchange * @param aggregationStrategy the strategy */ protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) { Map property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); Map map = CastUtils.cast(property); if (map == null) { map = new HashMap(); } else { // it is not safe to use the map directly as the exchange doesn't have the deep copy of it's properties // we just create a new copy if we need to change the map map = new HashMap(map); } // store the strategy using this processor as the key // (so we can store multiple strategies on the same exchange) map.put(this, aggregationStrategy); exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map); } /** * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange} * which must be done after use. * * @param exchange the current exchange */ protected void removeAggregationStrategyFromExchange(Exchange exchange) { Map property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); Map map = CastUtils.cast(property); if (map == null) { return; } // remove the strategy using this processor as the key map.remove(this); } /** * Is the multicast processor working in streaming mode? *

* In streaming mode: *

*/ public boolean isStreaming() { return streaming; } /** * Should the multicast processor stop processing further exchanges in case of an exception occurred? */ public boolean isStopOnException() { return stopOnException; } /** * Returns the producers to multicast to */ public Collection getProcessors() { return processors; } /** * An optional timeout in millis when using parallel processing */ public long getTimeout() { return timeout; } /** * Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead. */ public AggregationStrategy getAggregationStrategy() { return aggregationStrategy; } public boolean isParallelProcessing() { return parallelProcessing; } public boolean isShareUnitOfWork() { return shareUnitOfWork; } public List next() { if (!hasNext()) { return null; } return new ArrayList(processors); } public boolean hasNext() { return processors != null && !processors.isEmpty(); } }