/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; using common.org.apache.qpid.transport.util; using org.apache.qpid.client; using org.apache.qpid.transport; using org.apache.qpid.transport.util; using Plossum.CommandLine; namespace PerfTest { [CommandLineManager(ApplicationName = "Qpid Perf Tests", Copyright = "Apache Software Foundation")] public class Options { [CommandLineOption(Description = "Displays this help text")] public bool Help; [CommandLineOption(Description = "Create shared queues.", MinOccurs = 0)] public Boolean Setup; [CommandLineOption(Description = "Run test, print report.", MinOccurs = 0)] public Boolean Control; [CommandLineOption(Description = "Publish messages.", MinOccurs = 0)] public Boolean Publish; [CommandLineOption(Description = "Subscribe for messages.", MinOccurs = 0)] public Boolean Subscribe; [CommandLineOption(Description = "Test mode: [shared|fanout|topic]", MinOccurs = 0)] public string Mode { get { return _mMode; } set { if (! value.Equals("shared") && ! value.Equals("fanout") && ! value.Equals("topic")) throw new InvalidOptionValueException( "The mode must not be shared|fanout|topic", false); _mMode = value; } } private string _mMode = "shared"; [CommandLineOption(Description = "Specifies the broler name", MinOccurs = 0)] public string Broker { get { return _broker; } set { if (String.IsNullOrEmpty(value)) throw new InvalidOptionValueException( "The broker name must not be empty", false); _broker = value; } } private string _broker = "localhost"; [CommandLineOption(Description = "Specifies the port name", MinOccurs = 0)] public int Port { get { return _port; } set { _port = value; } } private int _port = 5672; #region Publisher [CommandLineOption(Description = "Create N publishers.", MinOccurs = 0)] public int Pubs { get { return _pubs; } set { _pubs = value; } } private int _pubs = 1; [CommandLineOption(Description = "Each publisher sends N messages.", MinOccurs = 0)] public double Count { get { return _count; } set { _count = value; } } private double _count = 5000; [CommandLineOption(Description = "Size of messages in bytes.", MinOccurs = 0)] public long Size { get { return _size; } set { _size = value; } } private long _size = 1024; [CommandLineOption(Description = "Publisher use confirm-mode.", MinOccurs = 0)] public Boolean Confirm = true; [CommandLineOption(Description = "Publish messages as durable.", MinOccurs = 0)] public Boolean Durable; [CommandLineOption(Description = "Make data for each message unique.", MinOccurs = 0)] public Boolean UniqueData; [CommandLineOption(Description = "Wait for confirmation of each message before sending the next one.", MinOccurs = 0)] public Boolean SyncPub; [CommandLineOption(Description = ">=0 delay between msg publish.", MinOccurs = 0)] public double IntervalPub { get { return _interval_pub; } set { _interval_pub = value; } } private double _interval_pub; #endregion #region Subscriber [CommandLineOption(Description = "Create N subscribers.", MinOccurs = 0)] public int Subs { get { return _subs; } set { _subs = value; } } private int _subs = 1; [CommandLineOption(Description = "N>0: Subscriber acks batches of N.\n N==0: Subscriber uses unconfirmed mode", MinOccurs = 0)] public int SubAck { get { return _suback; } set { _suback = value; } } private int _suback; [CommandLineOption(Description = ">=0 delay between msg consume", MinOccurs = 0)] public double IntervalSub { get { return _interval_sub; } set { _interval_sub = value; } } private double _interval_sub; #endregion [CommandLineOption(Description = "Create N queues.", MinOccurs = 0)] public int Queues { get { return _qt; } set { _qt = value; } } private int _qt = 1; [CommandLineOption(Description = "Desired number of iterations of the test.", MinOccurs = 0)] public int Iterations { get { return _iterations; } set { _iterations = value; } } private int _iterations = 1; [CommandLineOption(Description = "If non-zero, the transaction batch size.", MinOccurs = 0)] public int Tx { get { return _tx; } set { _tx = value; } } private int _tx; [CommandLineOption(Description = "Make queue durable (implied if durable set.", MinOccurs = 0)] public Boolean QueueDurable; [CommandLineOption(Description = "Queue policy: count to trigger 'flow to disk'", MinOccurs = 0)] public double QueueMaxCount { get { return _queueMaxCount; } set { _queueMaxCount = value; } } private double _queueMaxCount; [CommandLineOption(Description = "Queue policy: accumulated size to trigger 'flow to disk'", MinOccurs = 0)] public double QueueMaxSize { get { return _queueMaxSize; } set { _queueMaxSize = value; } } private double _queueMaxSize; public double SubQuota { get { return _subQuota; } set { _subQuota = value; } } private double _subQuota; } internal interface Startable { void Start(); } public abstract class PerfTestClient : Startable { private readonly IClient _connection; private readonly IClientSession _session; private readonly Options _options; public IClientSession Session { get { return _session; } } public Options Options { get { return _options; } } protected PerfTestClient(Options options) { _options = options; _connection = new Client(); _connection.Connect(options.Broker, options.Port, "test", "guest", "guest"); _session = _connection.CreateSession(50000); } public abstract void Start(); } public class SetupTest : PerfTestClient { public SetupTest(Options options) : base(options) { } private void queueInit(String name, Boolean durable, Dictionary arguments) { Session.QueueDeclare(name, null, arguments, durable ? Option.DURABLE : Option.NONE); Session.QueuePurge(name); Session.ExchangeBind(name, "amq.direct", name); Session.Sync(); } public override void Start() { queueInit("pub_start", false, null); queueInit("pub_done", false, null); queueInit("sub_ready", false, null); queueInit("sub_done", false, null); if (Options.Mode.Equals("shared")) { Dictionary settings = new Dictionary(); if (Options.QueueMaxCount > 0) settings.Add("qpid.max_count", Options.QueueMaxCount); if (Options.QueueMaxSize > 0) settings.Add("qpid.max_size", Options.QueueMaxSize); for (int i = 0; i < Options.Queues; ++i) { string qname = "perftest" + i; queueInit(qname, Options.Durable || Options.QueueDurable, settings); } } } } public class SubscribeThread : PerfTestClient { private readonly string _queue; public SubscribeThread(Options options, string key, string exchange) : base(options) { _queue = "perftest" + (new UUID(10, 10)); Session.QueueDeclare(_queue, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE, Options.Durable ? Option.DURABLE : Option.NONE); Session.ExchangeBind(_queue, exchange, key); } public SubscribeThread(Options options, string key) : base(options) { _queue = key; } public override void Start() { if (Options.Tx > 0) { Session.TxSelect(); Session.Sync(); } CircularBuffer buffer = new CircularBuffer(100); // Create a listener and subscribe it to the queue named "message_queue" IMessageListener listener = new SyncListener(buffer); string dest = "dest" + UUID.RandomUuid(); Session.AttachMessageListener(listener, dest); Session.MessageSubscribe(_queue, dest, Options.Tx > 0 || Options.SubAck > 0 ? MessageAcceptMode.EXPLICIT : MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, null, 0, null); // issue credits Session.MessageSetFlowMode(dest, MessageFlowMode.WINDOW); Session.MessageFlow(dest, MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES); // Notify controller we are ready. IMessage message = new Message(); message.DeliveryProperties.SetRoutingKey("sub_ready"); message.AppendData(Encoding.UTF8.GetBytes("ready")); Session.MessageTransfer("amq.direct", message); if (Options.Tx > 0) { Session.TxCommit(); Session.Sync(); } for (int j = 0; j < Options.Iterations; ++j) { //need to allocate some more credit Session.MessageFlow(dest, MessageCreditUnit.MESSAGE, (long)Options.SubQuota); RangeSet range = new RangeSet(); IMessage msg; DateTime start = DateTime.Now; for (long i = 0; i < Options.SubQuota; ++i) { msg = buffer.Dequeue(); if (Options.Tx > 0 && ((i + 1)%Options.Tx == 0)) { Session.TxCommit(); Session.Sync(); } if (Options.IntervalSub > 0) { Thread.Sleep((int) Options.IntervalSub*1000); } range.Add(msg.Id); } if (Options.Tx > 0 || Options.SubAck > 0) Session.MessageAccept(range); range.Clear(); if (Options.Tx > 0) { Session.TxSelect(); Session.Sync(); } DateTime end = DateTime.Now; // Report to publisher. message.DeliveryProperties.SetRoutingKey("sub_done"); message.ClearData(); message.AppendData(BitConverter.GetBytes(Options.SubQuota / end.Subtract(start).TotalMilliseconds )); Session.MessageTransfer("amq.direct", message); if (Options.Tx > 0) { Session.TxSelect(); Session.Sync(); } } Session.Close(); } } public class SyncListener : IMessageListener { private readonly CircularBuffer _buffer; public SyncListener(CircularBuffer buffer) { _buffer = buffer; } public void MessageTransfer(IMessage m) { _buffer.Enqueue(m); } } public class PublishThread : PerfTestClient { private readonly string _exchange; private readonly string _key; public PublishThread(Options options, string key, string exchange) : base(options) { _key = key; _exchange = exchange; } public override void Start() { byte[] data = new byte[Options.Size]; // randomly populate data Random r = new Random(34); r.NextBytes(data); IMessage message = new Message(); message.AppendData(data); message.DeliveryProperties.SetRoutingKey(_key); if (Options.Durable) message.DeliveryProperties.SetDeliveryMode(MessageDeliveryMode.PERSISTENT); if (Options.Tx > 0) { Session.TxSelect(); Session.Sync(); } CircularBuffer buffer = new CircularBuffer(100); // Create a listener and subscribe it to the queue named "pub_start" IMessageListener listener = new SyncListener(buffer); string localQueue = "localQueue-" + UUID.RandomUuid().ToString(); Session.QueueDeclare(localQueue, null, null, Option.AUTO_DELETE); Session.ExchangeBind(localQueue, "amq.direct", "pub_start"); Session.AttachMessageListener(listener, localQueue); Session.MessageSubscribe(localQueue); if (Options.Tx > 0) { Session.TxCommit(); Session.Sync(); } buffer.Dequeue(); for (int j = 0; j < Options.Iterations; ++j) { DateTime start = DateTime.Now; for (long i = 0; i < Options.Count; ++i) { Session.MessageTransfer(_exchange, message); if (Options.SyncPub) { Session.Sync(); } if (Options.Tx > 0 && (i + 1)%Options.Tx == 0) { Session.TxSelect(); Session.Sync(); } if (Options.IntervalPub > 0) { Thread.Sleep((int) Options.IntervalSub*1000); } } Session.Sync(); DateTime end = DateTime.Now; // Report to publisher. message.DeliveryProperties.SetRoutingKey("pub_done"); message.ClearData(); double time = end.Subtract(start).TotalMilliseconds; byte[] rate = BitConverter.GetBytes( Options.Count / time ); message.AppendData(rate); Session.MessageTransfer("amq.direct", message); if (Options.Tx > 0) { Session.TxSelect(); Session.Sync(); } } Session.Close(); } } public class Controller : PerfTestClient { public Controller(Options options) : base(options) { } private void process(int size, string queue) { CircularBuffer buffer = new CircularBuffer(100); IMessageListener listener = new SyncListener(buffer); string localQueue = "queue-" + UUID.RandomUuid(); Session.QueueDeclare(localQueue, null, null, Option.AUTO_DELETE); Session.ExchangeBind(localQueue, "amq.direct", queue); Session.AttachMessageListener(listener, localQueue); Session.MessageSubscribe(localQueue); for (int i = 0; i < size; ++i) { buffer.Dequeue(); } } private double processRate(int size, string queue) { CircularBuffer buffer = new CircularBuffer(100); IMessageListener listener = new SyncListener(buffer); string localQueue = "queue-" + UUID.RandomUuid(); Session.QueueDeclare(localQueue, null, null, Option.AUTO_DELETE); Session.ExchangeBind(localQueue, "amq.direct", queue); Session.AttachMessageListener(listener, localQueue); Session.MessageSubscribe(localQueue); double rate = 0; RangeSet range = new RangeSet(); for (int i = 0; i < size; ++i) { IMessage m = buffer.Dequeue(); range.Add(m.Id); BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8); byte[] body = new byte[m.Body.Length - m.Body.Position]; reader.Read(body, 0, body.Length); rate += BitConverter.ToDouble(body,0); } Session.MessageAccept(range); return rate; } private void send(int size, string queue, string data) { IMessage message = new Message(); message.DeliveryProperties.SetRoutingKey(queue); message.AppendData(Encoding.UTF8.GetBytes(data)); for (int i = 0; i < size; ++i) { Session.MessageTransfer("amq.direct", message); } } public override void Start() { process(Options.Subs, "sub_ready"); for (int j = 0; j < Options.Iterations; ++j) { DateTime start = DateTime.Now; send(Options.Pubs, "pub_start", "start"); // Start publishers double pubrate = processRate(Options.Pubs, "pub_done"); double subrate = processRate(Options.Subs, "sub_done"); DateTime end = DateTime.Now; double transfers = (Options.Pubs*Options.Count) + (Options.Subs*Options.SubQuota); double time = end.Subtract(start).TotalSeconds; double txrate = transfers/time; double mbytes = (txrate*Options.Size) / (1024 * 1024) ; Console.WriteLine("Total: " + transfers + " transfers of " + Options.Size + " bytes in " + time + " seconds.\n" + "Publish transfers/sec: " + pubrate * 1000 + "\n" + "Subscribe transfers/sec: " + subrate * 1000 + "\n" + "Total transfers/sec: " + txrate + "\n" + "Total Mbytes/sec: " + mbytes); Console.WriteLine("done"); } } } public class PerfTest { private static int Main(string[] args) { Options options = new Options(); CommandLineParser parser = new CommandLineParser(options); parser.Parse(); if (parser.HasErrors) { Console.WriteLine(parser.UsageInfo.GetErrorsAsString(78)); return -1; } if (options.Help) { Console.WriteLine(parser.UsageInfo.GetOptionsAsString(78)); return 0; } bool singleProcess = (!options.Setup && !options.Control && !options.Publish && !options.Subscribe); if (singleProcess) { options.Setup = options.Control = options.Publish = true; options.Subscribe = true; } string exchange = "amq.direct"; switch (options.Mode) { case "shared": options.SubQuota = (options.Pubs*options.Count)/options.Subs; break; case "fanout": options.SubQuota = (options.Pubs*options.Count); exchange = "amq.fanout"; break; case "topic": options.SubQuota = (options.Pubs*options.Count); exchange = "amq.topic"; break; } if (options.Setup) { SetupTest setup = new SetupTest(options); setup.Start(); // Set up queues } Thread contT = null; if ( options.Control) { Controller c = new Controller(options); contT = new Thread(c.Start); contT.Start(); } Thread[] publishers = null; Thread[] subscribers = null; // Start pubs/subs for each queue/topic. for (int i = 0; i < options.Queues; ++i) { string key = "perftest" + i; // Queue or topic name. if (options.Publish) { int n = singleProcess ? options.Pubs : 1; publishers = new Thread[n]; for (int j = 0; j < n; ++j) { PublishThread pt = new PublishThread(options, key, exchange); publishers[i] = new Thread(pt.Start); publishers[i].Start(); } } if ( options.Subscribe) { int n = singleProcess ? options.Subs : 1; subscribers = new Thread[n]; for (int j = 0; j < n; ++j) { SubscribeThread st; if (options.Mode.Equals("shared")) st = new SubscribeThread(options, key); else st = new SubscribeThread(options, key, exchange); subscribers[i] = new Thread(st.Start); subscribers[i].Start(); } } } if (options.Control) { contT.Join(); } // Wait for started threads. if (options.Publish) { foreach (Thread t in publishers) { t.Join(); } } if (options.Subscribe) { foreach (Thread t in subscribers) { t.Join(); } } return 0; } } }