Since we're on a major migration process of this website, some component documents here are out of sync right now. In the meantime you may want to look at the early version of the new website
https://camel.apache.org/staging/
We would very much like to receive any feedback on the new site, please join the discussion on the Camel user mailing list.

Pattern Appendix

There now follows a breakdown of the various Enterprise Integration Patterns that Camel supports

Messaging Systems

Message Channel

Camel supports the Message Channel from the EIP patterns. The Message Channel is an internal implementation detail of the Endpoint interface and all interactions with the Message Channel are via the Endpoint interfaces.


Example

In JMS, Message Channels are represented by topics and queues such as the following

jms:queue:foo

 

This message channel can be then used within the JMS component

Using the Fluent Builders

to("jms:queue:foo")


Using the Spring XML Extensions

<to uri="jms:queue:foo"/>

 

For more details see

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Message

Camel supports the Message from the EIP patterns using the Message interface.

To support various message exchange patterns like one way Event Message and Request Reply messages Camel uses an Exchange interface which has a pattern property which can be set to InOnly for an Event Message which has a single inbound Message, or InOut for a Request Reply where there is an inbound and outbound message.

Here is a basic example of sending a Message to a route in InOnly and InOut modes

Requestor Code

//InOnly
getContext().createProducerTemplate().sendBody("direct:startInOnly", "Hello World");

//InOut
String result = (String) getContext().createProducerTemplate().requestBody("direct:startInOut", "Hello World");

Route Using the Fluent Builders

from("direct:startInOnly").inOnly("bean:process");

from("direct:startInOut").inOut("bean:process");

Route Using the Spring XML Extensions

<route>
  <from uri="direct:startInOnly"/>
  <inOnly uri="bean:process"/>
</route>

<route>
  <from uri="direct:startInOut"/>
  <inOut uri="bean:process"/>
</route>

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Pipes and Filters

Camel supports the Pipes and Filters from the EIP patterns in various ways.

With Camel you can split your processing across multiple independent Endpoint instances which can then be chained together.

Using Routing Logic

You can create pipelines of logic using multiple Endpoint or Message Translator instances as follows{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java}Though pipeline is the default mode of operation when you specify multiple outputs in Camel. The opposite to pipeline is multicast; which fires the same message into each of its outputs. (See the example below).

In Spring XML you can use the <pipeline/> element

xml<route> <from uri="activemq:SomeQueue"/> <pipeline> <bean ref="foo"/> <bean ref="bar"/> <to uri="activemq:OutputQueue"/> </pipeline> </route>

In the above the pipeline element is actually unnecessary, you could use this:

xml<route> <from uri="activemq:SomeQueue"/> <bean ref="foo"/> <bean ref="bar"/> <to uri="activemq:OutputQueue"/> </route>

which is a bit more explicit.

However if you wish to use <multicast/> to avoid a pipeline - to send the same message into multiple pipelines - then the <pipeline/> element comes into its own:

xml<route> <from uri="activemq:SomeQueue"/> <multicast> <pipeline> <bean ref="something"/> <to uri="log:Something"/> </pipeline> <pipeline> <bean ref="foo"/> <bean ref="bar"/> <to uri="activemq:OutputQueue"/> </pipeline> </multicast> </route>

In the above example we are routing from a single Endpoint to a list of different endpoints specified using URIs. If you find the above a bit confusing, try reading about the Architecture or try the Examples

Using This Pattern

Message Router

The Message Router from the EIP patterns allows you to consume from an input destination, evaluate some predicate then choose the right output destination.

The following example shows how to route a request from an input queue:a endpoint to either queue:b, queue:c or queue:d depending on the evaluation of various Predicate expressions

Using the Fluent Builders

Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20

Using the Spring XML Extensions

Error rendering macro 'code': Invalid value specified for parameter 'java.lang.NullPointerException'
<camelContext errorHandlerRef="errorHandler" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:a"/>
        <choice>
            <when>
                <xpath>$foo = 'bar'</xpath>
                <to uri="direct:b"/>
            </when>
            <when>
                <xpath>$foo = 'cheese'</xpath>
                <to uri="direct:c"/>
            </when>
            <otherwise>
                <to uri="direct:d"/>
            </otherwise>
        </choice>
    </route>
</camelContext>

Choice without otherwise

If you use a choice without adding an otherwise, any unmatched exchanges will be dropped by default.

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Message Translator

Camel supports the Message Translator from the EIP patterns by using an arbitrary Processor in the routing logic, by using a bean to perform the transformation, or by using transform() in the DSL. You can also use a Data Format to marshal and unmarshal messages in different encodings.

Using the Fluent Builders

You can transform a message using Camel's Bean Integration to call any method on a bean in your Registry such as your Spring XML configuration file as follows

from("activemq:SomeQueue"). beanRef("myTransformerBean", "myMethodName"). to("mqseries:AnotherQueue");

Where the "myTransformerBean" would be defined in a Spring XML file or defined in JNDI etc. You can omit the method name parameter from beanRef() and the Bean Integration will try to deduce the method to invoke from the message exchange.

or you can add your own explicit Processor to do the transformation

{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformTest.java}

or you can use the DSL to explicitly configure the transformation

{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java}

Use Spring XML

You can also use Spring XML Extensions to do a transformation. Basically any Expression language can be substituted inside the transform element as shown below

{snippet:id=example|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/transformWithExpressionContext.xml}

Or you can use the Bean Integration to invoke a bean

<route> <from uri="activemq:Input"/> <bean ref="myBeanName" method="doTransform"/> <to uri="activemq:Output"/> </route>

You can also use Templating to consume a message from one destination, transform it with something like Velocity or XQuery and then send it on to another destination. For example using InOnly (one way messaging)

from("activemq:My.Queue"). to("velocity:com/acme/MyResponse.vm"). to("activemq:Another.Queue");

If you want to use InOut (request-reply) semantics to process requests on the My.Queue queue on ActiveMQ with a template generated response, then sending responses back to the JMSReplyTo Destination you could use this.

from("activemq:My.Queue"). to("velocity:com/acme/MyResponse.vm"); Using This Pattern

Message Endpoint

Camel supports the Message Endpoint from the EIP patterns using the Endpoint interface.

When using the DSL to create Routes you typically refer to Message Endpoints by their URIs rather than directly using the Endpoint interface. Its then a responsibility of the CamelContext to create and activate the necessary Endpoint instances using the available Component implementations.

Example

The following example route demonstrates the use of a File Consumer Endpoint and JMS Producer Endpoint


Using the Fluent Builders

from("file://local/router/messages/foo")
	.to("jms:queue:foo");

 

Using the Spring XML Extensions

<route>
	<from uri="file://local/router/messages/foo"/>
	<to uri="jms:queue:foo"/>
</route>

 

Dynamic To

Available as of Camel 2.16

There is a new <toD> that allows to send a message to a dynamic computed Endpoint using one or more Expression that are concat together. By default the Simple language is used to compute the endpoint. For example to send a message to a endpoint defined by a header you can do

<route>
  <from uri="direct:start"/>
  <toD uri="${header.foo}"/>
</route>

And in Java DSL

from("direct:start")
  .toD("${header.foo}");

 

You can also prefix the uri with a value because by default the uri is evaluated using the Simple language

<route>
  <from uri="direct:start"/>
  <toD uri="mock:${header.foo}"/>
</route>

And in Java DSL

from("direct:start")
  .toD("mock:${header.foo}");

In the example above we compute an endpoint that has prefix "mock:" and then the header foo is appended. So for example if the header foo has value order, then the endpoint is computed as "mock:order".

You can also use other languages than Simple such as XPath - this requires to prefix with language: as shown below (simple language is the default language). If you do not specify language: then the endpoint is a component name. And in some cases there is both a component and language with the same name such as xquery.

<route>
  <from uri="direct:start"/>
  <toD uri="language:xpath:/order/@uri"/>
</route>

This is done by specifying the name of the language followed by a colon.

from("direct:start")
  .toD("language:xpath:/order/@uri");

You can also concat multiple Language(s) together using the plus sign + such as shown below:

<route>
  <from uri="direct:start"/>
  <toD uri="jms:${header.base}+language:xpath:/order/@id"/>
</route>

In the example above the uri is a combination of Simple language and XPath where the first part is simple (simple is default language). And then the plus sign separate to another language, where we specify the language name followed by a colon

from("direct:start")
  .toD("jms:${header.base}+language:xpath:/order/@id");

You can concat as many languages as you want, just separate them with the plus sign

The Dynamic To has a few options you can configure

NameDefault ValueDescription
uri Mandatory: The uri to use. See above
pattern To set a specific Exchange Pattern to use when sending to the endpoint. The original MEP is restored afterwards.
cacheSize Allows to configure the cache size for the ProducerCache which caches producers for reuse. Will by default use the default cache size which is 1000. Setting the value to -1 allows to turn off the cache all together.
ignoreInvalidEndpointfalseWhether to ignore an endpoint URI that could not be resolved. If disabled, Camel will throw an exception identifying the invalid endpoint URI.

 

For more details see

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Messaging Channels

Point to Point Channel

Camel supports the Point to Point Channel from the EIP patterns using the following components

  • SEDA for in-VM seda based messaging
  • JMS for working with JMS Queues for high performance, clustering and load balancing
  • JPA for using a database as a simple message queue
  • XMPP for point-to-point communication over XMPP (Jabber)
  • and others

The following example demonstrates point to point messaging using the JMS component 

Using the Fluent Builders

from("direct:start")
	.to("jms:queue:foo");

 

Using the Spring XML Extensions

<route>
	<from uri="direct:start"/>
	<to uri="jms:queue:foo"/>
</route>

 

 

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Publish Subscribe Channel

Camel supports the Publish Subscribe Channel from the EIP patterns using for example the following components:

  • JMS for working with JMS Topics for high performance, clustering and load balancing
  • XMPP when using rooms for group communication
  • SEDA for working with SEDA in the same CamelContext which can work in pub-sub, but allowing multiple consumers.
  • VM as SEDA but for intra-JVM.

Using Routing Logic

Another option is to explicitly list the publish-subscribe relationship in your routing logic; this keeps the producer and consumer decoupled but lets you control the fine grained routing configuration using the DSL or Xml Configuration.

Using the Fluent Builders

Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20

Using the Spring XML Extensions

Error rendering macro 'code': Invalid value specified for parameter 'java.lang.NullPointerException'
<camelContext errorHandlerRef="errorHandler" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:a"/>
        <multicast>
            <to uri="direct:b"/>
            <to uri="direct:c"/>
            <to uri="direct:d"/>
        </multicast>
    </route>
</camelContext>

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Dead Letter Channel

Camel supports the Dead Letter Channel from the EIP patterns using the DeadLetterChannel processor which is an Error Handler.

Differences Between The DeadLetterChannel And The DefaultErrorHandler

The DefaultErrorHandler does very little: it ends the Exchange immediately and propagates the thrown Exception back to the caller.

The DeadLetterChannel lets you control behaviors including redelivery, whether to propagate the thrown Exception to the caller (the handled option), and where the (failed) Exchange should now be routed to.

The DeadLetterChannel is also by default configured to not be verbose in the logs, so when a message is handled and moved to the dead letter endpoint, then there is nothing logged. If you want some level of logging you can use the various options on the redelivery policy / dead letter channel to configure this. For example if you want the message history then set logExhaustedMessageHistory=true (and logHandled=true for Camel 2.15.x or older).

When the DeadLetterChannel moves a message to the dead letter endpoint, any new Exception thrown is by default handled by the dead letter channel as well. This ensures that the DeadLetterChannel will always succeed. From Camel 2.15: this behavior can be changed by setting the option deadLetterHandleNewException=false. Then if a new Exception is thrown, then the dead letter channel will fail and propagate back that new Exception (which is the behavior of the default error handler). When a new Exception occurs then the dead letter channel logs this at WARN level. This can be turned off by setting logNewException=false.

Redelivery

It is common for a temporary outage or database deadlock to cause a message to fail to process; but the chances are if its tried a few more times with some time delay then it will complete fine. So we typically wish to use some kind of redelivery policy to decide how many times to try redeliver a message and how long to wait before redelivery attempts.

The RedeliveryPolicy defines how the message is to be redelivered. You can customize things like

  • The number of times a message is attempted to be redelivered before it is considered a failure and sent to the dead letter channel.
  • The initial redelivery timeout.
  • Whether or not exponential backoff is used, i.e., the time between retries increases using a backoff multiplier.
  • Whether to use collision avoidance to add some randomness to the timings.
  • Delay pattern (see below for details).
  • Camel 2.11: Whether to allow redelivery during stopping/shutdown.

Once all attempts at redelivering the message fails then the message is forwarded to the dead letter queue.

About Moving Exchange to Dead Letter Queue and Using handled()

handled() on Dead Letter Channel

When all attempts of redelivery have failed the Exchange is moved to the dead letter queue (the dead letter endpoint). The exchange is then complete and from the client point of view it was processed. As such the Dead Letter Channel have handled the Exchange.

For instance configuring the dead letter channel as:

Using the Fluent Builders

javaerrorHandler(deadLetterChannel("jms:queue:dead") .maximumRedeliveries(3).redeliveryDelay(5000));

Using the Spring XML Extensions

xml<route errorHandlerRef="myDeadLetterErrorHandler"> <!-- ... --> </route> <bean id="myDeadLetterErrorHandler" class="org.apache.camel.builder.DeadLetterChannelBuilder"> <property name="deadLetterUri" value="jms:queue:dead"/> <property name="redeliveryPolicy" ref="myRedeliveryPolicyConfig"/> </bean> <bean id="myRedeliveryPolicyConfig" class="org.apache.camel.processor.RedeliveryPolicy"> <property name="maximumRedeliveries" value="3"/> <property name="redeliveryDelay" value="5000"/> </bean>

The Dead Letter Channel above will clear the caused exception setException(null), by moving the caused exception to a property on the Exchange, with the key Exchange.EXCEPTION_CAUGHT. Then the Exchange is moved to the jms:queue:dead destination and the client will not notice the failure.

About Moving Exchange to Dead Letter Queue and Using the Original Message

The option useOriginalMessage is used for routing the original input message instead of the current message that potentially is modified during routing.

For instance if you have this route:

java from("jms:queue:order:input") .to("bean:validateOrder") .to("bean:transformOrder") .to("bean:handleOrder");

The route listen for JMS messages and validates, transforms and handle it. During this the Exchange payload is transformed/modified. So in case something goes wrong and we want to move the message to another JMS destination, then we can configure our Dead Letter Channel with the useOriginalMessage option. But when we move the Exchange to this destination we do not know in which state the message is in. Did the error happen in before the transformOrder or after? So to be sure we want to move the original input message we received from jms:queue:order:input. So we can do this by enabling the useOriginalMessage option as shown below:

java// will use original body errorHandler(deadLetterChannel("jms:queue:dead") .useOriginalMessage() .maximumRedeliveries(5) .redeliverDelay(5000);

Then the messages routed to the jms:queue:dead is the original input. If we want to manually retry we can move the JMS message from the failed to the input queue, with no problem as the message is the same as the original we received.

OnRedelivery

When Dead Letter Channel is doing redeliver its possible to configure a Processor that is executed just before every redelivery attempt. This can be used for the situations where you need to alter the message before its redelivered. See below for sample.

onException and onRedeliver

We also support for per onException to set an onRedeliver. That means you can do special on redelivery for different exceptions, as opposed to onRedelivery set on Dead Letter Channel can be viewed as a global scope.

Redelivery Default Values

Redelivery is disabled by default.

The default redeliver policy will use the following values:

  • maximumRedeliveries=0
  • redeliverDelay=1000L (1 second)
  • maximumRedeliveryDelay = 60 * 1000L (60 seconds)
  • backOffMultiplier and useExponentialBackOff are ignored.
  • retriesExhaustedLogLevel=LoggingLevel.ERROR
  • retryAttemptedLogLevel=LoggingLevel.DEBUG
  • Stack traces are logged for exhausted messages, from Camel 2.2.
  • Handled exceptions are not logged, from Camel 2.3.
  • logExhaustedMessageHistory is true for default error handler, and false for dead letter channel.
  • logExhaustedMessageBody Camel 2.17: is disabled by default to avoid logging sensitive message body/header details. If this option is true, then logExhaustedMessageHistory must also be true.

The maximum redeliver delay ensures that a delay is never longer than the value, default 1 minute. This can happen when useExponentialBackOff=true.

The maximumRedeliveries is the number of re-delivery attempts. By default Camel will try to process the exchange 1 + 5 times. 1 time for the normal attempt and then 5 attempts as redeliveries.
Setting the maximumRedeliveries=-1 (or < -1) will then always redelivery (unlimited).
Setting the maximumRedeliveries=0 will disable re-delivery.

Camel will log delivery failures at the DEBUG logging level by default. You can change this by specifying retriesExhaustedLogLevel and/or retryAttemptedLogLevel. See ExceptionBuilderWithRetryLoggingLevelSetTest for an example.

You can turn logging of stack traces on/off. If turned off Camel will still log the redelivery attempt. It's just much less verbose.

Redeliver Delay Pattern

Delay pattern is used as a single option to set a range pattern for delays. When a delay pattern is in use the following options no longer apply:

  • delay
  • backOffMultiplier
  • useExponentialBackOff
  • useCollisionAvoidance
  • maximumRedeliveryDelay

The idea is to set groups of ranges using the following syntax: limit:delay;limit 2:delay 2;limit 3:delay 3;...;limit N:delay N

Each group has two values separated with colon:

  • limit = upper limit
  • delay = delay in milliseconds
    And the groups is again separated with semi-colon. The rule of thumb is that the next groups should have a higher limit than the previous group.

Lets clarify this with an example:
delayPattern=5:1000;10:5000;20:20000

That gives us three groups:

  • 5:1000
  • 10:5000
  • 20:20000

Resulting in these delays between redelivery attempts:

  • Redelivery attempt number 1..4 = 0ms (as the first group start with 5)
  • Redelivery attempt number 5..9 = 1000ms (the first group)
  • Redelivery attempt number 10..19 = 5000ms (the second group)
  • Redelivery attempt number 20.. = 20000ms (the last group)

Note: The first redelivery attempt is 1, so the first group should start with 1 or higher.

You can start a group with limit 1 to e.g., have a starting delay: delayPattern=1:1000;5:5000

  • Redelivery attempt number 1..4 = 1000ms (the first group)
  • Redelivery attempt number 5.. = 5000ms (the last group)

There is no requirement that the next delay should be higher than the previous. You can use any delay value you like. For example with delayPattern=1:5000;3:1000 we start with 5 sec delay and then later reduce that to 1 second.

Redelivery header

When a message is redelivered the DeadLetterChannel will append a customizable header to the message to indicate how many times its been redelivered.
Before Camel 2.6: The header is CamelRedeliveryCounter, which is also defined on the Exchange.REDELIVERY_COUNTER.
From Camel 2.6: The header CamelRedeliveryMaxCounter, which is also defined on the Exchange.REDELIVERY_MAX_COUNTER, contains the maximum redelivery setting. This header is absent if you use retryWhile or have unlimited maximum redelivery configured.

And a boolean flag whether it is being redelivered or not (first attempt). The header CamelRedelivered contains a boolean if the message is redelivered or not, which is also defined on the Exchange.REDELIVERED.

Dynamically Calculated Delay From the Exchange

In Camel 2.9 and 2.8.2: The header is CamelRedeliveryDelay, which is also defined on the Exchange.REDELIVERY_DELAY. If this header is absent, normal redelivery rules apply.

Which Endpoint Failed

Available as of Camel 2.1

When Camel routes messages it will decorate the Exchange with a property that contains the last endpoint Camel send the Exchange to:

javaString lastEndpointUri = exchange.getProperty(Exchange.TO_ENDPOINT, String.class);

The Exchange.TO_ENDPOINT have the constant value CamelToEndpoint. This information is updated when Camel sends a message to any endpoint. So if it exists its the last endpoint which Camel send the Exchange to.

When for example processing the Exchange at a given Endpoint and the message is to be moved into the dead letter queue, then Camel also decorates the Exchange with another property that contains that last endpoint:

javaString failedEndpointUri = exchange.getProperty(Exchange.FAILURE_ENDPOINT, String.class);

The Exchange.FAILURE_ENDPOINT have the constant value CamelFailureEndpoint.

This allows for example you to fetch this information in your dead letter queue and use that for error reporting. This is usable if the Camel route is a bit dynamic such as the dynamic Recipient List so you know which endpoints failed.

Note: this information is retained on the Exchange even if the message is subsequently processed successfully by a given endpoint only to fail, for example, in local Bean processing instead. So, beware that this is a hint that helps pinpoint errors.

javafrom("activemq:queue:foo") .to("http://someserver/somepath") .beanRef("foo");

Now suppose the route above and a failure happens in the foo bean. Then the Exchange.TO_ENDPOINT and Exchange.FAILURE_ENDPOINT will still contain the value of http://someserver/somepath.

OnPrepareFailure

Available as of Camel 2.16

Before the exchange is sent to the dead letter queue, you can use onPrepare to allow a custom Processor to prepare the exchange, such as adding information why the Exchange failed.

For example, the following processor adds a header with the exception message:

java public static class MyPrepareProcessor implements Processor { @Override public void process(Exchange exchange) throws Exception { Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); exchange.getIn().setHeader("FailedBecause", cause.getMessage()); } }

Then configure the error handler to use the processor as follows:

javaerrorHandler(deadLetterChannel("jms:dead").onPrepareFailure(new MyPrepareProcessor()));

 

Configuring this from XML DSL is as follows:

