/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Threading;
using log4net;
using NUnit.Framework;
using Apache.Qpid.Messaging;
using Apache.Qpid.Client.Qms;
namespace Apache.Qpid.Integration.Tests.testcases
{
///
/// DurableSubscriptionTest checks that durable subscriptions work, by sending messages that can be picked up by
/// a subscription that is currently off-line, and checking that the subscriber gets all of its messages when it
/// does come on-line.
///
/// CRC Card
/// Responsibilities | Collaborations
/// |
---|
/// |
///
[TestFixture, Category("Integration")]
public class DurableSubscriptionTest : BaseMessagingTestFixture
{
/// Used for debugging purposes.
private static ILog log = LogManager.GetLogger(typeof(DurableSubscriptionTest));
/// Defines the name of the test topic to use with the tests.
public const string TEST_ROUTING_KEY = "durablesubtestkey";
[SetUp]
public override void Init()
{
base.Init();
}
[TearDown]
public override void Shutdown()
{
base.Shutdown();
}
[Test]
public void TestDurableSubscriptionNoAck()
{
TestDurableSubscription(AcknowledgeMode.NoAcknowledge);
}
[Test]
public void TestDurableSubscriptionAutoAck()
{
TestDurableSubscription(AcknowledgeMode.AutoAcknowledge);
}
private void TestDurableSubscription(AcknowledgeMode ackMode)
{
// Create a topic with one producer and two consumers.
SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, false, null);
SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, false, null);
SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true,
true, "TestSubscription" + testId);
Thread.Sleep(500);
// Send messages and receive on both consumers.
testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
ConsumeNMessagesOnly(1, "A", testConsumer[1]);
ConsumeNMessagesOnly(1, "A", testConsumer[2]);
// Detach one consumer.
CloseEndPoint(2);
// Send message and receive on one consumer.
testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
ConsumeNMessagesOnly(1, "B", testConsumer[1]);
// Re-attach consumer, check that it gets the messages that it missed.
SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true,
true, "TestSubscription" + testId);
ConsumeNMessagesOnly(1, "B", testConsumer[2]);
// Clean up any open consumers at the end of the test.
CloseEndPoint(2);
CloseEndPoint(1);
CloseEndPoint(0);
}
/// Check that an uncommitted receive can be re-received, on re-consume from the same durable subscription.
[Test]
public void TestUncommittedReceiveCanBeRereceivedNewConnection()
{
SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
true, false, null);
SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
true, true, "foo"+testId);
// Send messages.
testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
testChannel[0].Commit();
// Try to receive messages, but don't commit them.
ConsumeNMessagesOnly(1, "C", testConsumer[1]);
// Close end-point 1 without committing the message, then re-open the subscription to consume again.
CloseEndPoint(1);
SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
true, true, "foo"+testId);
// Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
ConsumeNMessagesOnly(1, "C", testConsumer[1]);
testChannel[1].Commit();
CloseEndPoint(1);
CloseEndPoint(0);
}
/// Check that a rolled back receive can be re-received, on re-consume from the same durable subscription.
[Test]
public void TestRolledBackReceiveCanBeRereceivedNewConnection()
{
SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
true, false, null);
SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
true, true, "foo"+testId);
// Send messages.
testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
testChannel[0].Commit();
// Try to receive messages, but roll them back.
ConsumeNMessagesOnly(1, "D", testConsumer[1]);
testChannel[1].Rollback();
// Close end-point 1 without committing the message, then re-open the subscription to consume again.
CloseEndPoint(1);
SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
true, true, "foo"+testId);
// Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
ConsumeNMessagesOnly(1, "D", testConsumer[1]);
testChannel[1].Commit();
CloseEndPoint(1);
CloseEndPoint(0);
}
}
}