using System;
using System.Collections;
using System.Text;
using System.Threading;
using Apache.Qpid.Messaging;
using Apache.Qpid.Client.Qms;
using Apache.Qpid.Client;
using log4net;
using Apache.Qpid.Integration.Tests.interop.TestCases;
namespace Apache.Qpid.Integration.Tests.interop
{
///
/// Implements a test client as described in the interop testing spec
/// (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that
/// reacts to control message sequences send by the test coordinator.
///
/// Messages Handled by TestClient
/// Message | Action
/// |
---|
Invite(compulsory) | Reply with Enlist.
/// |
Invite(test case) | Reply with Enlist if test case available.
/// |
AssignRole(test case) | Reply with Accept Role if matches an enlisted test. Keep test parameters.
/// |
Start | Send test messages defined by test parameters. Send report on messages sent.
/// |
Status Request | Send report on messages received.
/// |
///
/// CRC Card
/// Responsibilities | Collaborations
/// |
---|
Handle all incoming control messages. | {@link InteropClientTestCase}
/// |
Configure and look up test cases by name. | {@link InteropClientTestCase}
/// |
///
public class TestClient
{
private static ILog log = LogManager.GetLogger(typeof(TestClient));
/// Defines the default broker for the tests, localhost, default port.
public static string DEFAULT_BROKER_URL = "amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'";
/// Defines the default virtual host to use for the tests, none.
public static string DEFAULT_VIRTUAL_HOST = "";
/// Defines the default identifying name of this test client.
public static string DEFAULT_CLIENT_NAME = "dotnet";
/// Holds the URL of the broker to run the tests on.
public static string brokerUrl;
/// Holds the virtual host to run the tests on. If null, then the default virtual host is used.
public static string virtualHost;
/// The clients identifying name to print in test results and to distinguish from other clients.
private string clientName;
/// Holds all the test cases.
private IDictionary testCases = new Hashtable();
InteropClientTestCase currentTestCase;
private MessagePublisherBuilder publisherBuilder;
private IChannel channel;
/// Monitor to wait for termination events on.
private static object terminationMonitor = new Object();
///
/// Creates a new interop test client, listenting to the specified broker and virtual host, with the specified
/// client identifying name.
///
///
/// The url of the broker to connect to.
/// The virtual host to conect to.
/// The client name to use.
public TestClient(string brokerUrl, string virtualHost, string clientName)
{
log.Info("public TestClient(string brokerUrl = " + brokerUrl + ", string virtualHost = " + virtualHost
+ ", string clientName = " + clientName + "): called");
// Retain the connection parameters.
TestClient.brokerUrl = brokerUrl;
TestClient.virtualHost = virtualHost;
this.clientName = clientName;
}
///
/// The entry point for the interop test coordinator. This client accepts the following command line arguments:
///
///
///
/// -b | The broker URL. | Optional.
/// |
-h | The virtual host. | Optional.
/// |
-n | The test client name. | Optional.
/// |
name=value | Trailing argument define name/value pairs. Added to system properties. | Optional.
/// |
///
/// The command line arguments.
public static void Main(string[] args)
{
// Extract the command line options (Not exactly Posix but it will do for now...).
string brokerUrl = DEFAULT_BROKER_URL;
string virtualHost = DEFAULT_VIRTUAL_HOST;
string clientName = DEFAULT_CLIENT_NAME;
foreach (string nextArg in args)
{
if (nextArg.StartsWith("-b"))
{
brokerUrl = nextArg.Substring(2);
}
else if (nextArg.StartsWith("-h"))
{
virtualHost = nextArg.Substring(2);
}
else if (nextArg.StartsWith("-n"))
{
clientName = nextArg.Substring(2);
}
}
NDC.Push(clientName);
// Create a test client and start it running.
TestClient client = new TestClient(brokerUrl, virtualHost, clientName);
try
{
client.Start();
}
catch (Exception e)
{
log.Error("The test client was unable to start.", e);
System.Environment.Exit(1);
}
// Wait for a signal on the termination monitor before quitting.
lock (terminationMonitor)
{
Monitor.Wait(terminationMonitor);
}
NDC.Pop();
}
///
/// Starts the interop test client running. This causes it to start listening for incoming test invites.
///
private void Start()
{
log.Info("private void Start(): called");
// Use a class path scanner to find all the interop test case implementations.
ArrayList testCaseClasses = new ArrayList();
// ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
// Hard code the test classes till the classpath scanner is fixed.
testCaseClasses.Add(typeof(TestCase1DummyRun));
testCaseClasses.Add(typeof(TestCase2BasicP2P));
testCaseClasses.Add(typeof(TestCase3BasicPubSub));
// Create all the test case implementations and index them by the test names.
foreach (Type testClass in testCaseClasses)
{
InteropClientTestCase testCase = (InteropClientTestCase)Activator.CreateInstance(testClass);
testCases.Add(testCase.GetName(), testCase);
log.Info("Found test case: " + testClass);
}
// Open a connection to communicate with the coordinator on.
log.Info("brokerUrl = " + brokerUrl);
IConnection connection = CreateConnection(brokerUrl, virtualHost);
channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge);
// Set this up to listen for control messages.
string responseQueueName = channel.GenerateUniqueName();
channel.DeclareQueue(responseQueueName, false, true, true);
channel.Bind(responseQueueName, ExchangeNameDefaults.TOPIC, "iop.control." + clientName);
channel.Bind(responseQueueName, ExchangeNameDefaults.TOPIC, "iop.control");
IMessageConsumer consumer = channel.CreateConsumerBuilder(responseQueueName)
.Create();
consumer.OnMessage += new MessageReceivedDelegate(OnMessage);
// Create a publisher to send replies with.
publisherBuilder = channel.CreatePublisherBuilder()
.WithExchangeName(ExchangeNameDefaults.DIRECT);
// Start listening for incoming control messages.
connection.Start();
Console.WriteLine("Test client " + clientName + " ready to receive test control messages...");
}
///
/// Establishes an AMQ connection. This is a simple convenience method for code that does not anticipate handling connection failures.
/// All exceptions that indicate that the connection has failed, are allowed to fall through.
///
///
/// The broker url to connect to, null to use the default from the properties.
/// The virtual host to connectio to, null to use the default.
///
/// A JMS conneciton.
public static IConnection CreateConnection(string brokerUrl, string virtualHost)
{
log.Info("public static Connection createConnection(string brokerUrl = " + brokerUrl + ", string virtualHost = "
+ virtualHost + "): called");
// Create a connection to the broker.
IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(brokerUrl);
connectionInfo.VirtualHost = virtualHost;
IConnection connection = new AMQConnection(connectionInfo);
return connection;
}
///
/// Handles all incoming control messages.
///
///
/// The incoming message.
public void OnMessage(IMessage message)
{
log.Info("public void OnMessage(IMessage message = " + message + "): called");
try
{
string controlType = message.Headers.GetString("CONTROL_TYPE");
string testName = message.Headers.GetString("TEST_NAME");
// Check if the message is a test invite.
if ("INVITE" == controlType)
{
string testCaseName = message.Headers.GetString("TEST_NAME");
// Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites
// for which test cases exist.
bool enlist = false;
if (testCaseName != null)
{
log.Info("Got an invite to test: " + testCaseName);
// Check if the requested test case is available.
InteropClientTestCase testCase = (InteropClientTestCase)testCases[testCaseName];
if (testCase != null)
{
// Make the requested test case the current test case.
currentTestCase = testCase;
enlist = true;
}
}
else
{
log.Info("Got a compulsory invite.");
enlist = true;
}
log.Info("enlist = " + enlist);
if (enlist)
{
// Reply with the client name in an Enlist message.
IMessage enlistMessage = channel.CreateMessage();
enlistMessage.Headers.SetString("CONTROL_TYPE", "ENLIST");
enlistMessage.Headers.SetString("CLIENT_NAME", clientName);
enlistMessage.Headers.SetString("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
enlistMessage.CorrelationId = message.CorrelationId;
Send(enlistMessage, message.ReplyToRoutingKey);
}
}
else if ("ASSIGN_ROLE" == controlType)
{
// Assign the role to the current test case.
string roleName = message.Headers.GetString("ROLE");
log.Info("Got a role assignment to role: " + roleName);
Roles role;
if (roleName == "SENDER")
{
role = Roles.SENDER;
}
else
{
role = Roles.RECEIVER;
}
currentTestCase.AssignRole(role, message);
// Reply by accepting the role in an Accept Role message.
IMessage acceptRoleMessage = channel.CreateMessage();
acceptRoleMessage.Headers.SetString("CONTROL_TYPE", "ACCEPT_ROLE");
acceptRoleMessage.CorrelationId = message.CorrelationId;
Send(acceptRoleMessage, message.ReplyToRoutingKey);
}
else if ("START" == controlType || "STATUS_REQUEST" == controlType)
{
if ("START" == controlType)
{
log.Info("Got a start notification.");
// Start the current test case.
currentTestCase.Start();
}
else
{
log.Info("Got a status request.");
}
// Generate the report from the test case and reply with it as a Report message.
IMessage reportMessage = currentTestCase.GetReport(channel);
reportMessage.Headers.SetString("CONTROL_TYPE", "REPORT");
reportMessage.CorrelationId = message.CorrelationId;
Send(reportMessage, message.ReplyToRoutingKey);
}
else if ("TERMINATE" == controlType)
{
Console.WriteLine("Received termination instruction from coordinator.");
// Is a cleaner shutdown needed?
System.Environment.Exit(1);
}
else
{
// Log a warning about this but otherwise ignore it.
log.Warn("Got an unknown control message, controlType = " + controlType + ", message = " + message);
}
}
catch (QpidException e)
{
// Log a warning about this, but otherwise ignore it.
log.Warn("A QpidException occurred whilst handling a message.");
log.Info("Got QpidException whilst handling message: " + message, e);
}
}
///
/// Send the specified message using the specified routing key on the direct exchange.
///
///
/// The message to send.
/// The routing key to send the message with.
public void Send(IMessage message, string routingKey)
{
IMessagePublisher publisher = publisherBuilder.WithRoutingKey(routingKey).Create();
publisher.Send(message);
}
}
}