xml<bean id="myPrepare" class="org.apache.camel.processor.DeadLetterChannelOnPrepareTest.MyPrepareProcessor"/> <errorHandler id="dlc" type="DeadLetterChannel" deadLetterUri="jms:dead" onPrepareFailureRef="myPrepare"/>

 

The onPrepare is also available using the default error handler.

Which Route Failed

Available as of Camel 2.10.4/2.11

When Camel error handler handles an error such as Dead Letter Channel or using Exception Clause with handled=true, then Camel will decorate the Exchange with the route id where the error occurred.

Example:

javaString failedRouteId = exchange.getProperty(Exchange.FAILURE_ROUTE_ID, String.class);

The Exchange.FAILURE_ROUTE_ID have the constant value CamelFailureRouteId. This allows for example you to fetch this information in your dead letter queue and use that for error reporting.

Control if Redelivery is Allowed During Stopping/Shutdown

Available as of Camel 2.11

Before Camel 2.10, Camel would perform redelivery while stopping a route, or shutting down Camel. This has improved a bit in Camel 2.10: Camel will no longer perform redelivery attempts when shutting down aggressively, e.g., during Graceful Shutdown and timeout hit.

From Camel 2.11: there is a new option allowRedeliveryWhileStopping which you can use to control if redelivery is allowed or not; notice that any in progress redelivery will still be executed. This option can only disallow any redelivery to be executed after the stopping of a route/shutdown of Camel has been triggered. If a redelivery is disallowed then a RejectedExcutionException is set on the Exchange and the processing of the Exchange stops. This means any consumer will see the Exchange as failed due the RejectedExcutionException. The default value is true for backward compatibility.

For example, the following snippet shows how to do this with Java DSL and XML DSL:{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNoRedeliveryOnShutdownTest.java}And the sample sample with XML DSL{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRedeliveryErrorHandlerNoRedeliveryOnShutdownTest.xml}

Samples

The following example shows how to configure the Dead Letter Channel configuration using the DSL{snippet:id=e3|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ErrorHandlerTest.java}You can also configure the RedeliveryPolicy as this example shows{snippet:id=e4|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ErrorHandlerTest.java}

How Can I Modify the Exchange Before Redelivery?

We support directly in Dead Letter Channel to set a Processor that is executed before each redelivery attempt. When Dead Letter Channel is doing redeliver its possible to configure a Processor that is executed just before every redelivery attempt. This can be used for the situations where you need to alter the message before its redelivered. Here we configure the Dead Letter Channel to use our processor MyRedeliveryProcessor to be executed before each redelivery.{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java}And this is the processor MyRedeliveryProcessor where we alter the message.{snippet:id=e2|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java}

How Can I Log What Caused the Dead Letter Channel to be Invoked?

You often need to know what went wrong that caused the Dead Letter Channel to be used and it does not offer logging for this purpose. So the Dead Letter Channel's endpoint can be set to a endpoint of our own (such as direct:deadLetterChannel). We write a route to accept this Exchange and log the Exception, then forward on to where we want the failed Exchange moved to (which might be a DLQ queue for instance). See also http://stackoverflow.com/questions/13711462/logging-camel-exceptions-and-sending-to-the-dead-letter-channel

Using This Pattern

Guaranteed Delivery

Camel supports the Guaranteed Delivery from the EIP patterns using among others the following components:

  • File for using file systems as a persistent store of messages
  • JMS when using persistent delivery (the default) for working with JMS Queues and Topics for high performance, clustering and load balancing
  • JPA for using a database as a persistence layer, or use any of the many other database component such as SQL, JDBC, iBATIS/MyBatis, Hibernate
  • HawtDB for a lightweight key-value persistent store

Example

The following example demonstrates illustrates the use of Guaranteed Delivery within the JMS component. By default, a message is not considered successfully delivered until the recipient has persisted the message locally guaranteeing its receipt in the event the destination becomes unavailable.

Using the Fluent Builders

from("direct:start")
	.to("jms:queue:foo");

 

Using the Spring XML Extensions

<route>
	<from uri="direct:start"/>
	<to uri="jms:queue:foo"/>
</route>

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Message Bus

Camel supports the Message Bus from the EIP patterns. You could view Camel as a Message Bus itself as it allows producers and consumers to be decoupled.

Folks often assume that a Message Bus is a JMS though so you may wish to refer to the JMS component for traditional MOM support.
Also worthy of note is the XMPP component for supporting messaging over XMPP (Jabber)

Of course there are also ESB products such as Apache ServiceMix which serve as full fledged message busses.
You can interact with Apache ServiceMix from Camel in many ways, but in particular you can use the NMR or JBI component to access the ServiceMix message bus directly.

 

Example

The following demonstrates how the Camel message bus can be used to communicate with consumers and producers


Using the Fluent Builders

from("direct:start")
	.pollEnrich("file:inbox?fileName=data.txt")
	.to("jms:queue:foo");

 

Using the Spring XML Extensions

<route>
	<from uri="direct:start"/>
	<pollEnrich uri="file:inbox?fileName=data.txt"/>
	<to uri="jms:queue:foo"/>
</route>

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Message Construction

Event Message

Camel supports the Event Message from the EIP patterns by supporting the Exchange Pattern on a Message which can be set to InOnly to indicate a oneway event message. Camel Components then implement this pattern using the underlying transport or protocols.

The default behaviour of many Components is InOnly such as for JMS, File or SEDA

Related

See the related Request Reply message.

Explicitly specifying InOnly

If you are using a component which defaults to InOut you can override the Exchange Pattern for an endpoint using the pattern property.

foo:bar?exchangePattern=InOnly

From 2.0 onwards on Camel you can specify the Exchange Pattern using the DSL.

Using the Fluent Builders

from("mq:someQueue").
  setExchangePattern(ExchangePattern.InOnly).
  bean(Foo.class);

or you can invoke an endpoint with an explicit pattern

from("mq:someQueue").
  inOnly("mq:anotherQueue");

Using the Spring XML Extensions

<route>
    <from uri="mq:someQueue"/>
    <inOnly uri="bean:foo"/>
</route>
<route>
    <from uri="mq:someQueue"/>
    <inOnly uri="mq:anotherQueue"/>
</route>

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Request Reply

Camel supports the Request Reply from the EIP patterns by supporting the Exchange Pattern on a Message which can be set to InOut to indicate a request/reply. Camel Components then implement this pattern using the underlying transport or protocols.

For example when using JMS with InOut the component will by default perform these actions

  • create by default a temporary inbound queue
  • set the JMSReplyTo destination on the request message
  • set the JMSCorrelationID on the request message
  • send the request message
  • consume the response and associate the inbound message to the request using the JMSCorrelationID (as you may be performing many concurrent request/responses).

Related

See the related Event Message message

Explicitly specifying InOut

When consuming messages from JMS a Request-Reply is indicated by the presence of the JMSReplyTo header.

You can explicitly force an endpoint to be in Request Reply mode by setting the exchange pattern on the URI. e.g.

jms:MyQueue?exchangePattern=InOut

You can specify the exchange pattern in DSL rule or Spring configuration.

Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20
Error rendering macro 'code': Invalid value specified for parameter 'java.lang.NullPointerException'
<camelContext xmlns="http://camel.apache.org/schema/spring">
  <!-- Send the exchange as InOnly -->
  <route>
    <from uri="direct:testInOut"/>
    <inOut uri="mock:result"/>
  </route>

  <!-- Send the exchange as InOnly -->
  <route>
    <from uri="direct:testInOnly"/>
    <inOnly uri="mock:result"/>
  </route>


  <!-- lets set the exchange pattern then send it on -->
  <route>
    <from uri="direct:testSetToInOnlyThenTo"/>
    <setExchangePattern pattern="InOnly"/>
    <to uri="mock:result"/>      
  </route>
  <route>
    <from uri="direct:testSetToInOutThenTo"/>
    <setExchangePattern pattern="InOut"/>
    <to uri="mock:result"/>
  </route>
  <route>
    <from uri="direct:testSetExchangePatternInOnly"/>
    <setExchangePattern pattern="InOnly"/>
    <to uri="mock:result"/>
  </route>


  <!-- Lets pass the pattern as an argument in the to element -->
  <route>
    <from uri="direct:testToWithInOnlyParam"/>
    <to uri="mock:result" pattern="InOnly"/>
  </route>
  <route>
    <from uri="direct:testToWithInOutParam"/>
    <to uri="mock:result" pattern="InOut"/>
  </route>
 </camelContext>

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Correlation Identifier

Camel supports the Correlation Identifier from the EIP patterns by getting or setting a header on a Message.

When working with the ActiveMQ or JMS components the correlation identifier header is called JMSCorrelationID. You can add your own correlation identifier to any message exchange to help correlate messages together to a single conversation (or business process).

The use of a Correlation Identifier is key to working with the Camel Business Activity Monitoring Framework and can also be highly useful when testing with simulation or canned data such as with the Mock testing framework

Some EIP patterns will spin off a sub message, and in those cases, Camel will add a correlation id on the Exchange as a property with they key Exchange.CORRELATION_ID, which links back to the source Exchange. For example the Splitter, Multicast, Recipient List, and Wire Tap EIP does this.

The following example demonstrates using the Camel JMSMessageID as the Correlation Identifier within a request/reply pattern in the JMS component

Using the Fluent Builders

from("direct:start")
	.to(ExchangePattern.InOut,"jms:queue:foo?useMessageIDAsCorrelationID=true")
	.to("mock:result");

 

Using the Spring XML Extensions

<route>
	<from uri="direct:start"/>
	<to uri="jms:queue:foo?useMessageIDAsCorrelationID=true" pattern="InOut"/>
	<to uri="mock:result"/>
</route>

See Also

Return Address

Camel supports the Return Address from the EIP patterns by using the JMSReplyTo header.

For example when using JMS with InOut the component will by default return to the address given in JMSReplyTo.

Requestor Code

getMockEndpoint("mock:bar").expectedBodiesReceived("Bye World");
template.sendBodyAndHeader("direct:start", "World", "JMSReplyTo", "queue:bar");

Route Using the Fluent Builders

from("direct:start").to("activemq:queue:foo?preserveMessageQos=true");
from("activemq:queue:foo").transform(body().prepend("Bye "));
from("activemq:queue:bar?disableReplyTo=true").to("mock:bar");

Route Using the Spring XML Extensions

<route>
  <from uri="direct:start"/>
  <to uri="activemq:queue:foo?preserveMessageQos=true"/>
</route>

<route>
  <from uri="activemq:queue:foo"/>
  <transform>
      <simple>Bye ${in.body}</simple>
  </transform>
</route>

<route>
  <from uri="activemq:queue:bar?disableReplyTo=true"/>
  <to uri="mock:bar"/>
</route>

For a complete example of this pattern, see this junit test case

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Message Routing

Content Based Router

The Content Based Router from the EIP patterns allows you to route messages to the correct destination based on the contents of the message exchanges.

The following example shows how to route a request from an input seda:a endpoint to either seda:b, seda:c or seda:d depending on the evaluation of various Predicate expressions

Using the Fluent Builders

 


RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        errorHandler(deadLetterChannel("mock:error"));
 
        from("direct:a")
            .choice()
                .when(header("foo").isEqualTo("bar"))
                    .to("direct:b")
                .when(header("foo").isEqualTo("cheese"))
                    .to("direct:c")
                .otherwise()
                    .to("direct:d");
    }
};

See Why can I not use when or otherwise in a Java Camel route if you have problems with the Java DSL, accepting using when or otherwise.

Using the Spring XML Extensions

 

<camelContext errorHandlerRef="errorHandler" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:a"/>
        <choice>
            <when>
                <xpath>$foo = 'bar'</xpath>
                <to uri="direct:b"/>
            </when>
            <when>
                <xpath>$foo = 'cheese'</xpath>
                <to uri="direct:c"/>
            </when>
            <otherwise>
                <to uri="direct:d"/>
            </otherwise>
        </choice>
    </route>
</camelContext>

For further examples of this pattern in use you could look at the junit test case

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Message Filter

The Message Filter from the EIP patterns allows you to filter messages

The following example shows how to create a Message Filter route consuming messages from an endpoint called queue:a, which if the Predicate is true will be dispatched to queue:b

Using the Fluent Builders{snippet:id=e2|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java}You can, of course, use many different Predicate languages such as XPath, XQuery, SQL or various Scripting Languages. Here is an XPath example{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/XPathFilterTest.java}Here's another example of using a bean to define the filter behavior:

javafrom("direct:start") .filter().method(MyBean.class, "isGoldCustomer").to("mock:result").end() .to("mock:end"); public static class MyBean { public boolean isGoldCustomer(@Header("level") String level) { return level.equals("gold"); } }

Using the Spring XML Extensions{snippet:id=example|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/xml/buildSimpleRouteWithHeaderPredicate.xml}You can also use a method call expression (to call a method on a bean) in the Message Filter, as shown below:

xml<bean id="myBean" class="com.foo.MyBean"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:a"/> <filter> <method ref="myBean" method="isGoldCustomer"/> <to uri="direct:b"/> </filter> </route> </camelContext>Filtered Endpoint Required Inside </filter> Tag

Ensure you put the endpoint you want to filter <to uri="seda:b"/> before the closing </filter> tag or the filter will not be applied. From Camel 2.8: omitting this will result in an error.

For further examples of this pattern in use you could look at the junit test case

Using stop()

Stop is a bit different than a message filter as it will filter out all messages and end the route entirely (filter only applies to its child processor). Stop is convenient to use in a Content Based Router when you for example need to stop further processing in one of the predicates.

In the example below we do not want to route messages any further that has the word Bye in the message body. Notice how we prevent this in the when() predicate by using the .stop().{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStopTest.java}

How To Determine If An  Exchange Was Filtered

Available as of Camel 2.5

The Message Filter EIP will add a property on the Exchange that states if it was filtered or not.

The property has the key Exchange.FILTER_MATCHED, which has the String value of CamelFilterMatched. Its value is a boolean indicating true or false. If the value is true then the Exchange was routed in the filter block. This property will be visible within the Message Filter block who's Predicate matches (value set to true), and to the steps immediately following the Message Filter with the value set based on the results of the last Message Filter Predicate evaluated.

Using This Pattern

Dynamic Router

The Dynamic Router from the EIP patterns allows you to route messages while avoiding the dependency of the router on all possible destinations while maintaining its efficiency.

In Camel 2.5 we introduced a dynamicRouter in the DSL which is like a dynamic Routing Slip which evaluates the slip on-the-fly.

Beware

You must ensure the expression used for the dynamicRouter such as a bean, will return null to indicate the end. Otherwise the dynamicRouter will keep repeating endlessly.

Options

confluenceTableSmall

Name

Default Value

Description

uriDelimiter

,

Delimiter used if the Expression returned multiple endpoints.

ignoreInvalidEndpoints

false

If an endpoint uri could not be resolved, should it be ignored. Otherwise Camel will thrown an exception stating the endpoint uri is not valid.

cacheSize

1000

Camel 2.13.1/2.12.4: Allows to configure the cache size for the ProducerCache which caches producers for reuse in the routing slip. Will by default use the default cache size which is 1000. Setting the value to -1 allows to turn off the cache all together.

Dynamic Router in Camel 2.5 onwards

