Since we're on a major migration process of this website, some component documents here are out of sync right now. In the meantime you may want to look at the early version of the new website
https://camel.apache.org/staging/
We would very much like to receive any feedback on the new site, please join the discussion on the Camel user mailing list.
Routing/Mediation serviceThe routing/mediation between services/bundles will be created using Camel Spring DSL language. We will describe its creation/genesis step by step. First, create the file <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring" xmlns:osgi="http://www.springframework.org/schema/osgi" xmlns:cxf="http://camel.apache.org/schema/cxf" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd http://camel.apache.org/schema/osgi http://camel.apache.org/schema/osgi/camel-osgi.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd"> </beans> Compare to a simple camel project, the spring beans tag has been enriched with new namespaces :
Now, that the schema/namespaces are declared, we can start to add additional stuffs like import resources, beans reference, ... that our routing engine will use. Step 1 : Webservice infrastructure : CXFWe will use the CXF framework to deploy the reportincident webservice and run it into the OSGI platform. To work with it, resources (= spring beans) declared in CXF project must be imported using the statement ... <import resource="classpath:META-INF/cxf/cxf.xml" /> <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" /> <import resource="classpath:META-INF/cxf/cxf-extension-http.xml" /> <import resource="classpath:META-INF/cxf/osgi/cxf-extension-osgi.xml" /> ... These imports will be used by spring at the bundle startup to instantiate the beans defined in these files. These beans are responsible in fact to deploy the architecture of the CXF bus top of the OSGI server and to provide a servlet that we will use to communicate with webservices engine of CXF. Remark : for the purpose of this tutorial, we have packaged this configuration into the camel-spring file but it could be defined in a separate xml file with by example the component bean that Camel will use to communicate with CXF bus. This allows you to separate routing from parameters to be provided to configure endpoints. The camel CXF endpoint is configurated like this : ... <!-- webservice endpoint --> <cxf:cxfEndpoint id="reportIncident" address="/camel-example/incident" (1) serviceClass="org.apache.camel.example.reportincident.ReportIncidentEndpoint" (2) xmlns:s="http://reportincident.example.camel.apache.org"> (3) </cxf:cxfEndpoint> ... Remarks : Step 2 : Queuing engineNo matter if the incidents come from a webservice or a files but before to process and save them in the database, we will put a) ActiveMQLike CXF, ActiveMq can be installed in the infrastructure using a spring.xml configuration file. So, create the file <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="default" dataDirectory="${servicemix.base}/data/activemq/default" useShutdownHook="false"> <!-- Destination specific policies using destination names or wildcards --> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" memoryLimit="5mb"/> <policyEntry topic=">" memoryLimit="5mb"> <subscriptionRecoveryPolicy> <lastImageSubscriptionRecoveryPolicy/> </subscriptionRecoveryPolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- Use the following to configure how ActiveMQ is exposed in JMX --> <managementContext> <managementContext createConnector="false"/> </managementContext> <!-- The store and forward broker networks ActiveMQ will listen to --> <networkConnectors> <!-- by default just auto discover the other brokers --> <networkConnector name="default-nc" uri="multicast://default"/> <!-- Example of a static configuration: <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/> --> </networkConnectors> <persistenceAdapter> <amqPersistenceAdapter syncOnWrite="false" directory="${servicemix.base}/data/activemq/default" maxFileLength="20 mb"/> </persistenceAdapter> <!-- Use the following if you wish to configure the journal with JDBC --> <!-- <persistenceAdapter> <journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#postgres-ds"/> </persistenceAdapter> --> <!-- Or if you want to use pure JDBC without a journal --> <!-- <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#postgres-ds"/> </persistenceAdapter> --> <!-- The maximum about of space the broker will use before slowing down producers --> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="1 gb" name="foo"/> </storeUsage> <tempUsage> <tempUsage limit="100 mb"/> </tempUsage> </systemUsage> </systemUsage> <!-- The transport connectors ActiveMQ will listen to --> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/> <transportConnector name="stomp" uri="stomp://localhost:61613"/> </transportConnectors> </broker> <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryBean"> <property name="maxConnections" value="8" /> <property name="maximumActive" value="500" /> <property name="transactionManager" ref="transactionManager" /> <property name="connectionFactory" ref="activemqConnectionFactory" /> <property name="resourceName" value="activemq.default" /> </bean> <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource"> <property name="transactionManager" ref="transactionManager" /> <property name="connectionFactory" ref="activemqConnectionFactory" /> <property name="resourceName" value="activemq.default" /> </bean> <osgi:reference id="transactionManager" interface="javax.transaction.TransactionManager" /> <osgi:service ref="pooledConnectionFactory"> <osgi:interfaces> <value>javax.jms.ConnectionFactory</value> </osgi:interfaces> <osgi:service-properties> <entry key="name" value="default"/> </osgi:service-properties> </osgi:service> </beans> At the bundle startup, Spring will instantiate the beans declared and in consequence start the queuing engine. We haven't changed the content of the file corresponding to what is proposed in the ServiceMix distribution but you can use here the same technique described for the Datasource and add properties that you configure through by example a ... <transportConnectors> <transportConnector name="${name}" uri="${uri}" discoveryUri="${discoveryUri}"/> </transportConnectors> <!-- here is the list of values defined as default but can be overidded in the file org.apache.activemq.config.etc --> <osgix:cm-properties id="confs" persistent-id="org.apache.activemq.config.etc"> <prop key="name">openwire</prop> <prop key="uri">tcp://localhost:61616</prop> <prop key="discoveryUri">multicast://default</prop> </osgix:cm-properties> ... The activeMq broker can also be integrated differently because the ServiceMix4 distribution (and not ServiceMix Kernel) proposes it in standard with additional commands that you can use from the console to : Available commands in activemq: The pom.xml file must be modified to add properties required by Spring blueprint. So add the following lines : ... <instructions> <Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName> <DynamicImport-Package>*</DynamicImport-Package> <Include-Resource>src/main/resources</Include-Resource> (1) <Spring-Context>*;publish-context:=false;create-asynchronously:=true</Spring-Context> (2) <Private-Package></Private-Package> <Import-Package> (3) javax.transaction, org.apache.activemq, org.apache.activemq.pool, org.springframework.beans.factory.config, * </Import-Package> </instructions> ... Remarks : b) Camel ActiveMq componentTo makes Camel independent of the JMS queue engine deployed in the OSGI server, we will implement a proxy using blueprint service between Camel component and the queuing engine used (Apache ActiveMq, IBM Websphere MQ, Oracle Advance Queue, TIBCO, ...) First, create a spring DSL file <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <bean id="active-mq" class="org.apache.activemq.camel.component.ActiveMQComponent" /> (1) <osgi:service id="osgiqueuingservice" ref="active-mq" interface="org.apache.camel.Component"/> (2) </beans> Remarks: (2) Our camel component will be exposed on the OSGI registry as an Next adapt the POM.xml file like this to instruct the felix plugin how to generate the MANIFEST.MF file ... <plugin> <groupId>org.apache.felix</groupId> <artifactId>maven-bundle-plugin</artifactId> <version>${felix-version}</version> <configuration> <manifestLocation>META-INF</manifestLocation> <instructions> <Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName> <Import-Package> org.apache.activemq.camel.component, org.apache.camel, *</Import-Package> <Include-Resource>src/main/resources</Include-Resource> <Spring-Context>*;publish-context:=false;create-asynchronously:=true</Spring-Context> <Private-Package></Private-Package> </instructions> </configuration> </plugin> ... Remark that we import here the All the infrastructure is in place, so we can start to describe the beans that we will use Step 3 : Beans references5 beans will be used by our application :
So, adapt the <bean id="bindyDataformat" class="org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat"> <constructor-arg type="java.lang.String" value="org.apache.camel.example.reportincident.model" /> (1) </bean> <bean id="incidentSaver" class="org.apache.camel.example.reportincident.internal.IncidentSaver"> <property name="incidentService"> <osgi:reference interface="org.apache.camel.example.reportincident.service.IncidentService"/> (2) </property> </bean> <bean id="webservice" class="org.apache.camel.example.reportincident.internal.WebService" /> <bean id="feedback" class="org.apache.camel.example.reportincident.internal.Feedback" /> <osgi:reference id="queuingservice" interface="org.apache.camel.Component" /> (3) Remarks : (1) - The name of the package containing the class of the model must be provided as parameter We will quickly review the three classes that we will created for our project in the directory a) package org.apache.camel.example.reportincident.internal; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.camel.Exchange; import org.apache.camel.example.reportincident.model.Incident; import org.apache.camel.example.reportincident.service.IncidentService; public class IncidentSaver { private static final transient Log LOG = LogFactory.getLog(IncidentSaver.class); private IncidentService incidentService = null; public void process(Exchange exchange) throws ParseException { int count = 0; List<Map<String, Object>> models = new ArrayList<Map<String, Object>>(); (1) Map<String, Object> model = new HashMap<String, Object>(); // Get models from message models = (List<Map<String, Object>>) exchange.getIn().getBody(); (2) // Get Header origin from message String origin = (String) exchange.getIn().getHeader("origin"); (3) LOG.debug("Header origin : " + origin); Iterator<Map<String, Object>> it = models.iterator(); // Specify current Date DateFormat format = new SimpleDateFormat( "dd/MM/yyyy HH:mm:ss" ); String currentDate = format.format( new Date() ); Date creationDate = format.parse( currentDate ); while (it.hasNext()) { model = it.next(); LOG.debug("Model retrieved"); for (String key : model.keySet()) { LOG.debug("Object retrieved : " + model.get(key).toString()); // Retrieve incident from model Incident incident = (Incident) model.get(key); (4) incident.setCreationDate(creationDate); incident.setCreationUser(origin); LOG.debug("Count : " + count + ", " + incident.toString()); // Save Incident incidentService.saveIncident(incident); (5) LOG.debug("Incident saved"); } count++; } LOG.debug("Nber of CSV records received by the csv bean : " + count); } // Property used to inject service implementation (6) public void setIncidentService(IncidentService incidentService) { this.incidentService = incidentService; } } Remarks : b) package org.apache.camel.example.reportincident.internal; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.camel.Exchange; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.camel.example.reportincident.InputReportIncident; import org.apache.camel.example.reportincident.model.Incident; public class WebService { private static final transient Log LOG = LogFactory.getLog(WebService.class); public void process(Exchange exchange) throws ParseException { InputReportIncident webincident = (InputReportIncident)exchange.getIn().getBody(); (1) LOG.debug("Incident received from : " + webincident.getFamilyName() + ", " + webincident.getGivenName()); LOG.debug("Incident info : " + webincident.getIncidentId() + ", at : " + webincident.getIncidentDate()); LOG.debug("Incident details : " + webincident.getDetails() + ", summary : " + webincident.getSummary()); List<Map<String, Incident>> models = new ArrayList<Map<String, Incident>>(); Map<String, Incident> model = new HashMap<String, Incident>(); // Convert the InputReportIncident into an Incident Incident incident = new Incident(); (2) DateFormat format = new SimpleDateFormat( "dd-mm-yyyy" ); incident.setIncidentDate(format.parse(webincident.getIncidentDate())); incident.setDetails(webincident.getDetails()); incident.setEmail(webincident.getEmail()); incident.setFamilyName(webincident.getFamilyName()); incident.setGivenName(webincident.getGivenName()); incident.setIncidentRef(webincident.getIncidentId()); incident.setPhone(webincident.getPhone()); incident.setSummary(webincident.getSummary()); // Get Header origin from message String origin = (String) exchange.getIn().getHeader("origin"); (3) // Specify current Date format = new SimpleDateFormat( "dd/MM/yyyy HH:mm:ss" ); String currentDate = format.format( new Date() ); Date creationDate = format.parse( currentDate ); incident.setCreationDate(creationDate); incident.setCreationUser(origin); LOG.debug("Incident created from web service : " + incident.toString()); // and place it in a model (cfr camel-bindy) model.put(Incident.class.getName(), incident); (4) models.add(model); // replace with our input exchange.getOut().setBody(models); (5) // propagate the header exchange.getOut().setHeader("origin", origin); (6) } } Remarks : c) package org.apache.camel.example.reportincident.internal; import org.apache.camel.example.reportincident.OutputReportIncident; public class Feedback { public OutputReportIncident setOk() { (3) OutputReportIncident outputReportIncident = new OutputReportIncident(); (1) outputReportIncident.setCode("0"); (2) return outputReportIncident; } } Remarks : Step 4 : RoutingNow that evverything is in place, we can create the three routes that we need to implement the architecture that we have presented in the introduction of this tutorial From File(s) to queueThis first route will be used by Apache Camel to read files deposited in the directory. Here is the explanation of the route <camel:camelContext trace="true" xmlns="http://camel.apache.org/schema/osgi"> (1) <camel:route> <camel:from uri="file://d:/temp/data/reportincident/?move=d:/temp/backup/${date:now:yyyyMMdd}/${file:name.noext}.bak"/> (2) <camel:setHeader headerName="origin"> (3) <camel:constant>file</camel:constant> </camel:setHeader> <camel:unmarshal ref="bindyDataformat" /> (4) <camel:to uri="queuingservice:queue:in" /> (5) </camel:route> ... Remarks : From Webservices to queueThe second route will read incoming web services call, extract the XML messages from the web services, transform it into an InputReportIncident, send the object to a bean who will convert it into an Incident object and put the result in the queue:in. ... <camel:route> <camel:from uri="cxf:bean:reportIncident" /> (1) <camel:setHeader headerName="origin"> (2) <camel:constant>webservice</camel:constant> </camel:setHeader> <camel:convertBodyTo type="org.apache.camel.example.reportincident.InputReportIncident" /> (3) <camel:to uri="bean:webservice" /> (4) <camel:inOnly uri="queuingservice:queue:in" /> (5) <camel:transform> (6) <camel:method bean="feedback" method="setOk" /> </camel:transform> </camel:route> ... Remarks : From queue to DBThe last route will read messages of the queue and send them to the bean ... <camel:route> <camel:from uri="queuingservice:queue:in" /> <camel:to uri="bean:incidentSaver?method=process" /> </camel:route> </camelContext> Add instructions to generate MANIFEST.MF fileNow that the reportincident.routing project is ready, we will modify the pom.xml file to add required instructions to generate the jar and MANIFEST.MF file of the bundle : ... <instructions> <Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName> <Import-Package> (1) META-INF.cxf, META-INF.cxf.osgi, META-INF.wsdl, org.apache.commons.logging, org.apache.camel;version="[2.0,2.2)", org.apache.camel.component;version="[2.0,2.2)", org.apache.camel.component.cxf;version="[2.0,2.2)", org.apache.camel.component.cxf.converter;version="[2.0,2.2)", org.apache.camel.component.jms;version="[2.0,2.2)", org.apache.camel.converter;version="[2.0,2.2)", org.apache.camel.converter.jaxp;version="[2.0,2.2)", org.apache.camel.converter.stream;version="[2.0,2.2)", org.apache.camel.dataformat.bindy;version="[2.0,2.2)", org.apache.camel.dataformat.bindy.csv;version="[2.0,2.2)", org.apache.camel.example.reportincident, org.apache.camel.example.reportincident.model, org.apache.camel.example.reportincident.service, org.apache.camel.processor;version="[2.0,2.2)", org.apache.activemq.camel.component;${activemq.osgi.version}, org.apache.activemq.camel.converter;${activemq.osgi.version}, org.apache.activemq.pool, org.apache.cxf, org.apache.cxf.binding, org.apache.cxf.binding.corba, org.apache.cxf.binding.soap, org.apache.cxf.binding.soap.spring, org.apache.cxf.bus, org.apache.cxf.bus.resource, org.apache.cxf.bus.spring, org.apache.cxf.buslifecycle, org.apache.cxf.catalog, org.apache.cxf.configuration, org.apache.cxf.configuration.spring, org.apache.cxf.endpoint, org.apache.cxf.headers, org.apache.cxf.management, org.apache.cxf.management.jmx, org.apache.cxf.phase, org.apache.cxf.resource, org.apache.cxf.transport, org.apache.cxf.transport.http, org.apache.cxf.transport.http.policy, org.apache.cxf.transport.http_jetty, org.apache.cxf.transport.http_osgi, org.apache.cxf.transport.jms, org.apache.cxf.transports.http, org.apache.cxf.workqueue, org.apache.cxf.wsdl, org.apache.cxf.wsdl11, org.springframework.beans.factory.config, * </Import-Package> <Private-Package>org.apache.camel.example.reportincident.internal</Private-Package> (2) </instructions> ... Remarks : ConclusionIn this section of the tutorial, we have discussed how to design the routing between endpoints/components of our application using Camel Spring DSL language. We have also investigated how to setup the infrastructure required to work with ActiveMq, any other queuing engine and CXF. In the next chapter, we will see how to create the web application, package the solution and deploy it on ServiceMix. Links
#Resources |