Since we're on a major migration process of this website, some component documents here are out of sync right now. In the meantime you may want to look at the early version of the new website
https://camel.apache.org/staging/
We would very much like to receive any feedback on the new site, please join the discussion on the Camel user mailing list.
Camel MongoDB componentAvailable as of Camel 2.10 According to Wikipedia: "NoSQL is a movement promoting a loosely defined class of non-relational data stores that break with a long history of relational databases and ACID guarantees." NoSQL solutions have grown in popularity in the last few years, and major extremely-used sites and services such as Facebook, LinkedIn, Twitter, etc. are known to use them extensively to achieve scalability and agility. Basically, NoSQL solutions differ from traditional RDBMS (Relational Database Management Systems) in that they don't use SQL as their query language and generally don't offer ACID-like transactional behaviour nor relational data. Instead, they are designed around the concept of flexible data structures and schemas (meaning that the traditional concept of a database table with a fixed schema is dropped), extreme scalability on commodity hardware and blazing-fast processing. MongoDB is a very popular NoSQL solution and the camel-mongodb component integrates Camel with MongoDB allowing you to interact with MongoDB collections both as a producer (performing operations on the collection) and as a consumer (consuming documents from a MongoDB collection). MongoDB revolves around the concepts of documents (not as is office documents, but rather hierarchical data defined in JSON/BSON) and collections. This component page will assume you are familiar with them. Otherwise, visit http://www.mongodb.org/. Maven users will need to add the following dependency to their <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-mongodb</artifactId> <version>x.y.z</version> <!-- use the same version as your Camel core version --> </dependency> URI formatmongodb:connectionBean?database=databaseName&collection=collectionName&operation=operationName[&moreOptions...] Endpoint optionsMongoDB endpoints support the following options, depending on whether they are acting like a Producer or as a Consumer (options vary based on the consumer type too).
Configuration of database in Spring XMLThe following Spring XML creates a bean defining the connection to a MongoDB instance. <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="mongoBean" class="com.mongodb.Mongo"> <constructor-arg name="host" value="${mongodb.host}" /> <constructor-arg name="port" value="${mongodb.port}" /> </bean> </beans> In case you are using a 3.x MongoDB instance you have to use the following bean <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="mongoBean" class="com.mongodb.MongoClient"> <constructor-arg name="host" value="${mongodb.host}" /> <constructor-arg name="port" value="${mongodb.port}" /> </bean> </beans>
Sample routeThe following route defined in Spring XML executes the operation dbStats on a collection. Get DB stats for specified collection <route> <from uri="direct:start" /> <!-- using bean 'mongoBean' defined above --> <to uri="mongodb:mongoBean?database=${mongodb.database}&collection=${mongodb.collection}&operation=getDbStats" /> <to uri="direct:result" /> </route>
MongoDB operations - producer endpointsQuery operationsfindByIdThis operation retrieves only one element from the collection whose _id field matches the content of the IN message body. The incoming object can be anything that has an equivalent to a BSON type. See http://bsonspec.org/#/specification and http://www.mongodb.org/display/DOCS/Java+Types. from("direct:findById") .to("mongodb:myDb?database=flights&collection=tickets&operation=findById") .to("mock:resultFindById"); Supports fields filter This operation supports specifying a fields filter. See Specifying a fields filter. findOneByQueryUse this operation to retrieve just one element from the collection that matches a MongoDB query. The query object is extracted from the IN message body, i.e. it should be of type Example with no query (returns any object of the collection): from("direct:findOneByQuery") .to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery") .to("mock:resultFindOneByQuery"); Example with a query (returns one matching result): from("direct:findOneByQuery") .setBody().constant("{ \"name\": \"Raul Kripalani\" }") .to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery") .to("mock:resultFindOneByQuery"); Supports fields filter This operation supports specifying a fields filter. See Specifying a fields filter. findAllThe Example with no query (returns all documents in the collection): from("direct:findAll") .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") .to("mock:resultFindAll"); Example with a query (returns all matching documents): from("direct:findAll") .setBody().constant("{ \"name\": \"Raul Kripalani\" }") .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") .to("mock:resultFindAll"); Paging and efficient retrieval is supported via the following headers:
You can also "stream" the documents returned from the server into your route by including Additionally, you can set a sortBy criteria by putting the relevant The
Supports fields filter This operation supports specifying a fields filter. See Specifying a fields filter.
countReturns the total number of objects in a collection, returning a Long as the OUT message body. // from("direct:count").to("mongodb:myDb?database=tickets&collection=flights&operation=count&dynamicity=true"); Long result = template.requestBodyAndHeader("direct:count", "irrelevantBody", MongoDbConstants.COLLECTION, "dynamicCollectionName"); assertTrue("Result is not of type Long", result instanceof Long); From Camel 2.14 onwards you can provide a
DBObject query = ... Long count = template.requestBodyAndHeader("direct:count", query, MongoDbConstants.COLLECTION, "dynamicCollectionName"); Specifying a fields filterQuery operations will, by default, return the matching objects in their entirety (with all their fields). If your documents are large and you only require retrieving a subset of their fields, you can specify a field filter in all query operations, simply by setting the relevant Here is an example that uses MongoDB's BasicDBObjectBuilder to simplify the creation of DBObjects. It retrieves all fields except // route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") DBObject fieldFilter = BasicDBObjectBuilder.start().add("_id", 0).add("boringField", 0).get(); Object result = template.requestBodyAndHeader("direct:findAll", (Object) null, MongoDbConstants.FIELDS_FILTER, fieldFilter); Create/update operationsinsertInserts an new object into the MongoDB collection, taken from the IN message body. Type conversion is attempted to turn it into Example: from("direct:insert") .to("mongodb:myDb?database=flights&collection=tickets&operation=insert"); The operation will return a WriteResult, and depending on the Note that the new object's This is not a limitation of the component, but it is how things work in MongoDB for higher throughput. If you are using a custom Since Camel 2.15: OID(s) of the inserted record(s) is stored in the message header under saveThe save operation is equivalent to an upsert (UPdate, inSERT) operation, where the record will be updated, and if it doesn't exist, it will be inserted, all in one atomic operation. MongoDB will perform the matching based on the _id field. Beware that in case of an update, the object is replaced entirely and the usage of MongoDB's $modifiers is not permitted. Therefore, if you want to manipulate the object if it already exists, you have two options:
For example: from("direct:insert") .to("mongodb:myDb?database=flights&collection=tickets&operation=save"); updateUpdate one or multiple records on the collection. Requires a List<DBObject> as the IN message body containing exactly 2 elements:
Multiupdates By default, MongoDB will only update 1 object even if multiple objects match the filter query. To instruct MongoDB to update all matching records, set the A header with key Supports the following IN message headers:
For example, the following will update all records whose filterField field equals true by setting the value of the "scientist" field to "Darwin": // route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update"); DBObject filterField = new BasicDBObject("filterField", true); DBObject updateObj = new BasicDBObject("$set", new BasicDBObject("scientist", "Darwin")); Object result = template.requestBodyAndHeader("direct:update", new Object[] {filterField, updateObj}, MongoDbConstants.MULTIUPDATE, true); Delete operationsremoveRemove matching records from the collection. The IN message body will act as the removal filter query, and is expected to be of type // route: from("direct:remove").to("mongodb:myDb?database=science&collection=notableScientists&operation=remove"); DBObject conditionField = new BasicDBObject("conditionField", true); Object result = template.requestBody("direct:remove", conditionField); A header with key Other operationsaggregateAvailable as of Camel 2.14 Perform a aggregation with the given pipeline contained in the body. Aggregations could be long and heavy operations. Use with care.
// route: from("direct:aggregate").to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate"); from("direct:aggregate") .setBody().constant("[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},{ $group: { _id: \"$scientist\", count: { $sum: 1 }} } ]") .to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate") .to("mock:resultAggregate"); getDbStatsEquivalent of running the > db.stats(); { "db" : "test", "collections" : 7, "objects" : 719, "avgObjSize" : 59.73296244784423, "dataSize" : 42948, "storageSize" : 1000058880, "numExtents" : 9, "indexes" : 4, "indexSize" : 32704, "fileSize" : 1275068416, "nsSizeMB" : 16, "ok" : 1 } Usage example: // from("direct:getDbStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getDbStats"); Object result = template.requestBody("direct:getDbStats", "irrelevantBody"); assertTrue("Result is not of type DBObject", result instanceof DBObject); The operation will return a data structure similar to the one displayed in the shell, in the form of a getColStatsEquivalent of running the > db.camelTest.stats(); { "ns" : "test.camelTest", "count" : 100, "size" : 5792, "avgObjSize" : 57.92, "storageSize" : 20480, "numExtents" : 2, "nindexes" : 1, "lastExtentSize" : 16384, "paddingFactor" : 1, "flags" : 1, "totalIndexSize" : 8176, "indexSizes" : { "_id_" : 8176 }, "ok" : 1 } Usage example: // from("direct:getColStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getColStats"); Object result = template.requestBody("direct:getColStats", "irrelevantBody"); assertTrue("Result is not of type DBObject", result instanceof DBObject); The operation will return a data structure similar to the one displayed in the shell, in the form of a commandAvailable as of Camel 2.15 Run the body as a command on database. Usefull for admin operation as getting host informations, replication or sharding status. Collection parameter is not use for this operation. // route: from("command").to("mongodb:myDb?database=science&operation=command"); DBObject commandBody = new BasicDBObject("hostInfo", "1"); Object result = template.requestBody("direct:command", commandBody); Dynamic operationsAn Exchange can override the endpoint's fixed operation by setting the For example: // from("direct:insert").to("mongodb:myDb?database=flights&collection=tickets&operation=insert"); Object result = template.requestBodyAndHeader("direct:insert", "irrelevantBody", MongoDbConstants.OPERATION_HEADER, "count"); assertTrue("Result is not of type Long", result instanceof Long); Tailable Cursor ConsumerMongoDB offers a mechanism to instantaneously consume ongoing data from a collection, by keeping the cursor open just like the There is only one requisite to use tailable cursors: the collection must be a "capped collection", meaning that it will only hold N objects, and when the limit is reached, MongoDB flushes old objects in the same order they were originally inserted. For more information, please refer to: http://www.mongodb.org/display/DOCS/Tailable+Cursors. The Camel MongoDB component implements a tailable cursor consumer, making this feature available for you to use in your Camel routes. As new objects are inserted, MongoDB will push them as DBObjects in natural order to your tailable cursor consumer, who will transform them to an Exchange and will trigger your route logic. How the tailable cursor consumer worksTo turn a cursor into a tailable cursor, a few special flags are to be signalled to MongoDB when first generating the cursor. Once created, the cursor will then stay open and will block upon calling the The Camel MongoDB tailable cursor consumer takes care of all these tasks for you. You will just need to provide the key to some field in your data of increasing nature, which will act as a marker to position your cursor every time it is regenerated, e.g. a timestamp, a sequential ID, etc. It can be of any datatype supported by MongoDB. Date, Strings and Integers are found to work well. We call this mechanism "tail tracking" in the context of this component. The consumer will remember the last value of this field and whenever the cursor is to be regenerated, it will run the query with a filter like: Setting the increasing field: Set the key of the increasing field on the endpoint URI Cursor regeneration delay: One thing to note is that if new data is not already available upon initialisation, MongoDB will kill the cursor instantly. Since we don't want to overwhelm the server in this case, a An example: from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime") .id("tailableCursorConsumer1") .autoStartup(false) .to("mock:test"); The above route will consume from the "flights.cancellations" capped collection, using "departureTime" as the increasing field, with a default regeneration cursor delay of 1000ms. Persistent tail trackingStandard tail tracking is volatile and the last value is only kept in memory. However, in practice you will need to restart your Camel container every now and then, but your last value would then be lost and your tailable cursor consumer would start consuming from the top again, very likely sending duplicate records into your route. To overcome this situation, you can enable the persistent tail tracking feature to keep track of the last consumed increasing value in a special collection inside your MongoDB database too. When the consumer initialises again, it will restore the last tracked value and continue as if nothing happened. The last read value is persisted on two occasions: every time the cursor is regenerated and when the consumer shuts down. We may consider persisting at regular intervals too in the future (flush every 5 seconds) for added robustness if the demand is there. To request this feature, please open a ticket in the Camel JIRA. Enabling persistent tail trackingTo enable this function, set at least the following options on the endpoint URI:
Additionally, you can set the For example, the following route will consume from the "flights.cancellations" capped collection, using "departureTime" as the increasing field, with a default regeneration cursor delay of 1000ms, with persistent tail tracking turned on, and persisting under the "cancellationsTracker" id on the "flights.camelTailTracking", storing the last processed value under the "lastTrackingValue" field ( from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker") .id("tailableCursorConsumer2") .autoStartup(false) .to("mock:test"); Below is another example identical to the one above, but where the persistent tail tracking runtime information will be stored under the "trackers.camelTrackers" collection, in the "lastProcessedDepartureTime" field: from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" + "&tailTrackField=lastProcessedDepartureTime") .id("tailableCursorConsumer3") .autoStartup(false) .to("mock:test"); Type conversionsThe
This type converter is auto-discovered, so you don't need to configure anything manually. See also
https://github.com/apache/camel/blob/master/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc |