/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Runtime.InteropServices; using System.Threading; using log4net; using NUnit.Framework; using Qpid.Client.qms; using Qpid.Messaging; namespace Qpid.Client.Tests.failover { [TestFixture] public class FailoverTxTest : IConnectionListener { private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTxTest)); const int NUM_ITERATIONS = 10; const int NUM_COMMITED_MESSAGES = 10; const int NUM_ROLLEDBACK_MESSAGES = 3; const int SLEEP_MILLIS = 50; // AutoAcknowledge, ClientAcknowledge, DupsOkAcknowledge, NoAcknowledge, PreAcknowledge AcknowledgeMode _acknowledgeMode = AcknowledgeMode.DupsOkAcknowledge; const bool _noWait = true; // use Receive or ReceiveNoWait AMQConnection _connection; public void OnMessage(IMessage message) { try { _log.Info("Received: " + ((ITextMessage) message).Text); if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) { _log.Info("client acknowledging"); message.Acknowledge(); } } catch (QpidException e) { Error(e); } } class NoWaitConsumer { FailoverTxTest _failoverTxTest; IMessageConsumer _consumer; private bool _noWait; internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel, bool noWait) { _failoverTxTest = failoverTxTest; _consumer = channel; _noWait = noWait; } internal void Run() { int messages = 0; while (messages < NUM_COMMITED_MESSAGES) { IMessage msg; if (_noWait) msg = _consumer.ReceiveNoWait(); else msg = _consumer.Receive(); if (msg != null) { _log.Info("NoWait received message"); ++messages; _failoverTxTest.OnMessage(msg); } else { Thread.Sleep(1); } } } } void DoFailoverTxTest(ConnectionInfo connectionInfo) { _connection = new AMQConnection(connectionInfo); _connection.ConnectionListener = this; _log.Info("connection = " + _connection); _log.Info("connectionInfo = " + connectionInfo); _log.Info("connection.AsUrl = " + _connection.toURL()); _log.Info("AcknowledgeMode is " + _acknowledgeMode); IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode); string queueName = receivingChannel.GenerateUniqueName(); // Queue.Declare receivingChannel.DeclareQueue(queueName, false, true, true); // No need to call Queue.Bind as automatically bound to default direct exchange. receivingChannel.Bind(queueName, "amq.direct", queueName); IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName) .WithPrefetchLow(30) .WithPrefetchHigh(60).Create(); bool useThread = false; if (useThread) { NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer, _noWait); new Thread(new ThreadStart(noWaitConsumer.Run)).Start(); } else { consumer.OnMessage = new MessageReceivedDelegate(OnMessage); } _connection.Start(); PublishInTx(queueName); Thread.Sleep(2000); // Wait a while for last messages. _connection.Close(); _log.Info("FailoverTxText complete"); } private void PublishInTx(string routingKey) { _log.Info("sendInTx"); bool transacted = true; IChannel publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge); IMessagePublisher publisher = publishingChannel.CreatePublisherBuilder() .WithRoutingKey(routingKey) .Create(); for (int i = 1; i <= NUM_ITERATIONS; ++i) { for (int j = 1; j <= NUM_ROLLEDBACK_MESSAGES; ++j) { ITextMessage msg = publishingChannel.CreateTextMessage("Tx=" + i + " rolledBackMsg=" + j); _log.Info("sending message = " + msg.Text); publisher.Send(msg); Thread.Sleep(SLEEP_MILLIS); } if (transacted) publishingChannel.Rollback(); for (int j = 1; j <= NUM_COMMITED_MESSAGES; ++j) { ITextMessage msg = publishingChannel.CreateTextMessage("Tx=" + i + " commitedMsg=" + j); _log.Info("sending message = " + msg.Text); publisher.Send(msg); Thread.Sleep(SLEEP_MILLIS); } if (transacted) publishingChannel.Commit(); } } private void Error(Exception e) { _log.Fatal("Exception received. About to stop.", e); Stop(); } private void Stop() { _log.Info("Stopping..."); try { _connection.Close(); } catch (QpidException e) { _log.Info("Failed to shutdown: ", e); } } public void BytesSent(long count) { } public void BytesReceived(long count) { } public bool PreFailover(bool redirect) { _log.Info("preFailover(" + redirect + ") called"); return true; } public bool PreResubscribe() { _log.Info("preResubscribe() called"); return true; } public void FailoverComplete() { _log.Info("failoverComplete() called"); } // [Test] public void TestWithBasicInfo() { Console.WriteLine("TestWithBasicInfo"); Console.WriteLine(".NET Framework version: " + RuntimeEnvironment.GetSystemVersion()); QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false)); DoFailoverTxTest(connectionInfo); } [Test] public void runTestWithUrl() { String clientId = "failover" + DateTime.Now.Ticks; string defaultUrl = "amqp://guest:guest@" + clientId + "/test" + "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'"; _log.Info("url = [" + defaultUrl + "]"); ConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl); _log.Info("connection url = [" + connectionInfo + "]"); DoFailoverTxTest(connectionInfo); } } }