/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Runtime.InteropServices; using System.Threading; using log4net; using NUnit.Framework; using Apache.Qpid.Client.Qms; using Apache.Qpid.Client; using Apache.Qpid.Messaging; namespace Apache.Qpid.Integration.Tests.interactive { [TestFixture, Category("Interactive")] public class FailoverTest : IConnectionListener { private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest)); /// Specifies the number of times to run the test cycle. const int NUM_MESSAGES = 10; /// Determines how many messages to send within each commit. const int COMMIT_BATCH_SIZE = 1; /// Specifies the duration of the pause to place between each message sent in the test. //const int SLEEP_MILLIS = 1; /// Specified the maximum time in milliseconds to wait for the test to complete. const int TIMEOUT = 10000; /// Defines the number of test messages to send, before prompting the user to fail a broker. const int FAIL_POINT = 5; /// Specified the ack mode to use for the test. AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge; /// Determines whether this test runs transactionally or not. bool transacted = false; /// Holds the connection to run the test over. AMQConnection _connection; /// Holds the channel for the test message publisher. IChannel publishingChannel; /// Holds the test message publisher. IMessagePublisher publisher; /// Used to keep count of the number of messages sent. int messagesSent; /// Used to keep count of the number of messages received. int messagesReceived; /// Used to wait for test completion on. private static object testComplete = new Object(); /// Used to wait for failover completion on. private static object failoverComplete = new Object(); bool failedOver=false; /// Used to record the extra message count (1) if the message sent right after failover actually made it to the new broker. int _extraMessage = 0; /// /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection. /// /// [SetUp] public void Init(IConnectionInfo connectionInfo) { //log4net.Config.BasicConfigurator.Configure(); // Reset all counts. messagesSent = 0; messagesReceived = 0; failedOver=false; _extraMessage = 0; PromptAndWait("Ensure both brokers are running, then press Enter"); // Create a connection for the test. _connection = new AMQConnection(connectionInfo); _connection.ConnectionListener = this; // Create a consumer to receive the test messages. IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode); string queueName = receivingChannel.GenerateUniqueName(); receivingChannel.DeclareQueue(queueName, false, true, true); receivingChannel.Bind(queueName, "amq.direct", queueName); IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName) .WithPrefetchLow(30) .WithPrefetchHigh(60).Create(); consumer.OnMessage = new MessageReceivedDelegate(OnMessage); _connection.Start(); // Create a publisher to send the test messages. publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge); publisher = publishingChannel.CreatePublisherBuilder() .WithRoutingKey(queueName) .Create(); _log.Debug("connection = " + _connection); _log.Debug("connectionInfo = " + connectionInfo); _log.Debug("connection.AsUrl = " + _connection.toURL()); _log.Debug("AcknowledgeMode is " + _acknowledgeMode); } /// /// Clean up the test connection. /// [TearDown] public virtual void Shutdown() { if (!failedOver) { Assert.Fail("The failover callback never occured."); } Console.WriteLine("Test done shutting down"); Thread.Sleep(2000); _connection.Close(); } /// /// Runs a failover test, building up the connection information from its component parts. In particular the brokers /// to fail between are seperately added into the connection info. /// /*[Test] public void TestWithBasicInfo() { _log.Debug("public void TestWithBasicInfo(): called"); // Manually create the connection parameters. QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false)); Init(connectionInfo); DoFailoverTest(); }*/ /// /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format. /// [Test] public void TestWithUrl() { _log.Debug("public void runTestWithUrl(): called"); // Parse the connection parameters from a URL. String clientId = "failover" + DateTime.Now.Ticks; string defaultUrl = "amqp://guest:guest@" + clientId + "/test" + "?brokerlist='tcp://localhost:9672;tcp://localhost:9673'&failover='roundrobin'"; IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl); Init(connectionInfo); DoFailoverTest(0); } /// /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent /// are received within the test time limit. /// /// /// The connection parameters, specifying the brokers to fail between. void DoFailoverTest(int delay) { _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called"); // Wait for all of the test messages to be received, checking that this occurs within the test time limit. bool withinTimeout = false; for (int i = 1; i <= NUM_MESSAGES; ++i) { SendMessage(); // Prompt the user to cause a failure if at the fail point. if (i == FAIL_POINT) { for( int min = delay ; min > 0 ; min--) { Console.WriteLine("Waiting for "+min+" minutes to test connection time bug."); Thread.Sleep(60*1000); } PromptAndWait("Cause a broker failure now, then press return."); Console.WriteLine("NOTE: ensure that the delay between killing the broker and continuing here is less than 20 second"); Console.WriteLine("Sending a message to ensure send right after works"); SendMessage(); Console.WriteLine("Waiting for fail-over to complete before continuing..."); lock(failoverComplete) { if (!failedOver) { withinTimeout = Monitor.Wait(failoverComplete, TIMEOUT); } else { withinTimeout=true; } } if (!withinTimeout) { PromptAndWait("Failover has not yet occured. Press enter to give up waiting."); } } } lock(testComplete) { withinTimeout = Monitor.Wait(testComplete, TIMEOUT); } if (!withinTimeout) { Assert.Fail("Test timed out, before all messages received."); } _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting"); } [Test] public void Test5MinuteWait() { String clientId = "failover" + DateTime.Now.Ticks; QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); connectionInfo.Username = "guest"; connectionInfo.Password = "guest"; connectionInfo.ClientName = clientId; connectionInfo.VirtualHost = "/test"; connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9672, false)); connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9673, false)); Init(connectionInfo); DoFailoverTest(5); } void SendMessage() { ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent); publisher.Send(msg); messagesSent++; if (transacted) { publishingChannel.Commit(); } Console.WriteLine("messagesSent = " + messagesSent); } /// /// Receives all of the test messages. /// /// /// The newly arrived test message. public void OnMessage(IMessage message) { try { if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) { message.Acknowledge(); } messagesReceived++; _log.Debug("messagesReceived = " + messagesReceived); // Check if all of the messages in the test have been received, in which case notify the message producer that the test has // succesfully completed. if (messagesReceived == NUM_MESSAGES + _extraMessage) { lock (testComplete) { failedOver = true; Monitor.Pulse(testComplete); } } } catch (QpidException e) { _log.Fatal("Exception received. About to stop.", e); Stop(); } } /// Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message. /// /// The message to prompt the user with. private void PromptAndWait(string message) { Console.WriteLine("\n" + message); Console.ReadLine(); } // Closes the test connection. private void Stop() { _log.Debug("Stopping..."); try { _connection.Close(); } catch (QpidException e) { _log.Debug("Failed to shutdown: ", e); } } /// /// Called when bytes have been transmitted to the server /// /// /// count the number of bytes sent in total since the connection was opened public void BytesSent(long count) {} /// /// Called when some bytes have been received on a connection /// /// /// count the number of bytes received in total since the connection was opened public void BytesReceived(long count) {} /// /// Called after the infrastructure has detected that failover is required but before attempting failover. /// /// /// redirect true if the broker requested redirect. false if failover is occurring due to a connection error. /// /// true to continue failing over, false to veto failover and raise a connection exception public bool PreFailover(bool redirect) { _log.Debug("public bool PreFailover(bool redirect): called"); return true; } /// /// Called after connection has been made to another broker after failover has been started but before /// any resubscription has been done. /// /// /// true to continue with resubscription, false to prevent automatic resubscription. This is useful in /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers /// and consumers are invalidated. /// public bool PreResubscribe() { _log.Debug("public bool PreResubscribe(): called"); return true; } /// /// Called once failover has completed successfully. This is called irrespective of whether the client has /// vetoed automatic resubscription. /// public void FailoverComplete() { failedOver = true; _log.Debug("public void FailoverComplete(): called"); Console.WriteLine("public void FailoverComplete(): called"); lock (failoverComplete) { Monitor.Pulse(failoverComplete); } } } }