Since we're on a major migration process of this website, some component documents here are out of sync right now. In the meantime you may want to look at the early version of the new website
https://camel.apache.org/staging/
We would very much like to receive any feedback on the new site, please join the discussion on the Camel user mailing list.
SJMS ComponentAvailable as of Camel 2.11 The Simple JMS Component, or SJMS, is a JMS client for use with Camel that uses well known best practices when it comes to JMS client creation and configuration. SJMS contains a brand new JMS client API written explicitly for Camel eliminating third party messaging implementations keeping it light and resilient. The following features is included:
Additional key features include:
Why the S in SJMS S stands for Simple and Standard and Springless. Also camel-jms was already taken.
Maven users will need to add the following dependency to their <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-sjms</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency> URI formatsjms:[queue:|topic:]destinationName[?options] Where sjms:FOO.BAR You can include the optional sjms:queue:FOO.BAR To connect to a topic, you must include the sjms:topic:Stocks.Prices You append query options to the URI using the following format, Component Options and ConfigurationsThe SJMS Component supports the following configuration options:
Below is an example of how to configure the SjmsComponent with its required ConnectionFactory provider. It will create a single connection by default and store it using the components internal pooling APIs to ensure that it is able to service Session creation requests in a thread safe manner. SjmsComponent component = new SjmsComponent(); component.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616")); getContext().addComponent("sjms", component); For a SjmsComponent that is required to support a durable subscription, you can override the default ConnectionFactoryResource instance and set the clientId property. ConnectionFactoryResource connectionResource = new ConnectionFactoryResource(); connectionResource.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616")); connectionResource.setClientId("myclient-id"); SjmsComponent component = new SjmsComponent(); component.setConnectionResource(connectionResource); component.setMaxConnections(1); Producer Configuration OptionsThe SjmsProducer Endpoint supports the following properties:
Producer UsageInOnly Producer - (Default)The InOnly Producer is the default behavior of the SJMS Producer Endpoint. from("direct:start") .to("sjms:queue:bar"); InOut ProducerTo enable InOut behavior append the from("direct:start") .to("sjms:queue:bar?exchangePattern=InOut"); You can specify a from("direct:start") .to("sjms:queue:bar?exchangePattern=InOut&namedReplyTo=my.reply.to.queue"); Consumers Configuration OptionsThe SjmsConsumer Endpoint supports the following properties:
Consumer UsageInOnly Consumer - (Default)The InOnly Consumer is the default Exchange behavior of the SJMS Consumer Endpoint. from("sjms:queue:bar") .to("mock:result"); InOut ConsumerTo enable InOut behavior append the from("sjms:queue:in.out.test?exchangePattern=InOut") .transform(constant("Bye Camel")); Advanced Usage NotesPlugable Connection Resource ManagementSJMS provides JMS Connection resource management through built-in connection pooling. This eliminates the need to depend on third party API pooling logic. However there may be times that you are required to use an external Connection resource manager such as those provided by J2EE or OSGi containers. For this SJMS provides an interface that can be used to override the internal SJMS Connection pooling capabilities. This is accomplished through the ConnectionResource interface. The ConnectionResource provides methods for borrowing and returning Connections as needed is the contract used to provide Connection pools to the SJMS component. A user should use when it is necessary to integrate SJMS with an external connection pooling manager. It is recommended though that for standard ConnectionFactory providers you use the ConnectionFactoryResource implementation that is provided with SJMS as-is or extend as it is optimized for this component. Below is an example of using the pluggable ConnectionResource with the ActiveMQ PooledConnectionFactory: public class AMQConnectionResource implements ConnectionResource { private PooledConnectionFactory pcf; public AMQConnectionResource(String connectString, int maxConnections) { super(); pcf = new PooledConnectionFactory(connectString); pcf.setMaxConnections(maxConnections); pcf.start(); } public void stop() { pcf.stop(); } @Override public Connection borrowConnection() throws Exception { Connection answer = pcf.createConnection(); answer.start(); return answer; } @Override public Connection borrowConnection(long timeout) throws Exception { // SNIPPED... } @Override public void returnConnection(Connection connection) throws Exception { // Do nothing since there isn't a way to return a Connection // to the instance of PooledConnectionFactory log.info("Connection returned"); } } Then pass in the ConnectionResource to the SjmsComponent: CamelContext camelContext = new DefaultCamelContext(); AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1); SjmsComponent component = new SjmsComponent(); component.setConnectionResource(pool); camelContext.addComponent("sjms", component); To see the full example of its usage please refer to the ConnectionResourceIT. Session, Consumer, & Producer Pooling & Caching ManagementComing soon ... Batch Message SupportThe SjmsProducer supports publishing a collection of messages by creating an Exchange that encapsulates a List. This SjmsProducer will take then iterate through the contents of the List and publish each message individually. If when producing a batch of messages there is the need to set headers that are unique to each message you can use the SJMS BatchMessage class. When the SjmsProducer encounters a BatchMessage List it will iterate each BatchMessage and publish the included payload and headers. Below is an example of using the BatchMessage class. First we create a List of BatchMessages: List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>(); for (int i = 1; i <= messageCount; i++) { String body = "Hello World " + i; BatchMessage<String> message = new BatchMessage<String>(body, null); messages.add(message); } Then publish the List: template.sendBody("sjms:queue:batch.queue", messages); Customizable Transaction Commit Strategies (Local JMS Transactions only)SJMS provides a developer the means to create a custom and plugable transaction strategy through the use of the TransactionCommitStrategy interface. This allows a user to define a unique set of circumstances that the SessionTransactionSynchronization will use to determine when to commit the Session. An example of its use is the BatchTransactionCommitStrategy which is detailed further in the next section. Transacted Batch Consumers & ProducersThe SjmsComponent has been designed to support the batching of local JMS transactions on both the Producer and Consumer endpoints. How they are handled on each is very different though. The SjmsConsumer endpoint is a straitforward implementation that will process X messages before committing them with the associated Session. To enable batched transaction on the consumer first enable transactions by setting the sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10 If an exception occurs during the processing of a batch on the consumer endpoint, the Session rollback is invoked causing the messages to be redelivered to the next available consumer. The counter is also reset to 0 for the BatchTransactionCommitStrategy for the associated Session as well. It is the responsibility of the user to ensure they put hooks in their processors of batch messages to watch for messages with the JMSRedelivered header set to true. This is the indicator that messages were rolled back at some point and that a verification of a successful processing should occur. A transacted batch consumer also carries with it an instance of an internal timer that waits a default amount of time (5000ms) between messages before committing the open transactions on the Session. The default value of 5000ms (minimum of 1000ms) should be adequate for most use-cases but if further tuning is necessary simply set the sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10&transactionBatchTimeout=2000 The minimal value that will be accepted is 1000ms as the amount of context switching may cause unnecessary performance impacts without gaining benefit. The producer endpoint is handled much differently though. With the producer after each message is delivered to its destination the Exchange is closed and there is no longer a reference to that message. To make a available all the messages available for redelivery you simply enable transactions on a Producer Endpoint that is publishing BatchMessages. The transaction will commit at the conclusion of the exchange which includes all messages in the batch list. Nothing additional need be configured. For example: List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>(); for (int i = 1; i <= messageCount; i++) { String body = "Hello World " + i; BatchMessage<String> message = new BatchMessage<String>(body, null); messages.add(message); } Now publish the List with transactions enabled: template.sendBody("sjms:queue:batch.queue?transacted=true", messages); Additional NotesMessage Header FormatThe SJMS Component uses the same header format strategy that is used in the Camel JMS Component. This plugable strategy ensures that messages sent over the wire conform to the JMS Message spec. For the exchange.in.header the following rules apply for the header keys: Keys starting with JMS or JMSX are reserved.
For the exchange.in.header, the following rules apply for the header values: Message ContentTo deliver content over the wire we must ensure that the body of the message that is being delivered adheres to the JMS Message Specification. Therefore, all that are produced must either be primitives or their counter objects (such as Integer, Long, Character). The types, String, CharSequence, Date, BigDecimal and BigInteger are all converted to their toString() representation. All other types are dropped. ClusteringWhen using InOut with SJMS in a clustered environment you must either use TemporaryQueue destinations or use a unique named reply to destination per InOut producer endpoint. Message correlation is handled by the endpoint, not with message selectors at the broker. The InOut Producer Endpoint uses Java Concurrency Exchangers cached by the Message JMSCorrelationID. This provides a nice performance increase while reducing the overhead on the broker since all the messages are consumed from the destination in the order they are produced by the interested consumer. Currently the only correlation strategy is to use the JMSCorrelationId. The InOut Consumer uses this strategy as well ensuring that all responses messages to the included JMSReplyTo destination also have the JMSCorrelationId copied from the request as well. Transaction SupportSJMS currently only supports the use of internal JMS Transactions. There is no support for the Camel Transaction Processor or the Java Transaction API (JTA). Does Springless Mean I Can't Use Spring?Not at all. Below is an example of the SJMS component using the Spring DSL: <route id="inout.named.reply.to.producer.route"> <from uri="direct:invoke.named.reply.to.queue" /> <to uri="sjms:queue:named.reply.to.queue?namedReplyTo=my.response.queue&exchangePattern=InOut" /> </route> Springless refers to moving away from the dependency on the Spring JMS API. A new JMS client API is being developed from the ground up to power SJMS. |