#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import optparse, time, qpid.messaging, re from threading import Thread from subprocess import Popen, PIPE, STDOUT op = optparse.OptionParser(usage="usage: %prog [options]", description="simple performance benchmarks") op.add_option("-b", "--broker", default=[], action="append", type="str", help="url of broker(s) to connect to, round robin on multiple brokers") op.add_option("-c", "--client-host", default=[], action="append", type="str", help="host(s) to run clients on via ssh, round robin on mulple hosts") op.add_option("-q", "--queues", default=1, type="int", metavar="N", help="create N queues (default %default)") op.add_option("-s", "--senders", default=1, type="int", metavar="N", help="start N senders per queue (default %default)") op.add_option("-r", "--receivers", default=1, type="int", metavar="N", help="start N receivers per queue (default %default)") op.add_option("-m", "--messages", default=100000, type="int", metavar="N", help="send N messages per sender (default %default)") op.add_option("--queue-name", default="benchmark", metavar="NAME", help="base name for queues (default %default)") op.add_option("--send-rate", default=0, metavar="N", help="send rate limited to N messages/second, 0 means no limit (default %default)") op.add_option("--receive-rate", default=0, metavar="N", help="receive rate limited to N messages/second, 0 means no limit (default %default)") op.add_option("--content-size", default=1024, type="int", metavar="BYTES", help="message size in bytes (default %default)") op.add_option("--ack-frequency", default=100, metavar="N", type="int", help="receiver ack's every N messages, 0 means unconfirmed (default %default)") op.add_option("--no-report-header", dest="report_header", default=True, action="store_false", help="don't print header on report") op.add_option("--summarize", default=False, action="store_true", help="print summary statistics for multiple senders/receivers: total throughput, average latency") op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int") op.add_option("--send-option", default=[], action="append", type="str", help="Additional option for sending addresses") op.add_option("--receive-option", default=[], action="append", type="str", help="Additional option for receiving addresses") op.add_option("--create-option", default=[], action="append", type="str", help="Additional option for creating addresses") op.add_option("--send-arg", default=[], action="append", type="str", help="Additional argument for qpid-send") op.add_option("--receive-arg", default=[], action="append", type="str", help="Additional argument for qpid-receive") op.add_option("--no-timestamp", dest="timestamp", default=True, action="store_false", help="don't add a timestamp, no latency results") op.add_option("--sequence", dest="sequence", default=False, action="store_true", help="add a sequence number to each message") op.add_option("--connection-options", type="str", help="Connection options for senders & receivers") op.add_option("--flow-control", default=0, type="int", metavar="N", help="Flow control each sender to limit queue depth to 2*N. 0 means no flow control.") op.add_option("--durable", default=False, action="store_true", help="Use durable queues and messages") op.add_option("--save-received", default=False, action="store_true", help="Save received message content to files -receiver-.msg") op.add_option("--group-receivers", default=False, action="store_true", help="Run receivers for the same queue on the same host.") op.add_option("--verbose", default=False, action="store_true", help="Show commands executed") op.add_option("--no-delete", default=False, action="store_true", help="Don't delete the test queues.") single_quote_re = re.compile("'") def posix_quote(string): """ Quote a string for use as an argument in a posix shell""" return "'" + single_quote_re.sub("\\'", string) + "'"; def ssh_command(host, command): """ Convert command into an ssh command on host with quoting""" return ["ssh", host] + [posix_quote(arg) for arg in command] class Clients: def __init__(self): self.clients=[] def add(self, client): self.clients.append(client) return client def kill(self): for c in self.clients: try: c.kill() except: pass class PopenCommand(Popen): """Like Popen but you can query for the command""" def __init__(self, command, *args, **kwargs): self.command = command Popen.__init__(self, command, *args, **kwargs) clients = Clients() def start_receive(queue, index, opts, ready_queue, broker, host): address_opts=opts.receive_option if opts.durable: address_opts += ["node:{durable:true}"] address="%s;{%s}"%(queue,",".join(address_opts)) msg_total=opts.senders*opts.messages messages = msg_total/opts.receivers; if (index < msg_total%opts.receivers): messages += 1 if (messages == 0): return None command = ["qpid-receive", "-b", broker, "-a", address, "-m", str(messages), "--forever", "--print-content=no", "--receive-rate", str(opts.receive_rate), "--report-total", "--ack-frequency", str(opts.ack_frequency), "--ready-address", "%s;{create:always}"%ready_queue, "--report-header=no" ] if opts.save_received: command += ["--save-content=%s-receiver-%s.msg"%(queue,index)] command += opts.receive_arg if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) if opts.verbose: print "Receiver: ", command return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) def start_send(queue, opts, broker, host): address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"])) command = ["qpid-send", "-b", broker, "-a", address, "--messages", str(opts.messages), "--content-size", str(opts.content_size), "--send-rate", str(opts.send_rate), "--report-total", "--report-header=no", "--timestamp=%s"%(opts.timestamp and "yes" or "no"), "--sequence=%s"%(opts.sequence and "yes" or "no"), "--flow-control", str(opts.flow_control), "--durable", str(opts.durable) ] command += opts.send_arg if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) if opts.verbose: print "Sender: ", command return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) def error_msg(out, err): return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err)) def first_line(p): out,err=p.communicate() if p.returncode != 0: raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err))) return out.split("\n")[0] def queue_exists(queue,broker): c = qpid.messaging.Connection(broker) c.open() try: s = c.session() try: s.sender(queue) return True except qpid.messaging.exceptions.NotFound: return False finally: c.close() def recreate_queues(queues, brokers, no_delete, opts): c = qpid.messaging.Connection(brokers[0]) c.open() s = c.session() for q in queues: if not no_delete: try: s.sender("%s;{delete:always}"%(q)).close() except qpid.messaging.exceptions.NotFound: pass # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate for b in brokers: while queue_exists(q,b): time.sleep(0.1); address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"])) if opts.verbose: print "Creating", address s.sender(address) # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate for b in brokers: while not queue_exists(q,b): time.sleep(0.1); c.close() def print_header(timestamp): if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp" else: latency_header="" print "send-tp\trecv-tp%s"%latency_header def parse(parser, lines): # Parse sender/receiver output return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] def parse_senders(senders): return parse([int],[first_line(p) for p in senders]) def parse_receivers(receivers): return parse([int,float,float,float],[first_line(p) for p in receivers if p]) def print_data(send_stats, recv_stats, total_tp): for send,recv in map(None, send_stats, recv_stats): line="" if send: line += "%d"%send[0] if recv: line += "\t%d"%recv[0] if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]) if total_tp is not None: line += "\t%d"%total_tp total_tp = None print line def print_summary(send_stats, recv_stats, total_tp): def avg(s): sum(s) / len(s) send_tp = sum([l[0] for l in send_stats]) recv_tp = sum([l[0] for l in recv_stats]) summary = "%d\t%d"%(send_tp, recv_tp) if recv_stats and len(recv_stats[0]) == 4: l_min = sum(l[1] for l in recv_stats)/len(recv_stats) l_max = sum(l[2] for l in recv_stats)/len(recv_stats) l_avg = sum(l[3] for l in recv_stats)/len(recv_stats) summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg) summary += "\t%d"%total_tp print summary class ReadyReceiver: """A receiver for ready messages""" def __init__(self, queue, broker): self.connection = qpid.messaging.Connection(broker) self.connection.open() self.receiver = self.connection.session().receiver( "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue)) self.receiver.session.sync() self.timeout=10 def wait(self, receivers): try: for i in receivers: self.receiver.fetch(self.timeout) self.connection.close() except qpid.messaging.Empty: for r in receivers: if (r.poll() is not None): out,err=r.communicate() raise Exception("Receiver error: %s\n%s" % (" ".join(r.command), error_msg(out,err))) raise Exception("Timed out waiting for receivers to be ready") def flatten(l): return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), []) class RoundRobin: def __init__(self,items): self.items = items self.index = 0 def next(self): if not self.items: return None ret = self.items[self.index] self.index = (self.index+1)%len(self.items) return ret def main(): opts, args = op.parse_args() opts.client_host = flatten(opts.client_host) if not opts.broker: if opts.client_host: raise Exception("--broker must be specified if --client_host is.") opts.broker = ["127.0.0.1"] # Deafult to local broker opts.broker = flatten(opts.broker) brokers = RoundRobin(opts.broker) client_hosts = RoundRobin(opts.client_host) send_out = "" receive_out = "" ready_queue="%s-ready"%(opts.queue_name) queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] try: for i in xrange(opts.repeat): recreate_queues(queues, opts.broker, opts.no_delete, opts) ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) if opts.group_receivers: # Run receivers for same queue against same broker. receivers = [] for q in queues: b = brokers.next() for j in xrange(opts.receivers): receivers.append( start_receive(q, j, opts, ready_queue, b, client_hosts.next())) else: # Don't group receivers receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.receivers)] ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. start = time.time() senders = [start_send(q, opts,brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.senders)] if opts.report_header and i == 0: print_header(opts.timestamp) for p in senders + receivers: p.wait() total_sent = opts.queues * opts.senders * opts.messages total_tp = total_sent / (time.time()-start) send_stats=parse_senders(senders) recv_stats=parse_receivers(receivers) if opts.summarize: print_summary(send_stats, recv_stats, total_tp) else: print_data(send_stats, recv_stats, total_tp) finally: clients.kill() # No strays if __name__ == "__main__": main()