From Camel 2.5 the Dynamic Router will set a property (Exchange.SLIP_ENDPOINT) on the Exchange which contains the current endpoint as it advanced though the slip. This allows you to know how far we have processed in the slip. (It's a slip because the Dynamic Router implementation is based on top of Routing Slip).

Java DSL

In Java DSL you can use the dynamicRouter as shown below:

{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterTest.java}

Which will leverage a Bean to compute the slip on-the-fly, which could be implemented as follows:

{snippet:id=e2|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterTest.java}

Mind that this example is only for show and tell. The current implementation is not thread safe. You would have to store the state on the Exchange, to ensure thread safety, as shown below:

{snippet:id=e2|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterExchangePropertiesTest.java}

You could also store state as message headers, but they are not guaranteed to be preserved during routing, where as properties on the Exchange are. Although there was a bug in the method call expression, see the warning below.

Using beans to store state

Mind that in Camel 2.9.2 or older, when using a Bean the state is not propagated, so you will have to use a Processor instead. This is fixed in Camel 2.9.3 onwards.

Spring XML

The same example in Spring XML would be:

{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringDynamicRouterTest.xml}

@DynamicRouter annotation

You can also use the @DynamicRouter annotation, for example the Camel 2.4 example below could be written as follows. The route method would then be invoked repeatedly as the message is processed dynamically. The idea is to return the next endpoint uri where to go. Return null to indicate the end. You can return multiple endpoints if you like, just as the Routing Slip, where each endpoint is separated by a delimiter.

javapublic class MyDynamicRouter { @Consume(uri = "activemq:foo") @DynamicRouter public String route(@XPath("/customer/id") String customerId, @Header("Location") String location, Document body) { // query a database to find the best match of the endpoint based on the input parameteres // return the next endpoint uri, where to go. Return null to indicate the end. } }

Dynamic Router in Camel 2.4 or older

The simplest way to implement this is to use the RecipientList Annotation on a Bean method to determine where to route the message.

javapublic class MyDynamicRouter { @Consume(uri = "activemq:foo") @RecipientList public List<String> route(@XPath("/customer/id") String customerId, @Header("Location") String location, Document body) { // query a database to find the best match of the endpoint based on the input parameteres ... } }

In the above we can use the Parameter Binding Annotations to bind different parts of the Message to method parameters or use an Expression such as using XPath or XQuery.

The method can be invoked in a number of ways as described in the Bean Integration such as

Using This Pattern

Recipient List

The Recipient List from the EIP patterns allows you to route messages to a number of dynamically specified recipients.

The recipients will receive a copy of the same Exchange, and Camel will execute them sequentially.

Options

confluenceTableSmall

Name

Default Value

Description

delimiter

,

Delimiter used if the Expression returned multiple endpoints (like "direct:foo,direct:bar"). From Camel 2.13 onwards this can be disabled by setting delimiter to "false".

strategyRef

 

An AggregationStrategy that will assemble the replies from recipients into a single outgoing message from the Recipient List. By default Camel will use the last reply as the outgoing message. From Camel 2.12 onwards you can also use a POJO as the AggregationStrategy, see the Aggregator page for more details. If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.

strategyMethodName

 

Camel 2.12: This option can be used to explicitly declare the method name to use, when using POJOs as the AggregationStrategy. See the Aggregator page for more details.

strategyMethodAllowNull

false

Camel 2.12: If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. See the Aggregator page for more details.

parallelProcessing

false

Camel 2.2: If enabled, messages are sent to the recipients concurrently. Note that the calling thread will still wait until all messages have been fully processed before it continues; it is the sending and processing of replies from recipients which happens in parallel.

 

parallelAggregate

false

Camel 2.14: If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe.

executorServiceRef

 

Camel 2.2: A custom Thread Pool to use for parallel processing. Note that enabling this option implies parallel processing, so you need not enable that option as well.

stopOnException

false

Camel 2.2: Whether to immediately stop processing when an exception occurs. If disabled, Camel will send the message to all recipients regardless of any individual failures. You can process exceptions in an AggregationStrategy implementation, which supports full control of error handling.

ignoreInvalidEndpoints

false

Camel 2.3: Whether to ignore an endpoint URI that could not be resolved. If disabled, Camel will throw an exception identifying the invalid endpoint URI.

streaming

false

Camel 2.5: If enabled, Camel will process replies out-of-order - that is, in the order received in reply from each recipient. If disabled, Camel will process replies in the same order as specified by the Expression. So this specifies whether the response messages are aggregated as they come in, or in the exact order as the recipient list was evaluated. Only relevant if you enable parallelProcessing.

timeout

 

Camel 2.5: Specifies a processing timeout in milliseconds. If the Recipient List hasn't been able to send and process all replies within this timeframe, then the timeout triggers and the Recipient List breaks out, with message flow continuing to the next element. Note that if you provide a TimeoutAwareAggregationStrategy, its timeout method is invoked before breaking out. Beware: If the timeout is reached with running tasks still remaining, certain tasks (for which it is difficult for Camel to shut down in a graceful manner) may continue to run. So use this option with caution. We may be able to improve this functionality in future Camel releases.

onPrepareRef

 

Camel 2.8: A custom Processor to prepare the copy of the Exchange each recipient will receive. This allows you to perform arbitrary transformations, such as deep-cloning the message payload (or any other custom logic).

shareUnitOfWork

false

Camel 2.8: Whether the unit of work should be shared. See the same option on Splitter for more details.

cacheSize

1000

Camel 2.13.1/2.12.4: Allows to configure the cache size for the ProducerCache which caches producers for reuse in the recipient list. Will by default use the default cache size which is 1000. Setting the value to -1 allows to turn off the cache completely.

Static Recipient List

The following example shows how to route a request from an input queue:a endpoint to a static list of destinations

Using Annotations
You can use the RecipientList Annotation on a POJO to create a Dynamic Recipient List. For more details see the Bean Integration.

Using the Fluent Builders{snippet:id=multicast|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java}Using the Spring XML Extensions{snippet:id=example|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/xml/buildStaticRecipientList.xml}

Dynamic Recipient List

Usually one of the main reasons for using the Recipient List pattern is that the list of recipients is dynamic and calculated at runtime. The following example demonstrates how to create a dynamic recipient list using an Expression (which in this case extracts a named header value dynamically) to calculate the list of endpoints which are either of type Endpoint or are converted to a String and then resolved using the endpoint URIs.

Using the Fluent Builders{snippet:id=e9|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java}The above assumes that the header contains a list of endpoint URIs. The following takes a single string header and tokenizes it{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListTest.java}

Iteratable value

The dynamic list of recipients that are defined in the header must be iterable such as:

  • java.util.Collection
  • java.util.Iterator
  • arrays
  • org.w3c.dom.NodeList
  • a single String with values separated by comma
  • any other type will be regarded as a single value

Using the Spring XML Extensions{snippet:id=example|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/xml/buildDynamicRecipientList.xml}For further examples of this pattern in action you could take a look at one of the junit test cases.

Using delimiter in Spring XML

In Spring DSL you can set the delimiter attribute for setting a delimiter to be used if the header value is a single String with multiple separated endpoints. By default Camel uses comma as delimiter, but this option lets you specify a custom delimiter to use instead.{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/recipientListWithStringDelimitedHeader.xml}So if myHeader contains a String with the value "activemq:queue:foo, activemq:topic:hello , log:bar" then Camel will split the String using the delimiter given in the XML that was comma, resulting into 3 endpoints to send to. You can use spaces between the endpoints as Camel will trim the value when it lookup the endpoint to send to.

Note: In Java DSL you use the tokenizer to achieve the same. The route above in Java DSL:

from("direct:a").recipientList(header("myHeader").tokenize(","));

In Camel 2.1 its a bit easier as you can pass in the delimiter as 2nd parameter:

from("direct:a").recipientList(header("myHeader"), "#");

Sending to multiple recipients in parallel

Available as of Camel 2.2

The Recipient List now supports parallelProcessing that for example Splitter also supports. You can use it to use a thread pool to have concurrent tasks sending the Exchange to multiple recipients concurrently.

from("direct:a").recipientList(header("myHeader")).parallelProcessing();

And in Spring XML it is an attribute on the recipient list tag.

<route> <from uri="direct:a"/> <recipientList parallelProcessing="true"> <header>myHeader</header> </recipientList> </route>

Stop continuing in case one recipient failed

Available as of Camel 2.2

The Recipient List now supports stopOnException that for example Splitter also supports. You can use it to stop sending to any further recipients in case any recipient failed.

from("direct:a").recipientList(header("myHeader")).stopOnException();

And in Spring XML its an attribute on the recipient list tag.

<route> <from uri="direct:a"/> <recipientList stopOnException="true"> <header>myHeader</header> </recipientList> </route>

Note: You can combine parallelProcessing and stopOnException and have them both true.

Ignore invalid endpoints

Available as of Camel 2.3

The Recipient List now supports ignoreInvalidEndpoints (like the Routing Slip). You can use it to skip endpoints which are invalid.

from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();

And in Spring XML it is an attribute on the recipient list tag.

<route> <from uri="direct:a"/> <recipientList ignoreInvalidEndpoints="true"> <header>myHeader</header> </recipientList> </route>

Then let us say the myHeader contains the following two endpoints direct:foo,xxx:bar. The first endpoint is valid and works. However the second one is invalid and will just be ignored. Camel logs at INFO level about it, so you can see why the endpoint was invalid.

Using custom AggregationStrategy

Available as of Camel 2.2

You can now use your own AggregationStrategy with the Recipient List. However this is rarely needed. What it is good for is that in case you are using Request Reply messaging then the replies from the recipients can be aggregated. By default Camel uses UseLatestAggregationStrategy which just keeps that last received reply. If you must remember all the bodies that all the recipients sent back, then you can use your own custom aggregator that keeps those. It is the same principle as with the Aggregator EIP so check it out for details.

from("direct:a") .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy()) .to("direct:b");

And in Spring XML it is again an attribute on the recipient list tag.

<route> <from uri="direct:a"/> <recipientList strategyRef="myStrategy"> <header>myHeader</header> </recipientList> <to uri="direct:b"/> </route> <bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>

Knowing which endpoint when using custom AggregationStrategy

Available as of Camel 2.12

When using a custom AggregationStrategy then the aggregate method is always invoked in sequential order (also if parallel processing is enabled) of the endpoints the Recipient List is using. However from Camel 2.12 onwards this is easier to know as the newExchange Exchange now has a property stored (key is Exchange.RECIPIENT_LIST_ENDPOINT with the uri of the Endpoint. So you know which endpoint you are aggregating from. The code block shows how to access this property in your Aggregator

@Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { String uri = newExchange.getProperty(Exchange.RECIPIENT_LIST_ENDPOINT, String.class); ... }

Using custom thread pool

Available as of Camel 2.2

A thread pool is only used for parallelProcessing. You supply your own custom thread pool via the ExecutorServiceStrategy (see Camel's Threading Model), the same way you would do it for the aggregationStrategy. By default Camel uses a thread pool with 10 threads (subject to change in future versions).

Using method call as recipient list

You can use a Bean to provide the recipients, for example:

from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");

And then MessageRouter:

public class MessageRouter { public String routeTo() { String queueName = "activemq:queue:test2"; return queueName; } }

When you use a Bean then do not use the @RecipientList annotation as this will in fact add yet another recipient list, so you end up having two. Do not do the following.

public class MessageRouter { @RecipientList public String routeTo() { String queueName = "activemq:queue:test2"; return queueName; } }

You should only use the snippet above (using @RecipientList) if you just route to a Bean which you then want to act as a recipient list.
So the original route can be changed to:

from("activemq:queue:test").bean(MessageRouter.class, "routeTo");

Which then would invoke the routeTo method and detect that it is annotated with @RecipientList and then act accordingly as if it was a recipient list EIP.

Using timeout

Available as of Camel 2.5

If you use parallelProcessing then you can configure a total timeout value in millis. Camel will then process the messages in parallel until the timeout is hit. This allows you to continue processing if one message consumer is slow. For example you can set a timeout value of 20 sec.

Tasks may keep running

If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel to shut down in a graceful manner may continue to run. So use this option with a bit of care. We may be able to improve this functionality in future Camel releases.

For example in the unit test below you can see that we multicast the message to 3 destinations. We have a timeout of 2 seconds, which means only the last two messages can be completed within the timeframe. This means we will only aggregate the last two which yields a result aggregation which outputs "BC".{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutTest.java}

Timeout in other EIPs

This timeout feature is also supported by Splitter and both multicast and recipientList.

By default if a timeout occurs the AggregationStrategy is not invoked. However you can implement a special version

javaTimeoutAwareAggregationStrategyTimeoutAwareAggregationStrategypublic interface TimeoutAwareAggregationStrategy extends AggregationStrategy { /** * A timeout occurred * * @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange) * @param index the index * @param total the total * @param timeout the timeout value in millis */ void timeout(Exchange oldExchange, int index, int total, long timeout);

This allows you to deal with the timeout in the AggregationStrategy if you really need to.

Timeout is total

The timeout is total, which means that after X time, Camel will aggregate the messages which have completed within the timeframe. The remainders will be cancelled. Camel will also only invoke the timeout method in the TimeoutAwareAggregationStrategy once, for the first index which caused the timeout.

Using onPrepare to execute custom logic when preparing messages

Available as of Camel 2.8

See details at Multicast

Using ExchangePattern in recipients

Available as of Camel 2.15

The recipient list will by default use the current Exchange Pattern. Though one can imagine use-cases where one wants to send a message to a recipient using a different exchange pattern. For example you may have a route that initiates as an InOnly route, but want to use InOut exchange pattern with a recipient list. To do this in earlier Camel releases, you would need to change the exchange pattern before the recipient list, or use onPrepare option to alter the pattern. From Camel 2.15 onwards, you can configure the exchange pattern directly in the recipient endpoints.

For example in the route below we pick up new files (which will be started as InOnly) and then route to a recipient list. As we want to use InOut with the ActiveMQ (JMS) endpoint we can now specify this using the exchangePattern=InOut option. Then the response from the JMS request/reply will then be continued routed, and thus the response is what will be stored in as a file in the outbox directory.

from("file:inbox") // the exchange pattern is InOnly initially when using a file route .recipientList().constant("activemq:queue:inbox?exchangePattern=InOut") .to("file:outbox");

The recipient list will not alter the original exchange pattern. So in the example above the exchange pattern will still be InOnly when the message is routed to the file:outbox endpoint.

If you want to alter the exchange pattern permanently then use the .setExchangePattern option. See more details at Request Reply and Event Message.

 

 

Using This Pattern

Splitter

The Splitter from the EIP patterns allows you split a message into a number of pieces and process them individually

You need to specify a Splitter as split(). In earlier versions of Camel, you need to use splitter().

Options

confluenceTableSmall

Name

Default Value

Description

strategyRef

 

Refers to an AggregationStrategy to be used to assemble the replies from the sub-messages, into a single outgoing message from the Splitter. See the defaults described below in What the Splitter returns. From Camel 2.12 onwards you can also use a POJO as the AggregationStrategy, see the Aggregate page for more details. If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.

strategyMethodName

 

Camel 2.12: This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

strategyMethodAllowNull

false

Camel 2.12: If this option is false then the aggregate method is not used for the very first splitted message. If this option is true then null values is used as the oldExchange (for the very first message splitted), when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

parallelProcessing

false

If enabled then processing the sub-messages occurs concurrently. Note the caller thread will still wait until all sub-messages has been fully processed, before it continues.

 

parallelAggregate

false

Camel 2.14: If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to achieve higher performance when the AggregationStrategy is implemented as thread-safe.

executorServiceRef

 

Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatically implied, and you do not have to enable that option as well.

stopOnException

false

Camel 2.2: Whether or not to stop continue processing immediately when an exception occurred. If disable, then Camel continue splitting and process the sub-messages regardless if one of them failed. You can deal with exceptions in the AggregationStrategy class where you have full control how to handle that.

streaming

false

If enabled then Camel will split in a streaming fashion, which means it will split the input message in chunks. This reduces the memory overhead. For example if you split big messages its recommended to enable streaming. If streaming is enabled then the sub-message replies will be aggregated out-of-order, eg in the order they come back. If disabled, Camel will process sub-message replies in the same order as they where splitted.

timeout

 

Camel 2.5: Sets a total timeout specified in millis. If the Recipient List hasn't been able to split and process all replies within the given timeframe, then the timeout triggers and the Splitter breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel to shut down in a graceful manner may continue to run. So use this option with a bit of care. We may be able to improve this functionality in future Camel releases.

onPrepareRef

 

Camel 2.8: Refers to a custom Processor to prepare the sub-message of the Exchange, before its processed. This allows you to do any custom logic, such as deep-cloning the message payload if that's needed etc.

shareUnitOfWork

false

Camel 2.8: Whether the unit of work should be shared. See further below for more details.

Exchange properties

The following properties are set on each Exchange that are split:

property

type

description

CamelSplitIndex

int

A split counter that increases for each Exchange being split. The counter starts from 0.

CamelSplitSize

int

The total number of Exchanges that was splitted. This header is not applied for stream based splitting. From Camel 2.9 onwards this header is also set in stream based splitting, but only on the completed Exchange.

CamelSplitComplete

boolean

Camel 2.4: Whether or not this Exchange is the last.

Examples

The following example shows how to take a request from the direct:a endpoint the split it into pieces using an Expression, then forward each piece to direct:b

Using the Fluent Builders{snippet:id=splitter|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java}The splitter can use any Expression language so you could use any of the Languages Supported such as XPath, XQuery, SQL or one of the Scripting Languages to perform the split. e.g.

from("activemq:my.queue").split(xpath("//foo/bar")).convertBodyTo(String.class).to("file://some/directory")

Using the Spring XML Extensions{snippet:id=example|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/xml/buildSplitter.xml}For further examples of this pattern in use you could look at one of the junit test case

Splitting a Collection, Iterator or Array

A common use case is to split a Collection, Iterator or Array from the message. In the sample below we simply use an Expression to identify the value to split.

javafrom("direct:splitUsingBody").split(body()).to("mock:result"); from("direct:splitUsingHeader").split(header("foo")).to("mock:result"); 

In Spring XML you can use the Simple language to identify the value to split.

xml<split> <simple>${body}</simple> <to uri="mock:result"/> </split> <split> <simple>${header.foo}</simple> <to uri="mock:result"/> </split>  

Using Tokenizer from Spring XML Extensions*

You can use the tokenizer expression in the Spring DSL to split bodies or headers using a token. This is a common use-case, so we provided a special tokenizer tag for this.
In the sample below we split the body using a @ as separator. You can of course use comma or space or even a regex pattern, also set regex=true.{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterTokenizerTest.xml}

What the Splitter returns

Camel 2.2 or older:
The Splitter will by default return the last splitted message.

Camel 2.3 and newer
The Splitter will by default return the original input message.

For all versions
You can override this by suppling your own strategy as an AggregationStrategy. There is a sample on this page (Split aggregate request/reply sample). Notice its the same strategy as the Aggregator supports. This Splitter can be viewed as having a build in light weight Aggregator.

Parallel execution of distinct 'parts'

If you want to execute all parts in parallel you can use special notation of split() with two arguments, where the second one is a boolean flag if processing should be parallel. e.g.

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); from("activemq:my.queue").split(xPathBuilder, true).to("activemq:my.parts");

The boolean option has been refactored into a builder method parallelProcessing so its easier to understand what the route does when we use a method instead of true|false.

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); from("activemq:my.queue").split(xPathBuilder).parallelProcessing().to("activemq:my.parts");

Stream based

Splitting big XML payloads

The XPath engine in Java and saxon will load the entire XML content into memory. And thus they are not well suited for very big XML payloads.
Instead you can use a custom Expression which will iterate the XML payload in a streamed fashion. From Camel 2.9 onwards you can use the Tokenizer language
which supports this when you supply the start and end tokens. From Camel 2.14, you can use the XMLTokenizer language which is specifically provided for tokenizing XML documents.

You can split streams by enabling the streaming mode using the streaming builder method.

from("direct:streaming").split(body().tokenize(",")).streaming().to("activemq:my.parts");

You can also supply your custom splitter to use with streaming like this:

import static org.apache.camel.builder.ExpressionBuilder.beanExpression; from("direct:streaming") .split(beanExpression(new MyCustomIteratorFactory(), "iterator")) .streaming().to("activemq:my.parts")

Streaming big XML payloads using Tokenizer language

There are two tokenizers that can be used to tokenize an XML payload. The first tokenizer uses the same principle as in the text tokenizer to scan the XML payload and extract a sequence of tokens.

Available as of Camel 2.9
If you have a big XML payload, from a file source, and want to split it in streaming mode, then you can use the Tokenizer language with start/end tokens to do this with low memory footprint.

StAX component

The Camel StAX component can also be used to split big XML files in a streaming mode. See more details at StAX.

For example you may have a XML payload structured as follows

xml<orders> <order> <!-- order stuff here --> </order> <order> <!-- order stuff here --> </order> ... <order> <!-- order stuff here --> </order> </orders>

Now to split this big file using XPath would cause the entire content to be loaded into memory. So instead we can use the Tokenizer language to do this as follows:

from("file:inbox") .split().tokenizeXML("order").streaming() .to("activemq:queue:order");

In XML DSL the route would be as follows:

xml<route> <from uri="file:inbox"/> <split streaming="true"> <tokenize token="order" xml="true"/> <to uri="activemq:queue:order"/> </split> </route>

Notice the tokenizeXML method which will split the file using the tag name of the child node (more precisely speaking, the local name of the element without its namespace prefix if any), which mean it will grab the content between the <order> and </order> tags (incl. the tokens). So for example a splitted message would be as follows:

xml <order> <!-- order stuff here --> </order>

If you want to inherit namespaces from a root/parent tag, then you can do this as well by providing the name of the root/parent tag:

xml<route> <from uri="file:inbox"/> <split streaming="true"> <tokenize token="order" inheritNamespaceTagName="orders" xml="true"/> <to uri="activemq:queue:order"/> </split> </route>

And in Java DSL its as follows:

from("file:inbox") .split().tokenizeXML("order", "orders").streaming() .to("activemq:queue:order");

Available as of Camel 2.13.1, you can set the above inheritNamsepaceTagName property to "*" to include the preceding context in each token (i.e., generating each token enclosed in its ancestor elements). It is noted that each token must share the same ancestor elements in this case.

The above tokenizer works well on simple structures but has some inherent limitations in handling more complex XML structures.

Available as of Camel 2.14

The second tokenizer uses a StAX parser to overcome these limitations. This tokenizer recognizes XML namespaces and also handles simple and complex XML structures more naturally and efficiently. 

To split using this tokenizer at {urn:shop}order, we can write

Namespaces ns = new Namespaces("ns1", "urn:shop"); ... from("file:inbox") .split().xtokenize("//ns1:order", 'i', ns).streaming() .to("activemq:queue:order)

Two arguments control the behavior of the tokenizer. The first argument specifies the element using a path notation. This path notation uses a subset of xpath with wildcard support. The second argument represents the extraction mode. The available extraction modes are:

modedescription
iinjecting the contextual namespace bindings into the extracted token (default)
wwrapping the extracted token in its ancestor context
uunwrapping the extracted token to its child content
textracting the text content of the specified element

 Having an input XML

xml<m:orders xmlns:m="urn:shop" xmlns:cat="urn:shop:catalog"> <m:order><id>123</id><date>2014-02-25</date>...</m:order> ...

Each mode will result in the following tokens, 

i
<m:order xmlns:m="urn:shop" xmlns:cat="urn:shop:catalog"><id>123</id><date>2014-02-25</date>...</m:order>
w
<m:orders xmlns:m="urn:shop" xmlns:cat="urn:shop:catalog">
  <m:order><id>123</id><date>2014-02-25</date>...</m:order></m:orders>
u
<id>123</id><date>2014-02-25</date>...
t
1232014-02-25...

 In XML DSL, the equivalent route would be written as follows:

xml<camelContext xmlns:ns1="urn:shop"> <route> <from uri="file:inbox"/> <split streaming="true"> <xtokenize>//ns1:order</xtokenize> <to uri="activemq:queue:order"/> </split> </route> </camelContext>

 or setting the extraction mode explicitly as

xml ... <xtokenize mode="i">//ns1:order</xtokenize> ...

Note that this StAX based tokenizer's uses StAX Location API and requires a StAX Reader implementation (e.g., woodstox) that correctly returns the offset position pointing to the beginning of each event triggering segment (e.g., the offset position of '<' at each start and end element event). If you use a StAX Reader which does not implement that API correctly it results in invalid xml snippets after the split. For example the snippet could be wrong terminated:

<Start>...<</Start> .... <Start>...</</Start>

Splitting files by grouping N lines together

Available as of Camel 2.10

The Tokenizer language has a new option group that allows you to group N parts together, for example to split big files into chunks of 1000 lines.

from("file:inbox") .split().tokenize("\n", 1000).streaming() .to("activemq:queue:order");

And in XML DSL

xml<route> <from uri="file:inbox"/> <split streaming="true"> <tokenize token="\n" group="1000"/> <to uri="activemq:queue:order"/> </split> </route>

The group option is a number that must be a positive number that dictates how many groups to combine together. Each part will be combined using the token.
So in the example above the message being sent to the activemq order queue, will contain 1000 lines, and each line separated by the token (which is a new line token).
The output when using the group option is always a java.lang.String type.

Specifying a custom aggregation strategy

This is specified similar to the Aggregator.

Specifying a custom ThreadPoolExecutor

You can customize the underlying ThreadPoolExecutor used in the parallel splitter. In the Java DSL try something like this:

JavaXPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); ExecutorService pool = ... from("activemq:my.queue") .split(xPathBuilder).executorService(pool) .to("activemq:my.parts");

Using a Pojo to do the splitting

As the Splitter can use any Expression to do the actual splitting we leverage this fact and use a method expression to invoke a Bean to get the splitted parts.
The Bean should return a value that is iterable such as: java.util.Collection, java.util.Iterator or an array.
So the returned value, will then be used by Camel at runtime, to split the message.

Streaming mode and using pojo

When you have enabled the streaming mode, then you should return a Iterator to ensure streamish fashion. For example if the message is a big file, then by using an iterator, that returns a piece of the file in chunks, in the next method of the Iterator ensures low memory footprint. This avoids the need for reading the entire content into memory. For an example see the source code for the TokenizePair implementation.

In the route we define the Expression as a method call to invoke our Bean that we have registered with the id mySplitterBean in the Registry.{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterPojoTest.java}And the logic for our Bean is as simple as. Notice we use Camel Bean Binding to pass in the message body as a String object.{snippet:id=e2|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterPojoTest.java}

Split aggregate request/reply sample

This sample shows how you can split an Exchange, process each splitted message, aggregate and return a combined response to the original caller using request/reply.

The route below illustrates this and how the split supports a aggregationStrategy to hold the in progress processed messages:{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitAggregateInOutTest.java}And the OrderService bean is as follows:{snippet:id=e2|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitAggregateInOutTest.java}And our custom aggregationStrategy that is responsible for holding the in progress aggregated message that after the splitter is ended will be sent to the buildCombinedResponse method for final processing before the combined response can be returned to the waiting caller.{snippet:id=e3|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitAggregateInOutTest.java}So lets run the sample and see how it works.
We send an Exchange to the direct:start endpoint containing a IN body with the String value: A@B@C. The flow is:

HandleOrder: A HandleOrder: B Aggregate old orders: (id=1,item=A) Aggregate new order: (id=2,item=B) HandleOrder: C Aggregate old orders: (id=1,item=A);(id=2,item=B) Aggregate new order: (id=3,item=C) BuildCombinedResponse: (id=1,item=A);(id=2,item=B);(id=3,item=C) Response to caller: Response[(id=1,item=A);(id=2,item=B);(id=3,item=C)]

Stop processing in case of exception

Available as of Camel 2.1

The Splitter will by default continue to process the entire Exchange even in case of one of the splitted message will thrown an exception during routing.
For example if you have an Exchange with 1000 rows that you split and route each sub message. During processing of these sub messages an exception is thrown at the 17th. What Camel does by default is to process the remainder 983 messages. You have the chance to remedy or handle this in the AggregationStrategy.

But sometimes you just want Camel to stop and let the exception be propagated back, and let the Camel error handler handle it. You can do this in Camel 2.1 by specifying that it should stop in case of an exception occurred. This is done by the stopOnException option as shown below:

from("direct:start") .split(body().tokenize(",")).stopOnException() .process(new MyProcessor()) .to("mock:split");

And using XML DSL you specify it as follows:

xml <route> <from uri="direct:start"/> <split stopOnException="true"> <tokenize token=","/> <process ref="myProcessor"/> <to uri="mock:split"/> </split> </route>

Using onPrepare to execute custom logic when preparing messages

Available as of Camel 2.8

See details at Multicast

Sharing unit of work

Available as of Camel 2.8

The Splitter will by default not share unit of work between the parent exchange and each splitted exchange. This means each sub exchange has its own individual unit of work.

For example you may have an use case, where you want to split a big message. And you want to regard that process as an atomic isolated operation that either is a success or failure. In case of a failure you want that big message to be moved into a dead letter queue. To support this use case, you would have to share the unit of work on the Splitter.

Here is an example in Java DSL{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java}Now in this example what would happen is that in case there is a problem processing each sub message, the error handler will kick in (yes error handling still applies for the sub messages). But what doesn't happen is that if a sub message fails all redelivery attempts (its exhausted), then its not moved into that dead letter queue. The reason is that we have shared the unit of work, so the sub message will report the error on the shared unit of work. When the Splitter is done, it checks the state of the shared unit of work and checks if any errors occurred. And if an error occurred it will set the exception on the Exchange and mark it for rollback. The error handler will yet again kick in, as the Exchange has been marked as rollback and it had an exception as well. No redelivery attempts is performed (as it was marked for rollback) and the Exchange will be moved into the dead letter queue.

Using this from XML DSL is just as easy as you just have to set the shareUnitOfWork attribute to true:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringSplitSubUnitOfWorkTest.xml}

Implementation of shared unit of work

So in reality the unit of work is not shared as a single object instance. Instead SubUnitOfWork is attached to their parent, and issues callback to the parent about their status (commit or rollback). This may be refactored in Camel 3.0 where larger API changes can be done.

Using This Pattern

Aggregator

This applies for Camel version 2.3 or newer. If you use an older version then use this Aggregator link instead.

The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message.

A correlation Expression is used to determine the messages which should be aggregated together. If you want to aggregate all messages into a single message, just use a constant expression. An AggregationStrategy is used to combine all the message exchanges for a single correlation key into a single message exchange.

Aggregator options

The aggregator supports the following options:

confluenceTableSmall

Option

Default

Description

correlationExpression

 

Mandatory Expression which evaluates the correlation key to use for aggregation. The Exchange which has the same correlation key is aggregated together. If the correlation key could not be evaluated an Exception is thrown. You can disable this by using the ignoreBadCorrelationKeys option.

aggregationStrategy

 

Mandatory AggregationStrategy which is used to merge the incoming Exchange with the existing already merged exchanges. At first call the oldExchange parameter is null. On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange.

From Camel 2.9.2 onwards the strategy can also be a TimeoutAwareAggregationStrategy implementation, supporting the timeout callback, see further below for more details.

From Camel 2.16: the strategy can also be a PreCompletionAwareAggregationStrategy implementation which then runs the completion check in pre-completion mode. See further below for more details.

strategyRef

 

A reference to lookup the AggregationStrategy in the Registry. From Camel 2.12 onwards you can also use a POJO as the AggregationStrategy, see further below for details.

strategyMethodName

 

Camel 2.12: This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. See further below for more details.

strategyMethodAllowNull

false

Camel 2.12: If this option is false then the aggregate method is not used for the very first aggregation. If this option is true then null values is used as the oldExchange (at the very first aggregation), when using POJOs as the AggregationStrategy. See further below for more details.

completionSize

 

Number of messages aggregated before the aggregation is complete. This option can be set as either a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.

completionTimeout

 

Time in millis that an aggregated exchange should be inactive before its complete. This option can be set as either a fixed value or using an Expression which allows you to evaluate a timeout dynamically - will use Long as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. You cannot use this option together with completionInterval, only one of the two can be used.

completionInterval

 

A repeating period in millis by which the aggregator will complete all current aggregated exchanges. Camel has a background task which is triggered every period. You cannot use this option together with completionTimeout, only one of them can be used.

completionPredicate

 

A Predicate to indicate when an aggregated exchange is complete.

From Camel 2.15: if this is not specified and the AggregationStrategy object implements Predicate, the aggregationStrategy object will be used as the completionPredicate.

completionFromBatchConsumer

false

This option is if the exchanges are coming from a Batch Consumer. Then when enabled the Aggregator2 will use the batch size determined by the Batch Consumer in the message header CamelBatchSize. See more details at Batch Consumer. This can be used to aggregate all files consumed from a File endpoint in that given poll.

forceCompletionOnStop

false

Camel 2.9 Indicates to complete all current aggregated exchanges when the context is stopped

completeAllOnStopfalseCamel 2.16: Indicates to wait to complete all current and partial (pending) aggregated exchanges when the context is stopped. This also means that we will wait for all pending exchanges which are stored in the aggregation repository to complete so the repository is empty before we can stop.  You may want to enable this when using the memory based aggregation repository that is memory based only, and do not store data on disk. When this option is enabled, then the aggregator is waiting to complete all those exchanges before its stopped, when stopping CamelContext or the route using it.

eagerCheckCompletion

false

Whether or not to eager check for completion when a new incoming Exchange has been received. This option influences the behavior of the completionPredicate option as the Exchange being passed in changes accordingly. When false the Exchange passed in the Predicate is the aggregated Exchange which means any information you may store on the aggregated Exchange from the AggregationStrategy is available for the Predicate. When true the Exchange passed in the Predicate is the incoming Exchange, which means you can access data from the incoming Exchange.

groupExchanges

false

If enabled then Camel will group all aggregated Exchanges into a single combined org.apache.camel.impl.GroupedExchange holder class that holds all the aggregated Exchanges. And as a result only one Exchange is being sent out from the aggregator. Can be used to combine many incoming Exchanges into a single output Exchange without coding a custom AggregationStrategy yourself.

Note: this option does not support persistent repository with the aggregator. See further below for an example and more details.

ignoreInvalidCorrelationKeys

false

Whether or not to ignore correlation keys which could not be evaluated to a value. By default Camel will throw an Exception, but you can enable this option and ignore the situation instead.

closeCorrelationKeyOnCompletion

 

Whether or not too late Exchanges should be accepted or not. You can enable this to indicate that if a correlation key has already been completed, then any new exchanges with the same correlation key be denied. Camel will then throw a closedCorrelationKeyException exception. When using this option you pass in a integer which is a number for a LRUCache which keeps that last X number of closed correlation keys. You can pass in 0 or a negative value to indicate a unbounded cache. By passing in a number you are ensured that cache won't grow too big if you use a log of different correlation keys.

discardOnCompletionTimeout

false

Camel 2.5: Whether or not exchanges which complete due to a timeout should be discarded. If enabled then when a timeout occurs the aggregated message will not be sent out but dropped (discarded).

aggregationRepository

 

Allows you to plugin you own implementation of org.apache.camel.spi.AggregationRepository which keeps track of the current inflight aggregated exchanges. Camel uses by default a memory based implementation.

aggregationRepositoryRef

 

Reference to lookup a aggregationRepository in the Registry.

parallelProcessing

false

When aggregated are completed they are being send out of the aggregator. This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads.

executorService

 

If using parallelProcessing you can specify a custom thread pool to be used. In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well.

executorServiceRef

 

Reference to lookup a executorService in the Registry

timeoutCheckerExecutorService

 

Camel 2.9: If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a background thread is created to check for the completion for every aggregator. Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator.

timeoutCheckerExecutorServiceRef

 

Camel 2.9: Reference to lookup a timeoutCheckerExecutorService in the Registry

optimisticLocking

false

Camel 2.11: Turns on using optimistic locking, which requires the aggregationRepository being used, is supporting this by implementing the org.apache.camel.spi.OptimisticLockingAggregationRepository interface.

optimisticLockRetryPolicy

 

Camel 2.11.1: Allows to configure retry settings when using optimistic locking.

Exchange Properties

The following properties are set on each aggregated Exchange:

confluenceTableSmall

Header

Type

Description

CamelAggregatedSize

int

The total number of Exchanges aggregated into this combined Exchange.

CamelAggregatedCompletedBy

String

Indicator how the aggregation was completed as a value of either: predicate, size, strategy, consumer, timeout, forceCompletion or interval.

About AggregationStrategy

The AggregationStrategy is used for aggregating the old (lookup by its correlation id) and the new exchanges together into a single exchange. Possible implementations include performing some kind of combining or delta processing, such as adding line items together into an invoice or just using the newest exchange and removing old exchanges such as for state tracking or market data prices; where old values are of little use.

Notice the aggregation strategy is a mandatory option and must be provided to the aggregator.

Here are a few example AggregationStrategy implementations that should help you create your own custom strategy.

class ArrayListAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Object newBody = newExchange.getIn().getBody(); ArrayList

Resequencer

The Resequencer from the EIP patterns allows you to reorganise messages based on some comparator. By default in Camel we use an Expression to create the comparator; so that you can compare by a message header or the body or a piece of a message etc.

Change in Camel 2.7

The <batch-config> and <stream-config> tags in XML DSL in the Resequencer EIP must now be configured in the top, and not in the bottom. So if you use those, then move them up just below the <resequence> EIP starts in the XML. If you are using Camel older than 2.7, then those configs should be at the bottom.

Camel supports two resequencing algorithms:

  • Batch resequencing collects messages into a batch, sorts the messages and sends them to their output.
  • Stream resequencing re-orders (continuous) message streams based on the detection of gaps between messages.

By default the Resequencer does not support duplicate messages and will only keep the last message, in case a message arrives with the same message expression. However in the batch mode you can enable it to allow duplicates.

Batch Resequencing

The following example shows how to use the batch-processing resequencer so that messages are sorted in order of the body() expression. That is messages are collected into a batch (either by a maximum number of messages per batch or using a timeout) then they are sorted in order and then sent out to their output.

Using the Fluent Builders{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java}This is equivalent to

from("direct:start") .resequence(body()).batch() .to("mock:result");

The batch-processing resequencer can be further configured via the size() and timeout() methods.

from("direct:start") .resequence(body()).batch().size(300).timeout(4000L) .to("mock:result")

This sets the batch size to 300 and the batch timeout to 4000 ms (by default, the batch size is 100 and the timeout is 1000 ms). Alternatively, you can provide a configuration object.

from("direct:start") .resequence(body()).batch(new BatchResequencerConfig(300, 4000L)) .to("mock:result")

So the above example will reorder messages from endpoint direct:a in order of their bodies, to the endpoint mock:result.
Typically you'd use a header rather than the body to order things; or maybe a part of the body. So you could replace this expression with

resequencer(header("mySeqNo"))

for example to reorder messages using a custom sequence number in the header mySeqNo.

You can of course use many different Expression languages such as XPath, XQuery, SQL or various Scripting Languages.

Using the Spring XML Extensions

xml<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start" /> <resequence> <simple>body</simple> <to uri="mock:result" /> <!-- batch-config can be ommitted for default (batch) resequencer settings --> <batch-config batchSize="300" batchTimeout="4000" /> </resequence> </route> </camelContext>

Allow Duplicates

Available as of Camel 2.4

In the batch mode, you can now allow duplicates. In Java DSL there is a allowDuplicates() method and in Spring XML there is an allowDuplicates=true attribute on the <batch-config/> you can use to enable it.

Reverse

Available as of Camel 2.4

In the batch mode, you can now reverse the expression ordering. By default the order is based on 0..9,A..Z, which would let messages with low numbers be ordered first, and thus also also outgoing first. In some cases you want to reverse order, which is now possible.

In Java DSL there is a reverse() method and in Spring XML there is an reverse=true attribute on the <batch-config/> you can use to enable it.

Resequence JMS messages based on JMSPriority

Available as of Camel 2.4

It's now much easier to use the Resequencer to resequence messages from JMS queues based on JMSPriority. For that to work you need to use the two new options allowDuplicates and reverse.{snippet:id=e1|lang=java|url=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsBatchResequencerJMSPriorityTest.java}Notice this is only possible in the batch mode of the Resequencer.

Ignore invalid exchanges

Available as of Camel 2.9

The Resequencer EIP will from Camel 2.9 onwards throw a CamelExchangeException if the incoming Exchange is not valid for the resequencer - ie. the expression cannot be evaluated, such as a missing header. You can use the option ignoreInvalidExchanges to ignore these exceptions which means the Resequencer will then skip the invalid Exchange.{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java}This option is available for both batch and stream resequencer.

Reject Old Exchanges

Available as of Camel 2.11

This option can be used to prevent out of order messages from being sent regardless of the event that delivered messages downstream (capacity, timeout, etc). If enabled using rejectOld(), the Resequencer will throw a MessageRejectedException when an incoming Exchange is "older" (based on the Comparator) than the last delivered message. This provides an extra level of control with regards to delayed message ordering.

from("direct:start") .onException(MessageRejectedException.class).handled(true).to("mock:error").end() .resequence(header("seqno")).stream().timeout(1000).rejectOld() .to("mock:result");

This option is available for the stream resequencer only.

Stream Resequencing

The next example shows how to use the stream-processing resequencer. Messages are re-ordered based on their sequence numbers given by a seqnum header using gap detection and timeouts on the level of individual messages.

Using the Fluent Builders{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java}The stream-processing resequencer can be further configured via the capacity() and timeout() methods.

from("direct:start") .resequence(header("seqnum")).stream().capacity(5000).timeout(4000L) .to("mock:result")

This sets the resequencer's capacity to 5000 and the timeout to 4000 ms (by default, the capacity is 1000 and the timeout is 1000 ms). Alternatively, you can provide a configuration object.

from("direct:start") .resequence(header("seqnum")).stream(new StreamResequencerConfig(5000, 4000L)) .to("mock:result")

The stream-processing resequencer algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence (i.e. the batch size) in advance. Messages must contain a unique sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the successor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs).

If the maximum time difference between messages (with successor/predecessor relationship with respect to the sequence number) in a message stream is known, then the resequencer's timeout parameter should be set to this value. In this case it is guaranteed that all messages of a stream are delivered in correct order to the next processor. The lower the timeout value is compared to the out-of-sequence time difference the higher is the probability for out-of-sequence messages delivered by this resequencer. Large timeout values should be supported by sufficiently high capacity values. The capacity parameter is used to prevent the resequencer from running out of memory.

By default, the stream resequencer expects long sequence numbers but other sequence numbers types can be supported as well by providing a custom expression.{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/MyFileNameExpression.java}{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerFileNameTest.java}or custom comparator via the comparator() method

ExpressionResultComparator<Exchange> comparator = new MyComparator(); from("direct:start") .resequence(header("seqnum")).stream().comparator(comparator) .to("mock:result");

or via a StreamResequencerConfig object.

ExpressionResultComparator<Exchange> comparator = new MyComparator(); StreamResequencerConfig config = new StreamResequencerConfig(100, 1000L, comparator); from("direct:start") .resequence(header("seqnum")).stream(config) .to("mock:result");

Using the Spring XML Extensions

xml<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <resequence> <simple>in.header.seqnum</simple> <to uri="mock:result" /> <stream-config capacity="5000" timeout="4000"/> </resequence> </route> </camelContext>

Further Examples

For further examples of this pattern in use you could look at the batch-processing resequencer junit test case and the stream-processing resequencer junit test case

Using This Pattern

Composed Message Processor

The Composed Message Processor from the EIP patterns allows you to process a composite message by splitting it up, routing the sub-messages to appropriate destinations and the re-aggregating the responses back into a single message.

In Camel we provide two solutions

The difference is when using only a Splitter it aggregates back all the splitted messages into the same aggregation group, eg like a fork/join pattern.
Whereas using the Aggregator allows you group into multiple groups, a pattern which provides more options.

Using the splitter alone is often easier and possibly a better solution. So take a look at this first, before involving the aggregator.

Example using both Splitter and Aggregator

In this example we want to check that a multipart order can be filled. Each part of the order requires a check at a different inventory.

Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20

Using the Spring XML Extensions

<route>
  <from uri="direct:start"/>
  <split>
    <simple>body</simple>
    <choice>
      <when>
        <method bean="orderItemHelper" method="isWidget"/>
	<to uri="bean:widgetInventory"/>
      </when>
      <otherwise>
	<to uri="bean:gadgetInventory"/>
      </otherwise>
    </choice>
    <to uri="seda:aggregate"/>
  </split>
</route>

<route>
  <from uri="seda:aggregate"/>
  <aggregate strategyRef="myOrderAggregatorStrategy" completionTimeout="1000">
    <correlationExpression>
      <simple>header.orderId</simple>
    </correlationExpression>
    <to uri="mock:result"/>
  </aggregate>
</route>

To do this we split up the order using a Splitter. The Splitter then sends individual OrderItems to a Content Based Router which checks the item type. Widget items get sent for checking in the widgetInventory bean and gadgets get sent to the gadgetInventory bean. Once these OrderItems have been validated by the appropriate bean, they are sent on to the Aggregator which collects and re-assembles the validated OrderItems into an order again.

When an order is sent it contains a header with the order id. We use this fact when we aggregate, as we configure this .header("orderId") on the aggregate DSL to instruct Camel to use the header with the key orderId as correlation expression.

For full details, check the example source here:

camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java

Example using only Splitter

In this example we want to split an incoming order using the Splitter eip, transform each order line, and then combine the order lines into a new order message.

Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20

Using XML

If you use XML, then the <split> tag offers the strategyRef attribute to refer to your custom AggregationStrategy

The bean with the methods to transform the order line and process the order as well:

Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20

And the AggregationStrategy we use with the Splitter eip to combine the orders back again (eg fork/join):

Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Scatter-Gather

The Scatter-Gather from the EIP patterns allows you to route messages to a number of dynamically specified recipients and re-aggregate the responses back into a single message.

Dynamic Scatter-Gather Example

In this example we want to get the best quote for beer from several different vendors. We use a dynamic Recipient List to get the request for a quote to all vendors and an Aggregator to pick the best quote out of all the responses. The routes for this are defined as:

Error rendering macro 'code': Invalid value specified for parameter 'java.lang.NullPointerException'
<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <recipientList>
      <header>listOfVendors</header>
    </recipientList>
  </route>
  <route>
    <from uri="seda:quoteAggregator"/>
    <aggregate strategyRef="aggregatorStrategy" completionTimeout="1000">
      <correlationExpression>
        <header>quoteRequestId</header>
      </correlationExpression>
      <to uri="mock:result"/>
    </aggregate>
  </route>
</camelContext>

So in the first route you see that the Recipient List is looking at the listOfVendors header for the list of recipients. So, we need to send a message like

Error rendering macro 'code': Invalid value specified for parameter 'java.lang.NullPointerException'
Map<String, Object> headers = new HashMap<>();
headers.put("listOfVendors", "bean:vendor1, bean:vendor2, bean:vendor3");
headers.put("quoteRequestId", "quoteRequest-1");
template.sendBodyAndHeaders("direct:start", "<quote_request item=\"beer\"/>", headers);

This message will be distributed to the following Endpoints: bean:vendor1, bean:vendor2, and bean:vendor3. These are all beans which look like

Error rendering macro 'code': Invalid value specified for parameter 'java.lang.NullPointerException'
public class MyVendor {
    private int beerPrice;
    
    @Produce("seda:quoteAggregator")
    private ProducerTemplate quoteAggregator;
            
    public MyVendor(int beerPrice) {
        this.beerPrice = beerPrice;
    }
        
    public void getQuote(@XPath("/quote_request/@item") String item, Exchange exchange) throws Exception {
        if ("beer".equals(item)) {
            exchange.getIn().setBody(beerPrice);
            quoteAggregator.send(exchange);
        } else {
            throw new Exception("No quote available for " + item);
        }
    }
}

and are loaded up in Spring like

Error rendering macro 'code': Invalid value specified for parameter 'java.lang.NullPointerException'
<bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.scattergather.LowestQuoteAggregationStrategy"/>

<bean id="vendor1" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>1</value>
  </constructor-arg>
</bean>

<bean id="vendor2" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>2</value>
  </constructor-arg>
</bean>

<bean id="vendor3" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>3</value>
  </constructor-arg>
</bean>

Each bean is loaded with a different price for beer. When the message is sent to each bean endpoint, it will arrive at the MyVendor.getQuote method. This method does a simple check whether this quote request is for beer and then sets the price of beer on the exchange for retrieval at a later step. The message is forwarded on to the next step using POJO Producing (see the @Produce annotation).

At the next step we want to take the beer quotes from all vendors and find out which one was the best (i.e. the lowest!). To do this we use an Aggregator with a custom aggregation strategy. The Aggregator needs to be able to compare only the messages from this particular quote; this is easily done by specifying a correlationExpression equal to the value of the quoteRequestId header. As shown above in the message sending snippet, we set this header to quoteRequest-1. This correlation value should be unique or you may include responses that are not part of this quote. To pick the lowest quote out of the set, we use a custom aggregation strategy like

Error rendering macro 'code': Invalid value specified for parameter 'java.lang.NullPointerException'
public class LowestQuoteAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // the first time we only have the new exchange
        if (oldExchange == null) {
            return newExchange;
        }

        if (oldExchange.getIn().getBody(int.class) < newExchange.getIn().getBody(int.class)) {
            return oldExchange;
        } else {
            return newExchange;
        }
    }
}

Finally, we expect to get the lowest quote of $1 out of $1, $2, and $3.

Error rendering macro 'code': Invalid value specified for parameter 'java.lang.NullPointerException'
result.expectedBodiesReceived(1); // expect the lowest quote

You can find the full example source here:

camel-spring/src/test/java/org/apache/camel/spring/processor/scattergather/
camel-spring/src/test/resources/org/apache/camel/spring/processor/scattergather/scatter-gather.xml

Static Scatter-Gather Example

You can lock down which recipients are used in the Scatter-Gather by using a static Recipient List. It looks something like this

from("direct:start").multicast().to("seda:vendor1", "seda:vendor2", "seda:vendor3");

from("seda:vendor1").to("bean:vendor1").to("seda:quoteAggregator");
from("seda:vendor2").to("bean:vendor2").to("seda:quoteAggregator");
from("seda:vendor3").to("bean:vendor3").to("seda:quoteAggregator");

from("seda:quoteAggregator")
    .aggregate(header("quoteRequestId"), new LowestQuoteAggregationStrategy()).to("mock:result")

A full example of the static Scatter-Gather configuration can be found in the Loan Broker Example.

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Routing Slip

The Routing Slip from the EIP patterns allows you to route a message consecutively through a series of processing steps where the sequence of steps is not known at design time and can vary for each message.

Options

confluenceTableSmall

Name

Default Value

Description

uriDelimiter

,

Delimiter used if the Expression returned multiple endpoints.

ignoreInvalidEndpoints

false

If an endpoint URI could not be resolved, should it be ignored.

Otherwise, Camel will throw an exception stating the endpoint URI is not valid.

cacheSize

1000

Camel 2.13.1/2.12.4: Allows to configure the cache size for the ProducerCache which caches producers for reuse in the routing slip.

The default cache size is 1000.

A value of -1 disables the use of the cache.

Example

The following route will take any messages sent to the Apache ActiveMQ queue SomeQueue and pass them into the Routing Slip pattern.

javafrom("activemq:SomeQueue") .routingSlip("aRoutingSlipHeader");

Messages will be checked for the existence of the aRoutingSlipHeader header. The value of this header should be a comma-delimited list of endpoint URIs you wish the message to be routed to. The Message will be routed in a pipeline fashion, i.e., one after the other. From Camel 2.5 the Routing Slip will set a property, Exchange.SLIP_ENDPOINT, on the Exchange which contains the current endpoint as it advanced though the slip. This allows you to know how far we have processed in the slip.

The Routing Slip will compute the slip beforehand which means, the slip is only computed once. If you need to compute the slip on-the-fly then use the Dynamic Router pattern instead.

Configuration Options

Here we set the header name and the URI delimiter to something different.

Using the Fluent Builders{snippet:id=e3|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipTest.java}Using the Spring XML Extensions

<camelContext id="buildRoutingSlip" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="direct:c"/> <routingSlip uriDelimiter="#"> <header>aRoutingSlipHeader</header> </routingSlip> </route> </camelContext>

Ignore Invalid Endpoints

Available as of Camel 2.3

The Routing Slip now supports ignoreInvalidEndpoints which the Recipient List also supports. You can use it to skip endpoints which are invalid.

javafrom("direct:a") .routingSlip("myHeader") .ignoreInvalidEndpoints();

And in Spring XML its an attribute on the recipient list tag:

xml<route> <from uri="direct:a"/> <routingSlip ignoreInvalidEndpoints="true"/> <header>myHeader</header> </routingSlip> </route>

Then let's say the myHeader contains the following two endpoints direct:foo,xxx:bar. The first endpoint is valid and works. However the second endpoint is invalid and will just be ignored. Camel logs at INFO level, so you can see why the endpoint was invalid.

Expression Support

Available as of Camel 2.4

The Routing Slip now supports to take the expression parameter as the Recipient List does. You can tell Camel the expression that you want to use to get the routing slip.

javafrom("direct:a") .routingSlip(header("myHeader")) .ignoreInvalidEndpoints();

And in Spring XML its an attribute on the recipient list tag.

xml<route> <from uri="direct:a"/> <!--NOTE from Camel 2.4.0, you need to specify the expression element inside of the routingSlip element --> <routingSlip ignoreInvalidEndpoints="true"> <header>myHeader</header> </routingSlip> </route>

Further Examples

For further examples of this pattern in use you could look at the routing slip test cases.

Using This Pattern

Throttler

The Throttler Pattern allows you to ensure that a specific endpoint does not get overloaded, or that we don't exceed an agreed SLA with some external service.

Options

confluenceTableSmall

Name

Default Value

Description

maximumRequestsPerPeriod

 

Maximum number of requests per period to throttle. This option must be provided as a positive number. Notice, in the XML DSL, from Camel 2.8 onwards this option is configured using an Expression instead of an attribute.

timePeriodMillis

1000

The time period in milliseconds, in which the throttler will allow at most maximumRequestsPerPeriod number of messages.

asyncDelayed

false

Camel 2.4: If enabled then any messages which is delayed happens asynchronously using a scheduled thread pool.

executorServiceRef

 

Camel 2.4: Refers to a custom Thread Pool to be used if asyncDelay has been enabled.

callerRunsWhenRejected

true

Camel 2.4: Is used if asyncDelayed was enabled. This controls if the caller thread should execute the task if the thread pool rejected the task.

rejectExecution

false

Camel 2.14: If this option is true, throttler throws a ThrottlerRejectExecutionException when the request rate exceeds the limit.

Examples

Using the Fluent Builders

{snippet:id=ex|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java}

So the above example will throttle messages all messages received on seda:a before being sent to mock:result ensuring that a maximum of 3 messages are sent in any 10 second window.

Note that since timePeriodMillis defaults to 1000 milliseconds, just setting the maximumRequestsPerPeriod has the effect of setting the maximum number of requests per second. So to throttle requests at 100 requests per second between two endpoints, it would look more like this...

from("seda:a").throttle(100).to("seda:b");

For further examples of this pattern in use you could look at the junit test case

Using the Spring XML Extensions

Camel 2.7.x or older

{snippet:id=example|lang=xml|url=camel/tags/camel-2.7.0/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml}

Camel 2.8 onwards

In Camel 2.8 onwards you must set the maximum period as an Expression as shown below where we use a Constant expression:

{snippet:id=example|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml}

Dynamically changing maximum requests per period

Available as of Camel 2.8
Since we use an Expression you can adjust this value at runtime, for example you can provide a header with the value. At runtime Camel evaluates the expression and converts the result to a java.lang.Long type. In the example below we use a header from the message to determine the maximum requests per period. If the header is absent, then the Throttler uses the old value. So that allows you to only provide a header if the value is to be changed:

{snippet:id=e2|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml}

Asynchronous delaying

Available as of Camel 2.4

You can let the Throttler use non blocking asynchronous delaying, which means Camel will use a scheduler to schedule a task to be executed in the future. The task will then continue routing. This allows the caller thread to not block and be able to service other messages, etc.

from("seda:a").throttle(100).asyncDelayed().to("seda:b");

Using This Pattern

Sampling Throttler

Available as of Camel 2.1

A sampling throttler allows you to extract a sample of the exchanges from the traffic through a route.
It is configured with a sampling period during which only a single exchange is allowed to pass through. All other exchanges will be stopped.

Will by default use a sample period of 1 seconds.

Options

confluenceTableSmall

Name

Default Value

Description

messageFrequency

 

Samples the message every N'th message. You can only use either frequency or period.

samplePeriod

1

Samples the message every N'th period. You can only use either frequency or period.

units

SECOND

Time unit as an enum of java.util.concurrent.TimeUnit from the JDK.

Samples

You use this EIP with the sample DSL as show in these samples.

Using the Fluent Builders
These samples also show how you can use the different syntax to configure the sampling period:

{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java}

Using the Spring XML Extensions
And the same example in Spring XML is:

{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/samplingThrottler.xml}

And since it uses a default of 1 second you can omit this configuration in case you also want to use 1 second

{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/samplingThrottlerWithDefault.xml} Using This Pattern

See Also

Delayer

The Delayer Pattern allows you to delay the delivery of messages to some destination.

Delayer

The expression is a value in millis to wait from the current time, so the expression should just be 3000.
However you can use a long value for a fixed value to indicate the delay in millis.
See the Spring DSL samples for Delayer.

Using Delayer in Java DSL

See this ticket: https://issues.apache.org/jira/browse/CAMEL-2654

Options

confluenceTableSmall

Name

Default Value

Description

asyncDelayed

false

Camel 2.4: If enabled then delayed messages happens asynchronously using a scheduled thread pool.

executorServiceRef

 

Camel 2.4: Refers to a custom Thread Pool to be used if asyncDelay has been enabled.

callerRunsWhenRejected

true

Camel 2.4: Is used if asyncDelayed was enabled. This controls if the caller thread should execute the task if the thread pool rejected the task.

Using the Fluent Builders

The example below will delay all messages received on seda:b 1 second before sending them to mock:result.

{snippet:id=ex2|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java}

You can just delay things a fixed amount of time from the point at which the delayer receives the message. For example to delay things 2 seconds

delayer(2000)

The above assume that the delivery order is maintained and that the messages are delivered in delay order. If you want to reorder the messages based on delivery time, you can use the Resequencer with this pattern. For example

from("activemq:someQueue").resequencer(header("MyDeliveryTime")).delay("MyRedeliveryTime").to("activemq:aDelayedQueue");

You can of course use many different Expression languages such as XPath, XQuery, SQL or various Scripting Languages. For example to delay the message for the time period specified in the header, use the following syntax:

from("activemq:someQueue").delay(header("delayValue")).to("activemq:aDelayedQueue");

And to delay processing using the Simple language you can use the following DSL:

from("activemq:someQueue").delay(simple("${body.delayProperty}")).to("activemq:aDelayedQueue");

Spring DSL

The sample below demonstrates the delay in Spring DSL:

{snippet:id=example|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/delayer.xml}

For further examples of this pattern in use you could look at the junit test case

Asynchronous delaying

Available as of Camel 2.4

You can let the Delayer use non blocking asynchronous delaying, which means Camel will use a scheduler to schedule a task to be executed in the future. The task will then continue routing. This allows the caller thread to not block and be able to service other messages etc.

From Java DSL

You use the asyncDelayed() to enable the async behavior.

from("activemq:queue:foo").delay(1000).asyncDelayed().to("activemq:aDelayedQueue");

From Spring XML

You use the asyncDelayed="true" attribute to enable the async behavior.

xml<route> <from uri="activemq:queue:foo"/> <delay asyncDelayed="true"> <constant>1000</constant> </delay> <to uri="activemq:aDealyedQueue"/> </route>

Creating a custom delay

You can use an expression to determine when to send a message using something like this

from("activemq:foo"). delay().method("someBean", "computeDelay"). to("activemq:bar");

then the bean would look like this...

public class SomeBean { public long computeDelay() { long delay = 0; // use java code to compute a delay value in millis return delay; } }

Using This Pattern

See Also

Load Balancer

The Load Balancer Pattern allows you to delegate to one of a number of endpoints using a variety of different load balancing policies.

Built-in load balancing policies

Camel provides the following policies out-of-the-box:

Policy

Description

Round Robin

The exchanges are selected from in a round robin fashion. This is a well known and classic policy, which spreads the load evenly.

Random

A random endpoint is selected for each exchange.

Sticky

Sticky load balancing using an Expression to calculate a correlation key to perform the sticky load balancing; rather like jsessionid in the web or JMSXGroupID in JMS.

Topic

Topic which sends to all destinations (rather like JMS Topics)

Failover

In case of failures the exchange will be tried on the next endpoint.

Weighted Round-Robin

Camel 2.5: The weighted load balancing policy allows you to specify a processing load distribution ratio for each server with respect to the others. In addition to the weight, endpoint selection is then further refined using round-robin distribution based on weight.

Weighted Random

Camel 2.5: The weighted load balancing policy allows you to specify a processing load distribution ratio for each server with respect to others.In addition to the weight, endpoint selection is then further refined using random distribution based on weight.

Custom

Camel 2.8: From Camel 2.8 onwards the preferred way of using a custom Load Balancer is to use this policy, instead of using the @deprecated ref attribute.

Circuit Breaker

Camel 2.14: Implements the Circuit Breaker pattern as described in "Release it!" book.

Load balancing HTTP endpoints

If you are proxying and load balancing HTTP, then see this page for more details.

Round Robin

The round robin load balancer is not meant to work with failover, for that you should use the dedicated failover load balancer. The round robin load balancer will only change to next endpoint per message.

The round robin load balancer is stateful as it keeps state of which endpoint to use next time.

Using the Fluent Builders{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RoundRobinLoadBalanceTest.java}Using the Spring configuration

xml<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <loadBalance> <roundRobin/> <to uri="mock:x"/> <to uri="mock:y"/> <to uri="mock:z"/> </loadBalance> </route> </camelContext>

The above example loads balance requests from direct:start to one of the available mock endpoint instances, in this case using a round robin policy.
For further examples of this pattern look at this junit test case

Failover

The failover load balancer is capable of trying the next processor in case an Exchange failed with an exception during processing.
You can constrain the failover to activate only when one exception of a list you specify occurs. If you do not specify a list any exception will cause fail over to occur. This balancer uses the same strategy for matching exceptions as the Exception Clause does for the onException.

Enable stream caching if using streams

If you use streaming then you should enable Stream caching when using the failover load balancer. This is needed so the stream can be re-read after failing over to the next processor.

Failover offers the following options:

Option

Type

Default

Description

inheritErrorHandler

boolean

true

Camel 2.3: Whether or not the Error Handler configured on the route should be used. Disable this if you want failover to transfer immediately to the next endpoint. On the other hand, if you have this option enabled, then Camel will first let the Error Handler try to process the message. The Error Handler may have been configured to redeliver and use delays between attempts. If you have enabled a number of redeliveries then Camel will try to redeliver to the same endpoint, and only fail over to the next endpoint, when the Error Handler is exhausted.

maximumFailoverAttempts

int

-1

Camel 2.3: A value to indicate after X failover attempts we should exhaust (give up). Use -1 to indicate never give up and continuously try to failover. Use 0 to never failover. And use e.g. 3 to failover at most 3 times before giving up. This option can be used whether or not roundRobin is enabled or not.

roundRobin

boolean

false

Camel 2.3: Whether or not the failover load balancer should operate in round robin mode or not. If not, then it will always start from the first endpoint when a new message is to be processed. In other words it restart from the top for every message. If round robin is enabled, then it keeps state and will continue with the next endpoint in a round robin fashion. When using round robin it will not stick to last known good endpoint, it will always pick the next endpoint to use. You can also enable sticky mode together with round robin, if so then it will pick the last known good endpoint to use when starting the load balancing (instead of using the next when starting).

stickybooleanfalseCamel 2.16: Whether or not the failover load balancer should operate in sticky mode or not. If not, then it will always start from the first endpoint when a new message is to be processed. In other words it restart from the top for every message. If sticky is enabled, then it keeps state and will continue with the last known good endpoint. You can also enable sticky mode together with round robin, if so then it will pick the last known good endpoint to use when starting the load balancing (instead of using the next when starting).

Camel 2.2 or older behavior
The current implementation of failover load balancer uses simple logic which always tries the first endpoint, and in case of an exception being thrown it tries the next in the list, and so forth. It has no state, and the next message will thus always start with the first endpoint.

Camel 2.3 onwards behavior
The failover load balancer now supports round robin mode, which allows you to failover in a round robin fashion. See the roundRobin option.

Redelivery must be enabled

In Camel 2.2 or older the failover load balancer requires you have enabled Camel Error Handler to use redelivery. In Camel 2.3 onwards this is not required as such, as you can mix and match. See the inheritErrorHandler option.

Here is a sample to failover only if a IOException related exception was thrown:{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverNotCatchedExceptionTest.java}You can specify multiple exceptions to failover as the option is varargs, for instance:

java// enable redelivery so failover can react errorHandler(defaultErrorHandler().maximumRedeliveries(5)); from("direct:foo"). loadBalance().failover(IOException.class, MyOtherException.class) .to("direct:a", "direct:b");

Using failover in Spring DSL

Failover can also be used from Spring DSL and you configure it as:

xml <route errorHandlerRef="myErrorHandler"> <from uri="direct:foo"/> <loadBalance> <failover> <exception>java.io.IOException</exception> <exception>com.mycompany.MyOtherException</exception> </failover> <to uri="direct:a"/> <to uri="direct:b"/> </loadBalance> </route>

Using failover in round robin mode

An example using Java DSL:{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverRoundRobinTest.java}And the same example using Spring XML:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/FailoverRoundRobinTest.xml}

Disabled inheritErrorHandler

You can configure inheritErrorHandler=false if you want to failover to the next endpoint as fast as possible. By disabling the Error Handler you ensure it does not intervene which allows the failover load balancer to handle failover asap. By also enabling roundRobin mode, then it will keep retrying until it success. You can then configure the maximumFailoverAttempts option to a high value to let it eventually exhaust (give up) and fail.

Weighted Round-Robin and Random Load Balancing

Available as of Camel 2.5

In many enterprise environments where server nodes of unequal processing power & performance characteristics are utilized to host services and processing endpoints, it is frequently necessary to distribute processing load based on their individual server capabilities so that some endpoints are not unfairly burdened with requests. Obviously simple round-robin or random load balancing do not alleviate problems of this nature. A Weighted Round-Robin and/or Weighted Random load balancer can be used to address this problem.

The weighted load balancing policy allows you to specify a processing load distribution ratio for each server with respect to others. You can specify this as a positive processing weight for each server. A larger number indicates that the server can handle a larger load. The weight is utilized to determine the payload distribution ratio to different processing endpoints with respect to others.

Disabled inheritErrorHandler

As of Camel 2.6, the Weighted Load balancer usage has been further simplified, there is no need to send in distributionRatio as a List<Integer>. It can be simply sent as a delimited String of integer weights separated by a delimiter of choice.

The parameters that can be used are

In Camel 2.5

Option

Type

Default

Description

roundRobin

boolean

false

The default value for round-robin is false. In the absence of this setting or parameter the load balancing algorithm used is random.

distributionRatio

List<Integer>

none

The distributionRatio is a list consisting on integer weights passed in as a parameter. The distributionRatio must match the number of endpoints and/or processors specified in the load balancer list. In Camel 2.5 if endpoints do not match ratios, then a best effort distribution is attempted.

Available In Camel 2.6

Option

Type

Default

Description

roundRobin

boolean

false

The default value for round-robin is false. In the absence of this setting or parameter the load balancing algorithm used is random.

distributionRatio

String

none

The distributionRatio is a delimited String consisting on integer weights separated by delimiters for example "2,3,5". The distributionRatio must match the number of endpoints and/or processors specified in the load balancer list.

distributionRatioDelimiter

String

,

The distributionRatioDelimiter is the delimiter used to specify the distributionRatio. If this attribute is not specified a default delimiter "," is expected as the delimiter used for specifying the distributionRatio.

Using Weighted round-robin & random load balancing

In Camel 2.5

An example using Java DSL:

javaArrayList<integer> distributionRatio = new ArrayList<integer>(); distributionRatio.add(4); distributionRatio.add(2); distributionRatio.add(1); // round-robin from("direct:start") .loadBalance().weighted(true, distributionRatio) .to("mock:x", "mock:y", "mock:z"); //random from("direct:start") .loadBalance().weighted(false, distributionRatio) .to("mock:x", "mock:y", "mock:z");

And the same example using Spring XML:

xml <route> <from uri="direct:start"/> <loadBalance> <weighted roundRobin="false" distributionRatio="4 2 1"/> <to uri="mock:x"/> <to uri="mock:y"/> <to uri="mock:z"/> </loadBalance> </route>

Available In Camel 2.6

An example using Java DSL:

java// round-robin from("direct:start") .loadBalance().weighted(true, "4:2:1" distributionRatioDelimiter=":") .to("mock:x", "mock:y", "mock:z"); //random from("direct:start") .loadBalance().weighted(false, "4,2,1") .to("mock:x", "mock:y", "mock:z");

And the same example using Spring XML:

xml <route> <from uri="direct:start"/> <loadBalance> <weighted roundRobin="false" distributionRatio="4-2-1" distributionRatioDelimiter="-" /> <to uri="mock:x"/> <to uri="mock:y"/> <to uri="mock:z"/> </loadBalance> </route>

Custom Load Balancer

You can use a custom load balancer (eg your own implementation) also.

An example using Java DSL:{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomLoadBalanceTest.java}And the same example using XML DSL:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringCustomRefLoadBalanceTest.xml}Notice in the XML DSL above we use <custom> which is only available in Camel 2.8 onwards. In older releases you would have to do as follows instead:

xml <loadBalance ref="myBalancer"> <!-- these are the endpoints to balancer --> <to uri="mock:x"/> <to uri="mock:y"/> <to uri="mock:z"/> </loadBalance>

To implement a custom load balancer you can extend some support classes such as LoadBalancerSupport and SimpleLoadBalancerSupport. The former supports the asynchronous routing engine, and the latter does not. Here is an example:{snippet:id=e2|title=Custom load balancer implementation|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomLoadBalanceTest.java}

Circuit Breaker

The Circuit Breaker load balancer is a stateful pattern that monitors all calls for certain exceptions. Initially the Circuit Breaker is in closed state and passes all messages. If there are failures and the threshold is reached, it moves to open state and rejects all calls until halfOpenAfter timeout is reached. After this timeout is reached, if there is a new call, it will pass and if the result is success the Circuit Breaker will move to closed state, or to open state if there was an error.

When the circuit breaker is closed, it will throw a java.util.concurrent.RejectedExecutionException. This can then be caught to provide an alternate path for processing exchanges.

An example using Java DSL:

javafrom("direct:start") .onException(RejectedExecutionException.class) .handled(true) .to("mock:serviceUnavailable") .end()  .loadBalance() .circuitBreaker(2, 1000L, MyCustomException.class) .to("mock:service") .end();

And the same example using Spring XML:

xml<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <onException> <exception>java.util.concurrent.RejectedExecutionException</exception> <handled><constant>true</constant></handled> <to uri="mock:serviceUnavailable"/> </onException>  <loadBalance> <circuitBreaker threshold="2" halfOpenAfter="1000"> <exception>MyCustomException</exception> </circuitBreaker> <to uri="mock:service"/> </loadBalance> </route> </camelContext>

Using This Pattern

Multicast

The Multicast allows to route the same message to a number of endpoints and process them in a different way. The main difference between the Multicast and Splitter is that Splitter will split the message into several pieces but the Multicast will not modify the request message.

Options

confluenceTableSmall

Name

Default Value

Description

strategyRef

 

Refers to an AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast. By default Camel will use the last reply as the outgoing message. From Camel 2.12 onwards you can also use a POJO as the AggregationStrategy, see the Aggregate page for more details. If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.

strategyMethodName

 

Camel 2.12: This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

strategyMethodAllowNull

false

Camel 2.12: If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

parallelProcessing

false

If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the multicasts which happens concurrently.

 

parallelAggregate

false

Camel 2.14: If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe.

executorServiceRef

 

Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.

stopOnException

false

Camel 2.2: Whether or not to stop continue processing immediately when an exception occurred. If disable, then Camel will send the message to all multicasts regardless if one of them failed. You can deal with exceptions in the AggregationStrategy class where you have full control how to handle that.

streaming

false

If enabled then Camel will process replies out-of-order, eg in the order they come back. If disabled, Camel will process replies in the same order as multicasted.

timeout

 

Camel 2.5: Sets a total timeout specified in millis. If the Multicast hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Multicast breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel to shut down in a graceful manner may continue to run. So use this option with a bit of care. We may be able to improve this functionality in future Camel releases.

onPrepareRef

 

Camel 2.8: Refers to a custom Processor to prepare the copy of the Exchange each multicast will receive. This allows you to do any custom logic, such as deep-cloning the message payload if that's needed etc.

shareUnitOfWork

false

Camel 2.8: Whether the unit of work should be shared. See the same option on Splitter for more details.

Example

The following example shows how to take a request from the direct:a endpoint , then multicast these request to direct:x, direct:y, direct:z.

Using the Fluent Builders{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastTest.java}By default Multicast invokes each endpoint sequentially. If parallel processing is desired, simply use

from("direct:a").multicast().parallelProcessing().to("direct:x", "direct:y", "direct:z");

In case of using InOut MEP, an AggregationStrategy is used for aggregating all reply messages. The default is to only use the latest reply message and discard any earlier replies. The aggregation strategy is configurable:

from("direct:start") .multicast(new MyAggregationStrategy()) .parallelProcessing().timeout(500).to("direct:a", "direct:b", "direct:c") .end() .to("mock:result");

Stop processing in case of exception

Available as of Camel 2.1

The Multicast will by default continue to process the entire Exchange even in case one of the multicasted messages will thrown an exception during routing.
For example if you want to multicast to 3 destinations and the 2nd destination fails by an exception. What Camel does by default is to process the remainder destinations. You have the chance to remedy or handle this in the AggregationStrategy.

But sometimes you just want Camel to stop and let the exception be propagated back, and let the Camel error handler handle it. You can do this in Camel 2.1 by specifying that it should stop in case of an exception occurred. This is done by the stopOnException option as shown below:

from("direct:start") .multicast() .stopOnException().to("direct:foo", "direct:bar", "direct:baz") .end() .to("mock:result"); from("direct:foo").to("mock:foo"); from("direct:bar").process(new MyProcessor()).to("mock:bar"); from("direct:baz").to("mock:baz");

And using XML DSL you specify it as follows:

xml <route> <from uri="direct:start"/> <multicast stopOnException="true"> <to uri="direct:foo"/> <to uri="direct:bar"/> <to uri="direct:baz"/> </multicast> <to uri="mock:result"/> </route> <route> <from uri="direct:foo"/> <to uri="mock:foo"/> </route> <route> <from uri="direct:bar"/> <process ref="myProcessor"/> <to uri="mock:bar"/> </route> <route> <from uri="direct:baz"/> <to uri="mock:baz"/> </route>

Using onPrepare to execute custom logic when preparing messages

Available as of Camel 2.8

The Multicast will copy the source Exchange and multicast each copy. However the copy is a shallow copy, so in case you have mutateable message bodies, then any changes will be visible by the other copied messages. If you want to use a deep clone copy then you need to use a custom onPrepare which allows you to do this using the Processor interface.

Notice the onPrepare can be used for any kind of custom logic which you would like to execute before the Exchange is being multicasted.

Design for immutable

Its best practice to design for immutable objects.

For example if you have a mutable message body as this Animal class:{snippet:id=e1|lang=java|title=Animal|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Animal.java}Then we can create a deep clone processor which clones the message body:{snippet:id=e1|lang=java|title=AnimalDeepClonePrepare|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AnimalDeepClonePrepare.java}Then we can use the AnimalDeepClonePrepare class in the Multicast route using the onPrepare option as shown:{snippet:id=e1|lang=java|title=Multicast using onPrepare|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastOnPrepareTest.java}And the same example in XML DSL{snippet:id=e1|lang=xml|title=Multicast using onPrepare|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/MulticastOnPrepareTest.xml}Notice the onPrepare option is also available on other EIPs such as Splitter, Recipient List, and Wire Tap.

Using This Pattern

Loop

The Loop allows for processing a message a number of times, possibly in a different way for each iteration. Useful mostly during testing.

Default mode

Notice by default the loop uses the same exchange throughout the looping. So the result from the previous iteration will be used for the next (eg Pipes and Filters). From Camel 2.8 onwards you can enable copy mode instead. See the options table for more details.

Options

confluenceTableSmall 

Name

Default Value

Description

copy

false

Camel 2.8: Whether or not copy mode is used. If false then the same Exchange will be used for each iteration. So the result from the previous iteration will be visible for the next iteration. Instead you can enable copy mode, and then each iteration restarts with a fresh copy of the input Exchange.

doWhile Camel 2.17: Enables the while loop that loops until the predicate evaluates to false or null.

Exchange properties

For each iteration two properties are set on the Exchange. Processors can rely on these properties to process the Message in different ways.

Property

Description

CamelLoopSize

Total number of loops. This is not available if running the loop in while loop mode.

CamelLoopIndex

Index of the current iteration (0 based)

Examples

The following example shows how to take a request from the direct:x endpoint, then send the message repetitively to mock:result. The number of times the message is sent is either passed as an argument to loop(), or determined at runtime by evaluating an expression. The expression must evaluate to an int, otherwise a RuntimeCamelException is thrown.

Using the Fluent Builders

Pass loop count as an argument{snippet:id=ex1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/LoopTest.java}Use expression to determine loop count{snippet:id=ex2|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/LoopTest.java}Use expression to determine loop count{snippet:id=ex3|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/LoopTest.java}Using the Spring XML Extensions

Pass loop count as an argument{snippet:id=ex1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/loop.xml}Use expression to determine loop count{snippet:id=ex2|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/loop.xml}For further examples of this pattern in use you could look at one of the junit test case

Using copy mode

Available as of Camel 2.8

Now suppose we send a message to "direct:start" endpoint containing the letter A.
The output of processing this route will be that, each "mock:loop" endpoint will receive "AB" as message.{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/LoopCopyTest.java}However if we do not enable copy mode then "mock:loop" will receive "AB", "ABB", "ABBB", etc. messages.{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/LoopNoCopyTest.java}The equivalent example in XML DSL in copy mode is as follows:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringLoopCopyTest.xml}

