/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Threading; using log4net; using NUnit.Framework; using Apache.Qpid.Messaging; using Apache.Qpid.Client.Qms; using Apache.Qpid.Client; namespace Apache.Qpid.Integration.Tests.testcases { /// /// CommitRollbackTest /// ///

///
CRC Card
Responsibilities Collaborations ///
Check that an uncommitted send cannot be received. ///
Check that a committed send can be received. ///
Check that a rolled back send cannot be received. ///
Check that an uncommitted receive can be re-received. ///
Check that a committed receive cannot be re-received. ///
Check that a rolled back receive can be re-received. ///
///

[TestFixture, Category("Integration")] public class CommitRollbackTest : BaseMessagingTestFixture { /// Used for debugging purposes. private static ILog log = LogManager.GetLogger(typeof(CommitRollbackTest)); /// Defines the name of the test topic to use with the tests. public const string TEST_ROUTING_KEY = "commitrollbacktestkey"; /// Used to count test messages received so far. private int messageReceivedCount; /// Used to hold the expected number of messages to receive. private int expectedMessageCount; /// Monitor used to signal succesfull receipt of all test messages. AutoResetEvent finishedEvent; /// Flag used to indicate that all messages really were received, and that the test did not just time out. private bool allReceived; [SetUp] public override void Init() { base.Init(); // Create one producer and one consumer, p2p, tx, consumer with queue bound to producers routing key. SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, true, false, null); SetUpEndPoint(1, true, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, true, false, null); // Clear counts messageReceivedCount = 0; expectedMessageCount = 0; finishedEvent = new AutoResetEvent(false); allReceived = false; } [TearDown] public override void Shutdown() { try { // Clean up after the test. CloseEndPoint(0); CloseEndPoint(1); } finally { base.Shutdown(); } } /// Check that an uncommitted send cannot be received. [Test] public void TestUncommittedSendNotReceived() { // Send messages. testProducer[0].Send(testChannel[0].CreateTextMessage("A")); // Try to receive messages. ConsumeNMessagesOnly(0, "A", testConsumer[1]); testChannel[1].Commit(); } /// Check that a committed send can be received. [Test] public void TestCommittedSendReceived() { // Send messages. testProducer[0].Send(testChannel[0].CreateTextMessage("B")); testChannel[0].Commit(); // Try to receive messages. ConsumeNMessagesOnly(1, "B", testConsumer[1]); testChannel[1].Commit(); } /// Check that a rolled back send cannot be received. [Test] public void TestRolledBackSendNotReceived() { // Send messages. testProducer[0].Send(testChannel[0].CreateTextMessage("B")); testChannel[0].Rollback(); // Try to receive messages. ConsumeNMessagesOnly(0, "B", testConsumer[1]); testChannel[1].Commit(); } /// Check that an uncommitted receive can be re-received. [Test] public void TestUncommittedReceiveCanBeRereceived() { // Create a third end-point as an alternative delivery route for the message. SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, true, false, null); // Send messages. testProducer[0].Send(testChannel[0].CreateTextMessage("C")); testChannel[0].Commit(); // Try to receive messages. ConsumeNMessagesOnly(1, "C", testConsumer[1]); // Close end-point 1 without committing the message, then re-open to consume again. CloseEndPoint(1); // Check that the message was released from the rolled back end-point an can be received on the alternative one instead. ConsumeNMessagesOnly(1, "C", testConsumer[2]); CloseEndPoint(2); } /// Check that a committed receive cannot be re-received. [Test] public void TestCommittedReceiveNotRereceived() { // Send messages. testProducer[0].Send(testChannel[0].CreateTextMessage("D")); testChannel[0].Commit(); // Try to receive messages. ConsumeNMessagesOnly(1, "D", testConsumer[1]); testChannel[1].Commit(); // Try to receive messages. ConsumeNMessagesOnly(0, "D", testConsumer[1]); } /// Check that a rolled back receive can be re-received. [Test] public void TestRolledBackReceiveCanBeRereceived() { // Send messages. testProducer[0].Send(testChannel[0].CreateTextMessage("E")); testChannel[0].Commit(); // Try to receive messages. ConsumeNMessagesOnly(1, "E", testConsumer[1]); testChannel[1].Rollback(); // Try to receive messages. ConsumeNMessagesOnly(1, "E", testConsumer[1]); } [Test] public void TestReceiveAndSendRollback() { // Send messages testProducer[0].Send(testChannel[0].CreateTextMessage("F")); testChannel[0].Commit(); // Try to receive messages. ConsumeNMessagesOnly(1, "F", testConsumer[1]); testProducer[1].Send(testChannel[1].CreateTextMessage("G")); testChannel[1].Rollback(); // Try to receive messages. ConsumeNMessagesOnly(1, "F", testConsumer[1]); } [Test] public void TestReceivePrePublished() { // Send messages for (int i = 0; i < 10; ++i) { testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i)); testChannel[0].Commit(); } for (int i = 0; i < 10; ++i) { ConsumeNMessages(1, "G"+i, testConsumer[1]); } testChannel[1].Commit(); } [Test] public void TestReceivePrePublishedOnMessageHandler() { testConsumer[1].OnMessage += new MessageReceivedDelegate(OnMessage); // Send messages for (int i = 0; i < 10; ++i) { testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i)); testChannel[0].Commit(); } expectedMessageCount = 10; finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false); // Check that all messages really were received. Assert.IsTrue(allReceived, "All messages were not received, only got: " + messageReceivedCount + " but wanted " + expectedMessageCount); testChannel[1].Commit(); } /// Atomically increments the message count on every message, and signals once all messages in the test are received. public void OnMessage(IMessage m) { int newCount = Interlocked.Increment(ref messageReceivedCount); if (newCount >= expectedMessageCount) { allReceived = true; finishedEvent.Set(); } } } }