ActiveMQ Component
The ActiveMQ component allows messages to be sent to a JMS Queue or Topic or messages to be consumed from a JMS Queue or Topic using Apache ActiveMQ. This component is based on JMS Component and uses Spring's JMS support for declarative transactions, using Spring's JmsTemplate
for sending and a MessageListenerContainer
for consuming. All the options from the JMS component also applies for this component.
To use this component make sure you have the activemq.jar
or activemq-core.jar
on your classpath along with any Camel dependencies such as camel-core.jar
, camel-spring.jar
and camel-jms.jar
.
Transacted and cachingSee section Transactions and Cache Levels below on JMS page if you are using transactions with JMS as it can impact performance.
activemq:[queue:|topic:]destinationName
Where destinationName
is an ActiveMQ queue or topic name. By default, the destinationName
is interpreted as a queue name. For example, to connect to the queue, FOO.BAR
, use:
activemq:FOO.BAR
You can include the optional queue:
prefix, if you prefer:
activemq:queue:FOO.BAR
To connect to a topic, you must include the topic:
prefix. For example, to connect to the topic, Stocks.Prices
, use:
activemq:topic:Stocks.Prices
Options
See Options on the JMS component as all these options also apply for this component.
Configuring the Connection Factory
This test case shows how to add an ActiveMQComponent
to the CamelContext using the activeMQComponent()
method while specifying the brokerURL used to connect to ActiveMQ.
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
Configuring the Connection Factory using Spring XML
You can configure the ActiveMQ broker URL on the ActiveMQComponent
as follows
xml<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelContext xmlns="http://camel.apache.org/schema/spring">
</camelContext>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://somehost:61616"/>
</bean>
</beans>
Using Connection Pooling
When sending to an ActiveMQ broker using Camel it's recommended to use a pooled connection factory to efficiently handle pooling of JMS connections, sessions and producers. This is documented on the ActiveMQ Spring Support page.
You can grab ActiveMQ's org.apache.activemq.pool.PooledConnectionFactory
with Maven:
xml<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.6.0</version>
</dependency>
And then setup the activemq
Camel component as follows:
xml<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
<property name="maxConnections" value="8"/>
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<property name="concurrentConsumers" value="10"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig"/>
<!-- If transacted=true then enable CACHE_CONSUMER (if not using XA) to run faster.
See more details at: http://camel.apache.org/jms -->
<!--
<property name="transacted" value="true"/>
<property name="cacheLevelName" value="CACHE_CONSUMER"/>
-->
</bean>
Notice the init
and destroy
methods on the pooled connection factory. This is important to ensure the connection pool is properly started and shutdown.
Important information about when using transactionsIf you are using transactions then see more details at JMS. And remember to set cacheLevelName
to CACHE_CONSUMER
if you are not using XA transactions. This can dramatically improve performance.
The PooledConnectionFactory
will then create a connection pool with up to 8 connections in use at the same time. Each connection can be shared by many sessions. There is an option named maximumActive
you can use to configure the maximum number of sessions per connection; the default value is 500
. From ActiveMQ 5.7: the option has been renamed to better reflect its purpose, being named as maximumActiveSessionPerConnection
. Notice the concurrentConsumers
is set to a higher value than maxConnections
is. This is okay, as each consumer is using a session, and as a session can share the same connection, we are in the safe. In this example we can have 8 * 500 = 4000
active sessions at the same time.
Invoking MessageListener POJOs in a Camel route
The ActiveMQ component also provides a helper Type Converter from a JMS MessageListener
to a Processor. This means that the Bean component is capable of invoking any JMS MessageListener
bean directly inside any route.
So for example you can create a MessageListener
in JMS like this:
public class MyListener implements MessageListener {
public void onMessage(Message jmsMessage) {
// ...
}
}
Then use it in your Camel route as follows
from("file://foo/bar")
.bean(MyListener.class);
That is, you can reuse any of the Camel Components and easily integrate them into your JMS MessageListener
POJO!
Using ActiveMQ Destination Options
Available as of ActiveMQ 5.6
You can configure the Destination Options in the endpoint URI, using the destination.
prefix. For example to mark a consumer as exclusive, and set its prefetch size to 50
, you can do as follows:
<from uri="activemq:foo?destination.consumer.exclusive=true&destination.consumer.prefetchSize=50"/>Consuming Advisory Messages
ActiveMQ can generate Advisory messages which are put in topics that you can consume. Such messages can help you send alerts in case you detect slow consumers or to build statistics (number of messages/produced per day, etc.) The following Spring DSL example shows you how to read messages from a topic.
The below route starts by reading the topic ActiveMQ.Advisory.Connection. To watch another topic, simply change the name according to the name provided in ActiveMQ Advisory Messages documentation. The parameter mapJmsMessage=false
allows for converting the org.apache.activemq.command.ActiveMqMessage
object from the JMS queue. Next, the body received is converted into a String for the purposes of this example and a carriage return is added. Finally, the string is added to a file
<route>
<from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false"/>
<convertBodyTo type="java.lang.String"/>
<transform>
<simple>${in.body} </simple>
</transform>
<to uri="file://data/activemq/?fileExist=Append&fileName=advisoryConnection-${date:now:yyyyMMdd}.txt"/>
</route>
If you consume a message on a queue, you should see the following files under the data/activemq
folder :
advisoryConnection-20100312.txt
advisoryProducer-20100312.txt
containing the following string:
ActiveMQMessage {
commandId = 0,
responseRequired = false,
messageId = ID:dell-charles-3258-1268399815140-1:0:0:0:221,
originalDestination = null,
originalTransactionId = null,
producerId = ID:dell-charles-3258-1268399815140-1:0:0:0,
destination = topic://ActiveMQ.Advisory.Connection,
transactionId = null,
expiration = 0,
timestamp = 0,
arrival = 0,
brokerInTime = 1268403383468,
brokerOutTime = 1268403383468,
correlationId = null,
replyTo = null,
persistent = false,
type = Advisory,
priority = 0,
groupID = null,
groupSequence = 0,
targetConsumerId = null,
compressed = false,
userID = null,
content = null,
marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705,
dataStructure = ConnectionInfo {
commandId = 1,
responseRequired = true,
connectionId = ID:dell-charles-3258-1268399815140-2:50,
clientId = ID:dell-charles-3258-1268399815140-14:0,
userName = ,
password = *****,
brokerPath = null,
brokerMasterConnector = false,
manageable = true,
clientMaster = true
},
redeliveryCounter = 0,
size = 0,
properties = { originBrokerName=master, originBrokerId=ID:dell-charles-3258-1268399815140-0:0, originBrokerURL=vm://master },
readOnlyProperties = true,
readOnlyBody = true,
droppable = false
}
Getting Component JAR
You will need this dependency
ActiveMQ is an extension of the JMS component released with the ActiveMQ project.
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-camel</artifactId>
<version>5.6.0</version>
</dependency>
Endpoint See Also