/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Threading; using log4net; using Apache.Qpid.Messaging; using Apache.Qpid.Client.Qms; namespace Apache.Qpid.Client.Tests.interop { public class TopicPublisher { private static ILog log = LogManager.GetLogger(typeof(TopicPublisher)); /// The default AMQ connection URL to use for tests. const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'"; /// Holds the default test timeout for broker communications before tests give up. const int TIMEOUT = 10000; /// Holds the routing key for the topic to receive test messages on. const string CONTROL_ROUTING_KEY = "topic_control"; /// Holds the routing key for the queue to send reports to. const string RESPONSE_ROUTING_KEY = "response"; /// Holds the number of messages to send in each test run. private int numMessages; /// Holds the number of subscribers listening to the messsages. private int numSubscribers; /// A monitor used to wait for all reports to arrive back from consumers on. private AutoResetEvent allReportsReceivedEvt = new AutoResetEvent(false); /// Holds the connection to listen on. private IConnection connection; /// Holds the channel for all test messages. private IChannel channel; /// Holds the producer to send test messages on. private IMessagePublisher publisher; /// /// Creates a topic publisher that will send the specifed number of messages and expect the specifed number of report back from test /// subscribers. /// /// /// The broker URL. /// The number of messages to send in each test. /// The number of subscribes that are expected to reply with a report. TopicPublisher(string connectionUri, int numMessages, int numSubscribers) { log.Debug("TopicPublisher(string connectionUri = " + connectionUri + ", int numMessages = "+ numMessages + ", int numSubscribers = " + numSubscribers + "): called"); // Create a connection to the broker. IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri); connection = new AMQConnection(connectionInfo); // Establish a session on the broker. channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1); // Set up a queue to listen for reports on. string responseQueueName = channel.GenerateUniqueName(); channel.DeclareQueue(responseQueueName, false, true, true); // Set this listener up to listen for reports on the response queue. channel.Bind(responseQueueName, ExchangeNameDefaults.DIRECT, RESPONSE_ROUTING_KEY); //channel.Bind(responseQueueName, "<>", RESPONSE_ROUTING_KEY); IMessageConsumer consumer = channel.CreateConsumerBuilder(responseQueueName) .Create(); consumer.OnMessage += new MessageReceivedDelegate(OnMessage); // Set up this listener with a producer to send the test messages and report requests on. publisher = channel.CreatePublisherBuilder() .WithExchangeName(ExchangeNameDefaults.TOPIC) .WithRoutingKey(CONTROL_ROUTING_KEY) .Create(); // Keep the test parameters. this.numMessages = numMessages; this.numSubscribers = numSubscribers; connection.Start(); Console.WriteLine("Sending messages and waiting for reports..."); } /// /// Start a test subscriber. The broker URL must be specified as the first command line argument. /// /// /// The command line arguments, broker URL first. public static void Main(String[] argv) { // Create an instance of this publisher with the command line parameters. TopicPublisher publisher = new TopicPublisher(DEFAULT_URI, 1, 1); // Publish the test messages. publisher.DoTest(); } /// /// Sends the test messages and waits for all subscribers to reply with a report. /// public void DoTest() { log.Debug("public void DoTest(): called"); // Create a test message to send. IMessage testMessage = channel.CreateTextMessage("test"); // Send the desired number of test messages. for (int i = 0; i < numMessages; i++) { publisher.Send(testMessage); } log.Debug("Sent " + numMessages + " test messages."); // Send the report request. IMessage reportRequestMessage = channel.CreateTextMessage("Report request message."); reportRequestMessage.Headers["TYPE"] = "REPORT_REQUEST"; reportRequestMessage.Headers.SetBoolean("BOOLEAN", false); //reportRequestMessage.Headers.SetByte("BYTE", 5); reportRequestMessage.Headers.SetDouble("DOUBLE", 3.141); reportRequestMessage.Headers.SetFloat("FLOAT", 1.0f); reportRequestMessage.Headers.SetInt("INT", 1); reportRequestMessage.Headers.SetLong("LONG", 1); reportRequestMessage.Headers.SetString("STRING", "hello"); reportRequestMessage.Headers.SetShort("SHORT", 2); publisher.Send(reportRequestMessage); log.Debug("Sent the report request message, waiting for all replies..."); // Wait until all the reports come in. allReportsReceivedEvt.WaitOne(TIMEOUT, true); // Check if all reports were really received or if the timeout occurred. if (numSubscribers == 0) { log.Debug("Got all reports."); } else { log.Debug("Waiting for reports timed out, still waiting for " + numSubscribers + "."); } // Send the termination request. IMessage terminationRequestMessage = channel.CreateTextMessage("Termination request message."); terminationRequestMessage.Headers["TYPE"] = "TERMINATION_REQUEST"; publisher.Send(terminationRequestMessage); log.Debug("Sent the termination request message."); // Close all message producers and consumers and the connection to the broker. Shutdown(); } /// /// Handles all report messages from subscribers. This decrements the count of subscribers that are still to reply, until this becomes /// zero, at which time waiting threads are notified of this event. /// /// /// The received report message. public void OnMessage(IMessage message) { log.Debug("public void OnMessage(IMessage message = " + message + "): called"); // Decrement the count of expected messages and release the wait monitor when this becomes zero. if (--numSubscribers == 0) { log.Debug("Got reports from all subscribers."); allReportsReceivedEvt.Set(); } } /// Stops the message consumers and closes the connection. private void Shutdown() { connection.Stop(); publisher.Dispose(); channel.Dispose(); connection.Dispose(); } } }