/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Threading; using log4net; using NUnit.Framework; using Apache.Qpid.Framing; using Apache.Qpid.Messaging; using Apache.Qpid.Client.Qms; using Apache.Qpid.Client; namespace Apache.Qpid.Integration.Tests.testcases { /// /// Sets up a producer/consumer pair to send test messages through a header exchange. The header exchange matching pattern is tested to /// verify that it correctly matches or filters out messages based on their headers. /// /// Check that a message matching all fields of a headers exchange is passed by the exchange. /// Check that a message containing values for empty fields of a headers exchange is passed by the exchange. /// Check that a message matching only some fields of a headers exhcnage is not passed by the exchange. /// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by the exchange. /// /// /// Remove the HeadersMatchingProducer class and rename this to HeaderExchangeTest. The producer and consumer are implemented /// in a single test class to make running this as part of an automated test suite possible. /// /// Consider not using a delegate to callback the OnMessage method. Easier to just call receive on the consumer but using the /// callback does demonstrate how to do so. [TestFixture, Category("Integration")] public class HeadersExchangeTest : BaseMessagingTestFixture { private static ILog _logger = LogManager.GetLogger(typeof(HeadersExchangeTest)); /// Holds the default test timeout for broker communications before tests give up. private static readonly int TIMEOUT = 2000; /// Holds the name of the headers exchange to create to send test messages on. private string _exchangeName = "ServiceQ1"; /// Used to preserve the most recent exception in case test cases need to examine it. private Exception _lastException = null; /// Used to preserve the most recent message from the test consumer. private IMessage _lastMessage = null; /// The test consumer to get messages from the broker with. private IMessageConsumer _consumer; private IMessagePublisher _publisher; private AutoResetEvent _evt = new AutoResetEvent(false); private MessageReceivedDelegate _msgRecDelegate; private ExceptionListenerDelegate _exceptionDelegate; /// Holds the test connection. protected IConnection _connection; /// Holds the test channel. protected IChannel _channel; [SetUp] public override void Init() { // Ensure that the base init method is called. It establishes a connection with the broker. base.Init(); connectionInfo = QpidConnectionInfo.FromUrl(connectionUri); _connection = new AMQConnection(connectionInfo); _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 500, 300); _logger.Info("Starting..."); _logger.Info("Exchange name is '" + _exchangeName + "'..."); // Register this to listen for exceptions on the test connection. _exceptionDelegate = new ExceptionListenerDelegate(OnException); _connection.ExceptionListener += _exceptionDelegate; // Declare a new headers exchange with the name of the test service. _channel.DeclareExchange(_exchangeName, ExchangeClassConstants.HEADERS); // Create a non-durable, temporary (aka auto-delete), exclusive queue. string queueName = _channel.GenerateUniqueName(); _channel.DeclareQueue(queueName, false, true, true); // Bind the queue to the new headers exchange, setting up some header patterns for the exchange to match. _channel.Bind(queueName, _exchangeName, null, CreatePatternAsFieldTable()); // Create a test consumer to consume messages from the test exchange. _consumer = _channel.CreateConsumerBuilder(queueName) .WithPrefetchLow(100) .WithPrefetchHigh(500) .WithNoLocal(false) // make sure we get our own messages .Create(); // Register this to listen for messages on the consumer. _msgRecDelegate = new MessageReceivedDelegate(OnMessage); _consumer.OnMessage += _msgRecDelegate; // Clear the most recent message and exception. _lastException = null; _lastMessage = null; _publisher = _channel.CreatePublisherBuilder() .WithExchangeName(_exchangeName) .WithMandatory(true) .Create(); _publisher.DeliveryMode = DeliveryMode.NonPersistent; // Start all channel _connection.Start(); } /// /// Deregisters the on message delegate before closing the connection. /// [TearDown] public override void Shutdown() { _logger.Info("public void Shutdown(): called"); //_consumer.OnMessage -= _msgRecDelegate; //_connection.ExceptionListener -= _exceptionDelegate; _connection.Stop(); _connection.Close(); _connection.Dispose(); base.Shutdown(); } /// /// Callback method that is passed any messages received on the test channel. /// /// /// The received message. public void OnMessage(IMessage message) { _logger.Debug(string.Format("message.Type = {0}", message.GetType())); _logger.Debug("Got message '" + message + "'"); // Preserve the most recent exception so that test cases can examine it. _lastMessage = message; // Notify any waiting threads that a message has been received. _evt.Set(); } /// Callback method to handle any exceptions raised by the test connection. /// /// The connection exception. public void OnException(Exception e) { // Preserve the most recent exception in case test cases need to examine it. _lastException = e; // Notify any waiting threads that an exception event has occurred. _evt.Set(); } /// Check that a message matching all fields of a headers exchange is passed by the exchange. [Test] public void TestMatchAll() { IMessage msg = _channel.CreateTextMessage("matches match2=''"); msg.Headers["match1"] = "foo"; msg.Headers["match2"] = ""; // Use the SendTestMessage helper method to verify that the message was sent and received. SendTestMessage(msg, true); } /// Check that a message containing values for empty fields of a headers exchange is passed by the exchange. [Test] public void TestMatchEmptyMatchesAnything() { // Send a test message that matches the headers exchange. IMessage msg = _channel.CreateTextMessage("matches match1='foo' and match2='bar'"); msg.Headers["match1"] = "foo"; msg.Headers["match2"] = "bar"; // Use the SendTestMessage helper method to verify that the message was sent and received. SendTestMessage(msg, true); } /// Check that a message matching only some fields of a headers exchange is not passed by the exchange. [Test] public void TestMatchOneFails() { IMessage msg = _channel.CreateTextMessage("not match - only match1"); msg.Headers["match1"] = "foo"; // Use the SendTestMessage helper method to verify that the message was sent and not received. SendTestMessage(msg, false); } /// /// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by /// the exchange. /// [Test] public void TestMatchExtraFields() { IMessage msg = _channel.CreateTextMessage("matches - extra headers"); msg.Headers["match1"] = "foo"; msg.Headers["match2"] = "bar"; msg.Headers["match3"] = "not required"; // Use the SendTestMessage helper method to verify that the message was sent and received. SendTestMessage(msg, true); } /// /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not /// depending on whether or not the message should be received by the consumer. /// /// Any exceptions raised by the connection will cause an Assert failure exception to be raised. /// /// /// The message to send. /// A flag to indicate whether or not the message should be received by the consumer. private void SendTestMessage(IMessage msgSend, bool shouldPass) { _publisher.Send(msgSend); _evt.WaitOne(TIMEOUT, true); // Check that an exception other than not routable was raised in which case re-raise it as a test error. if (_lastException != null && !(_lastException.InnerException is AMQUndeliveredException)) { Assert.Fail("Exception {0} was raised by the broker connection.", _lastException); } // Check that a message was returned if the test is expecting the message to pass. else if (shouldPass) { Assert.IsNotNull(_lastMessage, "Did not get a matching message from the headers exchange."); } // Check that a not routable exception was raised if the test is expecting the message to fail. else if (_lastException != null && _lastException.InnerException is AMQUndeliveredException) { Assert.IsNull(_lastMessage, "Message could not be routed so consumer should not have received it."); } // The broker did not respond within the test timeout so fail the test. else { Assert.Fail("The test timed out without a response from the broker."); } } /// Returns a field table containing patterns to match the test header exchange against. /// /// A field table containing test patterns. private FieldTable CreatePatternAsFieldTable() { FieldTable matchTable = new FieldTable(); matchTable["match1"] = "foo"; matchTable["match2"] = ""; matchTable["x-match"] = "all"; return matchTable; } } }