Since we're on a major migration process of this website, some component documents here are out of sync right now. In the meantime you may want to look at the early version of the new website
https://camel.apache.org/staging/
We would very much like to receive any feedback on the new site, please join the discussion on the Camel user mailing list.
Beanstalk componentAvailable in Camel 2.15 camel-beanstalk project provides a Camel component for job retrieval and post-processing of Beanstalk jobs. You can find the detailed explanation of Beanstalk job lifecycle at Beanstalk protocol. DependenciesMaven users need to add the following dependency to their <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-beanstalk</artifactId> <version>${camel-version}</version> </dependency> where URI formatbeanstalk://[host[:port]][/tube][?options] You may omit either When listening, you may probably want to watch for jobs from several tubes. Just separate them with plus sign, e.g. beanstalk://localhost:11300/tube1+tube2 Tube name will be URL decoded, so if your tube names include special characters like + or ?, you need to URL-encode them appropriately, or use the RAW syntax, see more details here. By the way, you cannot specify several tubes when you are writing jobs into Beanstalk. Common URI options
Producer UIR optionsProducer behavior is affected by the
Consumer UIR optionsThe consumer may delete the job immediately after reserving it or wait until Camel routes process it. While the first scenario is more like a “message queue”, the second is similar to “job queue”. This behavior is controlled by When synchronous, the consumer calls There is a boolean parameter Be careful when specifying
The beanstalk consumer is a Scheduled Polling Consumer which means there is more options you can configure, such as how frequent the consumer should poll. For more details see Polling Consumer. Consumer HeadersThe consumer stores a number of job headers in the Exchange message:
ExamplesThis Camel component lets you both request the jobs for processing and supply them to Beanstalkd daemon. Our simple demo routes may look like from("beanstalk:testTube"). log("Processing job #${property.beanstalk.jobId} with body ${in.body}"). process(new Processor() { @Override public void process(Exchange exchange) { // try to make integer value out of body exchange.getIn().setBody( Integer.valueOf(exchange.getIn().getBody(classOf[String])) ); } }). log("Parsed job #${property.beanstalk.jobId} to body ${in.body}"); from("timer:dig?period=30seconds"). setBody(constant(10)).log("Kick ${in.body} buried/delayed tasks"). to("beanstalk:testTube?command=kick"); In the first route we are listening for new jobs in tube “testTube”. When they are arriving, we are trying to parse integer value from the message body. If done successful, we log it and this successful exchange completion makes Camel component to delete this job from Beanstalk automatically. Contrary, when we cannot parse the job data, the exchange failed and the Camel component buries it by default, so that it can be processed later or probably we are going to inspect failed jobs manually. So the second route periodically requests Beanstalk to kick 10 jobs out of buried and/or delayed state to the normal queue.
|