/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Net; using System.Threading; using log4net; using Apache.Qpid.Client.Qms; using Apache.Qpid.Client; using Apache.Qpid.Messaging; using NUnit.Framework; namespace Apache.Qpid.Integration.Tests.testcases { /// /// Test the queue methods /// [TestFixture, Category("Integration")] public class ChannelQueueTest { private static ILog _logger = LogManager.GetLogger(typeof(ChannelQueueTest)); /// The default AMQ connection URL to use for tests. const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'"; const string _routingKey = "ServiceQ1"; private ExceptionListenerDelegate _exceptionDelegate; private AutoResetEvent _evt = new AutoResetEvent(false); private Exception _lastException = null; private IMessageConsumer _consumer; private IMessagePublisher _publisher; private IChannel _channel; private IConnection _connection; private string _queueName; [SetUp] public virtual void Init() { _logger.Info("public virtual void Init(): called"); // Create a connection to the broker. IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(DEFAULT_URI); _connection = new AMQConnection(connectionInfo); _logger.Info("Starting..."); // Register this to listen for exceptions on the test connection. _exceptionDelegate = new ExceptionListenerDelegate(OnException); _connection.ExceptionListener += _exceptionDelegate; // Establish a session on the broker. _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1); // Create a durable, non-temporary, non-exclusive queue. _queueName = _channel.GenerateUniqueName(); _channel.DeclareQueue(_queueName, true, false, false); _channel.Bind(_queueName, ExchangeNameDefaults.TOPIC, _routingKey); // Clear the most recent message and exception. _lastException = null; } [TearDown] public virtual void ShutDown() { _logger.Info("public virtual void Shutdown(): called"); if (_connection != null) { _logger.Info("Disposing connection."); _connection.Dispose(); _logger.Info("Connection disposed."); } } [Test] public void DeleteUsedQueue() { // Create the consumer _consumer = _channel.CreateConsumerBuilder(_queueName) .WithPrefetchLow(100) .Create(); _logger.Info("Consumer was created..."); // delete the queue _channel.DeleteQueue(_queueName, false, true, true); _logger.InfoFormat("Queue {0} was delete", _queueName); Assert.IsNull(_lastException); } [Test] public void DeleteUnusedQueue() { // delete the queue _channel.DeleteQueue(_queueName, true, true, true); _logger.InfoFormat("Queue {0} was delete", _queueName); Assert.IsNull(_lastException); } [Test] public void DeleteNonEmptyQueue() { // Create the publisher _publisher = _channel.CreatePublisherBuilder() .WithExchangeName(ExchangeNameDefaults.TOPIC) .WithRoutingKey(_routingKey) .Create(); _logger.Info("Publisher created..."); SendTestMessage("DeleteNonEmptyQueue Message 1"); try { _channel.DeleteQueue(_queueName, true, false, true); } catch (AMQException) { Assert.Fail("The test fails"); } } [Test] public void DeleteEmptyQueue() { // Create the publisher _publisher = _channel.CreatePublisherBuilder() .WithExchangeName(ExchangeNameDefaults.TOPIC) .WithRoutingKey(_routingKey) .Create(); _logger.Info("Publisher created..."); // delete an empty queue with ifEmpty = true _channel.DeleteQueue(_queueName, false, true, true); Assert.IsNull(_lastException); } [Test] public void DeleteQueueWithResponse() { // Create the publisher _publisher = _channel.CreatePublisherBuilder() .WithExchangeName(ExchangeNameDefaults.TOPIC) .WithRoutingKey(_routingKey) .Create(); _logger.Info("Publisher created..."); SendTestMessage("DeleteQueueWithResponse Message 1"); SendTestMessage("DeleteQueueWithResponse Message 2"); // delete the queue, the server must respond _channel.DeleteQueue(_queueName, false, false, false); } [Test] public void PurgeQueueWithResponse() { _publisher = _channel.CreatePublisherBuilder() .WithExchangeName(ExchangeNameDefaults.TOPIC) .WithRoutingKey(_routingKey) .Create(); _logger.Info("Pubisher created"); SendTestMessage("Message 1"); SendTestMessage("Message 2"); _channel.PurgeQueue(_queueName, false); } [Test] public void PurgeQueueWithOutResponse() { _publisher = _channel.CreatePublisherBuilder() .WithExchangeName(ExchangeNameDefaults.TOPIC) .WithRoutingKey(_routingKey) .Create(); _logger.Info("Pubisher created"); SendTestMessage("Message 1"); SendTestMessage("Message 2"); _channel.PurgeQueue(_queueName, true); } /// /// Callback method to handle any exceptions raised by the test connection. /// /// The connection exception. public void OnException(Exception e) { // Preserve the most recent exception in case test cases need to examine it. _lastException = e; // Notify any waiting threads that an exception event has occurred. _evt.Set(); } /// /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not /// depending on whether or not the message should be received by the consumer. /// /// Any exceptions raised by the connection will cause an Assert failure exception to be raised. /// /// /// The message to send. private void SendTestMessage(string msg) { // create the IMessage object IMessage msgSend = _channel.CreateTextMessage(msg); // send the message _publisher.Send(msgSend); _logger.InfoFormat("The messages \"{0}\" was sent", msg); } } }