/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using Apache.NMS.Test; using NUnit.Framework; namespace Apache.NMS.ActiveMQ.Test { [TestFixture] public class IndividualAckTest : NMSTestSupport { private IConnection connection; [SetUp] public override void SetUp() { base.SetUp(); connection = CreateConnection(); connection.Start(); } [TearDown] public override void TearDown() { connection.Close(); base.TearDown(); } [Test] public void TestAckedMessageAreConsumed() { ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); ITemporaryQueue queue = session.CreateTemporaryQueue(); IMessageProducer producer = session.CreateProducer(queue); producer.Send(session.CreateTextMessage("Hello")); // Consume the message... IMessageConsumer consumer = session.CreateConsumer(queue); IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(msg); msg.Acknowledge(); // Reset the session. session.Close(); session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); // Attempt to Consume the message... consumer = session.CreateConsumer(queue); msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNull(msg); session.Close(); } [Test] public void TestLastMessageAcked() { ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); ITemporaryQueue queue = session.CreateTemporaryQueue(); IMessageProducer producer = session.CreateProducer(queue); ITextMessage msg1 = session.CreateTextMessage("msg1" + Guid.NewGuid().ToString()); ITextMessage msg2 = session.CreateTextMessage("msg2" + Guid.NewGuid().ToString()); ITextMessage msg3 = session.CreateTextMessage("msg3" + Guid.NewGuid().ToString()); producer.Send(msg1); producer.Send(msg2); producer.Send(msg3); // Consume the message... IMessageConsumer consumer = session.CreateConsumer(queue); ITextMessage ackmsg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage; Assert.IsNotNull(ackmsg); Assert.AreEqual(msg1.Text,ackmsg.Text); ackmsg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage; Assert.IsNotNull(ackmsg); Assert.AreEqual(msg2.Text,ackmsg.Text); ackmsg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage; Assert.IsNotNull(ackmsg); Assert.AreEqual(msg3.Text,ackmsg.Text); ackmsg.Acknowledge(); // Reset the session. session.Close(); session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); // Attempt to Consume the message... consumer = session.CreateConsumer(queue); ackmsg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage; Assert.IsNotNull(ackmsg); Assert.AreEqual(msg1.Text,ackmsg.Text); ackmsg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage; Assert.IsNotNull(ackmsg); Assert.AreEqual(msg2.Text,ackmsg.Text); IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNull(msg); session.Close(); } [Test] public void TestUnAckedMessageAreNotConsumedOnSessionClose() { ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); ITemporaryQueue queue = session.CreateTemporaryQueue(); IMessageProducer producer = session.CreateProducer(queue); producer.Send(session.CreateTextMessage("Hello")); // Consume the message... IMessageConsumer consumer = session.CreateConsumer(queue); IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(msg); // Don't ack the message. // Reset the session. This should cause the unacknowledged message to be re-delivered. session.Close(); session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); // Attempt to Consume the message... consumer = session.CreateConsumer(queue); msg = consumer.Receive(TimeSpan.FromMilliseconds(2000)); Assert.IsNotNull(msg); msg.Acknowledge(); session.Close(); } [Test] public void TestIndividualAcknowledgeMultiMessages_AcknowledgeFirstTest() { ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); // Push 2 messages to queue ITemporaryQueue queue = session.CreateTemporaryQueue(); IMessageProducer producer = session.CreateProducer(queue); ITextMessage msg = session.CreateTextMessage("test 1"); producer.Send(msg, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue); msg = session.CreateTextMessage("test 2"); producer.Send(msg, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue); producer.Close(); IMessageConsumer consumer = session.CreateConsumer(queue); // Read the first message ITextMessage fetchedMessage1 = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000)); Assert.IsNotNull(fetchedMessage1); Assert.AreEqual("test 1", fetchedMessage1.Text); // Read the second message ITextMessage fetchedMessage2 = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000)); Assert.IsNotNull(fetchedMessage2); Assert.AreEqual("test 2", fetchedMessage2.Text); // Acknowledge first message fetchedMessage1.Acknowledge(); consumer.Close(); // Read first message a second time consumer = session.CreateConsumer(queue); fetchedMessage1 = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000)); Assert.IsNotNull(fetchedMessage1); Assert.AreEqual("test 2", fetchedMessage1.Text); // Try to read second message a second time fetchedMessage2 = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000)); Assert.IsNull(fetchedMessage2); consumer.Close(); } [Test] public void TestManyMessageAckedAfterMessageConsumption() { int messageCount = 20; IMessage msg; ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); ITemporaryQueue queue = session.CreateTemporaryQueue(); IMessageProducer producer = session.CreateProducer(queue); for(int i = 0; i < messageCount; i++) { msg = session.CreateTextMessage("msg" + i); producer.Send(msg); } // Consume the message... IMessageConsumer consumer = session.CreateConsumer(queue); for(int i = 0; i < messageCount; i++) { msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(msg); msg.Acknowledge(); } msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNull(msg); // Reset the session. session.Close(); session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); // Attempt to Consume the message... consumer = session.CreateConsumer(queue); msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNull(msg); session.Close(); } [Test] public void TestManyMessageAckedAfterAllConsumption() { int messageCount = 20; IMessage msg; ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); ITemporaryQueue queue = session.CreateTemporaryQueue(); IMessageProducer producer = session.CreateProducer(queue); for(int i = 0; i < messageCount; i++) { msg = session.CreateTextMessage("msg" + i); producer.Send(msg); } // Consume the message... IMessageConsumer consumer = session.CreateConsumer(queue); IMessage[] consumedMessages = new IMessage[messageCount]; for(int i = 0; i < messageCount; i++) { msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(msg); consumedMessages[i] = msg; } for(int i = 0; i < messageCount; i++) { consumedMessages[i].Acknowledge(); } msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNull(msg); // Reset the session. session.Close(); session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); // Attempt to Consume the message... consumer = session.CreateConsumer(queue); msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNull(msg); session.Close(); } } }