/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Text; using System.Threading; using NUnit.Framework; using org.apache.qpid.client; using org.apache.qpid.transport; using org.apache.qpid.transport.util; namespace test.interop { public class Message : TestCase { private static readonly Logger _log = Logger.Get(typeof (Message)); [Test] public void sendAndPurge() { _log.Debug("Running: ExchangeBind"); IClientSession ssn = Client.CreateSession(0); ssn.QueueDelete("queue1"); QueueQueryResult result = (QueueQueryResult) ssn.QueueQuery("queue1").Result; Assert.IsNull(result.GetQueue()); ssn.QueueDeclare("queue1", null, null); ssn.ExchangeBind("queue1", "amq.direct", "queue1", null); for (int i = 0; i < 10; i++) { ssn.MessageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(new DeliveryProperties().SetRoutingKey("queue1"), new MessageProperties().SetMessageId(UUID.RandomUuid())), Encoding.UTF8.GetBytes("test: " + i)); } ssn.Sync(); result = (QueueQueryResult) ssn.QueueQuery("queue1").Result; Assert.IsTrue(result.GetMessageCount() == 10); ssn.QueuePurge("queue1"); ssn.Sync(); result = (QueueQueryResult) ssn.QueueQuery("queue1").Result; Assert.IsTrue(result.GetMessageCount() == 0); } [Test] public void sendAndReceiveSmallMessages() { _log.Debug("Running: sendAndReceiveSmallMessages"); byte[] smallMessage = Encoding.UTF8.GetBytes("test"); sendAndReceive(smallMessage, 100); } [Test] public void sendAndReceiveLargeMessages() { _log.Debug("Running: sendAndReceiveSmallMessages"); byte[] largeMessage = new byte[100 * 1024]; Random random = new Random(); random.NextBytes(largeMessage); sendAndReceive(largeMessage, 10); } [Test] public void sendAndReceiveVeryLargeMessages() { _log.Debug("Running: sendAndReceiveSmallMessages"); byte[] verylargeMessage = new byte[1000 * 1024]; Random random = new Random(); random.NextBytes(verylargeMessage); sendAndReceive(verylargeMessage, 2); } private void sendAndReceive(byte[] messageBody, int count) { IClientSession ssn = Client.CreateSession(0); ssn.Sync(); ssn.QueueDeclare("queue1", null, null); ssn.QueueDelete("queue1"); QueueQueryResult result = (QueueQueryResult) ssn.QueueQuery("queue1").Result; Assert.IsNull(result.GetQueue()); ssn.QueueDeclare("queue1", null, null); ssn.ExchangeBind("queue1", "amq.direct", "queue1", null); Object myLock = new Object(); MyListener myListener = new MyListener(myLock, count); ssn.AttachMessageListener(myListener, "myDest"); ssn.MessageSubscribe("queue1", "myDest", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, null, 0, null); // issue credits ssn.MessageSetFlowMode("myDest", MessageFlowMode.WINDOW); ssn.MessageFlow("myDest", MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES); ssn.MessageFlow("myDest", MessageCreditUnit.MESSAGE, 10000); ssn.Sync(); for (int i = 0; i < count; i++) { ssn.MessageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(new DeliveryProperties().SetRoutingKey("queue1"), new MessageProperties().SetMessageId(UUID.RandomUuid())), messageBody); } ssn.Sync(); lock (myLock) { if (myListener.Count != 0) { Monitor.Wait(myLock, 10000000); } } Assert.IsTrue(myListener.Count == 0); ssn.MessageAccept(myListener.UnAck); ssn.Sync(); // the queue should be empty result = (QueueQueryResult)ssn.QueueQuery("queue1").Result; Assert.IsTrue(result.GetMessageCount() == 0); ssn.Close(); } private class MyListener : IMessageListener { private static readonly Logger _log = Logger.Get(typeof (MyListener)); private readonly Object _wl; private int _count; private RangeSet _rs = new RangeSet(); public MyListener(Object wl, int count) { _wl = wl; _count = count; } public void MessageTransfer(IMessage m) { byte[] body = new byte[m.Body.Length - m.Body.Position]; _log.Debug("Got a message of size: " + body.Length + " count = " + _count); _rs.Add(m.Id); lock (_wl) { _count--; if (_count == 0) { Monitor.PulseAll(_wl); } } } public int Count { get { return _count; } } public RangeSet UnAck { get { return _rs; } } } } }