#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # def publisher(n): import qpid import sys from qpid.client import Client from qpid.content import Content if len(sys.argv) >= 3: n = int(sys.argv[2]) client = Client("127.0.0.1", 5672) client.start({"LOGIN": "guest", "PASSWORD": "guest"}) channel = client.channel(1) channel.session_open() message = Content("message") message["routing_key"] = "message_queue" print "producing ", n, " messages" for i in range(n): channel.message_transfer(destination="amq.direct", content=message) print "producing final message" message = Content("That's done") message["routing_key"] = "message_queue" channel.message_transfer(destination="amq.direct", content=message) print "consuming sync message" consumer = "consumer" queue = client.queue(consumer) channel.message_subscribe(queue="sync_queue", destination=consumer) channel.message_flow(consumer, 0, 0xFFFFFFFFL) channel.message_flow(consumer, 1, 0xFFFFFFFFL) queue.get(block = True) print "done" channel.session_close() def consumer(): import sys import qpid from qpid.client import Client from qpid.content import Content client = Client("127.0.0.1", 5672) client.start({"LOGIN": "guest", "PASSWORD": "guest"}) channel = client.channel(1) channel.session_open() consumer = "consumer" queue = client.queue(consumer) channel.message_subscribe(queue="message_queue", destination=consumer) channel.message_flow(consumer, 0, 0xFFFFFFFFL) channel.message_flow(consumer, 1, 0xFFFFFFFFL) final = "That's done" content = "" message = None print "getting messages" while content != final: message = queue.get(block = True) content = message.content.body message.complete(cumulative=True) print "consumed all messages" message = Content("message") message["routing_key"] = "sync_queue" channel.message_transfer(destination="amq.direct", content=message) print "done" channel.session_close() if __name__=='__main__': import sys import qpid from timeit import Timer from qpid.client import Client from qpid.content import Content client = Client("127.0.0.1", 5672) client.start({"LOGIN": "guest", "PASSWORD": "guest"}) channel = client.channel(1) channel.session_open() channel.queue_declare(queue="message_queue") channel.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="message_queue") channel.queue_declare(queue="sync_queue") channel.queue_bind(exchange="amq.direct", queue="sync_queue", routing_key="sync_queue") channel.session_close() numMess = 100 if len(sys.argv) >= 3: numMess = int(sys.argv[2]) if len(sys.argv) == 1: print "error: please specify prod or cons" elif sys.argv[1] == 'prod': tprod = Timer("publisher(100)", "from __main__ import publisher") tp = tprod.timeit(1) print "produced and consumed" , numMess + 2 ,"messages in: ", tp elif sys.argv[1] == 'cons': tcons = Timer("consumer()", "from __main__ import consumer") tc = tcons.timeit(1) print "consumed " , numMess ," in: ", tc else: print "please specify prod or cons"