Apache Qpid - AMQP Messaging for Java JMS, C++, Python, Ruby, and .NET | Apache Qpid Documentation |
include <qpid/messaging/Connection.h> include <qpid/messaging/Message.h> include <qpid/messaging/Receiver.h> include <qpid/messaging/Sender.h> include <qpid/messaging/Session.h>
include <iostream>
using namespace qpid::messaging;
int main(int argc, char** argv) { std::string broker = argc > 1 ? argv[1] : "localhost:5672"; std::string address = argc > 2 ? argv[2] : "amq.topic"; Connection connection(broker); try { connection.open(); Session session = connection.createSession();
// ### Your Code Here ###
connection.close(); return 0; } catch(const std::exception& error) { std::cerr << error.what() << std::endl; connection.close(); return 1; } }
Sender sender = session.createSender(address); sender.send(Message("Hello world!"));
Message message; message.setContent("Hello world!");
// In some applications, you should also set the content type, // which is a MIME type message.setContentType("text/plain");
Receiver receiver = session.createReceiver(address); Message message = receiver.fetch(Duration::SECOND * 1); // timeout is optional session.acknowledge(); // acknowledge message receipt std::cout << message.getContent() << std::endl;
To receive messages from multiple sources, create a receiver for each source, and use session.nextReceiver().fetch() to fetch messages. session.nextReceiver() is guaranteed to return the receiver responsible for the first available message on the session.
Receiver receiver1 = session.createReceiver(address1); Receiver receiver2 = session.createReceiver(address2);
Message message = session.nextReceiver().fetch(); session.acknowledge(); // acknowledge message receipt std::cout << message.getContent() << std::endl;
// Server creates a service queue and waits for messages // If it gets a request, it sends a response to the reply to address
Receiver receiver = session.createReceiver("service_queue; {create: always}"); Message request = receiver.fetch(); const Address& address = request.getReplyTo(); // Get "reply-to" from request ... if (address) { Sender sender = session.createSender(address); // ... send response to "reply-to" Message response("pong!"); sender.send(response); session.acknowledge(); }
// Client creates a private response queue - the # gets converted // to a unique string for the response queue name. Client uses the // name of this queue as its reply-to.
Sender sender = session.createSender("service_queue"); Address responseQueue("#response-queue; {create:always, delete:always}"); Receiver receiver = session.createReceiver(responseQueue);
Message request; request.setReplyTo(responseQueue); request.setContent("ping"); sender.send(request); Message response = receiver.fetch(); std::cout << request.getContent() << " -> " << response.getContent() << std::endl;
This shows some of the most commonly used message properties, it is not complete.
Message message("Hello world!"); message.setContentType("text/plain"); message.setSubject("greeting"); message.setReplyTo("response-queue"); message.setTtl(100); // milliseconds message.setDurable(1);
std::cout << "Content: " << message.getContent() << std::endl << "Content Type: " << message.getContentType() << "Subject: " << message.getSubject() << "ReplyTo: " << message.getReplyTo() << "Time To Live (in milliseconds) " << message.getTtl() << "Durability: " << message.getDurable();
std::string name = "weekday"; std::string value = "Thursday"; message.getProperties()[name] = value;
std:string s = message.getProperties()["weekday"];
If a connection opened using the reconnect option, it will transparently reconnect if the connection is lost.
Connection connection(broker); connection.setOption("reconnect", true); try { connection.open(); ....
Maps provide a simple way to exchange binary data portably, across languages and platforms. Maps can contain simple types, lists, or maps.
// Sender
Variant::Map content; content["id"] = 987654321; content["name"] = "Widget"; content["probability"] = 0.43; Variant::List colours; colours.push_back(Variant("red")); colours.push_back(Variant("green")); colours.push_back(Variant("white")); content["colours"] = colours; content["uuid"] = Uuid(true);
Message message; encode(content, message);
sender.send(message);
// Receiver
Variant::Map content; decode(receiver.fetch(), content);
If a queue is durable, the queue survives a messaging broker crash, as well as any durable messages that have been placed on the queue. These messages will be delivered when the messaging broker is restarted. Delivery is not guaranteed unless both the message and the queue are durable.
Sender sender = session.createSender("durable-queue");
Message message("Hello world!"); message.setDurable(1);
sender.send(Message("Hello world!"));
Transactions cover enqueues and dequeues.
When sending messages, a transaction tracks enqueues without actually delivering the messages, a commit places messages on their queues, and a rollback discards the enqueues.
When receiving messages, a transaction tracks dequeues without actually removing acknowledged messages, a commit removes all acknowledged messages, and a rollback discards acknowledgements. A rollback does not release the message, it must be explicitly released to return it to the queue.
Connection connection(broker); Session session = connection.createTransactionalSession(); ... if (looksOk) session.commit(); else session.rollback();
The Qpidd broker and C++ clients can both use environment variables to enable logging. Use QPID_LOG_ENABLE to set the level of logging you are interested in (trace, debug, info, notice, warning, error, or critical):
export QPID_LOG_ENABLE="warning+"
Use QPID_LOG_OUTPUT to determine where logging output should be sent. This is either a file name or the special values stderr, stdout, or syslog:
export QPID_LOG_TO_FILE="/tmp/myclient.out"