Hello World!¶
Tradition dictates that we start with hello world! However rather than simply striving for the shortest program possible, we’ll aim for a more illustrative example while still restricting the functionality to sending and receiving a single message.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | from proton import Message
from proton_events import EventLoop, IncomingMessageHandler
class HelloWorldReceiver(IncomingMessageHandler):
def on_message(self, event):
print event.message.body
event.connection.close()
class HelloWorldSender(object):
def on_link_flow(self, event):
event.link.send_msg(Message(body=u"Hello World!"))
event.link.close()
class HelloWorld(object):
def __init__(self, eventloop, url, address):
self.eventloop = eventloop
self.conn = eventloop.connect(url, handler=self)
self.address = address
def on_connection_remote_open(self, event):
self.conn.receiver(self.address, handler=HelloWorldReceiver())
def on_link_remote_open(self, event):
if event.link.is_receiver:
self.conn.sender(self.address, handler=HelloWorldSender())
def on_link_remote_close(self, event):
self.closed(event.link.remote_condition)
def on_connection_remote_close(self, event):
self.closed(event.connection.remote_condition)
def closed(self, error=None):
if error:
print "Closed due to %s" % error
self.conn.close()
def run(self):
self.eventloop.run()
HelloWorld(EventLoop(), "localhost:5672", "examples").run()
|
You can see the import of EventLoop from proton_events on the second line. This is a helper class that makes programming with proton a little easier for the common cases. It includes within it an event loop, and programs written using this utility are generally structured to react to various events. This reactive style is particularly suited to messaging applications.
To be notified of a particular event, you define a class with the appropriately name method on it. That method is then called by the event loop when the event occurs.
The first class we define, HelloWorldReceiver, handles the event where a message is received and so implements a on_message() method. Within that we simply print the body of the message (line 6) and then close the connection (line 7).
The second class, HelloWorldSender, handles the event where the flow of messages is enabled over our sending link by implementing a on_link_flow() method and sending the message within that. Doing this ensures that we only send when the recipient is ready and able to receive the message. This is particularly important when the volume of messages might be large. In our case we are just going to send one message, which we do on line 11, so we can then just close the sending link on line 12.
The HelloWorld class ties everything together. It’s constructor takes the instance of the event loop to use, a url to connect to, and an address through which the message will be sent. To run the example you will need to have a broker (or similar) accepting connections on that url either with a queue (or topic) matching the given address or else configured to create such a queue (or topic) dynamically.
On line 17 we request that a connection be made to the process this url refers to by calling connect() on the EventLoop. This call returns a MessagingContext object through which we can create objects for sending and receiving messages to the process it is connected to. However we will delay doing that until our connection is fully established, i.e. until the remote peer ‘opens’ the connection (the open here is the ‘handshake’ for establishing an operational AMQP connection).
To be notified of this we pass a reference to self as the handler in connect() and define an on_connection_remote_open() method within which we can create our receiver using the connection context we obtained from the earlier connect() call, and passing the handler implementation defined by HelloWorldReceiver. When the remote peer confirms the establishment of that receiver we get a callback on on_link_remote_open() and that is where we then create our sender, passing it the HelloWorldSender handler. Delaying the creation of the sender until the receiver is established avoids losing messages even when these are not queued up by the remote peer.
We’ll add definitions to HelloWorld of on_link_remote_close() and on_connection_remote_close() also, so that we can be notified if the broker we are connected to closes either link or the connection for any reason.
Finally we actually enter the event loop, to handle all the necessary IO and make all the necessary event callbacks, by calling run() on it.
Hello World, Direct!¶
Though often used in conjunction with a broker, AMQP does not require this. It also allows senders and receivers can communicate directly if desired.
Let’s modify our example to demonstrate this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | from proton import Message
from proton_events import EventLoop, FlowController, Handshaker, IncomingMessageHandler
class HelloWorldReceiver(IncomingMessageHandler):
def on_message(self, event):
print event.message.body
event.connection.close()
class HelloWorldSender(object):
def on_link_flow(self, event):
event.link.send_msg(Message(body=u"Hello World!"))
event.link.close()
class HelloWorld(object):
def __init__(self, eventloop, url, address):
self.eventloop = eventloop
self.acceptor = eventloop.listen(url)
self.conn = eventloop.connect(url, handler=self)
self.address = address
def on_connection_remote_open(self, event):
self.conn.sender(self.address, handler=HelloWorldSender())
def on_link_remote_close(self, event):
self.closed(event.link.remote_condition)
def on_connection_remote_close(self, event):
self.closed(event.connection.remote_condition)
def closed(self, error=None):
if error:
print "Closed due to %s" % error
self.conn.close()
self.acceptor.close()
def run(self):
self.eventloop.run()
eventloop = EventLoop(HelloWorldReceiver(), Handshaker(), FlowController(1))
HelloWorld(eventloop, "localhost:8888", "examples").run()
|
The first difference, on line 17, is that rather than creating a receiver on the same connection as our sender, we listen for incoming connections by invoking the listen() method on the ``EventLoop instance.
Another difference is that the EventLoop instance we use is not the default instance as was used in the original example, but one we construct ourselves on line 38, passing in some event handlers. The first of these is HelloWorldReceiver, as used in the original example. We pass it to the event loop, because we aren’t going to directly create the receiver here ourselves. Rather we will accept an incoming connection on which the message will be received. This handler would then be notified of any incoming message event on any of the connections the event loop controls. As well as our own handler, we specify a couple of useful handlers from the proton_events toolkit. The Handshaker handler will ensure our server follows the basic handshaking rules laid down by the protocol. The FlowController will issue credit for incoming messages. We won’t worry about them in more detail than that for now.
The last difference is that we close the acceptor returned from the listen() call as part of the handling of the connection close event (line 33).
So now we have our example working without a broker involved!
The Basics¶
TODO: These examples show reliable (at-least-once) send and receive with reconnect ability. Need to write some explanation. Could also do with some further cleanup.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | import time
from proton_events import IncomingMessageHandler, EventLoop
class Recv(IncomingMessageHandler):
def __init__(self, eventloop, host, address):
self.eventloop = eventloop
self.host = host
self.address = address
self.delay = 0
self.connect()
def connect(self):
self.conn = self.eventloop.connect(self.host, handler=self)
def on_message(self, event):
print event.message.body
def on_connection_remote_open(self, event):
self.delay = 0
self.conn.receiver(self.address)
def on_link_remote_close(self, event):
self.closed(event.link.remote_condition)
def on_connection_remote_close(self, event):
self.closed(event.connection.remote_condition)
def closed(self, error=None):
if error:
print "Closed due to %s" % error
self.conn.close()
def on_disconnected(self, conn):
if self.delay == 0:
self.delay = 0.1
print "Disconnected, reconnecting..."
self.connect()
else:
print "Disconnected will try to reconnect after %d seconds" % self.delay
self.eventloop.schedule(time.time() + self.delay, connection=conn)
self.delay = min(10, 2*self.delay)
def on_timer(self, event):
print "Reconnecting..."
self.connect()
def run(self):
self.eventloop.run()
try:
Recv(EventLoop(), "localhost:5672", "examples").run()
except KeyboardInterrupt: pass
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | import time
from proton import Message
from proton_events import OutgoingMessageHandler, EventLoop
class Send(OutgoingMessageHandler):
def __init__(self, eventloop, host, address, messages):
self.eventloop = eventloop
self.sent = 0
self.confirmed = 0
self.total = messages
self.address = address
self.host = host
self.delay = 0
self.connect()
def connect(self):
self.conn = self.eventloop.connect(self.host, handler=self)
def on_link_flow(self, event):
for i in range(self.sender.credit):
if self.sent == self.total:
self.sender.drained()
break
msg = Message(body={'sequence':self.sent})
self.sender.send_msg(msg, handler=self)
self.sent += 1
def on_accepted(self, event):
"""
Stop the application once all of the messages are sent and acknowledged,
"""
self.confirmed += 1
if self.confirmed == self.total:
self.sender.close()
self.conn.close()
def on_connection_remote_open(self, event):
self.sent = self.confirmed
self.sender = self.conn.sender(self.address)
self.sender.offered(self.total)
self.delay = 0
def on_link_remote_close(self, event):
self.closed(event.link.remote_condition)
def on_connection_remote_close(self, event):
self.closed(event.connection.remote_condition)
def closed(self, error=None):
if error:
print "Closed due to %s" % error
self.conn.close()
def on_disconnected(self, conn):
if self.delay == 0:
self.delay = 0.1
print "Disconnected, reconnecting..."
self.connect()
else:
print "Disconnected will try to reconnect after %d seconds" % self.delay
self.eventloop.schedule(time.time() + self.delay, connection=conn)
self.delay = min(10, 2*self.delay)
def on_timer(self, event):
print "Reconnecting..."
self.connect()
def run(self):
self.eventloop.run()
Send(EventLoop(), "localhost:5672", "examples", 10000).run()
|
Request/Response¶
A common pattern is to send a request message and expect a response message in return. AMQP has special support for this pattern. Let’s have a look at a simple example. We’ll start with the ‘server’, i.e. the program that will process the request and send the response. Note that we are still using a broker in this example.
Our server will provide a very simple service: it will respond with the body of the request converted to uppercase.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | from proton import Message
from proton_events import EventLoop, IncomingMessageHandler
class Server(IncomingMessageHandler):
def __init__(self, eventloop, host, address):
self.eventloop = eventloop
self.conn = eventloop.connect(host)
self.receiver = self.conn.receiver(address, handler=self)
self.senders = {}
def on_message(self, event):
sender = self.senders.get(event.message.reply_to)
if not sender:
sender = self.conn.sender(event.message.reply_to)
self.senders[event.message.reply_to] = sender
sender.send_msg(Message(body=event.message.body.upper()))
def on_connection_remote_close(self, endpoint, error):
if error: print "Closed due to %s" % error
self.conn.close()
def run(self):
self.eventloop.run()
try:
Server(EventLoop(), "localhost:5672", "examples").run()
except KeyboardInterrupt: pass
|
The code here is not too different from the simple receiver example. When we receive a request however, we look at the reply-to address and create a sender for that over which to send the response. We’ll cache the senders incase we get further requests wit the same reply-to.
Now let’s create a simple client to test this service out.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | from proton import Message
from proton_events import EventLoop, IncomingMessageHandler
class Client(IncomingMessageHandler):
def __init__(self, eventloop, host, address, requests):
self.eventloop = eventloop
self.conn = eventloop.connect(host)
self.sender = self.conn.sender(address)
self.receiver = self.conn.receiver(None, dynamic=True, handler=self)
self.requests = requests
def next_request(self):
req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0])
self.sender.send_msg(req)
def on_link_remote_open(self, event):
self.next_request()
def on_message(self, event):
print "%s => %s" % (self.requests.pop(0), event.message.body)
if self.requests:
self.next_request()
else:
self.conn.close()
def run(self):
self.eventloop.run()
REQUESTS= ["Twas brillig, and the slithy toves",
"Did gire and gymble in the wabe.",
"All mimsy were the borogroves,",
"And the mome raths outgrabe."]
Client(EventLoop(), "localhost:5672", "examples", REQUESTS).run()
|
As well as sending requests, we need to be able to get back the responses. We create a receiver for that (see line 8), but we don’t specify an address, we set the dynamic option which tells the broker we are connected to to create a temporary address over which we can receive our responses.
We need to use the address allocated by the broker as the reply_to address of our requests. To be notified when the broker has sent us back the address to use, we add an on_link_remote_open() method to our receiver’s handler, and use that as the trigger to send our first request.