Using while mode

Available as of Camel 2.17

The loop can act like a while loop that loops until the expression evaluates to false or null.

For example the route below loops while the length of the message body is 5 or less characters. Notice that the DSL uses loopDoWhile.

from("direct:start") .loopDoWhile(simple("${body.length} <= 5")) .to("mock:loop") .transform(body().append("A")) .end() .to("mock:result");

And the same example in XML:

xml <route> <from uri="direct:start"/> <loop doWhile="true"> <simple>${body.length} &lt;= 5</simple> <to uri="mock:loop"/> <transform> <simple>A${body}</simple> </transform> </loop> <to uri="mock:result"/> </route>

Notice in XML that the while loop is turned on using the doWhile attribute.

 

Using This Pattern

Message Transformation

Content Enricher

Camel supports the Content Enricher from the EIP patterns using a Message Translator, an arbitrary Processor in the routing logic, or using the enrich DSL element to enrich the message.

Content enrichment using a Message Translator or a Processor

Using the Fluent Builders

You can use Templating to consume a message from one destination, transform it with something like Velocity or XQuery, and then send it on to another destination. For example using InOnly (one way messaging)

from("activemq:My.Queue"). to("velocity:com/acme/MyResponse.vm"). to("activemq:Another.Queue");

If you want to use InOut (request-reply) semantics to process requests on the My.Queue queue on ActiveMQ with a template generated response, then sending responses back to the JMSReplyTo Destination you could use this:

