/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Threading;
using log4net;
using NUnit.Framework;
using Apache.Qpid.Framing;
using Apache.Qpid.Messaging;
using Apache.Qpid.Client.Qms;
using Apache.Qpid.Client;
namespace Apache.Qpid.Integration.Tests.testcases
{
///
/// Sets up a producer/consumer pair to send test messages through a header exchange. The header exchange matching pattern is tested to
/// verify that it correctly matches or filters out messages based on their headers.
///
/// Check that a message matching all fields of a headers exchange is passed by the exchange.
/// Check that a message containing values for empty fields of a headers exchange is passed by the exchange.
/// Check that a message matching only some fields of a headers exhcnage is not passed by the exchange.
/// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by the exchange.
///
///
/// Remove the HeadersMatchingProducer class and rename this to HeaderExchangeTest. The producer and consumer are implemented
/// in a single test class to make running this as part of an automated test suite possible.
///
/// Consider not using a delegate to callback the OnMessage method. Easier to just call receive on the consumer but using the
/// callback does demonstrate how to do so.
[TestFixture, Category("Integration")]
public class HeadersExchangeTest : BaseMessagingTestFixture
{
private static ILog _logger = LogManager.GetLogger(typeof(HeadersExchangeTest));
/// Holds the default test timeout for broker communications before tests give up.
private static readonly int TIMEOUT = 2000;
/// Holds the name of the headers exchange to create to send test messages on.
private string _exchangeName = "ServiceQ1";
/// Used to preserve the most recent exception in case test cases need to examine it.
private Exception _lastException = null;
/// Used to preserve the most recent message from the test consumer.
private IMessage _lastMessage = null;
/// The test consumer to get messages from the broker with.
private IMessageConsumer _consumer;
private IMessagePublisher _publisher;
private AutoResetEvent _evt = new AutoResetEvent(false);
private MessageReceivedDelegate _msgRecDelegate;
private ExceptionListenerDelegate _exceptionDelegate;
/// Holds the test connection.
protected IConnection _connection;
/// Holds the test channel.
protected IChannel _channel;
[SetUp]
public override void Init()
{
// Ensure that the base init method is called. It establishes a connection with the broker.
base.Init();
connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
_connection = new AMQConnection(connectionInfo);
_channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 500, 300);
_logger.Info("Starting...");
_logger.Info("Exchange name is '" + _exchangeName + "'...");
// Register this to listen for exceptions on the test connection.
_exceptionDelegate = new ExceptionListenerDelegate(OnException);
_connection.ExceptionListener += _exceptionDelegate;
// Declare a new headers exchange with the name of the test service.
_channel.DeclareExchange(_exchangeName, ExchangeClassConstants.HEADERS);
// Create a non-durable, temporary (aka auto-delete), exclusive queue.
string queueName = _channel.GenerateUniqueName();
_channel.DeclareQueue(queueName, false, true, true);
// Bind the queue to the new headers exchange, setting up some header patterns for the exchange to match.
_channel.Bind(queueName, _exchangeName, null, CreatePatternAsFieldTable());
// Create a test consumer to consume messages from the test exchange.
_consumer = _channel.CreateConsumerBuilder(queueName)
.WithPrefetchLow(100)
.WithPrefetchHigh(500)
.WithNoLocal(false) // make sure we get our own messages
.Create();
// Register this to listen for messages on the consumer.
_msgRecDelegate = new MessageReceivedDelegate(OnMessage);
_consumer.OnMessage += _msgRecDelegate;
// Clear the most recent message and exception.
_lastException = null;
_lastMessage = null;
_publisher = _channel.CreatePublisherBuilder()
.WithExchangeName(_exchangeName)
.WithMandatory(true)
.Create();
_publisher.DeliveryMode = DeliveryMode.NonPersistent;
// Start all channel
_connection.Start();
}
///
/// Deregisters the on message delegate before closing the connection.
///
[TearDown]
public override void Shutdown()
{
_logger.Info("public void Shutdown(): called");
//_consumer.OnMessage -= _msgRecDelegate;
//_connection.ExceptionListener -= _exceptionDelegate;
_connection.Stop();
_connection.Close();
_connection.Dispose();
base.Shutdown();
}
///
/// Callback method that is passed any messages received on the test channel.
///
///
/// The received message.
public void OnMessage(IMessage message)
{
_logger.Debug(string.Format("message.Type = {0}", message.GetType()));
_logger.Debug("Got message '" + message + "'");
// Preserve the most recent exception so that test cases can examine it.
_lastMessage = message;
// Notify any waiting threads that a message has been received.
_evt.Set();
}
/// Callback method to handle any exceptions raised by the test connection.
///
/// The connection exception.
public void OnException(Exception e)
{
// Preserve the most recent exception in case test cases need to examine it.
_lastException = e;
// Notify any waiting threads that an exception event has occurred.
_evt.Set();
}
/// Check that a message matching all fields of a headers exchange is passed by the exchange.
[Test]
public void TestMatchAll()
{
IMessage msg = _channel.CreateTextMessage("matches match2=''");
msg.Headers["match1"] = "foo";
msg.Headers["match2"] = "";
// Use the SendTestMessage helper method to verify that the message was sent and received.
SendTestMessage(msg, true);
}
/// Check that a message containing values for empty fields of a headers exchange is passed by the exchange.
[Test]
public void TestMatchEmptyMatchesAnything()
{
// Send a test message that matches the headers exchange.
IMessage msg = _channel.CreateTextMessage("matches match1='foo' and match2='bar'");
msg.Headers["match1"] = "foo";
msg.Headers["match2"] = "bar";
// Use the SendTestMessage helper method to verify that the message was sent and received.
SendTestMessage(msg, true);
}
/// Check that a message matching only some fields of a headers exchange is not passed by the exchange.
[Test]
public void TestMatchOneFails()
{
IMessage msg = _channel.CreateTextMessage("not match - only match1");
msg.Headers["match1"] = "foo";
// Use the SendTestMessage helper method to verify that the message was sent and not received.
SendTestMessage(msg, false);
}
///
/// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by
/// the exchange.
///
[Test]
public void TestMatchExtraFields()
{
IMessage msg = _channel.CreateTextMessage("matches - extra headers");
msg.Headers["match1"] = "foo";
msg.Headers["match2"] = "bar";
msg.Headers["match3"] = "not required";
// Use the SendTestMessage helper method to verify that the message was sent and received.
SendTestMessage(msg, true);
}
///
/// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not
/// depending on whether or not the message should be received by the consumer.
///
/// Any exceptions raised by the connection will cause an Assert failure exception to be raised.
///
///
/// The message to send.
/// A flag to indicate whether or not the message should be received by the consumer.
private void SendTestMessage(IMessage msgSend, bool shouldPass)
{
_publisher.Send(msgSend);
_evt.WaitOne(TIMEOUT, true);
// Check that an exception other than not routable was raised in which case re-raise it as a test error.
if (_lastException != null && !(_lastException.InnerException is AMQUndeliveredException))
{
Assert.Fail("Exception {0} was raised by the broker connection.", _lastException);
}
// Check that a message was returned if the test is expecting the message to pass.
else if (shouldPass)
{
Assert.IsNotNull(_lastMessage, "Did not get a matching message from the headers exchange.");
}
// Check that a not routable exception was raised if the test is expecting the message to fail.
else if (_lastException != null && _lastException.InnerException is AMQUndeliveredException)
{
Assert.IsNull(_lastMessage, "Message could not be routed so consumer should not have received it.");
}
// The broker did not respond within the test timeout so fail the test.
else
{
Assert.Fail("The test timed out without a response from the broker.");
}
}
/// Returns a field table containing patterns to match the test header exchange against.
///
/// A field table containing test patterns.
private FieldTable CreatePatternAsFieldTable()
{
FieldTable matchTable = new FieldTable();
matchTable["match1"] = "foo";
matchTable["match2"] = "";
matchTable["x-match"] = "all";
return matchTable;
}
}
}