001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.io.Closeable; 020 import java.util.ArrayList; 021 import java.util.Collection; 022 import java.util.Iterator; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.concurrent.Callable; 026 import java.util.concurrent.CompletionService; 027 import java.util.concurrent.ConcurrentHashMap; 028 import java.util.concurrent.ConcurrentMap; 029 import java.util.concurrent.CountDownLatch; 030 import java.util.concurrent.ExecutionException; 031 import java.util.concurrent.ExecutorCompletionService; 032 import java.util.concurrent.ExecutorService; 033 import java.util.concurrent.Future; 034 import java.util.concurrent.TimeUnit; 035 import java.util.concurrent.atomic.AtomicBoolean; 036 import java.util.concurrent.atomic.AtomicInteger; 037 038 import org.apache.camel.AsyncCallback; 039 import org.apache.camel.AsyncProcessor; 040 import org.apache.camel.CamelContext; 041 import org.apache.camel.CamelExchangeException; 042 import org.apache.camel.Endpoint; 043 import org.apache.camel.ErrorHandlerFactory; 044 import org.apache.camel.Exchange; 045 import org.apache.camel.Navigate; 046 import org.apache.camel.Processor; 047 import org.apache.camel.Producer; 048 import org.apache.camel.Traceable; 049 import org.apache.camel.processor.aggregate.AggregationStrategy; 050 import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy; 051 import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy; 052 import org.apache.camel.spi.RouteContext; 053 import org.apache.camel.spi.TracedRouteNodes; 054 import org.apache.camel.spi.UnitOfWork; 055 import org.apache.camel.support.ServiceSupport; 056 import org.apache.camel.util.AsyncProcessorConverterHelper; 057 import org.apache.camel.util.AsyncProcessorHelper; 058 import org.apache.camel.util.CastUtils; 059 import org.apache.camel.util.EventHelper; 060 import org.apache.camel.util.ExchangeHelper; 061 import org.apache.camel.util.IOHelper; 062 import org.apache.camel.util.KeyValueHolder; 063 import org.apache.camel.util.ObjectHelper; 064 import org.apache.camel.util.ServiceHelper; 065 import org.apache.camel.util.StopWatch; 066 import org.apache.camel.util.concurrent.AtomicException; 067 import org.apache.camel.util.concurrent.AtomicExchange; 068 import org.apache.camel.util.concurrent.SubmitOrderedCompletionService; 069 import org.slf4j.Logger; 070 import org.slf4j.LoggerFactory; 071 072 import static org.apache.camel.util.ObjectHelper.notNull; 073 074 075 /** 076 * Implements the Multicast pattern to send a message exchange to a number of 077 * endpoints, each endpoint receiving a copy of the message exchange. 078 * 079 * @version 080 * @see Pipeline 081 */ 082 public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable { 083 084 private static final Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class); 085 086 /** 087 * Class that represent each step in the multicast route to do 088 */ 089 static final class DefaultProcessorExchangePair implements ProcessorExchangePair { 090 private final int index; 091 private final Processor processor; 092 private final Processor prepared; 093 private final Exchange exchange; 094 095 private DefaultProcessorExchangePair(int index, Processor processor, Processor prepared, Exchange exchange) { 096 this.index = index; 097 this.processor = processor; 098 this.prepared = prepared; 099 this.exchange = exchange; 100 } 101 102 public int getIndex() { 103 return index; 104 } 105 106 public Exchange getExchange() { 107 return exchange; 108 } 109 110 public Producer getProducer() { 111 if (processor instanceof Producer) { 112 return (Producer) processor; 113 } 114 return null; 115 } 116 117 public Processor getProcessor() { 118 return prepared; 119 } 120 121 public void begin() { 122 // noop 123 } 124 125 public void done() { 126 // noop 127 } 128 129 } 130 131 /** 132 * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges 133 * <p/> 134 * See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods. 135 */ 136 static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> { 137 138 public PreparedErrorHandler(RouteContext key, Processor value) { 139 super(key, value); 140 } 141 142 } 143 144 protected final Processor onPrepare; 145 private final CamelContext camelContext; 146 private Collection<Processor> processors; 147 private final AggregationStrategy aggregationStrategy; 148 private final boolean parallelProcessing; 149 private final boolean streaming; 150 private final boolean stopOnException; 151 private final ExecutorService executorService; 152 private final boolean shutdownExecutorService; 153 private ExecutorService aggregateExecutorService; 154 private final long timeout; 155 private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>(); 156 private final boolean shareUnitOfWork; 157 158 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) { 159 this(camelContext, processors, null); 160 } 161 162 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) { 163 this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false); 164 } 165 166 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, 167 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, 168 boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { 169 notNull(camelContext, "camelContext"); 170 this.camelContext = camelContext; 171 this.processors = processors; 172 this.aggregationStrategy = aggregationStrategy; 173 this.executorService = executorService; 174 this.shutdownExecutorService = shutdownExecutorService; 175 this.streaming = streaming; 176 this.stopOnException = stopOnException; 177 // must enable parallel if executor service is provided 178 this.parallelProcessing = parallelProcessing || executorService != null; 179 this.timeout = timeout; 180 this.onPrepare = onPrepare; 181 this.shareUnitOfWork = shareUnitOfWork; 182 } 183 184 @Override 185 public String toString() { 186 return "Multicast[" + getProcessors() + "]"; 187 } 188 189 public String getTraceLabel() { 190 return "multicast"; 191 } 192 193 public CamelContext getCamelContext() { 194 return camelContext; 195 } 196 197 public void process(Exchange exchange) throws Exception { 198 AsyncProcessorHelper.process(this, exchange); 199 } 200 201 public boolean process(Exchange exchange, AsyncCallback callback) { 202 final AtomicExchange result = new AtomicExchange(); 203 Iterable<ProcessorExchangePair> pairs = null; 204 205 try { 206 boolean sync = true; 207 208 pairs = createProcessorExchangePairs(exchange); 209 210 if (isParallelProcessing()) { 211 // ensure an executor is set when running in parallel 212 ObjectHelper.notNull(executorService, "executorService", this); 213 doProcessParallel(exchange, result, pairs, isStreaming(), callback); 214 } else { 215 sync = doProcessSequential(exchange, result, pairs, callback); 216 } 217 218 if (!sync) { 219 // the remainder of the multicast will be completed async 220 // so we break out now, then the callback will be invoked which then continue routing from where we left here 221 return false; 222 } 223 } catch (Throwable e) { 224 exchange.setException(e); 225 // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted 226 // and do the done work 227 doDone(exchange, null, pairs, callback, true, false); 228 return true; 229 } 230 231 // multicasting was processed successfully 232 // and do the done work 233 Exchange subExchange = result.get() != null ? result.get() : null; 234 doDone(exchange, subExchange, pairs, callback, true, true); 235 return true; 236 } 237 238 protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs, 239 final boolean streaming, final AsyncCallback callback) throws Exception { 240 241 ObjectHelper.notNull(executorService, "ExecutorService", this); 242 ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this); 243 244 final CompletionService<Exchange> completion; 245 if (streaming) { 246 // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence) 247 completion = new ExecutorCompletionService<Exchange>(executorService); 248 } else { 249 // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence) 250 completion = new SubmitOrderedCompletionService<Exchange>(executorService); 251 } 252 253 final AtomicInteger total = new AtomicInteger(0); 254 final Iterator<ProcessorExchangePair> it = pairs.iterator(); 255 256 if (it.hasNext()) { 257 // when parallel then aggregate on the fly 258 final AtomicBoolean running = new AtomicBoolean(true); 259 final AtomicBoolean allTasksSubmitted = new AtomicBoolean(); 260 final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1); 261 final AtomicException executionException = new AtomicException(); 262 263 // issue task to execute in separate thread so it can aggregate on-the-fly 264 // while we submit new tasks, and those tasks complete concurrently 265 // this allows us to optimize work and reduce memory consumption 266 final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running, 267 aggregationOnTheFlyDone, allTasksSubmitted, executionException); 268 final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean(); 269 270 LOG.trace("Starting to submit parallel tasks"); 271 272 while (it.hasNext()) { 273 final ProcessorExchangePair pair = it.next(); 274 final Exchange subExchange = pair.getExchange(); 275 updateNewExchange(subExchange, total.intValue(), pairs, it); 276 277 completion.submit(new Callable<Exchange>() { 278 public Exchange call() throws Exception { 279 // only start the aggregation task when the task is being executed to avoid staring 280 // the aggregation task to early and pile up too many threads 281 if (aggregationTaskSubmitted.compareAndSet(false, true)) { 282 // but only submit the task once 283 aggregateExecutorService.submit(aggregateOnTheFlyTask); 284 } 285 286 if (!running.get()) { 287 // do not start processing the task if we are not running 288 return subExchange; 289 } 290 291 try { 292 doProcessParallel(pair); 293 } catch (Throwable e) { 294 subExchange.setException(e); 295 } 296 297 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 298 Integer number = getExchangeIndex(subExchange); 299 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); 300 if (stopOnException && !continueProcessing) { 301 // signal to stop running 302 running.set(false); 303 // throw caused exception 304 if (subExchange.getException() != null) { 305 // wrap in exception to explain where it failed 306 CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException()); 307 subExchange.setException(cause); 308 } 309 } 310 311 LOG.trace("Parallel processing complete for exchange: {}", subExchange); 312 return subExchange; 313 } 314 }); 315 316 total.incrementAndGet(); 317 } 318 319 // signal all tasks has been submitted 320 LOG.trace("Signaling that all {} tasks has been submitted.", total.get()); 321 allTasksSubmitted.set(true); 322 323 // its to hard to do parallel async routing so we let the caller thread be synchronously 324 // and have it pickup the replies and do the aggregation (eg we use a latch to wait) 325 // wait for aggregation to be done 326 LOG.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", total.get(), original.getExchangeId()); 327 aggregationOnTheFlyDone.await(); 328 329 // did we fail for whatever reason, if so throw that caused exception 330 if (executionException.get() != null) { 331 if (LOG.isDebugEnabled()) { 332 LOG.debug("Parallel processing failed due {}", executionException.get().getMessage()); 333 } 334 throw executionException.get(); 335 } 336 } 337 338 // no everything is okay so we are done 339 LOG.debug("Done parallel processing {} exchanges", total); 340 } 341 342 /** 343 * Task to aggregate on-the-fly for completed tasks when using parallel processing. 344 * <p/> 345 * This ensures lower memory consumption as we do not need to keep all completed tasks in memory 346 * before we perform aggregation. Instead this separate thread will run and aggregate when new 347 * completed tasks is done. 348 * <p/> 349 * The logic is fairly complex as this implementation has to keep track how far it got, and also 350 * signal back to the <i>main</t> thread when its done, so the <i>main</t> thread can continue 351 * processing when the entire splitting is done. 352 */ 353 private final class AggregateOnTheFlyTask implements Runnable { 354 355 private final AtomicExchange result; 356 private final Exchange original; 357 private final AtomicInteger total; 358 private final CompletionService<Exchange> completion; 359 private final AtomicBoolean running; 360 private final CountDownLatch aggregationOnTheFlyDone; 361 private final AtomicBoolean allTasksSubmitted; 362 private final AtomicException executionException; 363 364 private AggregateOnTheFlyTask(AtomicExchange result, Exchange original, AtomicInteger total, 365 CompletionService<Exchange> completion, AtomicBoolean running, 366 CountDownLatch aggregationOnTheFlyDone, AtomicBoolean allTasksSubmitted, 367 AtomicException executionException) { 368 this.result = result; 369 this.original = original; 370 this.total = total; 371 this.completion = completion; 372 this.running = running; 373 this.aggregationOnTheFlyDone = aggregationOnTheFlyDone; 374 this.allTasksSubmitted = allTasksSubmitted; 375 this.executionException = executionException; 376 } 377 378 public void run() { 379 LOG.trace("Aggregate on the fly task started for exchangeId: {}", original.getExchangeId()); 380 381 try { 382 aggregateOnTheFly(); 383 } catch (Throwable e) { 384 if (e instanceof Exception) { 385 executionException.set((Exception) e); 386 } else { 387 executionException.set(ObjectHelper.wrapRuntimeCamelException(e)); 388 } 389 } finally { 390 // must signal we are done so the latch can open and let the other thread continue processing 391 LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId()); 392 LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId()); 393 aggregationOnTheFlyDone.countDown(); 394 } 395 } 396 397 private void aggregateOnTheFly() throws InterruptedException, ExecutionException { 398 boolean timedOut = false; 399 boolean stoppedOnException = false; 400 final StopWatch watch = new StopWatch(); 401 int aggregated = 0; 402 boolean done = false; 403 // not a for loop as on the fly may still run 404 while (!done) { 405 // check if we have already aggregate everything 406 if (allTasksSubmitted.get() && aggregated >= total.get()) { 407 LOG.debug("Done aggregating {} exchanges on the fly.", aggregated); 408 break; 409 } 410 411 Future<Exchange> future; 412 if (timedOut) { 413 // we are timed out but try to grab if some tasks has been completed 414 // poll will return null if no tasks is present 415 future = completion.poll(); 416 LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future); 417 } else if (timeout > 0) { 418 long left = timeout - watch.taken(); 419 if (left < 0) { 420 left = 0; 421 } 422 LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left); 423 future = completion.poll(left, TimeUnit.MILLISECONDS); 424 } else { 425 LOG.trace("Polling completion task #{}", aggregated); 426 // we must not block so poll every second 427 future = completion.poll(1, TimeUnit.SECONDS); 428 if (future == null) { 429 // and continue loop which will recheck if we are done 430 continue; 431 } 432 } 433 434 if (future == null && timedOut) { 435 // we are timed out and no more tasks complete so break out 436 break; 437 } else if (future == null) { 438 // timeout occurred 439 AggregationStrategy strategy = getAggregationStrategy(null); 440 if (strategy instanceof TimeoutAwareAggregationStrategy) { 441 // notify the strategy we timed out 442 Exchange oldExchange = result.get(); 443 if (oldExchange == null) { 444 // if they all timed out the result may not have been set yet, so use the original exchange 445 oldExchange = original; 446 } 447 ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated, total.intValue(), timeout); 448 } else { 449 // log a WARN we timed out since it will not be aggregated and the Exchange will be lost 450 LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated); 451 } 452 LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated); 453 timedOut = true; 454 455 // mark that index as timed out, which allows us to try to retrieve 456 // any already completed tasks in the next loop 457 if (completion instanceof SubmitOrderedCompletionService) { 458 ((SubmitOrderedCompletionService<?>) completion).timeoutTask(); 459 } 460 } else { 461 // there is a result to aggregate 462 Exchange subExchange = future.get(); 463 464 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 465 Integer number = getExchangeIndex(subExchange); 466 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); 467 if (stopOnException && !continueProcessing) { 468 // we want to stop on exception and an exception or failure occurred 469 // this is similar to what the pipeline does, so we should do the same to not surprise end users 470 // so we should set the failed exchange as the result and break out 471 result.set(subExchange); 472 stoppedOnException = true; 473 break; 474 } 475 476 // we got a result so aggregate it 477 AggregationStrategy strategy = getAggregationStrategy(subExchange); 478 doAggregate(strategy, result, subExchange); 479 } 480 481 aggregated++; 482 } 483 484 if (timedOut || stoppedOnException) { 485 if (timedOut) { 486 LOG.debug("Cancelling tasks due timeout after {} millis.", timeout); 487 } 488 if (stoppedOnException) { 489 LOG.debug("Cancelling tasks due stopOnException."); 490 } 491 // cancel tasks as we timed out (its safe to cancel done tasks) 492 running.set(false); 493 } 494 } 495 } 496 497 protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception { 498 AtomicInteger total = new AtomicInteger(); 499 Iterator<ProcessorExchangePair> it = pairs.iterator(); 500 501 while (it.hasNext()) { 502 ProcessorExchangePair pair = it.next(); 503 Exchange subExchange = pair.getExchange(); 504 updateNewExchange(subExchange, total.get(), pairs, it); 505 506 boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total); 507 if (!sync) { 508 if (LOG.isTraceEnabled()) { 509 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId()); 510 } 511 // the remainder of the multicast will be completed async 512 // so we break out now, then the callback will be invoked which then continue routing from where we left here 513 return false; 514 } 515 516 if (LOG.isTraceEnabled()) { 517 LOG.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId()); 518 } 519 520 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 521 // remember to test for stop on exception and aggregate before copying back results 522 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); 523 if (stopOnException && !continueProcessing) { 524 if (subExchange.getException() != null) { 525 // wrap in exception to explain where it failed 526 CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException()); 527 subExchange.setException(cause); 528 } 529 // we want to stop on exception, and the exception was handled by the error handler 530 // this is similar to what the pipeline does, so we should do the same to not surprise end users 531 // so we should set the failed exchange as the result and be done 532 result.set(subExchange); 533 return true; 534 } 535 536 LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange); 537 538 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 539 total.incrementAndGet(); 540 } 541 542 LOG.debug("Done sequential processing {} exchanges", total); 543 544 return true; 545 } 546 547 private boolean doProcessSequential(final Exchange original, final AtomicExchange result, 548 final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it, 549 final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) { 550 boolean sync = true; 551 552 final Exchange exchange = pair.getExchange(); 553 Processor processor = pair.getProcessor(); 554 final Producer producer = pair.getProducer(); 555 556 TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; 557 558 // compute time taken if sending to another endpoint 559 final StopWatch watch = producer != null ? new StopWatch() : null; 560 561 try { 562 // prepare tracing starting from a new block 563 if (traced != null) { 564 traced.pushBlock(); 565 } 566 567 if (producer != null) { 568 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); 569 } 570 // let the prepared process it, remember to begin the exchange pair 571 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); 572 pair.begin(); 573 sync = async.process(exchange, new AsyncCallback() { 574 public void done(boolean doneSync) { 575 // we are done with the exchange pair 576 pair.done(); 577 578 // okay we are done, so notify the exchange was sent 579 if (producer != null) { 580 long timeTaken = watch.stop(); 581 Endpoint endpoint = producer.getEndpoint(); 582 // emit event that the exchange was sent to the endpoint 583 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 584 } 585 586 // we only have to handle async completion of the routing slip 587 if (doneSync) { 588 return; 589 } 590 591 // continue processing the multicast asynchronously 592 Exchange subExchange = exchange; 593 594 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 595 // remember to test for stop on exception and aggregate before copying back results 596 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); 597 if (stopOnException && !continueProcessing) { 598 if (subExchange.getException() != null) { 599 // wrap in exception to explain where it failed 600 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException())); 601 } else { 602 // we want to stop on exception, and the exception was handled by the error handler 603 // this is similar to what the pipeline does, so we should do the same to not surprise end users 604 // so we should set the failed exchange as the result and be done 605 result.set(subExchange); 606 } 607 // and do the done work 608 doDone(original, subExchange, pairs, callback, false, true); 609 return; 610 } 611 612 try { 613 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 614 } catch (Throwable e) { 615 // wrap in exception to explain where it failed 616 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); 617 // and do the done work 618 doDone(original, subExchange, pairs, callback, false, true); 619 return; 620 } 621 622 total.incrementAndGet(); 623 624 // maybe there are more processors to multicast 625 while (it.hasNext()) { 626 627 // prepare and run the next 628 ProcessorExchangePair pair = it.next(); 629 subExchange = pair.getExchange(); 630 updateNewExchange(subExchange, total.get(), pairs, it); 631 boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total); 632 633 if (!sync) { 634 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId()); 635 return; 636 } 637 638 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 639 // remember to test for stop on exception and aggregate before copying back results 640 continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); 641 if (stopOnException && !continueProcessing) { 642 if (subExchange.getException() != null) { 643 // wrap in exception to explain where it failed 644 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException())); 645 } else { 646 // we want to stop on exception, and the exception was handled by the error handler 647 // this is similar to what the pipeline does, so we should do the same to not surprise end users 648 // so we should set the failed exchange as the result and be done 649 result.set(subExchange); 650 } 651 // and do the done work 652 doDone(original, subExchange, pairs, callback, false, true); 653 return; 654 } 655 656 // must catch any exceptions from aggregation 657 try { 658 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 659 } catch (Throwable e) { 660 // wrap in exception to explain where it failed 661 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); 662 // and do the done work 663 doDone(original, subExchange, pairs, callback, false, true); 664 return; 665 } 666 667 total.incrementAndGet(); 668 } 669 670 // do the done work 671 subExchange = result.get() != null ? result.get() : null; 672 doDone(original, subExchange, pairs, callback, false, true); 673 } 674 }); 675 } finally { 676 // pop the block so by next round we have the same staring point and thus the tracing looks accurate 677 if (traced != null) { 678 traced.popBlock(); 679 } 680 } 681 682 return sync; 683 } 684 685 private void doProcessParallel(final ProcessorExchangePair pair) throws Exception { 686 final Exchange exchange = pair.getExchange(); 687 Processor processor = pair.getProcessor(); 688 Producer producer = pair.getProducer(); 689 690 TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; 691 692 // compute time taken if sending to another endpoint 693 StopWatch watch = null; 694 if (producer != null) { 695 watch = new StopWatch(); 696 } 697 698 try { 699 // prepare tracing starting from a new block 700 if (traced != null) { 701 traced.pushBlock(); 702 } 703 704 if (producer != null) { 705 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); 706 } 707 // let the prepared process it, remember to begin the exchange pair 708 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); 709 pair.begin(); 710 // we invoke it synchronously as parallel async routing is too hard 711 AsyncProcessorHelper.process(async, exchange); 712 } finally { 713 pair.done(); 714 // pop the block so by next round we have the same staring point and thus the tracing looks accurate 715 if (traced != null) { 716 traced.popBlock(); 717 } 718 if (producer != null) { 719 long timeTaken = watch.stop(); 720 Endpoint endpoint = producer.getEndpoint(); 721 // emit event that the exchange was sent to the endpoint 722 // this is okay to do here in the finally block, as the processing is not using the async routing engine 723 //( we invoke it synchronously as parallel async routing is too hard) 724 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 725 } 726 } 727 } 728 729 /** 730 * Common work which must be done when we are done multicasting. 731 * <p/> 732 * This logic applies for both running synchronous and asynchronous as there are multiple exist points 733 * when using the asynchronous routing engine. And therefore we want the logic in one method instead 734 * of being scattered. 735 * 736 * @param original the original exchange 737 * @param subExchange the current sub exchange, can be <tt>null</tt> for the synchronous part 738 * @param pairs the pairs with the exchanges to process 739 * @param callback the callback 740 * @param doneSync the <tt>doneSync</tt> parameter to call on callback 741 * @param forceExhaust whether or not error handling is exhausted 742 */ 743 protected void doDone(Exchange original, Exchange subExchange, final Iterable<ProcessorExchangePair> pairs, 744 AsyncCallback callback, boolean doneSync, boolean forceExhaust) { 745 746 // we are done so close the pairs iterator 747 if (pairs != null && pairs instanceof Closeable) { 748 IOHelper.close((Closeable) pairs, "pairs", LOG); 749 } 750 751 AggregationStrategy strategy = getAggregationStrategy(subExchange); 752 // invoke the on completion callback 753 if (strategy instanceof CompletionAwareAggregationStrategy) { 754 ((CompletionAwareAggregationStrategy) strategy).onCompletion(subExchange); 755 } 756 757 // cleanup any per exchange aggregation strategy 758 removeAggregationStrategyFromExchange(original); 759 760 // we need to know if there was an exception, and if the stopOnException option was enabled 761 // also we would need to know if any error handler has attempted redelivery and exhausted 762 boolean stoppedOnException = false; 763 boolean exception = false; 764 boolean exhaust = forceExhaust || subExchange != null && (subExchange.getException() != null || ExchangeHelper.isRedeliveryExhausted(subExchange)); 765 if (original.getException() != null || subExchange != null && subExchange.getException() != null) { 766 // there was an exception and we stopped 767 stoppedOnException = isStopOnException(); 768 exception = true; 769 } 770 771 // must copy results at this point 772 if (subExchange != null) { 773 if (stoppedOnException) { 774 // if we stopped due an exception then only propagte the exception 775 original.setException(subExchange.getException()); 776 } else { 777 // copy the current result to original so it will contain this result of this eip 778 ExchangeHelper.copyResults(original, subExchange); 779 } 780 } 781 782 // .. and then if there was an exception we need to configure the redelivery exhaust 783 // for example the noErrorHandler will not cause redelivery exhaust so if this error 784 // handled has been in use, then the exhaust would be false (if not forced) 785 if (exception) { 786 // multicast uses error handling on its output processors and they have tried to redeliver 787 // so we shall signal back to the other error handlers that we are exhausted and they should not 788 // also try to redeliver as we will then do that twice 789 original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); 790 } 791 792 callback.done(doneSync); 793 } 794 795 /** 796 * Aggregate the {@link Exchange} with the current result 797 * 798 * @param strategy the aggregation strategy to use 799 * @param result the current result 800 * @param exchange the exchange to be added to the result 801 */ 802 protected synchronized void doAggregate(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { 803 if (strategy != null) { 804 // prepare the exchanges for aggregation 805 Exchange oldExchange = result.get(); 806 ExchangeHelper.prepareAggregation(oldExchange, exchange); 807 result.set(strategy.aggregate(oldExchange, exchange)); 808 } 809 } 810 811 protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, 812 Iterator<ProcessorExchangePair> it) { 813 exchange.setProperty(Exchange.MULTICAST_INDEX, index); 814 if (it.hasNext()) { 815 exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE); 816 } else { 817 exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE); 818 } 819 } 820 821 protected Integer getExchangeIndex(Exchange exchange) { 822 return exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class); 823 } 824 825 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { 826 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size()); 827 828 int index = 0; 829 for (Processor processor : processors) { 830 // copy exchange, and do not share the unit of work 831 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 832 833 // if we share unit of work, we need to prepare the child exchange 834 if (isShareUnitOfWork()) { 835 prepareSharedUnitOfWork(copy, exchange); 836 } 837 838 // and add the pair 839 RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; 840 result.add(createProcessorExchangePair(index++, processor, copy, routeContext)); 841 } 842 843 if (exchange.getException() != null) { 844 // force any exceptions occurred during creation of exchange paris to be thrown 845 // before returning the answer; 846 throw exchange.getException(); 847 } 848 849 return result; 850 } 851 852 /** 853 * Creates the {@link ProcessorExchangePair} which holds the processor and exchange to be send out. 854 * <p/> 855 * You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they 856 * need to be specially prepared before use. 857 * 858 * @param index the index 859 * @param processor the processor 860 * @param exchange the exchange 861 * @param routeContext the route context 862 * @return prepared for use 863 */ 864 protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange, 865 RouteContext routeContext) { 866 Processor prepared = processor; 867 868 // set property which endpoint we send to 869 setToEndpoint(exchange, prepared); 870 871 // rework error handling to support fine grained error handling 872 prepared = createErrorHandler(routeContext, exchange, prepared); 873 874 // invoke on prepare on the exchange if specified 875 if (onPrepare != null) { 876 try { 877 onPrepare.process(exchange); 878 } catch (Exception e) { 879 exchange.setException(e); 880 } 881 } 882 return new DefaultProcessorExchangePair(index, processor, prepared, exchange); 883 } 884 885 protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) { 886 Processor answer; 887 888 boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class); 889 890 // do not wrap in error handler if we are inside a try block 891 if (!tryBlock && routeContext != null) { 892 // wrap the producer in error handler so we have fine grained error handling on 893 // the output side instead of the input side 894 // this is needed to support redelivery on that output alone and not doing redelivery 895 // for the entire multicast block again which will start from scratch again 896 897 // create key for cache 898 final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor); 899 900 // lookup cached first to reuse and preserve memory 901 answer = errorHandlers.get(key); 902 if (answer != null) { 903 LOG.trace("Using existing error handler for: {}", processor); 904 return answer; 905 } 906 907 LOG.trace("Creating error handler for: {}", processor); 908 ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder(); 909 // create error handler (create error handler directly to keep it light weight, 910 // instead of using ProcessorDefinition.wrapInErrorHandler) 911 try { 912 processor = builder.createErrorHandler(routeContext, processor); 913 914 // and wrap in unit of work processor so the copy exchange also can run under UoW 915 answer = createUnitOfWorkProcessor(routeContext, processor, exchange); 916 917 boolean child = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null; 918 919 // must start the error handler 920 ServiceHelper.startServices(answer); 921 922 // here we don't cache the child unit of work 923 if (!child) { 924 // add to cache 925 errorHandlers.putIfAbsent(key, answer); 926 } 927 928 } catch (Exception e) { 929 throw ObjectHelper.wrapRuntimeCamelException(e); 930 } 931 } else { 932 // and wrap in unit of work processor so the copy exchange also can run under UoW 933 answer = createUnitOfWorkProcessor(routeContext, processor, exchange); 934 } 935 936 return answer; 937 } 938 939 /** 940 * Strategy to create the unit of work to be used for the sub route 941 * 942 * @param routeContext the route context 943 * @param processor the processor 944 * @param exchange the exchange 945 * @return the unit of work processor 946 */ 947 protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) { 948 String routeId = routeContext != null ? routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()) : null; 949 CamelInternalProcessor internal = new CamelInternalProcessor(processor); 950 951 // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW 952 UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); 953 if (parent != null) { 954 internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeId, parent)); 955 } else { 956 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId)); 957 } 958 959 // and then in route context so we can keep track which route this is at runtime 960 if (routeContext != null) { 961 internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext)); 962 } 963 return internal; 964 } 965 966 /** 967 * Prepares the exchange for participating in a shared unit of work 968 * <p/> 969 * This ensures a child exchange can access its parent {@link UnitOfWork} when it participate 970 * in a shared unit of work. 971 * 972 * @param childExchange the child exchange 973 * @param parentExchange the parent exchange 974 */ 975 protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange parentExchange) { 976 childExchange.setProperty(Exchange.PARENT_UNIT_OF_WORK, parentExchange.getUnitOfWork()); 977 } 978 979 protected void doStart() throws Exception { 980 if (isParallelProcessing() && executorService == null) { 981 throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set"); 982 } 983 if (timeout > 0 && !isParallelProcessing()) { 984 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled"); 985 } 986 if (isParallelProcessing() && aggregateExecutorService == null) { 987 // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread 988 // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run 989 // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing 990 String name = getClass().getSimpleName() + "-AggregateTask"; 991 aggregateExecutorService = createAggregateExecutorService(name); 992 } 993 ServiceHelper.startServices(aggregationStrategy, processors); 994 } 995 996 /** 997 * Strategy to create the thread pool for the aggregator background task which waits for and aggregates 998 * completed tasks when running in parallel mode. 999 * 1000 * @param name the suggested name for the background thread 1001 * @return the thread pool 1002 */ 1003 protected synchronized ExecutorService createAggregateExecutorService(String name) { 1004 // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in 1005 return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name); 1006 } 1007 1008 @Override 1009 protected void doStop() throws Exception { 1010 ServiceHelper.stopServices(processors, errorHandlers, aggregationStrategy); 1011 } 1012 1013 @Override 1014 protected void doShutdown() throws Exception { 1015 ServiceHelper.stopAndShutdownServices(processors, errorHandlers, aggregationStrategy); 1016 // only clear error handlers when shutting down 1017 errorHandlers.clear(); 1018 1019 if (shutdownExecutorService && executorService != null) { 1020 getCamelContext().getExecutorServiceManager().shutdownNow(executorService); 1021 } 1022 if (aggregateExecutorService != null) { 1023 getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService); 1024 } 1025 } 1026 1027 protected static void setToEndpoint(Exchange exchange, Processor processor) { 1028 if (processor instanceof Producer) { 1029 Producer producer = (Producer) processor; 1030 exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); 1031 } 1032 } 1033 1034 protected AggregationStrategy getAggregationStrategy(Exchange exchange) { 1035 AggregationStrategy answer = null; 1036 1037 // prefer to use per Exchange aggregation strategy over a global strategy 1038 if (exchange != null) { 1039 Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); 1040 Map<Object, AggregationStrategy> map = CastUtils.cast(property); 1041 if (map != null) { 1042 answer = map.get(this); 1043 } 1044 } 1045 if (answer == null) { 1046 // fallback to global strategy 1047 answer = getAggregationStrategy(); 1048 } 1049 return answer; 1050 } 1051 1052 /** 1053 * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}. 1054 * 1055 * @param exchange the exchange 1056 * @param aggregationStrategy the strategy 1057 */ 1058 protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) { 1059 Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); 1060 Map<Object, AggregationStrategy> map = CastUtils.cast(property); 1061 if (map == null) { 1062 map = new ConcurrentHashMap<Object, AggregationStrategy>(); 1063 } else { 1064 // it is not safe to use the map directly as the exchange doesn't have the deep copy of it's properties 1065 // we just create a new copy if we need to change the map 1066 map = new ConcurrentHashMap<Object, AggregationStrategy>(map); 1067 } 1068 // store the strategy using this processor as the key 1069 // (so we can store multiple strategies on the same exchange) 1070 map.put(this, aggregationStrategy); 1071 exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map); 1072 } 1073 1074 /** 1075 * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange} 1076 * which must be done after use. 1077 * 1078 * @param exchange the current exchange 1079 */ 1080 protected void removeAggregationStrategyFromExchange(Exchange exchange) { 1081 Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); 1082 Map<Object, AggregationStrategy> map = CastUtils.cast(property); 1083 if (map == null) { 1084 return; 1085 } 1086 // remove the strategy using this processor as the key 1087 map.remove(this); 1088 } 1089 1090 /** 1091 * Is the multicast processor working in streaming mode? 1092 * <p/> 1093 * In streaming mode: 1094 * <ul> 1095 * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li> 1096 * <li>for parallel processing, we start aggregating responses as they get send back to the processor; 1097 * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li> 1098 * </ul> 1099 */ 1100 public boolean isStreaming() { 1101 return streaming; 1102 } 1103 1104 /** 1105 * Should the multicast processor stop processing further exchanges in case of an exception occurred? 1106 */ 1107 public boolean isStopOnException() { 1108 return stopOnException; 1109 } 1110 1111 /** 1112 * Returns the producers to multicast to 1113 */ 1114 public Collection<Processor> getProcessors() { 1115 return processors; 1116 } 1117 1118 /** 1119 * An optional timeout in millis when using parallel processing 1120 */ 1121 public long getTimeout() { 1122 return timeout; 1123 } 1124 1125 /** 1126 * Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead. 1127 */ 1128 public AggregationStrategy getAggregationStrategy() { 1129 return aggregationStrategy; 1130 } 1131 1132 public boolean isParallelProcessing() { 1133 return parallelProcessing; 1134 } 1135 1136 public boolean isShareUnitOfWork() { 1137 return shareUnitOfWork; 1138 } 1139 1140 public List<Processor> next() { 1141 if (!hasNext()) { 1142 return null; 1143 } 1144 return new ArrayList<Processor>(processors); 1145 } 1146 1147 public boolean hasNext() { 1148 return processors != null && !processors.isEmpty(); 1149 } 1150 }