from("activemq:My.Queue"). to("velocity:com/acme/MyResponse.vm");

Here is a simple example using the DSL directly to transform the message body{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformViaDSLTest.java}In this example we add our own Processor using explicit Java code{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformTest.java}Finally we can use Bean Integration to use any Java method on any bean to act as the transformer

from("activemq:My.Queue"). beanRef("myBeanName", "myMethodName"). to("activemq:Another.Queue");

For further examples of this pattern in use you could look at one of the JUnit tests

Using Spring XML

<route> <from uri="activemq:Input"/> <bean ref="myBeanName" method="doTransform"/> <to uri="activemq:Output"/> </route>

enrich-dsl

Content enrichment using the enrich DSL element

Camel comes with two flavors of content enricher in the DSL

  • enrich
  • pollEnrich

enrich uses a Producer to obtain the additional data. It is usually used for Request Reply messaging, for instance to invoke an external web service.
pollEnrich on the other hand uses a Polling Consumer to obtain the additional data. It is usually used for Event Message messaging, for instance to read a file or download a FTP file.

Camel 2.15 or older - Data from current Exchange not used

pollEnrich or enrich does not access any data from the current Exchange which means when polling it cannot use any of the existing headers you may have set on the Exchange. For example you cannot set a filename in the Exchange.FILE_NAME header and use pollEnrich to consume only that file. For that you must set the filename in the endpoint URI.

