/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using Apache.NMS.ActiveMQ.Commands; using Apache.NMS.Test; using NUnit.Framework; namespace Apache.NMS.ActiveMQ.Test { [TestFixture] public class ExclusiveConsumerTest : NMSTestSupport { protected static string DESTINATION_NAME = "TEST.ExclusiveConsumerTestDestination"; protected static string TEST_CLIENT_ID = "ExclusiveConsumerTestClientId"; private IConnection createConnection(bool start) { IConnection conn = CreateConnection(TEST_CLIENT_ID); if(start) { conn.Start(); } return conn; } public void purgeQueue(IConnection conn, ActiveMQQueue queue) { ISession session = conn.CreateSession(); IMessageConsumer consumer = session.CreateConsumer(queue); while(consumer.Receive(TimeSpan.FromMilliseconds(500)) != null) { } consumer.Close(); session.Close(); } [Test] public void TestExclusiveConsumerSelectedCreatedFirst() { IConnection conn = createConnection(true); ISession exclusiveSession = null; ISession fallbackSession = null; ISession senderSession = null; purgeQueue(conn, new ActiveMQQueue("TEST.QUEUE1")); try { exclusiveSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); fallbackSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); senderSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE1?consumer.exclusive=true"); IMessageConsumer exclusiveConsumer = exclusiveSession.CreateConsumer(exclusiveQueue); ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE1"); IMessageConsumer fallbackConsumer = fallbackSession.CreateConsumer(fallbackQueue); ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE1"); IMessageProducer producer = senderSession.CreateProducer(senderQueue); IMessage msg = senderSession.CreateTextMessage("test"); producer.DeliveryMode = MsgDeliveryMode.NonPersistent; producer.Send(msg); Thread.Sleep(500); // Verify exclusive consumer receives the message. Assert.IsNotNull(exclusiveConsumer.Receive(TimeSpan.FromMilliseconds(1000))); Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000))); } finally { fallbackSession.Close(); senderSession.Close(); conn.Close(); } } [Test] public void TestExclusiveConsumerSelectedCreatedAfter() { IConnection conn = createConnection(true); ISession exclusiveSession = null; ISession fallbackSession = null; ISession senderSession = null; purgeQueue(conn, new ActiveMQQueue("TEST.QUEUE5")); try { exclusiveSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); fallbackSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); senderSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE5"); IMessageConsumer fallbackConsumer = fallbackSession.CreateConsumer(fallbackQueue); ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE5?consumer.exclusive=true"); IMessageConsumer exclusiveConsumer = exclusiveSession.CreateConsumer(exclusiveQueue); ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE5"); IMessageProducer producer = senderSession.CreateProducer(senderQueue); IMessage msg = senderSession.CreateTextMessage("test"); producer.DeliveryMode = MsgDeliveryMode.NonPersistent; producer.Send(msg); Thread.Sleep(500); // Verify exclusive consumer receives the message. Assert.IsNotNull(exclusiveConsumer.Receive(TimeSpan.FromMilliseconds(1000))); Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000))); } finally { fallbackSession.Close(); senderSession.Close(); conn.Close(); } } [Test] public void TestFailoverToAnotherExclusiveConsumerCreatedFirst() { IConnection conn = createConnection(true); ISession exclusiveSession1 = null; ISession exclusiveSession2 = null; ISession fallbackSession = null; ISession senderSession = null; purgeQueue(conn, new ActiveMQQueue("TEST.QUEUE2")); try { exclusiveSession1 = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); exclusiveSession2 = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); fallbackSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); senderSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); // This creates the exclusive consumer first which avoids AMQ-1024 // bug. ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2?consumer.exclusive=true"); IMessageConsumer exclusiveConsumer1 = exclusiveSession1.CreateConsumer(exclusiveQueue); IMessageConsumer exclusiveConsumer2 = exclusiveSession2.CreateConsumer(exclusiveQueue); ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE2"); IMessageConsumer fallbackConsumer = fallbackSession.CreateConsumer(fallbackQueue); ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE2"); IMessageProducer producer = senderSession.CreateProducer(senderQueue); producer.DeliveryMode = MsgDeliveryMode.NonPersistent; IMessage msg = senderSession.CreateTextMessage("test"); producer.Send(msg); Thread.Sleep(500); // Verify exclusive consumer receives the message. Assert.IsNotNull(exclusiveConsumer1.Receive(TimeSpan.FromMilliseconds(1000))); Assert.IsNull(exclusiveConsumer2.Receive(TimeSpan.FromMilliseconds(1000))); Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000))); // Close the exclusive consumer to verify the non-exclusive consumer // takes over exclusiveConsumer1.Close(); producer.Send(msg); producer.Send(msg); Assert.IsNotNull(exclusiveConsumer2.Receive(TimeSpan.FromMilliseconds(1000))); Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000))); } finally { fallbackSession.Close(); senderSession.Close(); conn.Close(); } } [Test] public void TestFailoverToAnotherExclusiveConsumerCreatedAfter() { IConnection conn = createConnection(true); ISession exclusiveSession1 = null; ISession exclusiveSession2 = null; ISession fallbackSession = null; ISession senderSession = null; purgeQueue(conn, new ActiveMQQueue("TEST.QUEUE6")); try { exclusiveSession1 = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); exclusiveSession2 = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); fallbackSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); senderSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); // This creates the exclusive consumer first which avoids AMQ-1024 // bug. ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE6?consumer.exclusive=true"); IMessageConsumer exclusiveConsumer1 = exclusiveSession1.CreateConsumer(exclusiveQueue); ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE6"); IMessageConsumer fallbackConsumer = fallbackSession.CreateConsumer(fallbackQueue); IMessageConsumer exclusiveConsumer2 = exclusiveSession2.CreateConsumer(exclusiveQueue); ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE6"); IMessageProducer producer = senderSession.CreateProducer(senderQueue); producer.DeliveryMode = MsgDeliveryMode.NonPersistent; IMessage msg = senderSession.CreateTextMessage("test"); producer.Send(msg); Thread.Sleep(500); // Verify exclusive consumer receives the message. Assert.IsNotNull(exclusiveConsumer1.Receive(TimeSpan.FromMilliseconds(1000))); Assert.IsNull(exclusiveConsumer2.Receive(TimeSpan.FromMilliseconds(1000))); Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000))); // Close the exclusive consumer to verify the non-exclusive consumer // takes over exclusiveConsumer1.Close(); Thread.Sleep(100); producer.Send(msg); producer.Send(msg); Assert.IsNotNull(exclusiveConsumer2.Receive(TimeSpan.FromMilliseconds(1000))); Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000))); } finally { fallbackSession.Close(); senderSession.Close(); conn.Close(); } } [Test] public void TestFailoverToNonExclusiveConsumer() { IConnection conn = createConnection(true); ISession exclusiveSession = null; ISession fallbackSession = null; ISession senderSession = null; purgeQueue(conn, new ActiveMQQueue("TEST.QUEUE3")); try { exclusiveSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); fallbackSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); senderSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); // This creates the exclusive consumer first which avoids AMQ-1024 // bug. ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3?consumer.exclusive=true"); IMessageConsumer exclusiveConsumer = exclusiveSession.CreateConsumer(exclusiveQueue); ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE3"); IMessageConsumer fallbackConsumer = fallbackSession.CreateConsumer(fallbackQueue); ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE3"); IMessageProducer producer = senderSession.CreateProducer(senderQueue); producer.DeliveryMode = MsgDeliveryMode.NonPersistent; IMessage msg = senderSession.CreateTextMessage("test"); producer.Send(msg); Thread.Sleep(500); // Verify exclusive consumer receives the message. Assert.IsNotNull(exclusiveConsumer.Receive(TimeSpan.FromMilliseconds(1000))); Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000))); // Close the exclusive consumer to verify the non-exclusive consumer // takes over exclusiveConsumer.Close(); producer.Send(msg); Assert.IsNotNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000))); } finally { fallbackSession.Close(); senderSession.Close(); conn.Close(); } } [Test] public void TestFallbackToExclusiveConsumer() { IConnection conn = createConnection(true); ISession exclusiveSession = null; ISession fallbackSession = null; ISession senderSession = null; purgeQueue(conn, new ActiveMQQueue("TEST.QUEUE4")); try { exclusiveSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); fallbackSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); senderSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge); // This creates the exclusive consumer first which avoids AMQ-1024 // bug. ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE4?consumer.exclusive=true"); IMessageConsumer exclusiveConsumer = exclusiveSession.CreateConsumer(exclusiveQueue); ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE4"); IMessageConsumer fallbackConsumer = fallbackSession.CreateConsumer(fallbackQueue); ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE4"); IMessageProducer producer = senderSession.CreateProducer(senderQueue); producer.DeliveryMode = MsgDeliveryMode.NonPersistent; IMessage msg = senderSession.CreateTextMessage("test"); producer.Send(msg); Thread.Sleep(500); // Verify exclusive consumer receives the message. Assert.IsNotNull(exclusiveConsumer.Receive(TimeSpan.FromMilliseconds(1000))); Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000))); // Close the exclusive consumer to verify the non-exclusive consumer // takes over exclusiveConsumer.Close(); producer.Send(msg); // Verify other non-exclusive consumer receices the message. Assert.IsNotNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000))); // Create exclusive consumer to determine if it will start receiving // the messages. exclusiveConsumer = exclusiveSession.CreateConsumer(exclusiveQueue); producer.Send(msg); Assert.IsNotNull(exclusiveConsumer.Receive(TimeSpan.FromMilliseconds(1000))); Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000))); } finally { fallbackSession.Close(); senderSession.Close(); conn.Close(); } } } }