/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using Apache.NMS.ActiveMQ.Commands; using Apache.NMS.Test; using NUnit.Framework; namespace Apache.NMS.ActiveMQ.Test { [TestFixture] public class QueueConsumerPriorityTest : NMSTestSupport { protected static string DESTINATION_NAME = "TEST.QueueConsumerPriority"; protected static string TEST_CLIENT_ID = "QueueConsumerPriorityTestClientId"; protected static int MSG_COUNT = 50; private IConnection createConnection(bool start) { IConnection conn = CreateConnection(TEST_CLIENT_ID); if(start) { conn.Start(); } return conn; } private void PurgeQueue(IConnection conn, IDestination queue) { ISession session = conn.CreateSession(); IMessageConsumer consumer = session.CreateConsumer(queue); while(consumer.Receive(TimeSpan.FromMilliseconds(500)) != null) { } consumer.Close(); session.Close(); } class Producer { private readonly ISession session; private readonly IDestination dest; private readonly int count; private readonly MsgPriority priority; private Thread theThread; public Producer(ISession session, IDestination dest, int count, MsgPriority priority) { this.session = session; this.dest = dest; this.count = count; this.priority = priority; } public void Start() { theThread = new Thread(Run); theThread.Start(); } public void Join() { if(theThread != null) { theThread.Join(); } } public void Run() { IMessageProducer producer = session.CreateProducer(dest); producer.Priority = this.priority; for(int i = 0; i < this.count; ++i) { ITextMessage message = session.CreateTextMessage("Message Priority = " + (byte) priority); producer.Send(message); } } } [Test] public void TestPriorityConsumption() { IConnection conn = createConnection(true); Connection connection = conn as Connection; Assert.IsNotNull(connection); connection.MessagePrioritySupported = true; ISession receiverSession = conn.CreateSession(); ISession senderSession = conn.CreateSession(); IDestination queue = receiverSession.GetQueue(DESTINATION_NAME); PurgeQueue(conn, queue); IMessageConsumer consumer = receiverSession.CreateConsumer(queue); Producer producer1 = new Producer(senderSession, queue, MSG_COUNT, MsgPriority.High); Producer producer2 = new Producer(senderSession, queue, MSG_COUNT, MsgPriority.Low); producer1.Start(); producer2.Start(); producer1.Join(); producer2.Join(); for(int i = 0; i < MSG_COUNT * 2; i++) { IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(msg, "Message {0} was null", i); Assert.AreEqual(i < MSG_COUNT ? MsgPriority.High : MsgPriority.Low, msg.NMSPriority, "Message {0} priority was wrong", i); } } } }