Instead of using enrich you can use Recipient List and have dynamic endpoints and define an AggregationStrategy on the Recipient List which then would work as a enrich would do.

pollEnrich only accept one message as response. That means that if you target to enrich your original message with the enricher collecting messages from a seda, ... components using an aggregation strategy. Only one response message will be aggregated with the original message.

From Camel 2.16 onwards both enrich and pollEnrich supports dynamic endpoints that uses an Expression to compute the uri, which allows to use data from the current Exchange. In other words all what is told above no longer apply and it just works.

Enrich Options

confluenceTableSmall

Name

Default Value

Description

uri

 

The endpoint uri for the external service to enrich from. You must use either uri or ref. Important: From Camel 2.16 onwards, this option is removed, and you use an Expression to configure the uri, such as Simple or Constant or any other dynamic language that can compute the uri dynamically using values from the current Exchange.

ref

 

Refers to the endpoint for the external service to enrich from. You must use either uri or ref.  Important: From Camel 2.16 onwards, this option is removed, and you use an Expression to configure the uri, such as Simple or Constant or any other dynamic language that can compute the uri dynamically using values from the current  Exchange.

expression Camel 2.16: Mandatory. The Expression to configure the uri, such as Simple or Constant or any other dynamic language that can compute the uri dynamically using values from the current  Exchange.

strategyRef

 

Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message. From Camel 2.12 onwards you can also use a POJO as the AggregationStrategy, see the Aggregate page for more details.

strategyMethodName

 

Camel 2.12: This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

strategyMethodAllowNull

false

Camel 2.12: If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

aggregateOnExceptionfalseCamel 2.14: If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example to suppress the exception or set a custom message body etc.
shareUnitOfWorkfalseCamel 2.16:  Shares the unit of work with the parent and the resource exchange. Enrich will by default not share unit of work between the parent exchange and the resource exchange. This means the resource exchange has its own individual unit of work. See Splitter for more information and example.
cacheSize Camel 2.16: Allows to configure the cache size for the ProducerCache which caches producers for reuse in the enrich. Will by default use the default cache size which is 1000. Setting the value to -1 allows to turn off the cache all together.
ignoreInvalidEndpointfalseCamel 2.16: Whether to ignore an endpoint URI that could not be resolved. If disabled, Camel will throw an exception identifying the invalid endpoint URI.

Using the Fluent Builders

AggregationStrategy aggregationStrategy = ... from("direct:start") .enrich("direct:resource", aggregationStrategy) .to("direct:result"); from("direct:resource") ...

The content enricher (enrich) retrieves additional data from a resource endpoint in order to enrich an incoming message (contained in the original exchange). An aggregation strategy is used to combine the original exchange and the resource exchange. The first parameter of the AggregationStrategy.aggregate(Exchange, Exchange) method corresponds to the the original exchange, the second parameter the resource exchange. The results from the resource endpoint are stored in the resource exchange's out-message. Here's an example template for implementing an aggregation strategy:

