/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections; using NUnit.Framework; namespace Apache.NMS.Test { //[TestFixture] public class TempDestinationTest : NMSTest { private IConnection connection; private IList connections = ArrayList.Synchronized(new ArrayList()); protected TempDestinationTest(NMSTestSupport testSupport) : base(testSupport) { } //[SetUp] public override void SetUp() { base.SetUp(); this.connection = CreateConnection(); this.connections.Add(connection); } //[TearDown] public override void TearDown() { foreach(IConnection conn in this.connections) { try { conn.Close(); } catch { } } connections.Clear(); base.TearDown(); } //[Test] public virtual void TestTempDestOnlyConsumedByLocalConn() { connection.Start(); ISession tempSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); ITemporaryQueue queue = tempSession.CreateTemporaryQueue(); IMessageProducer producer = tempSession.CreateProducer(queue); producer.DeliveryMode = (MsgDeliveryMode.NonPersistent); ITextMessage message = tempSession.CreateTextMessage("First"); producer.Send(message); // temp destination should not be consume when using another connection IConnection otherConnection = CreateConnection(); connections.Add(otherConnection); ISession otherSession = otherConnection.CreateSession(AcknowledgementMode.AutoAcknowledge); ITemporaryQueue otherQueue = otherSession.CreateTemporaryQueue(); IMessageConsumer consumer = otherSession.CreateConsumer(otherQueue); IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(3000)); Assert.IsNull(msg); // should be able to consume temp destination from the same connection consumer = tempSession.CreateConsumer(queue); msg = consumer.Receive(TimeSpan.FromMilliseconds(3000)); Assert.IsNotNull(msg); } //[Test] public virtual void TestTempQueueHoldsMessagesWithConsumers() { ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); ITemporaryQueue queue = session.CreateTemporaryQueue(); IMessageConsumer consumer = session.CreateConsumer(queue); connection.Start(); IMessageProducer producer = session.CreateProducer(queue); producer.DeliveryMode = (MsgDeliveryMode.NonPersistent); ITextMessage message = session.CreateTextMessage("Hello"); producer.Send(message); IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(message2); Assert.IsTrue(message2 is ITextMessage, "Expected message to be a ITextMessage"); Assert.IsTrue(((ITextMessage)message2).Text == message.Text, "Expected message to be a '" + message.Text + "'"); } //[Test] public virtual void TestTempQueueHoldsMessagesWithoutConsumers() { ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); ITemporaryQueue queue = session.CreateTemporaryQueue(); IMessageProducer producer = session.CreateProducer(queue); producer.DeliveryMode = MsgDeliveryMode.NonPersistent; ITextMessage message = session.CreateTextMessage("Hello"); producer.Send(message); connection.Start(); IMessageConsumer consumer = session.CreateConsumer(queue); IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(3000)); Assert.IsNotNull(message2); Assert.IsTrue(message2 is ITextMessage, "Expected message to be a ITextMessage"); Assert.IsTrue(((ITextMessage)message2).Text == message.Text, "Expected message to be a '" + message.Text + "'"); } //[Test] public virtual void TestTmpQueueWorksUnderLoad() { int count = 500; int dataSize = 1024; ArrayList list = new ArrayList(count); ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); ITemporaryQueue queue = session.CreateTemporaryQueue(); IBytesMessage message; IBytesMessage message2; IMessageProducer producer = session.CreateProducer(queue); producer.DeliveryMode = MsgDeliveryMode.NonPersistent; byte[] srcdata = new byte[dataSize]; srcdata[0] = (byte) 'B'; srcdata[1] = (byte) 'A'; srcdata[2] = (byte) 'D'; srcdata[3] = (byte) 'W'; srcdata[4] = (byte) 'O'; srcdata[5] = (byte) 'L'; srcdata[6] = (byte) 'F'; for(int i = 0; i < count; i++) { message = session.CreateBytesMessage(); message.WriteBytes(srcdata); message.Properties.SetInt("c", i); producer.Send(message); list.Add(message); } connection.Start(); byte[] data = new byte[dataSize]; byte[] data2 = new byte[dataSize]; IMessageConsumer consumer = session.CreateConsumer(queue); for(int i = 0; i < count; i++) { message2 = consumer.Receive(TimeSpan.FromMilliseconds(2000)) as IBytesMessage; Assert.IsNotNull(message2); Assert.AreEqual(i, message2.Properties.GetInt("c")); message = list[i] as IBytesMessage; Assert.IsNotNull(message); message.Reset(); message.ReadBytes(data); message2.ReadBytes(data2); Assert.AreEqual(data, data2); } } } }