#!/usr/bin/env python def publisher(n): import qpid import sys from qpid.client import Client from qpid.content import Content if len(sys.argv) >= 3: n = int(sys.argv[2]) client = Client("127.0.0.1", 5672) client.start({"LOGIN": "guest", "PASSWORD": "guest"}) channel = client.channel(1) channel.session_open() message = Content("message") message["routing_key"] = "message_queue" print "producing ", n, " messages" for i in range(n): channel.message_transfer(destination="amq.direct", content=message) print "producing final message" message = Content("That's done") message["routing_key"] = "message_queue" channel.message_transfer(destination="amq.direct", content=message) print "consuming sync message" consumer = "consumer" queue = client.queue(consumer) channel.message_subscribe(queue="sync_queue", destination=consumer) channel.message_flow(consumer, 0, 0xFFFFFFFF) channel.message_flow(consumer, 1, 0xFFFFFFFF) queue.get(block = True) print "done" channel.session_close() def consumer(): import sys import qpid from qpid.client import Client from qpid.content import Content client = Client("127.0.0.1", 5672) client.start({"LOGIN": "guest", "PASSWORD": "guest"}) channel = client.channel(1) channel.session_open() consumer = "consumer" queue = client.queue(consumer) channel.message_subscribe(queue="message_queue", destination=consumer) channel.message_flow(consumer, 0, 0xFFFFFFFF) channel.message_flow(consumer, 1, 0xFFFFFFFF) final = "That's done" content = "" message = None print "getting messages" while content != final: message = queue.get(block = True) content = message.content.body message.complete(cumulative=True) print "consumed all messages" message = Content("message") message["routing_key"] = "sync_queue" channel.message_transfer(destination="amq.direct", content=message) print "done" channel.session_close() if __name__=='__main__': import sys import qpid from timeit import Timer from qpid.client import Client from qpid.content import Content client = Client("127.0.0.1", 5672) client.start({"LOGIN": "guest", "PASSWORD": "guest"}) channel = client.channel(1) channel.session_open() channel.queue_declare(queue="message_queue") channel.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="message_queue") channel.queue_declare(queue="sync_queue") channel.queue_bind(exchange="amq.direct", queue="sync_queue", routing_key="sync_queue") channel.session_close() numMess = 100 if len(sys.argv) >= 3: numMess = int(sys.argv[2]) if len(sys.argv) == 1: print "error: please specify prod or cons" elif sys.argv[1] == 'prod': tprod = Timer("publisher(100)", "from __main__ import publisher") tp = tprod.timeit(1) print "produced and consumed" , numMess + 2 ,"messages in: ", tp elif sys.argv[1] == 'cons': tcons = Timer("consumer()", "from __main__ import consumer") tc = tcons.timeit(1) print "consumed " , numMess ," in: ", tc else: print "please specify prod or cons"