javapublic class ExampleAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange original, Exchange resource) { Object originalBody = original.getIn().getBody(); Object resourceResponse = resource.getIn().getBody(); Object mergeResult = ... // combine original body and resource response if (original.getPattern().isOutCapable()) { original.getOut().setBody(mergeResult); } else { original.getIn().setBody(mergeResult); } return original; } }

Using this template the original exchange can be of any pattern. The resource exchange created by the enricher is always an in-out exchange.

Using Spring XML

The same example in the Spring DSL (Camel 2.15 or older)

xml<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <enrich uri="direct:resource" strategyRef="aggregationStrategy"/> <to uri="direct:result"/> </route> <route> <from uri="direct:resource"/> ... </route> </camelContext> <bean id="aggregationStrategy" class="..." />

The same example in the Spring DSL (Camel 2.16 or newer)

xml<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <enrich strategyRef="aggregationStrategy"> <constant>direct:resource</constant> </enrich> <to uri="direct:result"/> </route> <route> <from uri="direct:resource"/> ... </route> </camelContext> <bean id="aggregationStrategy" class="..." />

 

Aggregation strategy is optional

The aggregation strategy is optional. If you do not provide it Camel will by default just use the body obtained from the resource.

from("direct:start") .enrich("direct:resource") .to("direct:result");

In the route above the message sent to the direct:result endpoint will contain the output from the direct:resource as we do not use any custom aggregation.

And for Spring DSL (Camel 2.15 or older) just omit the strategyRef attribute:

xml <route> <from uri="direct:start"/> <enrich uri="direct:resource"/> <to uri="direct:result"/> </route>

And for Spring DSL (Camel 2.16 or newer) just omit the strategyRef attribute:

xml <route> <from uri="direct:start"/> <enrich> <constant>direct:resource</constant> </enrich> <to uri="direct:result"/> </route>

Using dynamic uris

Available as of Camel 2.16

From Camel 2.16 onwards enrich and pollEnrich supports using dynamic uris computed based on information from the current Exchange. For example to enrich from a HTTP endpoint where the header with key orderId is used as part of the content-path of the HTTP url:

from("direct:start") .enrich().simple("http:myserver/${header.orderId}/order") .to("direct:result");

And in XML DSL

xml <route> <from uri="direct:start"/> <enrich> <simple>http:myserver/${header.orderId}/order</simple> </enrich> <to uri="direct:result"/> </route>

Content enrichment using pollEnrich

The pollEnrich works just as the enrich however as it uses a Polling Consumer we have 3 methods when polling

  • receive
  • receiveNoWait
  • receive(timeout)

PollEnrich Options

confluenceTableSmall

Name

Default Value

Description

uri

 

The endpoint uri for the external service to enrich from. You must use either uri or ref. Important: From Camel 2.16 onwards, this option is removed, and you use an Expression to configure the uri, such as Simple or Constant or any other dynamic language that can compute the uri dynamically using values from the current  Exchange.

ref

 

Refers to the endpoint for the external service to enrich from. You must use either uri or ref. Important: From Camel 2.16 onwards, this option is removed, and you use an Expression to configure the uri, such as Simple or Constant or any other dynamic language that can compute the uri dynamically using values from the current  Exchange.

expression Camel 2.16: Mandatory. The Expression to configure the uri, such as Simple or Constant or any other dynamic language that can compute the uri dynamically using values from the current  Exchange.

strategyRef

 

Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message. From Camel 2.12 onwards you can also use a POJO as the AggregationStrategy, see the Aggregate page for more details.

strategyMethodName

 

Camel 2.12: This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

strategyMethodAllowNull

false

Camel 2.12: If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

timeout

-1

Timeout in millis when polling from the external service. See below for important details about the timeout.

aggregateOnExceptionfalseCamel 2.14: If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example to suppress the exception or set a custom message body etc.
cacheSize Camel 2.16: Allows to configure the cache size for the ConsumerCache which caches consumers for reuse in the pollEnrich. Will by default use the default cache size which is 1000. Setting the value to -1 allows to turn off the cache all together.
ignoreInvalidEndpointfalseCamel 2.16: Whether to ignore an endpoint URI that could not be resolved. If disabled, Camel will throw an exception identifying the invalid endpoint URI.
Good practice to use timeout value

By default Camel will use the receive. Which may block until there is a message available. It is therefore recommended to always provide a timeout value, to make this clear that we may wait for a message, until the timeout is hit.

If there is no data then the newExchange in the aggregation strategy is null.

You can pass in a timeout value that determines which method to use

  • if timeout is -1 or other negative number then receive is selected (Important: the receive method may block if there is no message)
  • if timeout is 0 then receiveNoWait is selected
  • otherwise receive(timeout) is selected

The timeout values is in millis.

Camel 2.15 or older - Data from current Exchange not used

pollEnrich does not access any data from the current Exchange which means when polling it cannot use any of the existing headers you may have set on the Exchange. For example you cannot set a filename in the Exchange.FILE_NAME header and use pollEnrich to consume only that file. For that you must set the filename in the endpoint URI.

From Camel 2.16 onwards both enrich and pollEnrich supports dynamic endpoints that uses an Expression to compute the uri, which allows to use data from the current Exchange. In other words all what is told above no longer apply and it just works.

Example

In this example we enrich the message by loading the content from the file named inbox/data.txt.

from("direct:start") .pollEnrich("file:inbox?fileName=data.txt") .to("direct:result");

And in XML DSL (Camel 2.15 or older) you do:

xml <route> <from uri="direct:start"/> <pollEnrich uri="file:inbox?fileName=data.txt"/> <to uri="direct:result"/> </route>

And in XML DSL (Camel 2.16 or newer) you do:

xml <route> <from uri="direct:start"/> <pollEnrich> <constant>file:inbox?fileName=data.txt</constant> </pollEnrich> <to uri="direct:result"/> </route>

 

If there is no file then the message is empty. We can use a timeout to either wait (potentially forever) until a file exists, or use a timeout to wait a certain period.

For example to wait up to 5 seconds you can do (Camel 2.15 or older):

xml <route> <from uri="direct:start"/> <pollEnrich uri="file:inbox?fileName=data.txt" timeout="5000"/> <to uri="direct:result"/> </route>

For example to wait up to 5 seconds you can do (Camel 2.16 or newer):

xml <route> <from uri="direct:start"/> <pollEnrich timeout="5000"> <constant>file:inbox?fileName=data.txt</constant> </pollEnrich> <to uri="direct:result"/> </route>

Using dynamic uris

Available as of Camel 2.16

From Camel 2.16 onwards enrich and pollEnrich supports using dynamic uris computed based on information from the current Exchange. For example to pollEnrich from an endpoint that uses a header to indicate a SEDA queue name:

from("direct:start") .pollEnrich().simple("seda:${header.name}") .to("direct:result");

And in XML DSL

xml <route> <from uri="direct:start"/> <pollEnrich> <simple>seda:${header.name}</simple> </pollEnrich> <to uri="direct:result"/> </route>

Using This Pattern

Content Filter

Camel supports the Content Filter from the EIP patterns using one of the following mechanisms in the routing logic to transform content from the inbound message.

A common way to filter messages is to use an Expression in the DSL like XQuery, SQL or one of the supported Scripting Languages.

Using the Fluent Builders

Here is a simple example using the DSL directly

Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20

In this example we add our own Processor

Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20

For further examples of this pattern in use you could look at one of the JUnit tests

Using Spring XML

<route>
  <from uri="activemq:Input"/>
  <bean ref="myBeanName" method="doTransform"/>
  <to uri="activemq:Output"/>
</route>

You can also use XPath to filter out part of the message you are interested in:

<route>
  <from uri="activemq:Input"/>
  <setBody><xpath resultType="org.w3c.dom.Document">//foo:bar</xpath></setBody>
  <to uri="activemq:Output"/>
</route> 

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Claim Check

The Claim Check from the EIP patterns allows you to replace message content with a claim check (a unique key), which can be used to retrieve the message content at a later time. The message content is stored temporarily in a persistent store like a database or file system. This pattern is very useful when message content is very large (thus it would be expensive to send around) and not all components require all information.

It can also be useful in situations where you cannot trust the information with an outside party; in this case, you can use the Claim Check to hide the sensitive portions of data.

Example

In this example we want to replace a message body with a claim check, and restore the body at a later step.

Using the Fluent Builders

{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckTest.java}

Using the Spring XML Extensions

xml <route> <from uri="direct:start"/> <pipeline> <to uri="bean:checkLuggage"/> <to uri="mock:testCheckpoint"/> <to uri="bean:dataEnricher"/> <to uri="mock:result"/> </pipeline> </route>

The example route is pretty simple - its just a Pipeline. In a real application you would have some other steps where the mock:testCheckpoint endpoint is in the example.

The message is first sent to the checkLuggage bean which looks like

{snippet:id=e2|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckTest.java}

This bean stores the message body into the data store, using the custId as the claim check. In this example, we're just using a HashMap to store the message body; in a real application you would use a database or file system, etc. Next the claim check is added as a message header for use later. Finally we remove the body from the message and pass it down the pipeline.

The next step in the pipeline is the mock:testCheckpoint endpoint which is just used to check that the message body is removed, claim check added, etc.

To add the message body back into the message, we use the dataEnricher bean which looks like

{snippet:id=e3|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckTest.java}

This bean queries the data store using the claim check as the key and then adds the data back into the message. The message body is then removed from the data store and finally the claim check is removed. Now the message is back to what we started with!

For full details, check the example source here:

camel-core/src/test/java/org/apache/camel/processor/ClaimCheckTest.java

Using This Pattern

Normalizer

Camel supports the Normalizer from the EIP patterns by using a Message Router in front of a number of Message Translator instances.

Example

This example shows a Message Normalizer that converts two types of XML messages into a common format. Messages in this common format are then filtered.

Using the Fluent Builders

{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/NormalizerTest.java}

In this case we're using a Java bean as the normalizer. The class looks like this

{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MyNormalizer.java}

Using the Spring XML Extensions

The same example in the Spring DSL

{snippet:id=example|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/normalizer.xml}

See Also

Using This Pattern

Sort

Sort can be used to sort a message. Imagine you consume text files and before processing each file you want to be sure the content is sorted.

Sort will by default sort the body using a default comparator that handles numeric values or uses the string representation. You can provide your own comparator, and even an expression to return the value to be sorted. Sort requires the value returned from the expression evaluation is convertible to java.util.List as this is required by the JDK sort operation.

Options

Name

Default Value

Description

comparatorRef

 

Refers to a custom java.util.Comparator to use for sorting the message body. Camel will by default use a comparator which does a A..Z sorting.

Using from Java DSL

In the route below it will read the file content and tokenize by line breaks so each line can be sorted.

from("file://inbox").sort(body().tokenize("\n")).to("bean:MyServiceBean.processLine");

You can pass in your own comparator as a 2nd argument:

from("file://inbox").sort(body().tokenize("\n"), new MyReverseComparator()).to("bean:MyServiceBean.processLine");

Using from Spring DSL

In the route below it will read the file content and tokenize by line breaks so each line can be sorted.

Camel 2.7 or better
<route>
  <from uri="file://inbox"/>
  <sort>
    <simple>body</simple>
  </sort>
  <beanRef ref="myServiceBean" method="processLine"/>
</route>
Camel 2.6 or older
<route>
  <from uri="file://inbox"/>
  <sort>
    <expression>
      <simple>body</simple>
    </expression>
  </sort>
  <beanRef ref="myServiceBean" method="processLine"/>
</route>

And to use our own comparator we can refer to it as a spring bean:

Camel 2.7 or better
<route>
  <from uri="file://inbox"/>
  <sort comparatorRef="myReverseComparator">
    <simple>body</simple>
  </sort>
  <beanRef ref="MyServiceBean" method="processLine"/>
</route>

<bean id="myReverseComparator" class="com.mycompany.MyReverseComparator"/>
Camel 2.6 or older
<route>
  <from uri="file://inbox"/>
  <sort comparatorRef="myReverseComparator">
    <expression>
      <simple>body</simple>
    </expression>
  </sort>
  <beanRef ref="MyServiceBean" method="processLine"/>
</route>

<bean id="myReverseComparator" class="com.mycompany.MyReverseComparator"/>

Besides <simple>, you can supply an expression using any language you like, so long as it returns a list.

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Messaging Endpoints

Messaging Mapper

Camel supports the Messaging Mapper from the EIP patterns by using either Message Translator pattern or the Type Converter module.

Example

The following example demonstrates the use of a Bean component to map between two messaging system

Using the Fluent Builders

from("activemq:foo")
	.beanRef("transformerBean", "transform")
	.to("jms:bar");

 

Using the Spring XML Extensions

<route>
	<from uri="activemq:foo"/>
	<bean ref="transformerBean" method="transform" />
	<to uri="jms:bar"/>
</route>

See also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Event Driven Consumer

Camel supports the Event Driven Consumer from the EIP patterns. The default consumer model is event based (i.e. asynchronous) as this means that the Camel container can then manage pooling, threading and concurrency for you in a declarative manner.

The Event Driven Consumer is implemented by consumers implementing the Processor interface which is invoked by the Message Endpoint when a Message is available for processing.

Example

The following demonstrates a Processor defined in the Camel  Registry which is invoked when an event occurs from a JMS queue


Using the Fluent Builders

from("jms:queue:foo")
	.processRef("processor");

 

Using the Spring XML Extensions

<route>
	<from uri="jms:queue:foo"/>
	<to uri="processor"/>
</route>

 

For more details see

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Polling Consumer

Camel supports implementing the Polling Consumer from the EIP patterns using the PollingConsumer interface which can be created via the Endpoint.createPollingConsumer() method.

In Java:

javaEndpoint endpoint = context.getEndpoint("activemq:my.queue"); PollingConsumer consumer = endpoint.createPollingConsumer(); Exchange exchange = consumer.receive();

The ConsumerTemplate (discussed below) is also available.

There are three main polling methods on PollingConsumer

Method name

Description

receive()

Waits until a message is available and then returns it; potentially blocking forever

receive(long)

Attempts to receive a message exchange, waiting up to the given timeout and returning null if no message exchange could be received within the time available

receiveNoWait()

Attempts to receive a message exchange immediately without waiting and returning null if a message exchange is not available yet

EventDrivenPollingConsumer Options

The EventDrivePollingConsumer (the default implementation) supports the following options:

confluenceTableSmall

Option

Default

Description

pollingConsumerQueueSize

1000

Camel 2.14/2.13.1/2.12.4: The queue size for the internal hand-off queue between the polling consumer, and producers sending data into the queue.

pollingConsumerBlockWhenFull

true

Camel 2.14/2.13.1/2.12/4: Whether to block any producer if the internal queue is full.

pollingConsumerBlockTimeout0Camel 2.16: To use a timeout (in milliseconds) when the producer is blocked if the internal queue is full. If the value is 0 or negative then no timeout is in use. If a timeout is triggered then a ExchangeTimedOutException is thrown.

Notice that some Camel Components has their own implementation of PollingConsumer and therefore do not support the options above.

You can configure these options in endpoints URIs, such as shown below:

javaEndpoint endpoint = context.getEndpoint("file:inbox?pollingConsumerQueueSize=50"); PollingConsumer consumer = endpoint.createPollingConsumer(); Exchange exchange = consumer.receive(5000);

ConsumerTemplate

The ConsumerTemplate is a template much like Spring's JmsTemplate or JdbcTemplate supporting the Polling Consumer EIP. With the template you can consume Exchanges from an Endpoint. The template supports the three operations listed above. However, it also includes convenient methods for returning the body, etc consumeBody.

Example:

Exchange exchange = consumerTemplate.receive("activemq:my.queue");

Or to extract and get the body you can do:

Object body = consumerTemplate.receiveBody("activemq:my.queue");

And you can provide the body type as a parameter and have it returned as the type:

String body = consumerTemplate.receiveBody("activemq:my.queue", String.class);

You get hold of a ConsumerTemplate from the CamelContext with the createConsumerTemplate operation:

ConsumerTemplate consumer = context.createConsumerTemplate();

Using ConsumerTemplate with Spring DSL

With the Spring DSL we can declare the consumer in the CamelContext with the consumerTemplate tag, just like the ProducerTemplate. The example below illustrates this:{snippet:id=e1|lang=xml|url=camel/components/camel-spring/src/test/resources/org/apache/camel/spring/SpringConsumerTemplateTest-context.xml}Then we can get leverage Spring to inject the ConsumerTemplate in our java class. The code below is part of an unit test but it shows how the consumer and producer can work together.{snippet:id=e1|lang=java|url=camel/components/camel-spring/src/test/java/org/apache/camel/spring/SpringConsumerTemplateTest.java}

Timer Based Polling Consumer

In this sample we use a Timer to schedule a route to be started every 5th second and invoke our bean MyCoolBean where we implement the business logic for the Polling Consumer. Here we want to consume all messages from a JMS queue, process the message and send them to the next queue.

First we setup our route as:{snippet:id=e1|lang=java|url=camel/tags/camel-2.6.0/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTimerBasedPollingConsumerTest.java}And then we have out logic in our bean:{snippet:id=e2|lang=java|url=camel/tags/camel-2.6.0/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTimerBasedPollingConsumerTest.java}

Scheduled Poll Components

Quite a few inbound Camel endpoints use a scheduled poll pattern to receive messages and push them through the Camel processing routes. That is to say externally from the client the endpoint appears to use an Event Driven Consumer but internally a scheduled poll is used to monitor some kind of state or resource and then fire message exchanges.

Since this a such a common pattern, polling components can extend the ScheduledPollConsumer base class which makes it simpler to implement this pattern. There is also the Quartz Component which provides scheduled delivery of messages using the Quartz enterprise scheduler.

For more details see:

ScheduledPollConsumer Options

The ScheduledPollConsumer supports the following options:

confluenceTableSmall

Option

Default

Description

backoffErrorThreshold

0

Camel 2.12: The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in.

backoffIdleThreshold

0

Camel 2.12: The number of subsequent idle polls that should happen before the backoffMultipler should kick-in.

backoffMultiplier

0

Camel 2.12: To let the scheduled polling consumer back-off if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured.

delay

500

Milliseconds before the next poll.

greedy

false

Camel 2.10.6/2.11.1: If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages.

initialDelay

1000

Milliseconds before the first poll starts.

pollStrategy

 

A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange has been created and routed in Camel. In other words the error occurred while the polling was gathering information, for instance access to a file network failed so Camel cannot access it to scan for files.

The default implementation will log the caused exception at WARN level and ignore it.

runLoggingLevel

TRACE

Camel 2.8: The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that.

scheduledExecutorService

null

Camel 2.10: Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool. This option allows you to share a thread pool among multiple consumers.

scheduler

null

Camel 2.12: Allow to plugin a custom org.apache.camel.spi.ScheduledPollConsumerScheduler to use as the scheduler for firing when the polling consumer runs. The default implementation uses the ScheduledExecutorService and there is a Quartz2, and Spring based which supports CRON expressions. Notice: If using a custom scheduler then the options for initialDelay, useFixedDelay, timeUnit and scheduledExecutorService may not be in use. Use the text quartz2 to refer to use the Quartz2 scheduler; and use the text spring to use the Spring based; and use the text #myScheduler to refer to a custom scheduler by its id in the Registry.

See Quartz2 page for an example.

scheduler.xxx

null

Camel 2.12: To configure additional properties when using a custom scheduler or any of the Quartz2, Spring based scheduler.

sendEmptyMessageWhenIdle

false

Camel 2.9: If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead.

startScheduler

true

Whether the scheduler should be auto started.

timeUnit

TimeUnit.MILLISECONDS

Time unit for initialDelay and delay options.

useFixedDelay

 

Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details. In Camel 2.7.x or older the default value is false.

From Camel 2.8: the default value is true.

Using backoff to Let the Scheduler be Less Aggressive

Available as of Camel 2.12

The scheduled Polling Consumer is by default static by using the same poll frequency whether or not there is messages to pickup or not.

From Camel 2.12: you can configure the scheduled Polling Consumer to be more dynamic by using backoff. This allows the scheduler to skip N number of polls when it becomes idle, or there has been X number of errors in a row. See more details in the table above for the backoffXXX options.

For example to let a FTP consumer back-off if its becoming idle for a while you can do:

javafrom("ftp://myserver?username=foo&passowrd=secret?delete=true&delay=5s&backoffMultiplier=6&backoffIdleThreshold=5") .to("bean:processFile");

In this example, the FTP consumer will poll for new FTP files every 5th second. But if it has been idle for 5 attempts in a row, then it will back-off using a multiplier of 6, which means it will now poll every 5 x 6 = 30th second instead. When the consumer eventually pickup a file, then the back-off will reset, and the consumer will go back and poll every 5th second again.

Camel will log at DEBUG level using org.apache.camel.impl.ScheduledPollConsumer when back-off is kicking-in.

About Error Handling and Scheduled Polling Consumers

ScheduledPollConsumer is scheduled based and its run method is invoked periodically based on schedule settings. But errors can also occur when a poll is being executed. For instance if Camel should poll a file network, and this network resource is not available then a java.io.IOException could occur. As this error happens before any Exchange has been created and prepared for routing, then the regular Error handling in Camel does not apply. So what does the consumer do then? Well the exception is propagated back to the run method where its handled. Camel will by default log the exception at WARN level and then ignore it. At next schedule the error could have been resolved and thus being able to poll the endpoint successfully.

Using a Custom Scheduler

Available as of Camel 2.12:

The SPI interface org.apache.camel.spi.ScheduledPollConsumerScheduler allows to implement a custom scheduler to control when the Polling Consumer runs. The default implementation is based on the JDKs ScheduledExecutorService with a single thread in the thread pool. There is a CRON based implementation in the Quartz2, and Spring components.

For an example of developing and using a custom scheduler, see the unit test org.apache.camel.component.file.FileConsumerCustomSchedulerTest from the source code in camel-core.

Error Handling When Using PollingConsumerPollStrategy

org.apache.camel.PollingConsumerPollStrategy is a pluggable strategy that you can configure on the ScheduledPollConsumer. The default implementation org.apache.camel.impl.DefaultPollingConsumerPollStrategy will log the caused exception at WARN level and then ignore this issue.

The strategy interface provides the following three methods:

  • begin
    • void begin(Consumer consumer, Endpoint endpoint)
  • begin (Camel 2.3)
    • boolean begin(Consumer consumer, Endpoint endpoint)
  • commit
    • void commit(Consumer consumer, Endpoint endpoint)
  • commit (Camel 2.6)
    • void commit(Consumer consumer, Endpoint endpoint, int polledMessages)
  • rollback
    • boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception

In Camel 2.3: the begin method returns a boolean which indicates whether or not to skipping polling. So you can implement your custom logic and return false if you do not want to poll this time.

In Camel 2.6: the commit method has an additional parameter containing the number of message that was actually polled. For example if there was no messages polled, the value would be zero, and you can react accordingly.

The most interesting is the rollback as it allows you do handle the caused exception and decide what to do.

For instance if we want to provide a retry feature to a scheduled consumer we can implement the PollingConsumerPollStrategy method and put the retry logic in the rollback method. Lets just retry up till three times:

javapublic boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception { if (retryCounter < 3) { // return true to tell Camel that it should retry the poll immediately return true; } // okay we give up do not retry anymore return false; }

Notice that we are given the Consumer as a parameter. We could use this to restart the consumer as we can invoke stop and start:

java// error occurred lets restart the consumer, that could maybe resolve the issue consumer.stop(); consumer.start();

Note: if you implement the begin operation make sure to avoid throwing exceptions as in such a case the poll operation is not invoked and Camel will invoke the rollback directly.

Configuring an Endpoint to Use PollingConsumerPollStrategy

To configure an Endpoint to use a custom PollingConsumerPollStrategy you use the option pollStrategy. For example in the file consumer below we want to use our custom strategy defined in the Registry with the bean id myPoll:

from("file://inbox/?pollStrategy=#myPoll") .to("activemq:queue:inbox")

Using This Pattern

See Also

Competing Consumers

Camel supports the Competing Consumers from the EIP patterns using a few different components.

You can use the following components to implement competing consumers:-

  • Seda for SEDA based concurrent processing using a thread pool
  • JMS for distributed SEDA based concurrent processing with queues which support reliable load balancing, failover and clustering.

Enabling Competing Consumers with JMS

To enable Competing Consumers you just need to set the concurrentConsumers property on the JMS endpoint.

For example

from("jms:MyQueue?concurrentConsumers=5").bean(SomeBean.class);

or in Spring DSL

<route>
  <from uri="jms:MyQueue?concurrentConsumers=5"/>
  <to uri="bean:someBean"/>
</route>

Or just run multiple JVMs of any ActiveMQ or JMS route (smile)

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Message Dispatcher

Camel supports the Message Dispatcher from the EIP patterns using various approaches.

You can use a component like JMS with selectors to implement a Selective Consumer as the Message Dispatcher implementation. Or you can use an Endpoint as the Message Dispatcher itself and then use a Content Based Router as the Message Dispatcher.

 

Example

The following example demonstrates Message Dispatcher pattern using the Competing Consumers functionality of the JMS component to offload messages to a Content Based Router and custom Processors registered in the Camel Registry running in separate threads from originating consumer.

 

Using the Fluent Builders

from("jms:queue:foo?concurrentConsumers=5")
	.threads(5)
	.choice()
		.when(header("type").isEqualTo("A")) 
			.processRef("messageDispatchProcessorA")
		.when(header("type").isEqualTo("B"))
			.processRef("messageDispatchProcessorB")
		.when(header("type").isEqualTo("C"))
			.processRef("messageDispatchProcessorC")		
		.otherwise()
			.to("jms:queue:invalidMessageType");

 

Using the Spring XML Extensions

<route>
	<from uri="jms:queue:foo?concurrentConsumers=5"/>
	<threads poolSize="5">
		<choice>
			<when>
				<simple>${in.header.type} == 'A'</simple>
				<to ref="messageDispatchProcessorA"/>
			</when>
			<when>
				<simple>${in.header.type} == 'B'</simple>
				<to ref="messageDispatchProcessorB"/>
			</when>
			<when>
				<simple>${in.header.type} == 'C'</simple>
				<to ref="messageDispatchProcessorC"/>
			</when>
			<otherwise>
				<to uri="jms:queue:invalidMessageType"/>
		</choice>
	</threads>
</route>

See Also

 

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Selective Consumer

The Selective Consumer from the EIP patterns can be implemented in two ways

The first solution is to provide a Message Selector to the underlying URIs when creating your consumer. For example when using JMS you can specify a selector parameter so that the message broker will only deliver messages matching your criteria.

The other approach is to use a Message Filter which is applied; then if the filter matches the message your consumer is invoked as shown in the following example

Using the Fluent Builders

Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20

Using the Spring XML Extensions

Error rendering macro 'code': Invalid value specified for parameter 'java.lang.NullPointerException'
<bean id="myProcessor" class="org.apache.camel.builder.MyProcessor"/>

<camelContext errorHandlerRef="errorHandler" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:a"/>
        <filter>
            <xpath>$foo = 'bar'</xpath>
            <process ref="myProcessor"/>
        </filter>
    </route>
</camelContext>

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Durable Subscriber

Camel supports the Durable Subscriber from the EIP patterns using the JMS component which supports publish & subscribe using Topics with support for non-durable and durable subscribers.

Another alternative is to combine the Message Dispatcher or Content Based Router with File or JPA components for durable subscribers then something like Seda for non-durable.

Here is a simple example of creating durable subscribers to a JMS topic

Using the Fluent Builders

from("direct:start").to("activemq:topic:foo");

from("activemq:topic:foo?clientId=1&durableSubscriptionName=bar1").to("mock:result1");

from("activemq:topic:foo?clientId=2&durableSubscriptionName=bar2").to("mock:result2");

Using the Spring XML Extensions

<route>
    <from uri="direct:start"/>
    <to uri="activemq:topic:foo"/>
</route>

<route>
    <from uri="activemq:topic:foo?clientId=1&durableSubscriptionName=bar1"/>
    <to uri="mock:result1"/>
</route>

<route>
    <from uri="activemq:topic:foo?clientId=2&durableSubscriptionName=bar2"/>
    <to uri="mock:result2"/>
</route>

Here is another example of JMS durable subscribers, but this time using virtual topics (recommended by AMQ over durable subscriptions)

Using the Fluent Builders

from("direct:start").to("activemq:topic:VirtualTopic.foo");

from("activemq:queue:Consumer.1.VirtualTopic.foo").to("mock:result1");

from("activemq:queue:Consumer.2.VirtualTopic.foo").to("mock:result2");

Using the Spring XML Extensions

<route>
    <from uri="direct:start"/>
    <to uri="activemq:topic:VirtualTopic.foo"/>
</route>

<route>
    <from uri="activemq:queue:Consumer.1.VirtualTopic.foo"/>
    <to uri="mock:result1"/>
</route>

<route>
    <from uri="activemq:queue:Consumer.2.VirtualTopic.foo"/>
    <to uri="mock:result2"/>
</route>

See Also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Idempotent Consumer

The Idempotent Consumer from the EIP patterns is used to filter out duplicate messages.

This pattern is implemented using the IdempotentConsumer class. This uses an Expression to calculate a unique message ID string for a given message exchange; this ID can then be looked up in the IdempotentRepository to see if it has been seen before; if it has the message is consumed; if its not then the message is processed and the ID is added to the repository.

The Idempotent Consumer essentially acts like a Message Filter to filter out duplicates.

Camel will add the message id eagerly to the repository to detect duplication also for Exchanges currently in progress.
On completion Camel will remove the message id from the repository if the Exchange failed, otherwise it stays there.

Camel provides the following Idempotent Consumer implementations:

Options

The Idempotent Consumer has the following options:

Option

Default

Description

eager

true

Eager controls whether Camel adds the message to the repository before or after the exchange has been processed. If enabled before then Camel will be able to detect duplicate messages even when messages are currently in progress. By disabling Camel will only detect duplicates when a message has successfully been processed.

messageIdRepositoryRef

null

A reference to a IdempotentRepository to lookup in the registry. This option is mandatory when using XML DSL.

skipDuplicate

true

Camel 2.8: Sets whether to skip duplicate messages. If set to false then the message will be continued. However the Exchange has been marked as a duplicate by having the Exchange.DUPLICATE_MESSAG exchange property set to a Boolean.TRUE value.

removeOnFailure

true

Camel 2.9: Sets whether to remove the id of an Exchange that failed.

completionEagerfalse

Camel 2.16: Sets whether to complete the idempotent consumer eager or when the exchange is done.

If this option is true to complete eager, then the idempotent consumer will trigger its completion when the exchange reached the end of the block of the idempotent consumer pattern. So if the exchange is continued routed after the block ends, then whatever happens there does not affect the state.

If this option is false (default) to not complete eager, then the idempotent consumer will complete when the exchange is done being routed. So if the exchange is continued routed after the block ends, then whatever happens there also affect the state. For example if the exchange failed due to an exception, then the state of the idempotent consumer will be a rollback.

Using the Fluent Builders

The following example will use the header myMessageId to filter out duplicates{snippet:id=idempotent|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java}The above example will use an in-memory based MessageIdRepository which can easily run out of memory and doesn't work in a clustered environment. So you might prefer to use the JPA based implementation which uses a database to store the message IDs which have been processed{snippet:id=idempotent|lang=java|url=camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java}In the above example we are using the header messageId to filter out duplicates and using the collection myProcessorName to indicate the Message ID Repository to use. This name is important as you could process the same message by many different processors; so each may require its own logical Message ID Repository.

For further examples of this pattern in use you could look at the junit test case

Spring XML example

The following example will use the header myMessageId to filter out duplicates{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.xml}

How to handle duplicate messages in the route

Available as of Camel 2.8

You can now set the skipDuplicate option to false which instructs the idempotent consumer to route duplicate messages as well. However the duplicate message has been marked as duplicate by having a property on the Exchange set to true. We can leverage this fact by using a Content Based Router or Message Filter to detect this and handle duplicate messages.

For example in the following example we use the Message Filter to send the message to a duplicate endpoint, and then stop continue routing that message.{snippet:id=e1|lang=java|title=Filter duplicate messages|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java}The sample example in XML DSL would be:{snippet:id=e1|lang=xml|title=Filter duplicate messages|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml}

How to handle duplicate message in a clustered environment with a data grid

Available as of Camel 2.8

If you have running Camel in a clustered environment, a in memory idempotent repository doesn't work (see above). You can setup either a central database or use the idempotent consumer implementation based on the Hazelcast data grid. Hazelcast finds the nodes over multicast (which is default - configure Hazelcast for tcp-ip) and creates automatically a map based repository:

java HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo"); from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");

You have to define how long the repository should hold each message id (default is to delete it never). To avoid that you run out of memory you should create an eviction strategy based on the Hazelcast configuration. For additional information see camel-hazelcast.

See this little tutorial, how setup such an idempotent repository on two cluster nodes using Apache Karaf.

Available as of Camel 2.13.0

Another option for using Idempotent Consumer in a clustered environment is Infinispan. Infinispan is a data grid with replication and distribution clustering support. For additional information see camel-infinispan.

Using This Pattern

Transactional Client

Camel recommends supporting the Transactional Client from the EIP patterns using spring transactions.

Transaction Oriented Endpoints like JMS support using a transaction for both inbound and outbound message exchanges. Endpoints that support transactions will participate in the current transaction context that they are called from.

Configuration of Redelivery

The redelivery in transacted mode is not handled by Camel but by the backing system (the transaction manager). In such cases you should resort to the backing system how to configure the redelivery.

You should use the SpringRouteBuilder to setup the routes since you will need to setup the spring context with the TransactionTemplates that will define the transaction manager configuration and policies.

For inbound endpoint to be transacted, they normally need to be configured to use a Spring PlatformTransactionManager. In the case of the JMS component, this can be done by looking it up in the spring context.

You first define needed object in the spring configuration.

xml <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean>

Then you look them up and use them to create the JmsComponent.

java PlatformTransactionManager transactionManager = (PlatformTransactionManager) spring.getBean("jmsTransactionManager"); ConnectionFactory connectionFactory = (ConnectionFactory) spring.getBean("jmsConnectionFactory"); JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager); component.getConfiguration().setConcurrentConsumers(1); ctx.addComponent("activemq", component);

Transaction Policies

Outbound endpoints will automatically enlist in the current transaction context. But what if you do not want your outbound endpoint to enlist in the same transaction as your inbound endpoint? The solution is to add a Transaction Policy to the processing route. You first have to define transaction policies that you will be using. The policies use a spring TransactionTemplate under the covers for declaring the transaction demarcation to use. So you will need to add something like the following to your spring xml:

xml <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="transactionManager" ref="jmsTransactionManager"/> </bean> <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/> </bean>

Then in your SpringRouteBuilder, you just need to create new SpringTransactionPolicy objects for each of the templates.

javapublic void configure() { ... Policy requried = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRED")); Policy requirenew = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRES_NEW")); ... }

