/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Runtime.InteropServices; using System.Threading; using log4net; using NUnit.Framework; using Apache.Qpid.Client.Qms; using Apache.Qpid.Messaging; namespace Apache.Qpid.Client.Tests.failover { [TestFixture, Category("Failover")] public class FailoverTest : IConnectionListener { private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest)); /// Specifies the number of times to run the test cycle. const int NUM_MESSAGES = 10; /// Determines how many messages to send within each commit. const int COMMIT_BATCH_SIZE = 1; /// Specifies the duration of the pause to place between each message sent in the test. //const int SLEEP_MILLIS = 1; /// Specified the maximum time in milliseconds to wait for the test to complete. const int TIMEOUT = 10000; /// Defines the number of test messages to send, before prompting the user to fail a broker. const int FAIL_POINT = 5; /// Specified the ack mode to use for the test. AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge; /// Determines whether this test runs transactionally or not. bool transacted = false; /// Holds the connection to run the test over. AMQConnection _connection; /// Holds the channel for the test message publisher. IChannel publishingChannel; /// Holds the test message publisher. IMessagePublisher publisher; /// Used to keep count of the number of messages sent. int messagesSent; /// Used to keep count of the number of messages received. int messagesReceived; /// Used to wait for test completion on. private static object testComplete = new Object(); /// /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection. /// /// [SetUp] public void Init(IConnectionInfo connectionInfo) { // Reset all counts. messagesSent = 0; messagesReceived = 0; // Create a connection for the test. _connection = new AMQConnection(connectionInfo); _connection.ConnectionListener = this; // Create a consumer to receive the test messages. IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode); string queueName = receivingChannel.GenerateUniqueName(); receivingChannel.DeclareQueue(queueName, false, true, true); receivingChannel.Bind(queueName, "amq.direct", queueName); IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName) .WithPrefetchLow(30) .WithPrefetchHigh(60).Create(); consumer.OnMessage = new MessageReceivedDelegate(OnMessage); _connection.Start(); // Create a publisher to send the test messages. publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge); publisher = publishingChannel.CreatePublisherBuilder() .WithRoutingKey(queueName) .Create(); _log.Debug("connection = " + _connection); _log.Debug("connectionInfo = " + connectionInfo); _log.Debug("connection.AsUrl = " + _connection.toURL()); _log.Debug("AcknowledgeMode is " + _acknowledgeMode); } /// /// Clean up the test connection. /// [TearDown] public virtual void Shutdown() { Thread.Sleep(2000); _connection.Close(); } /// /// Runs a failover test, building up the connection information from its component parts. In particular the brokers /// to fail between are seperately added into the connection info. /// /*[Test] public void TestWithBasicInfo() { _log.Debug("public void TestWithBasicInfo(): called"); // Manually create the connection parameters. QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false)); Init(connectionInfo); DoFailoverTest(); }*/ /// /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format. /// [Test] public void TestWithUrl() { _log.Debug("public void runTestWithUrl(): called"); // Parse the connection parameters from a URL. String clientId = "failover" + DateTime.Now.Ticks; string defaultUrl = "amqp://guest:guest@" + clientId + "/test" + "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'"; IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl); Init(connectionInfo); DoFailoverTest(); } /// /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent /// are received within the test time limit. /// /// /// The connection parameters, specifying the brokers to fail between. void DoFailoverTest() { _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called"); for (int i = 1; i <= NUM_MESSAGES; ++i) { ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent); //_log.Debug("sending message = " + msg.Text); publisher.Send(msg); messagesSent++; _log.Debug("messagesSent = " + messagesSent); if (transacted) { publishingChannel.Commit(); } // Prompt the user to cause a failure if at the fail point. if (i == FAIL_POINT) { PromptAndWait("Cause a broker failure now, then press return..."); } //Thread.Sleep(SLEEP_MILLIS); } // Wait for all of the test messages to be received, checking that this occurs within the test time limit. bool withinTimeout; lock(testComplete) { withinTimeout = Monitor.Wait(testComplete, TIMEOUT); } if (!withinTimeout) { Assert.Fail("Test timed out, before all messages received."); } _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting"); } /// /// Receives all of the test messages. /// /// /// The newly arrived test message. public void OnMessage(IMessage message) { try { if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) { message.Acknowledge(); } messagesReceived++; _log.Debug("messagesReceived = " + messagesReceived); // Check if all of the messages in the test have been received, in which case notify the message producer that the test has // succesfully completed. if (messagesReceived == NUM_MESSAGES) { lock (testComplete) { Monitor.Pulse(testComplete); } } } catch (QpidException e) { _log.Fatal("Exception received. About to stop.", e); Stop(); } } /// Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message. /// /// The message to prompt the user with. private void PromptAndWait(string message) { Console.WriteLine("\n" + message); Console.ReadLine(); } // Closes the test connection. private void Stop() { _log.Debug("Stopping..."); try { _connection.Close(); } catch (QpidException e) { _log.Debug("Failed to shutdown: ", e); } } /// /// Called when bytes have been transmitted to the server /// /// /// count the number of bytes sent in total since the connection was opened public void BytesSent(long count) {} /// /// Called when some bytes have been received on a connection /// /// /// count the number of bytes received in total since the connection was opened public void BytesReceived(long count) {} /// /// Called after the infrastructure has detected that failover is required but before attempting failover. /// /// /// redirect true if the broker requested redirect. false if failover is occurring due to a connection error. /// /// true to continue failing over, false to veto failover and raise a connection exception public bool PreFailover(bool redirect) { _log.Debug("public bool PreFailover(bool redirect): called"); return true; } /// /// Called after connection has been made to another broker after failover has been started but before /// any resubscription has been done. /// /// /// true to continue with resubscription, false to prevent automatic resubscription. This is useful in /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers /// and consumers are invalidated. /// public bool PreResubscribe() { _log.Debug("public bool PreResubscribe(): called"); return true; } /// /// Called once failover has completed successfully. This is called irrespective of whether the client has /// vetoed automatic resubscription. /// public void FailoverComplete() { _log.Debug("public void FailoverComplete(): called"); } } }