/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Threading; using Apache.NMS; using Apache.NMS.Util; using Apache.NMS.Test; using Apache.NMS.ActiveMQ; using Apache.NMS.ActiveMQ.Commands; using Apache.NMS.ActiveMQ.Transport; using Apache.NMS.ActiveMQ.Transport.Failover; using Apache.NMS.ActiveMQ.Transport.Tcp; using Apache.NMS.ActiveMQ.Transport.Mock; using NUnit.Framework; namespace Apache.NMS.ActiveMQ.Test { [TestFixture] public class FailoverTransportTest { private List sent; private List received; private List exceptions; private const int MAX_ATTEMPTS = 30; private const int MESSAGE_COUNT = 5; private Connection connection; private int msgCount = 5; private bool interrupted = false; private bool resumed = false; protected AutoResetEvent semaphore = new AutoResetEvent(false); int sessionIdx = 1; int consumerIdx = 1; int producerIdx = 1; private void OnException(ITransport transport, Exception exception) { Tracer.Debug("Test: Received Exception from Transport: " + exception); exceptions.Add(exception); } private void OnCommand(ITransport transport, Command command) { Tracer.DebugFormat("Test: Received Command from Transport: {0}", command); received.Add(command); } private void OnOutgoingCommand(ITransport transport, Command command) { Tracer.DebugFormat("FailoverTransportTest::OnOutgoingCommand - {0}", command); sent.Add(command); } private void OnResumed(ITransport sender) { Tracer.DebugFormat("FailoverTransportTest::OnResumed - {0}", sender.RemoteAddress); // Ensure the current mock transport has the correct outgoing command handler MockTransport mock = sender as MockTransport; Assert.IsNotNull(mock); mock.OutgoingCommand = OnOutgoingCommand; } private void OnInterrupted(ITransport sender) { } private void VerifyCommandHandlerSetting(ITransport transport, MockTransport mock) { // Walk the stack of wrapper transports. ITransport failoverTransportTarget = mock.Command.Target as ITransport; Assert.IsNotNull(failoverTransportTarget); ITransport mutexTransportTarget = failoverTransportTarget.Command.Target as ITransport; Assert.IsNotNull(mutexTransportTarget); ITransport responseCorrelatorTransportTarget = mutexTransportTarget.Command.Target as ITransport; Assert.IsNotNull(responseCorrelatorTransportTarget); Assert.AreEqual(transport.Command.Target, responseCorrelatorTransportTarget.Command.Target); } [SetUp] public void init() { sent = new List(); received = new List(); exceptions = new List(); sessionIdx = 1; consumerIdx = 1; producerIdx = 1; this.connection = null; this.msgCount = MESSAGE_COUNT; this.interrupted = false; this.resumed = false; } [Test] public void FailoverTransportCreateTest() { Uri uri = new Uri("failover:(mock://localhost:61616)?transport.randomize=false"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover, "Failed to create Transport"); Assert.IsFalse(failover.Randomize, "Failed to properly set Randomize flag"); transport.Start(); Thread.Sleep(2000); Assert.IsTrue(failover.IsConnected, "Transport should be connected"); } } [Test] public void FailoverTransportWithBackupsTest() { Uri uri = new Uri("failover:(mock://localhost:61616,mock://localhost:61618)?transport.randomize=false&transport.backup=true"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.Randomize); Assert.IsTrue(failover.Backup); transport.Start(); Thread.Sleep(1000); Assert.IsTrue(failover.IsConnected); } } [Test] public void FailoverTransportCreateFailOnCreateTest() { Uri uri = new Uri("failover:(mock://localhost:61616?transport.failOnCreate=true)?" + "transport.useExponentialBackOff=false&transport.maxReconnectAttempts=3&transport.initialReconnectDelay=100"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.UseExponentialBackOff); Assert.AreEqual(3, failover.MaxReconnectAttempts); Assert.AreEqual(100, failover.InitialReconnectDelay); transport.Start(); Thread.Sleep(2000); Assert.IsNotEmpty(this.exceptions); Assert.IsFalse(failover.IsConnected); } } [Test] public void FailoverTransportCreateFailOnCreateTest2() { Uri uri = new Uri("failover:(mock://localhost:61616?transport.failOnCreate=true)?" + "transport.useExponentialBackOff=false&transport.startupMaxReconnectAttempts=3&transport.initialReconnectDelay=100"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.UseExponentialBackOff); Assert.AreEqual(3, failover.StartupMaxReconnectAttempts); Assert.AreEqual(100, failover.InitialReconnectDelay); transport.Start(); Thread.Sleep(2000); Assert.IsNotEmpty(this.exceptions); Assert.IsFalse(failover.IsConnected); } } [Test] public void FailoverTransportFailOnSendMessageTest() { Uri uri = new Uri("failover:(mock://localhost:61616?transport.failOnCreate=true)?" + "transport.useExponentialBackOff=false&transport.maxReconnectAttempts=3&transport.initialReconnectDelay=100"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.UseExponentialBackOff); Assert.AreEqual(3, failover.MaxReconnectAttempts); Assert.AreEqual(100, failover.InitialReconnectDelay); transport.Start(); ActiveMQMessage message = new ActiveMQMessage(); Assert.Throws(delegate() { transport.Oneway(message); }, "Oneway call should block and then throw."); Assert.IsNotEmpty(this.exceptions); Assert.IsFalse(failover.IsConnected); } } [Test] public void FailoverTransportFailingBackupsTest() { Uri uri = new Uri( "failover:(mock://localhost:61616," + "mock://localhost:61618?transport.failOnCreate=true)?transport.randomize=false&transport.backup=true"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.Randomize); Assert.IsTrue(failover.Backup); transport.Start(); Thread.Sleep(2000); Assert.IsTrue(failover.IsConnected); } } [Test] public void FailoverTransportSendOnewayMessageTest() { int numMessages = 1000; Uri uri = new Uri("failover:(mock://localhost:61616)?transport.randomize=false"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.Randomize); failover.Resumed = OnResumed; transport.Start(); Thread.Sleep(1000); Assert.IsTrue(failover.IsConnected); // Ensure the current mock transport has the correct outgoing command handler MockTransport mock = transport.Narrow(typeof(MockTransport)) as MockTransport; Assert.IsNotNull(mock); Assert.AreEqual(61616, mock.RemoteAddress.Port); VerifyCommandHandlerSetting(transport, mock); mock.OutgoingCommand = OnOutgoingCommand; ActiveMQMessage message = new ActiveMQMessage(); for(int i = 0; i < numMessages; ++i) { transport.Oneway(message); } Thread.Sleep(1000); Assert.AreEqual(numMessages, this.sent.Count); } } [Test] public void FailoverTransportSendRequestTest() { Uri uri = new Uri("failover:(mock://localhost:61616)?transport.randomize=false"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.Randomize); failover.Resumed = OnResumed; transport.Start(); Thread.Sleep(1000); Assert.IsTrue(failover.IsConnected); // Ensure the current mock transport has the correct outgoing command handler MockTransport mock = transport.Narrow(typeof(MockTransport)) as MockTransport; Assert.IsNotNull(mock); Assert.AreEqual(61616, mock.RemoteAddress.Port); VerifyCommandHandlerSetting(transport, mock); mock.OutgoingCommand = OnOutgoingCommand; ActiveMQMessage message = new ActiveMQMessage(); int numMessages = 4; for(int i = 0; i < numMessages; ++i) { transport.Request(message); } Thread.Sleep(1000); Assert.AreEqual(numMessages, this.sent.Count); } } [Test] public void FailoverTransportSendOnewayFailTest() { Uri uri = new Uri( "failover:(mock://localhost:61616?transport.failOnSendMessage=true," + "mock://localhost:61618)?transport.randomize=false"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.Randomize); failover.Resumed = OnResumed; transport.Start(); Thread.Sleep(1000); Assert.IsTrue(failover.IsConnected); // Ensure the current mock transport has the correct outgoing command handler MockTransport mock = transport.Narrow(typeof(MockTransport)) as MockTransport; Assert.IsNotNull(mock); Assert.AreEqual(61616, mock.RemoteAddress.Port); VerifyCommandHandlerSetting(transport, mock); mock.OutgoingCommand = OnOutgoingCommand; ActiveMQMessage message = new ActiveMQMessage(); int numMessages = 4; for(int i = 0; i < numMessages; ++i) { transport.Oneway(message); // Make sure we switched to second failover mock = transport.Narrow(typeof(MockTransport)) as MockTransport; Assert.IsNotNull(mock); Assert.AreEqual(61618, mock.RemoteAddress.Port); } Thread.Sleep(1000); Assert.AreEqual(numMessages, this.sent.Count); } } [Test] public void FailoverTransportSendOnewayTimeoutTest() { Uri uri = new Uri( "failover:(mock://localhost:61616?transport.failOnCreate=true)?transport.timeout=1000"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.AreEqual(1000, failover.Timeout); transport.Start(); Thread.Sleep(1000); ActiveMQMessage message = new ActiveMQMessage(); Assert.Throws(delegate() { transport.Oneway(message); }); } } [Test] public void FailoverTransportSendRequestFailTest() { Uri uri = new Uri( "failover:(mock://localhost:61616?transport.failOnSendMessage=true," + "mock://localhost:61618)?transport.randomize=false"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.Randomize); failover.Resumed = OnResumed; transport.Start(); Thread.Sleep(1000); Assert.IsTrue(failover.IsConnected); // Ensure the current mock transport has the correct outgoing command handler MockTransport mock = transport.Narrow(typeof(MockTransport)) as MockTransport; Assert.IsNotNull(mock); VerifyCommandHandlerSetting(transport, mock); mock.OutgoingCommand = OnOutgoingCommand; ActiveMQMessage message = new ActiveMQMessage(); int numMessages = 4; for(int i = 0; i < numMessages; ++i) { transport.Request(message); } Thread.Sleep(1000); Assert.AreEqual(numMessages, this.sent.Count); } } [Test] public void TestFailoverTransportConnectionControlHandling() { Uri uri = new Uri("failover:(mock://localhost:61613)?transport.randomize=false"); string connectedBrokers = "mock://localhost:61616?transport.name=Reconnected," + "mock://localhost:61617?transport.name=Broker1," + "mock://localhost:61618?transport.name=Broker2"; FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.Randomize); const int MAX_ATTEMPTS = 50; transport.Start(); for(int i = 0; i < MAX_ATTEMPTS; ++i) { if(failover.IsConnected) { break; } Thread.Sleep(100); } Assert.IsTrue(failover.IsConnected); // Ensure the current mock transport has the correct outgoing command handler MockTransport mock = transport.Narrow(typeof(MockTransport)) as MockTransport; Assert.IsNotNull(mock); Assert.AreEqual(61613, mock.RemoteAddress.Port); VerifyCommandHandlerSetting(transport, mock); mock.OutgoingCommand = OnOutgoingCommand; mock.InjectCommand(new ConnectionControl() { FaultTolerant = true, ConnectedBrokers = connectedBrokers, RebalanceConnection = true }); // Give a bit of time for the Command to actually be processed. Thread.Sleep(2000); mock = null; for(int i = 0; i < MAX_ATTEMPTS; ++i) { mock = transport.Narrow(typeof(MockTransport)) as MockTransport; if(mock != null) { break; } Thread.Sleep(100); } Assert.IsNotNull(mock, "Error reconnecting to failover broker."); Assert.AreEqual(61616, mock.RemoteAddress.Port); Assert.AreEqual("Reconnected", mock.Name); } } [Test] public void TestPriorityBackupConfig() { Uri uri = new Uri("failover:(mock://localhost:61616,mock://localhost:61618)"+ "?transport.randomize=false&transport.priorityBackup=true"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.Randomize, "Randomize should be false"); Assert.IsTrue(failover.PriorityBackup, "Prioirity Backup not set."); transport.Start(); for(int i = 0; i < MAX_ATTEMPTS; ++i) { if(failover.IsConnected) { break; } Thread.Sleep(100); } Assert.IsTrue(failover.IsConnected); Assert.IsTrue(failover.IsConnectedToPriority); } } [Test] public void TestPriorityBackupConfigPriorityURIsList() { Uri uri = new Uri("failover:(mock://localhost:61616,mock://localhost:61618)" + "?transport.randomize=false&transport.priorityBackup=true&" + "transport.priorityURIs=mock://localhost:61616,mock://localhost:61618"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.Randomize, "Randomize should be false"); Assert.IsTrue(failover.PriorityBackup, "Prioirity Backup not set."); String priorityURIs = failover.PriorityURIs; String[] tokens = priorityURIs.Split(new Char[] { ',' }); Assert.AreEqual(2, tokens.Length, "Bad priorityURIs string: " + priorityURIs); transport.Start(); for(int i = 0; i < MAX_ATTEMPTS; ++i) { if(failover.IsConnected) { break; } Thread.Sleep(100); } Assert.IsTrue(failover.IsConnected); Assert.IsTrue(failover.IsConnectedToPriority); } } [Test] public void OpenWireCommandsTest() { Uri uri = new Uri("failover:(mock://localhost:61616)?transport.randomize=false"); FailoverTransportFactory factory = new FailoverTransportFactory(); using(ITransport transport = factory.CreateTransport(uri)) { Assert.IsNotNull(transport); transport.Command = OnCommand; transport.Exception = OnException; transport.Resumed = OnResumed; transport.Interrupted = OnInterrupted; FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; Assert.IsNotNull(failover); Assert.IsFalse(failover.Randomize); transport.Start(); Thread.Sleep(1000); Assert.IsTrue(failover.IsConnected); ConnectionInfo connection = createConnection(); transport.Request(connection); SessionInfo session1 = createSession(connection); transport.Request(session1); SessionInfo session2 = createSession(connection); transport.Request(session2); ConsumerInfo consumer1 = createConsumer(session1); transport.Request(consumer1); ConsumerInfo consumer2 = createConsumer(session1); transport.Request(consumer2); ConsumerInfo consumer3 = createConsumer(session2); transport.Request(consumer3); ProducerInfo producer1 = createProducer(session2); transport.Request(producer1); // Remove the Producers disposeOf(transport, producer1); // Remove the Consumers disposeOf(transport, consumer1); disposeOf(transport, consumer2); disposeOf(transport, consumer3); // Remove the Session instances. disposeOf(transport, session1); disposeOf(transport, session2); // Indicate that we are done. ShutdownInfo shutdown = new ShutdownInfo(); transport.Oneway(shutdown); } } protected ConnectionInfo createConnection() { return new ConnectionInfo() { ClientId = Guid.NewGuid().ToString(), ConnectionId = new ConnectionId() { Value = Guid.NewGuid().ToString() } }; } protected SessionInfo createSession(ConnectionInfo parent) { return new SessionInfo() { SessionId = new SessionId() { ConnectionId = parent.ConnectionId.Value, Value = sessionIdx++ } }; } protected ConsumerInfo createConsumer(SessionInfo parent) { return new ConsumerInfo() { ConsumerId = new ConsumerId() { ConnectionId = parent.SessionId.ConnectionId, SessionId = parent.SessionId.Value, Value = consumerIdx++ } }; } protected ProducerInfo createProducer(SessionInfo parent) { return new ProducerInfo() { ProducerId = new ProducerId() { ConnectionId = parent.SessionId.ConnectionId, SessionId = parent.SessionId.Value, Value = producerIdx++ } }; } protected void disposeOf(ITransport transport, SessionInfo session) { transport.Oneway(new RemoveInfo() { ObjectId = session.SessionId }); } protected void disposeOf(ITransport transport, ConsumerInfo consumer) { transport.Oneway(new RemoveInfo() { ObjectId = consumer.ConsumerId }); } protected void disposeOf(ITransport transport, ProducerInfo producer) { transport.Oneway(new RemoveInfo() { ObjectId = producer.ProducerId }); } [Test] public void FailoverTransportFailOnProcessingReceivedMessageTest() { string uri = "failover:(tcp://${activemqhost}:61616)"; IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri)); using(connection = factory.CreateConnection() as Connection ) { connection.ConnectionInterruptedListener += new ConnectionInterruptedListener(TransportInterrupted); connection.ConnectionResumedListener += new ConnectionResumedListener(TransportResumed); connection.Start(); using(ISession session = connection.CreateSession()) { IDestination destination = session.GetQueue("Test?consumer.prefetchSize=1"); PurgeQueue(connection, destination); PutMsgIntoQueue(session, destination); using(IMessageConsumer consumer = session.CreateConsumer(destination)) { consumer.Listener += OnMessage; BreakConnection(); WaitForMessagesToArrive(); } } } Assert.IsTrue(this.interrupted); Assert.IsTrue(this.resumed); } [Test] public void FailStartupMaxReconnectAttempts() { // Connect to valid machine, but on invalid port that doesn't have a broker listening. string uri = "failover:(tcp://localhost:31313)?transport.StartupMaxReconnectAttempts=3"; IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri)); IConnection failConnection = factory.CreateConnection(); try { failConnection.Start(); Assert.Fail("Should not have connected to broker."); } catch(Apache.NMS.ActiveMQ.ConnectionClosedException) { } catch(Apache.NMS.NMSConnectionException) { } catch(Exception e) { Assert.Fail("Wrong Exception Thrown after max reconnect attempts.\n" + "Exception thrown type: " + e.GetType()); } finally { try { failConnection.Stop(); } catch(Exception) { Assert.Fail("Connection closed exception thrown while closing a connection."); } finally { try { failConnection.Dispose(); } catch(Exception) { Assert.Fail("Connection closed exception thrown while closing a connection."); } } } } public void TransportInterrupted() { this.interrupted = true; } public void TransportResumed() { this.resumed = true; } public void OnMessage(IMessage message) { var textMsg = message as ITextMessage; if(textMsg == null) { return; } msgCount--; // just process the first message for 10 seconds to give some time main thread // to restart ActiveMq broker if(msgCount == MESSAGE_COUNT - 1) { Thread.Sleep(10000); } if(msgCount == 0) { // if all messages were consumed then we are fine semaphore.Set(); } } private void PutMsgIntoQueue(ISession session, IDestination destination) { using(IMessageProducer producer = session.CreateProducer(destination)) { ITextMessage message = session.CreateTextMessage(); for(int i = 0; i < msgCount; ++i) { message.Text = "Test message " + (i + 1); producer.Send(message); } } } public void PurgeQueue(IConnection conn, IDestination queue) { ISession session = conn.CreateSession(); IMessageConsumer consumer = session.CreateConsumer(queue); while(consumer.Receive(TimeSpan.FromMilliseconds(500)) != null) { } consumer.Close(); session.Close(); } private void BreakConnection() { TcpTransport transport = this.connection.ITransport.Narrow(typeof(TcpTransport)) as TcpTransport; Assert.IsNotNull(transport); transport.Close(); } protected void WaitForMessagesToArrive() { semaphore.WaitOne(30000, true); Assert.AreEqual(0, msgCount); } } }