Once created, you can use the Policy objects in your processing routes:

java // Send to bar in a new transaction from("activemq:queue:foo").policy(requirenew).to("activemq:queue:bar"); // Send to bar without a transaction. from("activemq:queue:foo").policy(notsupported ).to("activemq:queue:bar");

OSGi Blueprint

If you are using OSGi Blueprint then you most likely have to explicit declare a policy and refer to the policy from the transacted in the route.

xml <bean id="required" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/> </bean>

And then refer to "required" from the route:

xml<route> <from uri="activemq:queue:foo"/> <transacted ref="required"/> <to uri="activemq:queue:bar"/> </route>

Database Sample

In this sample we want to ensure that two endpoints is under transaction control. These two endpoints inserts data into a database.
The sample is in its full as a unit test.

First of all we setup the usual spring stuff in its configuration file. Here we have defined a DataSource to the HSQLDB and a most importantly the Spring DataSource TransactionManager that is doing the heavy lifting of ensuring our transactional policies. You are of course free to use any of the Spring based TransactionManager, eg. if you are in a full blown J2EE container you could use JTA or the WebLogic or WebSphere specific managers.

As we use the new convention over configuration we do not need to configure a transaction policy bean, so we do not have any PROPAGATION_REQUIRED beans. All the beans needed to be configured is standard Spring beans only, eg. there are no Camel specific configuration at all.{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/springTransactionalClientDataSourceMinimalConfiguration.xml}Then we are ready to define our Camel routes. We have two routes: 1 for success conditions, and 1 for a forced rollback condition.
This is after all based on a unit test. Notice that we mark each route as transacted using the transacted tag.{snippet:id=e2|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/springTransactionalClientDataSourceMinimalConfiguration.xml}That is all that is needed to configure a Camel route as being transacted. Just remember to use the transacted DSL. The rest is standard Spring XML to setup the transaction manager.

JMS Sample

In this sample we want to listen for messages on a queue and process the messages with our business logic java code and send them along. Since its based on a unit test the destination is a mock endpoint.

First we configure the standard Spring XML to declare a JMS connection factory, a JMS transaction manager and our ActiveMQ component that we use in our routing.{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml}And then we configure our routes. Notice that all we have to do is mark the route as transacted using the transacted tag.{snippet:id=e2|lang=xml|url=camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml}

Transaction error handler

When a route is marked as transacted using transacted Camel will automatic use the TransactionErrorHandler as Error Handler. It supports basically the same feature set as the DefaultErrorHandler, so you can for instance use Exception Clause as well.

Integration Testing with Spring

An Integration Test here means a test runner class annotated @RunWith(SpringJUnit4ClassRunner.class).

When following the Spring Transactions documentation it is tempting to annotate your integration test with @Transactional then seed your database before firing up the route to be tested and sending a message in. This is incorrect as Spring will have an in-progress transaction, and Camel will wait on this before proceeding, leading to the route timing out.

Instead, remove the @Transactional annotation from the test method and seed the test data within a TransactionTemplate execution which will ensure the data is committed to the database before Camel attempts to pick up and use the transaction manager. A simple example can be found on GitHub.

Spring's transactional model ensures each transaction is bound to one thread. A Camel route may invoke additional threads which is where the blockage may occur. This is not a fault of Camel but as the programmer you must be aware of the consequences of beginning a transaction in a test thread and expecting a separate thread created by your Camel route to be participate, which it cannot. You can, in your test, mock the parts that cause separate threads to avoid this issue.

Using multiple routes with different propagation behaviors

Available as of Camel 2.2
Suppose you want to route a message through two routes and by which the 2nd route should run in its own transaction. How do you do that? You use propagation behaviors for that where you configure it as follows:

  • The first route use PROPAGATION_REQUIRED
  • The second route use PROPAGATION_REQUIRES_NEW

This is configured in the Spring XML file:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/MixedTransactionPropagationTest.xml}Then in the routes you use transacted DSL to indicate which of these two propagations it uses.{snippet:id=e1|lang=java|url=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/MixedTransactionPropagationTest.java}Notice how we have configured the onException in the 2nd route to indicate in case of any exceptions we should handle it and just rollback this transaction. This is done using the markRollbackOnlyLast which tells Camel to only do it for the current transaction and not globally.

See Also

Using This Pattern

Messaging Gateway

Camel has several endpoint components that support the Messaging Gateway from the EIP patterns.

Components like Bean and CXF provide a a way to bind a Java interface to the message exchange.

However you may want to read the Using CamelProxy documentation as a true Messaging Gateway EIP solution.
Another approach is to use @Produce which you can read about in POJO Producing which also can be used as a Messaging Gateway EIP solution.

 

Example

The following example how the CXF and Bean components can be used to abstract the developer from the underlying messaging system API


Using the Fluent Builders

from("cxf:bean:soapMessageEndpoint")
	.to("bean:testBean?method=processSOAP");

 

Using the Spring XML Extensions

<route>
	<from uri="cxf:bean:soapMessageEndpoint"/>
	<to uri="bean:testBean?method=processSOAP"/>
</route>

See Also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Service Activator

Camel has several endpoint components that support the Service Activator from the EIP patterns.

Components like Bean, CXF and Pojo provide a a way to bind the message exchange to a Java interface/service where the route defines the endpoints and wires it up to the bean.

In addition you can use the Bean Integration to wire messages to a bean using annotation.

Here is a simple example of using a Direct endpoint to create a messaging interface to a Pojo Bean service.

Using the Fluent Builders

from("direct:invokeMyService").to("bean:myService");

Using the Spring XML Extensions

<route>
    <from uri="direct:invokeMyService"/>
    <to uri="bean:myService"/>
</route>

See Also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

System Management

Detour

The Detour from the EIP patterns allows you to send messages through additional steps if a control condition is met. It can be useful for turning on extra validation, testing, debugging code when needed.

Example

In this example we essentially have a route like from("direct:start").to("mock:result") with a conditional detour to the mock:detour endpoint in the middle of the route..

Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20

Using the Spring XML Extensions

<route>
  <from uri="direct:start"/>
    <choice>
      <when>
        <method bean="controlBean" method="isDetour"/>
	<to uri="mock:detour"/>
      </when>
    </choice>
    <to uri="mock:result"/>
</route>

whether the detour is turned on or off is decided by the ControlBean. So, when the detour is on the message is routed to mock:detour and then mock:result. When the detour is off, the message is routed to mock:result.

For full details, check the example source here:

camel-core/src/test/java/org/apache/camel/processor/DetourTest.java

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Wire Tap

Wire Tap (from the EIP patterns) allows you to route messages to a separate location while they are being forwarded to the ultimate destination.

Streams

If you Wire Tap a stream message body then you should consider enabling Stream caching to ensure the message body can be read at each endpoint. See more details at Stream caching.

Options

Name

Default

Description

uri

 

Mandatory: The URI of the endpoint to which the wire-tapped message should be sent.

From Camel 2.16: support for dynamic to URIs is as documented in Message Endpoint.

executorServiceRef

 

Reference ID of a custom Thread Pool to use when processing the wire-tapped messages.

When not set, Camel will use an instance of the default thread pool.

processorRef

 

Reference ID of a custom Processor to use for creating a new message.

See "Sending a New Exchange" below.

copy

true

Camel 2.3: Whether to copy the Exchange before wire-tapping the message.

onPrepareRef

 

Camel 2.8: Reference identifier of a custom Processor to prepare the copy of the Exchange to be wire-tapped. This allows you to do any custom logic, such as deep-cloning the message payload.

cacheSize

 

Camel 2.16: Allows to configure the cache size for the ProducerCache which caches producers for reuse. Will by default use the default cache size which is 1000.

Setting the value to -1 allows to turn off the cache all together.

ignoreInvalidEndpoint

false

Camel 2.16: Whether to ignore an endpoint URI that could not be resolved.

When false, Camel will throw an exception when it identifies an invalid endpoint URI.

WireTap Threadpool

The Wire Tap uses a thread pool to process the tapped messages. This thread pool will by default use the settings detailed at Threading Model. In particular, when the pool is exhausted (with all threads utilized), further wiretaps will be executed synchronously by the calling thread. To remedy this, you can configure an explicit thread pool on the Wire Tap having either a different rejection policy, a larger worker queue, or more worker threads.

WireTap Node

Camel's Wire Tap node supports two flavors when tapping an Exchange:

  • With the traditional Wire Tap, Camel will copy the original Exchange and set its Exchange Pattern to InOnly, as we want the tapped Exchange to be sent in a fire and forget style. The tapped Exchange is then sent in a separate thread so it can run in parallel with the original. Beware that only the Exchange is copied - Wire Tap won't do a deep clone (unless you specify a custom processor via onPrepareRef which does that). So all copies could share objects from the original Exchange.
  • Camel also provides an option of sending a new Exchange allowing you to populate it with new values.

Sending a Copy (traditional wiretap)

Using the Fluent Builders{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapTest.java}Using the Spring XML Extensions{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml}

Sending a New Exchange

Using the Fluent Builders
Camel supports either a processor or an Expression to populate the new Exchange. Using a processor gives you full power over how the Exchange is populated as you can set properties, headers, etc. An Expression can only be used to set the IN body.

From Camel 2.3: the Expression or Processor is pre-populated with a copy of the original Exchange, which allows you to access the original message when you prepare a new Exchange to be sent. You can use the copy option (enabled by default) to indicate whether you want this. If you set copy=false, then it works as in Camel 2.2 or older where the Exchange will be empty.

Below is the processor variation. This example is from Camel 2.3, where we disable copy by passing in false to create a new, empty Exchange.{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetTest.java}Here is the Expression variation. In the following example we disable copying by setting copy=false which results in the creation of a new, empty Exchange.{snippet:id=e2|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetTest.java}Using the Spring XML Extensions
The processor variation, which uses a processorRef attribute to refer to a Spring bean by ID:{snippet:id=e2|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml}Here is the Expression variation, where the expression is defined in the body tag:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml}This variation accesses the body of the original message and creates a new Exchange based on the Expression. It will create a new Exchange and have the body contain "Bye ORIGINAL BODY MESSAGE HERE"{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml}

Further Example

For another example of this pattern, refer to the wire tap test case.

Using Dynamic URIs

Available as of Camel 2.16:

For example to wire tap to a dynamic URI, then it supports the same dynamic URIs as documented in Message Endpoint. For example to wire tap to a JMS queue where the header ID is part of the queue name:

from("direct:start") .wireTap("jms:queue:backup-${header.id}") .to("bean:doSomething");

 

Sending a New Exchange and Set Headers in DSL

Available as of Camel 2.8

If you send a new message using Wire Tap, then you could only set the message body using an Expression from the DSL. If you also need to set headers, you would have to use a Processor. From Camel 2.8: it's possible to set headers as well using the DSL.

The following example sends a new message which has

  • Bye World as message body.
  • A header with key id with the value 123.
  • A header with key date which has current date as value.

Java DSL

{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java}

XML DSL

The XML DSL is slightly different than Java DSL in how you configure the message body and headers using <body> and <setHeader>:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml}

Using onPrepare to Execute Custom Logic when Preparing Messages

Available as of Camel 2.8

See details at Multicast

Using This Pattern

Log

How can I log the processing of a Message?

Camel provides many ways to log the fact that you are processing a message. Here are just a few examples:

  • You can use the Log component which logs the Message content.
  • You can use the Tracer which trace logs message flow.
  • You can also use a Processor or Bean and log from Java code.
  • You can use the log DSL.

Using log DSL

In Camel 2.2 you can use the log DSL which allows you to use Simple language to construct a dynamic message which gets logged.
For example you can do

from("direct:start").log("Processing ${id}").to("bean:foo");

Which will construct a String message at runtime using the Simple language. The log message will by logged at INFO level using the route id as the log name. By default a route is named route-1, route-2 etc. But you can use the routeId("myCoolRoute") to set a route name of choice.

Difference between log in the DSL and [Log] component

The log DSL is much lighter and meant for logging human logs such as Starting to do ... etc. It can only log a message based on the Simple language. On the other hand Log component is a full fledged component which involves using endpoints and etc. The Log component is meant for logging the Message itself and you have many URI options to control what you would like to be logged.

Using Logger instance from the the Registry

As of Camel 2.12.4/2.13.1, if no logger name or logger instance is passed to log DSL, there is a Registry lookup performed to find single instance of org.slf4j.Logger. If such an instance is found, it is used instead of creating a new logger instance. If more instances are found, the behavior defaults to creating a new instance of logger.

Logging message body with streamed messages

If the message body is stream based, then logging the message body, may cause the message body to be empty afterwards. See this FAQ. For streamed messages you can use Stream caching to allow logging the message body and be able to read the message body afterwards again.

The log DSL have overloaded methods to set the logging level and/or name as well.

from("direct:start").log(LoggingLevel.DEBUG, "Processing ${id}").to("bean:foo");

and to set a logger name

from("direct:start").log(LoggingLevel.DEBUG, "com.mycompany.MyCoolRoute", "Processing ${id}").to("bean:foo");

Since Camel 2.12.4/2.13.1 the logger instance may be used as well:

from("direct:start").log(LoggingLeven.DEBUG, org.slf4j.LoggerFactory.getLogger("com.mycompany.mylogger"), "Processing ${id}").to("bean:foo");

For example you can use this to log the file name being processed if you consume files.

from("file://target/files").log(LoggingLevel.DEBUG, "Processing file ${file:name}").to("bean:foo");

Using log DSL from Spring

In Spring DSL it is also easy to use log DSL as shown below:

        <route id="foo">
            <from uri="direct:foo"/>
            <log message="Got ${body}"/>
            <to uri="mock:foo"/>
        </route>

The log tag has attributes to set the message, loggingLevel and logName. For example:

        <route id="baz">
            <from uri="direct:baz"/>
            <log message="Me Got ${body}" loggingLevel="FATAL" logName="com.mycompany.MyCoolRoute"/>
            <to uri="mock:baz"/>
        </route>

Since Camel 2.12.4/2.13.1 it is possible to reference logger instance. For example:

        <bean id="myLogger" class="org.slf4j.LoggerFactory" factory-method="getLogger" xmlns="http://www.springframework.org/schema/beans">
            <constructor-arg value="com.mycompany.mylogger" />
        </bean>

        <route id="moo" xmlns="http://camel.apache.org/schema/spring">
            <from uri="direct:moo"/>
            <log message="Me Got ${body}" loggingLevel="INFO" loggerRef="myLogger"/>
            <to uri="mock:baz"/>
        </route>

Configuring log name globally

Available as of Camel 2.17

By default the log name is the route id. If you want to use a different log name, you would need to configure the logName option. However if you have many logs and you want all of them to use the same log name, then you would need to set that logName option on all of them.

With Camel 2.17 onwards you can configure a global log name that is used instead of the route id, eg

CamelContext context = ...
context.getProperties().put(Exchange.LOG_EIP_NAME, "com.foo.myapp");

And in XML

<camelContext ...>
  <properties>
    <property key="CamelLogEipName" value="com.foo.myapp"/>
  </properties>

 

Using slf4j Marker

Available as of Camel 2.9

You can specify a marker name in the DSL

        <route id="baz">
            <from uri="direct:baz"/>
            <log message="Me Got ${body}" loggingLevel="FATAL" logName="com.mycompany.MyCoolRoute" marker="myMarker"/>
            <to uri="mock:baz"/>
        </route>

Using log DSL in OSGi

Improvement as of Camel 2.12.4/2.13.1

When using log DSL inside OSGi (e.g., in Karaf), the underlying logging mechanisms are provided by PAX logging. It searches for a bundle which invokes org.slf4j.LoggerFactory.getLogger() method and associates the bundle with the logger instance. Passing only logger name to log DSL results in associating camel-core bundle with the logger instance created.

In some scenarios it is required that the bundle associated with logger should be the bundle which contains route definition. This is possible using provided logger instance both for Java DSL and Spring DSL (see the examples above).

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

© 2004-2015 The Apache Software Foundation.
Apache Camel, Camel, Apache, the Apache feather logo, and the Apache Camel project logo are trademarks of The Apache Software Foundation. All other marks mentioned may be trademarks or registered trademarks of their respective owners.
Graphic Design By Hiram