/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using log4net; using Apache.Qpid.Messaging; using Apache.Qpid.Client.Qms; namespace Apache.Qpid.Client.Tests.interop { public class TopicListener { private static ILog log = LogManager.GetLogger(typeof(TopicListener)); /// The default AMQ connection URL to use for tests. const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'"; /// Holds the routing key for the topic to receive test messages on. public static string CONTROL_ROUTING_KEY = "topic_control"; /// Holds the routing key for the queue to send reports to. public static string RESPONSE_ROUTING_KEY = "response"; /// Holds the connection to listen on. private IConnection connection; /// Holds the channel for all test messages. private IChannel channel; /// Holds the producer to send report messages on. private IMessagePublisher publisher; /// Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */ private bool init; /// Holds the count of messages received by this listener. */ private int count; /// Creates a topic listener using the specified broker URL. /// /// The broker URL to listen on. TopicListener(string connectionUri) { log.Debug("TopicListener(string connectionUri = " + connectionUri + "): called"); // Create a connection to the broker. IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri); connection = new AMQConnection(connectionInfo); // Establish a session on the broker. channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1); // Set up a queue to listen for test messages on. string topicQueueName = channel.GenerateUniqueName(); channel.DeclareQueue(topicQueueName, false, true, true); // Set this listener up to listen for incoming messages on the test topic queue. channel.Bind(topicQueueName, ExchangeNameDefaults.TOPIC, CONTROL_ROUTING_KEY); IMessageConsumer consumer = channel.CreateConsumerBuilder(topicQueueName) .Create(); consumer.OnMessage += new MessageReceivedDelegate(OnMessage); // Set up this listener with a producer to send the reports on. publisher = channel.CreatePublisherBuilder() .WithExchangeName(ExchangeNameDefaults.DIRECT) .WithRoutingKey(RESPONSE_ROUTING_KEY) .Create(); connection.Start(); Console.WriteLine("Waiting for messages..."); } public static void Main(String[] argv) { // Create an instance of this listener with the command line parameters. new TopicListener(DEFAULT_URI); } /// /// Handles all message received by this listener. Test messages are counted, report messages result in a report being sent and /// shutdown messages result in this listener being terminated. /// /// /// The received message. public void OnMessage(IMessage message) { log.Debug("public void onMessage(Message message = " + message + "): called"); // Take the start time of the first message if this is the first message. if (!init) { count = 0; init = true; } // Check if the message is a control message telling this listener to shut down. if (IsShutdown(message)) { log.Debug("Got a shutdown message."); Shutdown(); } // Check if the message is a report request message asking this listener to respond with the message count. else if (IsReport(message)) { log.Debug("Got a report request message."); // Send the message count report. SendReport(); // Reset the initialization flag so that the next message is considered to be the first. init = false; } // Otherwise it is an ordinary test message, so increment the message count. else { count++; } } /// Checks a message to see if it is a shutdown control message. /// /// The message to check. /// /// true if it is a shutdown control message, false otherwise. private bool IsShutdown(IMessage m) { bool result = CheckTextField(m, "TYPE", "TERMINATION_REQUEST"); //log.Debug("isShutdown = " + result); return result; } /// Checks a message to see if it is a report request control message. /// /// The message to check. /// /// true if it is a report request control message, false otherwise. private bool IsReport(IMessage m) { bool result = CheckTextField(m, "TYPE", "REPORT_REQUEST"); //log.Debug("isReport = " + result); return result; } /// Checks whether or not a text field on a message has the specified value. /// /// The message to check. /// The name of the field to check. /// The expected value of the field to compare with. /// /// trueIf the specified field has the specified value, fals otherwise. private static bool CheckTextField(IMessage m, string fieldName, string value) { /*log.Debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName + ", String value = " + value + "): called");*/ string comp = m.Headers.GetString(fieldName); return (comp != null) && comp == value; } /// Stops the message consumer and closes the connection. private void Shutdown() { connection.Stop(); channel.Dispose(); connection.Dispose(); } /// Sends the report message to the response location. private void SendReport() { string report = "Received " + count + "."; IMessage reportMessage = channel.CreateTextMessage(report); reportMessage.Headers.SetBoolean("BOOLEAN", false); //reportMessage.Headers.SetByte("BYTE", 5); reportMessage.Headers.SetDouble("DOUBLE", 3.141); reportMessage.Headers.SetFloat("FLOAT", 1.0f); reportMessage.Headers.SetInt("INT", 1); reportMessage.Headers.SetLong("LONG", 1); reportMessage.Headers.SetString("STRING", "hello"); reportMessage.Headers.SetShort("SHORT", 2); publisher.Send(reportMessage); Console.WriteLine("Sent report: " + report); } } }