/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections; using System.Data.SqlClient; using System.IO; using System.Threading; using System.Transactions; using Apache.NMS.ActiveMQ.Transport; using Apache.NMS.ActiveMQ.Transport.Tcp; using NUnit.Framework; namespace Apache.NMS.ActiveMQ.Test { [TestFixture] [Category("Manual")] class DtcConsumerTransactionsTest : DtcTransactionsTestSupport { [SetUp] public override void SetUp() { base.SetUp(); this.dtcFactory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI)); this.dtcFactory.ConfiguredResourceManagerId = Guid.NewGuid().ToString(); } [Test] public void TestRedelivered() { // enqueue several messages PurgeDatabase(); PurgeAndFillQueue(); // receive just one using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { connection.Start(); using (INetTxSession session = connection.CreateNetTxSession()) { IQueue queue = session.GetQueue(testQueueName); // read message from queue and insert into db table using (IMessageConsumer consumer = session.CreateConsumer(queue)) { using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew)) using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString)) using (SqlCommand sqlInsertCommand = new SqlCommand()) { sqlConnection.Open(); sqlInsertCommand.Connection = sqlConnection; ITextMessage message = consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage; sqlInsertCommand.CommandText = string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text)); sqlInsertCommand.ExecuteNonQuery(); scoped.Complete(); } } session.Close(); } } // check that others message have status redelivered = false IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionURI)); using (IConnection connection = checkFactory.CreateConnection()) { connection.Start(); using (ISession session = connection.CreateSession()) using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName))) { IEnumerator enumerator = browser.GetEnumerator(); while (enumerator.MoveNext()) { IMessage msg = enumerator.Current as IMessage; Assert.IsNotNull(msg, "message is not in the queue!"); Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!"); } } } } [Test] public void TestRedeliveredCase2() { const int messageCount = 300; const int receiveCount = 150; // enqueue several messages PurgeDatabase(); PurgeAndFillQueue(messageCount); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { connection.Start(); // receive half of total messages for (int i = 0; i < receiveCount; i++) { using (INetTxSession session = connection.CreateNetTxSession()) { IQueue queue = session.GetQueue(testQueueName); // read message from queue and insert into db table using (IMessageConsumer consumer = session.CreateConsumer(queue)) { using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew)) using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString)) using (SqlCommand sqlInsertCommand = new SqlCommand()) { sqlConnection.Open(); sqlInsertCommand.Connection = sqlConnection; ITextMessage message = consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage; sqlInsertCommand.CommandText = string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text)); sqlInsertCommand.ExecuteNonQuery(); scoped.Complete(); } } session.Close(); } Tracer.Debug("Completed for loop iteration #" + i); } } // check that others message have status redelivered = false IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionURI)); using (IConnection connection = checkFactory.CreateConnection()) { connection.Start(); using (ISession session = connection.CreateSession()) using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName))) { IEnumerator enumerator = browser.GetEnumerator(); while (enumerator.MoveNext()) { IMessage msg = enumerator.Current as IMessage; Assert.IsNotNull(msg, "message is not in the queue!"); Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!"); } } } } [Test] public void TestRedeliveredCase3() { const int messageCount = 300; const int receiveCount = 150; // enqueue several messages PurgeDatabase(); PurgeAndFillQueue(messageCount); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { connection.Start(); // receive half of total messages using (INetTxSession session = connection.CreateNetTxSession()) { IQueue queue = session.GetQueue(testQueueName); // read message from queue and insert into db table using (IMessageConsumer consumer = session.CreateConsumer(queue)) { for (int i = 0; i < receiveCount; i++) { using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew)) using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString)) using (SqlCommand sqlInsertCommand = new SqlCommand()) { sqlConnection.Open(); sqlInsertCommand.Connection = sqlConnection; ITextMessage message = consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage; sqlInsertCommand.CommandText = string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text)); sqlInsertCommand.ExecuteNonQuery(); scoped.Complete(); } } } session.Close(); } } Tracer.Debug("First stage ok"); // check that others message have status redelivered = false IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionURI)); using (IConnection connection = checkFactory.CreateConnection()) { connection.Start(); using (ISession session = connection.CreateSession()) using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName))) { IEnumerator enumerator = browser.GetEnumerator(); while (enumerator.MoveNext()) { IMessage msg = enumerator.Current as IMessage; Assert.IsNotNull(msg, "message is not in the queue!"); Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!"); } } } } [Test] public void TestRedeliveredNoComplete() { const int messageCount = 300; const int receiveCount = 150; // enqueue several messages PurgeDatabase(); PurgeAndFillQueue(messageCount); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { // allow no redelivery so that message immediatly goes to the DLQ if first read fails connection.RedeliveryPolicy.MaximumRedeliveries = 0; connection.Start(); // receive half of total messages using (INetTxSession session = connection.CreateNetTxSession()) { IQueue queue = session.GetQueue(testQueueName); // read message from queue and insert into db table using (IMessageConsumer consumer = session.CreateConsumer(queue)) { for (int i = 0; i < receiveCount; i++) { using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew)) using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString)) using (SqlCommand sqlInsertCommand = new SqlCommand()) { sqlConnection.Open(); sqlInsertCommand.Connection = sqlConnection; ITextMessage message = consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage; sqlInsertCommand.CommandText = string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text)); sqlInsertCommand.ExecuteNonQuery(); } } } session.Close(); } } Tracer.Debug("First stage ok"); // check that others message have status redelivered = false IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionURI)); using (IConnection connection = checkFactory.CreateConnection()) { connection.Start(); using (ISession session = connection.CreateSession()) using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName))) { IEnumerator enumerator = browser.GetEnumerator(); while (enumerator.MoveNext()) { IMessage msg = enumerator.Current as IMessage; Assert.IsNotNull(msg, "message is not in the queue!"); Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!"); } } } } [Test] public void TestRecoveryAfterCommitFailsBeforeSent() { // Test initialize - Fills in queue with data to send and clears the DB. PurgeDatabase(); PurgeAndFillQueue(); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { connection.ExceptionListener += this.OnException; connection.Start(); ITransport transport = (connection as Connection).ITransport; TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; Assert.IsNotNull(tcpFaulty); tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook; ReadFromQueueAndInsertIntoDbWithCommit(connection); Thread.Sleep(1000); } // transaction should not have been commited VerifyNoMessagesInQueueNoRecovery(); // verify sql server has commited the transaction VerifyDatabaseTableIsFull(); // check messages are not present in the queue VerifyNoMessagesInQueue(); } [Test] public void TestRecoveryAfterCommitFailsAfterSent() { // Test initialize - Fills in queue with data to send and clears the DB. PurgeDatabase(); PurgeAndFillQueue(); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { connection.ExceptionListener += this.OnException; connection.Start(); ITransport transport = (connection as Connection).ITransport; TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; Assert.IsNotNull(tcpFaulty); tcpFaulty.OnewayCommandPostProcessor += this.FailOnCommitTransportHook; ReadFromQueueAndInsertIntoDbWithCommit(connection); Thread.Sleep(1000); } // transaction should have been commited VerifyNoMessagesInQueueNoRecovery(); // verify sql server has commited the transaction VerifyDatabaseTableIsFull(); // check messages are not present in the queue VerifyNoMessagesInQueue(); } [Test] public void TestIterativeTransactedConsume() { // Test initialize - Fills in queue with data to send and clears the DB. PurgeDatabase(); PurgeAndFillQueue(5 * MSG_COUNT); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { connection.ExceptionListener += this.OnException; connection.Start(); ReadFromQueueAndInsertIntoDbWithCommit(connection); ReadFromQueueAndInsertIntoDbWithCommit(connection); ReadFromQueueAndInsertIntoDbWithCommit(connection); ReadFromQueueAndInsertIntoDbWithCommit(connection); ReadFromQueueAndInsertIntoDbWithCommit(connection); Thread.Sleep(2000); } // verify sql server has commited the transaction VerifyDatabaseTableIsFull(5 * MSG_COUNT); // check messages are NOT present in the queue VerifyNoMessagesInQueueNoRecovery(); } [Test] public void TestConsumeWithDBInsertLogLocation() { const string logLocation = @".\RecoveryDir"; string newConnectionUri = connectionURI + "?nms.RecoveryPolicy.RecoveryLogger.Location=" + logLocation + "&nms.configuredResourceManagerId=" + dtcFactory.ConfiguredResourceManagerId; // Test initialize - Fills in queue with data to send and clears the DB. PurgeDatabase(); PurgeAndFillQueue(); if (Directory.Exists(logLocation)) { Directory.Delete(logLocation, true); } Directory.CreateDirectory(logLocation); dtcFactory = new NetTxConnectionFactory(ReplaceEnvVar(newConnectionUri)); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { connection.ExceptionListener += this.OnException; connection.Start(); ITransport transport = (connection as Connection).ITransport; TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; Assert.IsNotNull(tcpFaulty); tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook; ReadFromQueueAndInsertIntoDbWithCommit(connection); Thread.Sleep(2000); } Assert.AreEqual(1, Directory.GetFiles(logLocation).Length); // verify sql server has commited the transaction VerifyDatabaseTableIsFull(); // check messages are NOT present in the queue NetTxTransactionContext.ResetDtcRecovery(); VerifyBrokerQueueCount(0, newConnectionUri); Assert.AreEqual(0, Directory.GetFiles(logLocation).Length); } [Test] public void TestRecoverAfterTransactionScopeAborted() { // Test initialize - Fills in queue with data to send and clears the DB. PurgeDatabase(); PurgeAndFillQueue(); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { connection.ExceptionListener += this.OnException; connection.Start(); ReadFromQueueAndInsertIntoDbWithScopeAborted(connection); Thread.Sleep(2000); } // verify sql server has NOT commited the transaction VerifyDatabaseTableIsEmpty(); // check messages are present in the queue NetTxTransactionContext.ResetDtcRecovery(); VerifyBrokerQueueCount(); } [Test] public void TestRecoverAfterRollbackFailWhenScopeAborted() { // Test initialize - Fills in queue with data to send and clears the DB. PurgeDatabase(); PurgeAndFillQueue(); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { connection.ExceptionListener += this.OnException; connection.Start(); ITransport transport = (connection as Connection).ITransport; TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; Assert.IsNotNull(tcpFaulty); tcpFaulty.OnewayCommandPreProcessor += this.FailOnRollbackTransportHook; ReadFromQueueAndInsertIntoDbWithScopeAborted(connection); Thread.Sleep(2000); } // verify sql server has NOT commited the transaction VerifyDatabaseTableIsEmpty(); // check messages are recovered and present in the queue NetTxTransactionContext.ResetDtcRecovery(); VerifyBrokerQueueCount(); } [Test] public void TestRecoverAfterFailOnTransactionBeforePrepareSent() { // Test initialize - Fills in queue with data to send and clears the DB. PurgeDatabase(); PurgeAndFillQueue(); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { ITransport transport = (connection as Connection).ITransport; TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; Assert.IsNotNull(tcpFaulty); tcpFaulty.OnewayCommandPreProcessor += this.FailOnPrepareTransportHook; connection.ExceptionListener += this.OnException; connection.Start(); ReadFromQueueAndInsertIntoDbWithCommit(connection); Thread.Sleep(2000); } // Messages are visible since no prepare sent VerifyBrokerQueueCountNoRecovery(); // verify sql server has NOT commited the transaction VerifyDatabaseTableIsEmpty(); // check messages are present in the queue NetTxTransactionContext.ResetDtcRecovery(); VerifyBrokerQueueCount(); } [Test] public void TestRecoverAfterFailOnTransactionAfterPrepareSent() { // Test initialize - Fills in queue with data to send and clears the DB. PurgeDatabase(); PurgeAndFillQueue(); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { ITransport transport = (connection as Connection).ITransport; TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; Assert.IsNotNull(tcpFaulty); tcpFaulty.OnewayCommandPostProcessor += this.FailOnPrepareTransportHook; connection.ExceptionListener += this.OnException; connection.Start(); ReadFromQueueAndInsertIntoDbWithCommit(connection); Thread.Sleep(2000); } // not visible yet because it must be rolled back VerifyNoMessagesInQueueNoRecovery(); // verify sql server has NOT commited the transaction VerifyDatabaseTableIsEmpty(); // check messages are present in the queue NetTxTransactionContext.ResetDtcRecovery(); VerifyBrokerQueueCount(); } [Test] public void MessageShouldEnlistToTheCorrectTransaction() { const int messageCount = 100; const int receiveCount = 100; // enqueue several messages PurgeDatabase(); PurgeAndFillQueue(messageCount); var enlistment = new TestSinglePhaseCommit(); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) { connection.Start(); // receive half of total messages using (INetTxSession session = connection.CreateNetTxSession()) { IQueue queue = session.GetQueue(testQueueName); using (IMessageConsumer consumer = session.CreateConsumer(queue)) { for (int i = 0; i < receiveCount; i++) { try { using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew)) { ITextMessage message = consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage; Transaction.Current.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None); if (new Random().Next(2) == 0) { Tracer.InfoFormat("Throwing random Exception for Message {0}", message.NMSMessageId); throw new Exception(); } scoped.Complete(); } } catch { } Assert.False(enlistment.singlePhaseCommit, "No single phase commit should happen."); } } } } } internal class TestSinglePhaseCommit : ISinglePhaseNotification { public bool singlePhaseCommit = false; public void Prepare(PreparingEnlistment preparingEnlistment) { preparingEnlistment.Prepared(); } public void Commit(Enlistment enlistment) { enlistment.Done(); } public void Rollback(Enlistment enlistment) { enlistment.Done(); } public void InDoubt(Enlistment enlistment) { enlistment.Done(); } public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment) { Tracer.Info("Performing invalid single phase commit."); singlePhaseCommit = true; singlePhaseEnlistment.Committed(); } } #region Asynchronous Consumer Inside of a Transaction Test / Example private const int BATCH_COUNT = 5; private int batchSequence; private DependentTransaction batchTxControl; private readonly ManualResetEvent awaitBatchProcessingStart = new ManualResetEvent(false); [Test] public void TestTransactedAsyncConsumption() { PurgeDatabase(); PurgeAndFillQueue(MSG_COUNT * BATCH_COUNT); using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) using (NetTxSession session = connection.CreateNetTxSession() as NetTxSession) { IQueue queue = session.GetQueue(testQueueName); IMessageConsumer consumer = session.CreateConsumer(queue); consumer.Listener += AsyncTxAwareOnMessage; // Be carefull, message are dispatched once this is done, so you could receive // a Message outside a TX. We use the awaitBatchProcessingStart event here to // gate te OnMessage callback, once that method returns the Message is ack'd and // no longer has a chance to participate in a TX. connection.Start(); for (int i = 0; i < BATCH_COUNT; ++i) { using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew)) { session.Enlist(Transaction.Current); batchTxControl = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete); awaitBatchProcessingStart.Set(); scoped.Complete(); } // Reenlisting to fast seems to annoy the DTC. Also since DTC operations are // async we need to allow a little time for lag so that the last TX actually // completes before we start a new one. Thread.Sleep(250); } } // verify sql server has commited the transaction VerifyDatabaseTableIsFull(MSG_COUNT * BATCH_COUNT); // check messages are NOT present in the queue VerifyNoMessagesInQueue(); } private void AsyncTxAwareOnMessage(IMessage message) { awaitBatchProcessingStart.WaitOne(); try { using (TransactionScope scoped = new TransactionScope(batchTxControl)) using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString)) using (SqlCommand sqlInsertCommand = new SqlCommand()) { sqlConnection.Open(); sqlInsertCommand.Connection = sqlConnection; ITextMessage textMessage = message as ITextMessage; Assert.IsNotNull(message, "missing message"); sqlInsertCommand.CommandText = string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(textMessage.Text)); sqlInsertCommand.ExecuteNonQuery(); scoped.Complete(); } if(++batchSequence == MSG_COUNT) { batchSequence = 0; awaitBatchProcessingStart.Reset(); batchTxControl.Complete(); } } catch (Exception e) { Tracer.Debug("TX;Error from TransactionScope: " + e.Message); Tracer.Debug(e.ToString()); } } #endregion } }