/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Threading; using Apache.NMS.Test; using NUnit.Framework; namespace Apache.NMS.ActiveMQ.Test { public abstract class AMQTransactionTestSupport : NMSTestSupport { private const int MESSAGE_COUNT = 5; private const string MESSAGE_TEXT = "message"; private IConnectionFactory connectionFactory; private IConnection connection; private ISession session; private IMessageConsumer consumer; private IMessageProducer producer; private IDestination destination; private int batchCount = 10; private int batchSize = 20; // for message listener test private LinkedList unackMessages = new LinkedList(); private LinkedList ackMessages = new LinkedList(); private bool resendPhase; [SetUp] public override void SetUp() { base.SetUp(); this.connectionFactory = new ConnectionFactory(); this.resendPhase = false; Reconnect(); } [TearDown] public override void TearDown() { this.session.Close(); this.session = null; this.connection.Close(); this.connection = null; this.unackMessages.Clear(); this.ackMessages.Clear(); base.TearDown(); } protected abstract bool Topic { get; } protected abstract String TestClientId { get; } protected abstract String Subscription { get; } protected abstract String DestinationName { get; } public override IConnection CreateConnection() { return this.connectionFactory.CreateConnection(); } protected void BeginTx() { } protected void CommitTx() { session.Commit(); } protected void RollbackTx() { session.Rollback(); } [Test] public void TestSessionCommitedWithoutReceivingMessage() { Assert.IsTrue(session.Transacted); IMessage message = consumer.Receive(new TimeSpan(0, 0, 0, 0, 100)); Assert.IsNull(message); session.Commit(); Assert.Pass("When getting here. It is ok"); } [Test] public void TestSendReceiveTransactedBatches() { ITextMessage message = session.CreateTextMessage("Batch IMessage"); for(int j = 0; j < batchCount; j++) { BeginTx(); for(int i = 0; i < batchSize; i++) { producer.Send(message); } CommitTx(); BeginTx(); for(int i = 0; i < batchSize; i++) { message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(5000)); Assert.IsNotNull(message, "Received only " + i + " messages in batch " + j); Assert.AreEqual("Batch IMessage", message.Text); } CommitTx(); } } [Test] public void TestSendRollback() { IMessage[] outbound = new IMessage[] {session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage")}; // sends a message BeginTx(); producer.Send(outbound[0]); CommitTx(); // sends a message that gets rollbacked BeginTx(); producer.Send(session.CreateTextMessage("I'm going to get rolled back.")); RollbackTx(); // sends a message BeginTx(); producer.Send(outbound[1]); CommitTx(); // receives the first message BeginTx(); LinkedList messages = new LinkedList(); IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000)); messages.AddLast(message); // receives the second message message = consumer.Receive(TimeSpan.FromMilliseconds(5000)); messages.AddLast(message); // validates that the rollbacked was not consumed CommitTx(); IMessage[] inbound = new IMessage[messages.Count]; messages.CopyTo(inbound, 0); AssertTextMessagesEqual(outbound, inbound, "Rollback did not work."); } [Test] public void TestSendSessionClose() { IMessage[] outbound = new IMessage[] { session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage")}; // sends a message BeginTx(); producer.Send(outbound[0]); CommitTx(); // sends a message that gets rollbacked BeginTx(); producer.Send(session.CreateTextMessage("I'm going to get rolled back.")); consumer.Close(); ReconnectSession(); // sends a message producer.Send(outbound[1]); CommitTx(); // receives the first message LinkedList messages = new LinkedList(); BeginTx(); IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(message); messages.AddLast(message); // receives the second message message = consumer.Receive(TimeSpan.FromMilliseconds(5000)); Assert.IsNotNull(message); messages.AddLast(message); // validates that the rollbacked was not consumed CommitTx(); IMessage[] inbound = new IMessage[messages.Count]; messages.CopyTo(inbound, 0); AssertTextMessagesEqual(outbound, inbound, "Rollback did not work."); } [Test] public void TestSendSessionAndConnectionClose() { IMessage[] outbound = new IMessage[] { session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage")}; // sends a message BeginTx(); producer.Send(outbound[0]); CommitTx(); // sends a message that gets rollbacked BeginTx(); producer.Send(session.CreateTextMessage("I'm going to get rolled back.")); consumer.Close(); session.Close(); Reconnect(); // sends a message BeginTx(); producer.Send(outbound[1]); CommitTx(); // receives the first message LinkedList messages = new LinkedList(); BeginTx(); IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(message); messages.AddLast(message); // receives the second message message = consumer.Receive(TimeSpan.FromMilliseconds(5000)); Assert.IsNotNull(message); messages.AddLast(message); // validates that the rollbacked was not consumed CommitTx(); IMessage[] inbound = new IMessage[messages.Count]; messages.CopyTo(inbound, 0); AssertTextMessagesEqual(outbound, inbound, "Rollback did not work."); } [Test] public void TestReceiveRollback() { IMessage[] outbound = new IMessage[] { session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage")}; // lets consume any outstanding messages from prev test runs BeginTx(); bool needCommit = false; while(consumer.ReceiveNoWait() != null) { needCommit = true; } if(needCommit) { CommitTx(); } // sent both messages BeginTx(); producer.Send(outbound[0]); producer.Send(outbound[1]); CommitTx(); LinkedList messages = new LinkedList(); BeginTx(); IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(message); messages.AddLast(message); AssertEquals(outbound[0], message); CommitTx(); // Rollback so we can get that last message again. BeginTx(); message = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(message); AssertEquals(outbound[1], message); RollbackTx(); // Consume again.. the prev message should // get redelivered. BeginTx(); message = consumer.Receive(TimeSpan.FromMilliseconds(5000)); Assert.IsNotNull(message, "Should have re-received the message again!"); messages.AddLast(message); CommitTx(); IMessage[] inbound = new IMessage[messages.Count]; messages.CopyTo(inbound, 0); AssertTextMessagesEqual(outbound, inbound, "Rollback did not work."); } [Test] public void TestReceiveTwoThenRollback() { IMessage[] outbound = new IMessage[] { session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage")}; // lets consume any outstanding messages from prev test runs BeginTx(); bool needCommit = false; while(consumer.ReceiveNoWait() != null) { needCommit = true; } if(needCommit) { CommitTx(); } BeginTx(); producer.Send(outbound[0]); producer.Send(outbound[1]); CommitTx(); LinkedList messages = new LinkedList(); BeginTx(); IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000)); AssertEquals(outbound[0], message); message = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(message); AssertEquals(outbound[1], message); RollbackTx(); // Consume again.. the prev message should // get redelivered. BeginTx(); message = consumer.Receive(TimeSpan.FromMilliseconds(5000)); Assert.IsNotNull(message, "Should have re-received the first message again!"); messages.AddLast(message); AssertEquals(outbound[0], message); message = consumer.Receive(TimeSpan.FromMilliseconds(5000)); Assert.IsNotNull(message, "Should have re-received the first message again!"); messages.AddLast(message); AssertEquals(outbound[1], message); Assert.IsNull(consumer.ReceiveNoWait()); CommitTx(); IMessage[] inbound = new IMessage[messages.Count]; messages.CopyTo(inbound, 0); AssertTextMessagesEqual(outbound, inbound, "Rollback did not work."); } [Test] public void TestSendReceiveWithPrefetchOne() { SetPrefetchToOne(); ReconnectSession(); IMessage[] outbound = new IMessage[] { session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage"), session.CreateTextMessage("Third IMessage"), session.CreateTextMessage("Fourth IMessage")}; BeginTx(); for(int i = 0; i < outbound.Length; i++) { // sends a message producer.Send(outbound[i]); } CommitTx(); // receives the first message BeginTx(); for(int i = 0; i < outbound.Length; i++) { IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(message); } // validates that the rollbacked was not consumed CommitTx(); } [Test] public void TestReceiveTwoThenRollbackManyTimes() { for(int i = 0; i < 5; i++) { TestReceiveTwoThenRollback(); } } [Test] public void TestSendRollbackWithPrefetchOfOne() { SetPrefetchToOne(); TestSendRollback(); } [Test] public void TestReceiveRollbackWithPrefetchOfOne() { SetPrefetchToOne(); TestReceiveRollback(); } [Test] public void TestCloseConsumerBeforeCommit() { ITextMessage[] outbound = new ITextMessage[] { session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage")}; // lets consume any outstanding messages from prev test runs BeginTx(); bool needCommit = false; while(consumer.ReceiveNoWait() != null) { needCommit = true; } if(needCommit) { CommitTx(); } // sends the messages BeginTx(); producer.Send(outbound[0]); producer.Send(outbound[1]); CommitTx(); BeginTx(); ITextMessage message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.AreEqual(outbound[0].Text, message.Text); // Close the consumer before the Commit. This should not cause the // received message to Rollback. consumer.Close(); CommitTx(); // Create a new consumer consumer = CreateMessageConsumer(); BeginTx(); message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(message); Assert.AreEqual(outbound[1].Text, message.Text); CommitTx(); } protected void Reconnect() { if(this.connection != null) { // Close the prev connection. this.connection.Close(); this.connection = null; } this.session = null; this.connection = CreateConnection(this.TestClientId); ReconnectSession(); this.connection.Start(); } protected void ReconnectSession() { if(this.session != null) { this.session.Close(); } this.session = this.connection.CreateSession(AcknowledgementMode.Transactional); if( this.Topic == true ) { this.destination = this.session.GetTopic(this.DestinationName); } else { this.destination = this.session.GetQueue(this.DestinationName); } this.producer = this.session.CreateProducer(destination); this.consumer = CreateMessageConsumer(); } protected IMessageConsumer CreateMessageConsumer() { if(this.Subscription != null) { return this.session.CreateDurableConsumer((ITopic) destination, Subscription, null, false); } else { return this.session.CreateConsumer(destination); } } protected void SetPrefetchToOne() { GetPrefetchPolicy().SetAll(1); } protected PrefetchPolicy GetPrefetchPolicy() { return ((Connection) connection).PrefetchPolicy; } [Test] public void TestTransactionEventsFired() { IMessage[] outbound = new IMessage[] {session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage")}; session.TransactionStartedListener += TransactionStarted; session.TransactionCommittedListener += TransactionCommitted; session.TransactionRolledBackListener += TransactionRolledBack; // sends a message BeginTx(); producer.Send(outbound[0]); Assert.IsTrue(this.transactionStarted); CommitTx(); Assert.IsFalse(this.transactionStarted); Assert.IsTrue(this.transactionCommitted); // sends a message that gets rollbacked BeginTx(); producer.Send(session.CreateTextMessage("I'm going to get rolled back.")); Assert.IsTrue(this.transactionStarted); RollbackTx(); Assert.IsFalse(this.transactionStarted); Assert.IsTrue(this.transactionRolledBack); // sends a message BeginTx(); producer.Send(outbound[1]); Assert.IsTrue(this.transactionStarted); CommitTx(); Assert.IsFalse(this.transactionStarted); Assert.IsTrue(this.transactionCommitted); // receives the first message BeginTx(); LinkedList messages = new LinkedList(); IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000)); messages.AddLast(message); // receives the second message message = consumer.Receive(TimeSpan.FromMilliseconds(5000)); Assert.IsTrue(this.transactionStarted); messages.AddLast(message); // validates that the rollbacked was not consumed CommitTx(); Assert.IsFalse(this.transactionStarted); Assert.IsTrue(this.transactionCommitted); IMessage[] inbound = new IMessage[messages.Count]; messages.CopyTo(inbound, 0); AssertTextMessagesEqual(outbound, inbound, "Rollback did not work."); } [Test] public void TestMessageListenerGeneratesTxEvents() { messageReceived = false; session.TransactionStartedListener += TransactionStarted; session.TransactionCommittedListener += TransactionCommitted; session.TransactionRolledBackListener += TransactionRolledBack; // Send messages for(int i = 0; i < MESSAGE_COUNT; i++) { producer.Send(session.CreateTextMessage(MESSAGE_TEXT + i)); } Assert.IsTrue(this.transactionStarted); CommitTx(); Assert.IsFalse(this.transactionStarted); Assert.IsTrue(this.transactionCommitted); consumer.Listener += new MessageListener(OnAsyncTxMessage); // wait receive WaitForMessageToBeReceived(); Assert.IsTrue(this.transactionStarted); CommitTx(); Assert.IsFalse(this.transactionStarted); Assert.IsTrue(this.transactionCommitted); } private bool transactionStarted = false; private bool transactionCommitted = false; private bool transactionRolledBack = false; private bool messageReceived = false; public void OnAsyncTxMessage(IMessage message) { messageReceived = true; } private void WaitForMessageToBeReceived() { for(int i = 0; i < 100 && !messageReceived; i++) { Thread.Sleep(100); } Assert.IsTrue(messageReceived); } private void TransactionStarted(ISession session) { transactionStarted = true; transactionCommitted = false; transactionRolledBack = false; } private void TransactionCommitted(ISession session) { transactionStarted = false; transactionCommitted = true; transactionRolledBack = false; } private void TransactionRolledBack(ISession session) { transactionStarted = false; transactionCommitted = false; transactionRolledBack = true; } [Test] public void TestMessageListener() { // Send messages for(int i = 0; i < MESSAGE_COUNT; i++) { producer.Send(session.CreateTextMessage(MESSAGE_TEXT + i)); } CommitTx(); consumer.Listener += new MessageListener(OnMessage); // wait receive WaitReceiveUnack(); Assert.AreEqual(unackMessages.Count, MESSAGE_COUNT); // resend phase WaitReceiveAck(); Assert.AreEqual(ackMessages.Count, MESSAGE_COUNT); // should no longer re-receive consumer.Listener -= new MessageListener(OnMessage); Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(500))); Reconnect(); } public void OnMessage(IMessage message) { if(!resendPhase) { unackMessages.AddLast(message); if(unackMessages.Count == MESSAGE_COUNT) { try { RollbackTx(); resendPhase = true; } catch { } } } else { ackMessages.AddLast(message); if(ackMessages.Count == MESSAGE_COUNT) { try { CommitTx(); } catch { } } } } private void WaitReceiveUnack() { for(int i = 0; i < 100 && !resendPhase; i++) { Thread.Sleep(100); } Assert.IsTrue(resendPhase); } private void WaitReceiveAck() { for(int i = 0; i < 100 && ackMessages.Count < MESSAGE_COUNT; i++) { Thread.Sleep(100); } Assert.IsFalse(ackMessages.Count < MESSAGE_COUNT); } } }