/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Text; using log4net; using Apache.Qpid.Messaging; namespace Apache.Qpid.Integration.Tests.interop.TestCases { /// /// Implements test case 4, from the interop test specification. This test sets up the TC2_P2PMessageSize test for 50 /// messages, and a variety of message sizes. It checks that the sender and receivers reports both indicate that all /// the test messages were transmitted successfully. /// ///

///
CRC Card
Responsibilities Collaborations ///
Setup p2p test parameters and compare with test output. {@link FrameworkBaseCase} ///
/// public class TestCase5PubSubMessageSize : InteropClientTestCase { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(TestCase5PubSubMessageSize)); ///

The role to be played by the test. private Roles role; /// Holds the count of test messages received. private int messageCount; ///The size of the message to be sent private int messageSize; /// The number of test messages to send. private int numMessages; /// The number of receiver connection to use. private int numReceivers; /// The routing key to send them to on the default direct exchange. private string sendDestination; /// The connections to send/receive the test messages on. private IConnection[] connection; /// The sessions to send/receive the test messages on. private IChannel[] channel; /// The producer to send the test messages with. IMessagePublisher publisher; /// /// Creates a new coordinating test case with the specified name. /// /// The test case name. /// public String GetName() { log.Info("public String GetName(): called"); return "TC5_PubSubMessageSize"; } /// /// Determines whether the test invite that matched this test case is acceptable. /// /// /// The invitation to accept or reject. /// /// true to accept the invitation, false to reject it. public bool AcceptInvite(IMessage inviteMessage) { log.Info("public boolean AcceptInvite(IMessage inviteMessage = " + inviteMessage + "): called"); // All invites are acceptable. return true; } public void Start() { log.Info("public void start(): called"); // Assuming numMessages = 1 Start(1); } public void Start(int numMessages) { log.Info("public void start("+numMessages+"): called"); // Check that the sender role is being performed. if (role == Roles.SENDER) { IMessage testMessage = createMessageOfSize(messageSize); for (int i = 0; i < numMessages; i++) { publisher.Send(testMessage); // Increment the message count. messageCount++; } } } private IMessage createMessageOfSize(int size) { IBytesMessage message = channel[0].CreateBytesMessage(); string messageStr = "Test Message -- Test Message -- Test Message -- Test Message -- Test Message -- Test Message -- Test Message -- "; System.Text.ASCIIEncoding encoding = new System.Text.ASCIIEncoding(); byte[] messageBytes = encoding.GetBytes(messageStr); if (size > 0) { int div = size / messageBytes.Length; int mod = size % messageBytes.Length; for (int i = 0; i < div; i++) { message.WriteBytes(messageBytes); } if (mod != 0) { message.WriteBytes(messageBytes, 0, mod); } } return message; } public void AssignRole(Roles role, IMessage assignRoleMessage) { log.Info("public void assignRole(Roles role = " + role + ", IMessage assignRoleMessage = " + assignRoleMessage + "): called"); // Reset the message count for a new test. messageCount = 0; // Take note of the role to be played. this.role = role; // Extract and retain the test parameters. numMessages = assignRoleMessage.Headers.GetInt("PUBSUB_NUM_MESSAGES"); messageSize = assignRoleMessage.Headers.GetInt("messageSize"); numReceivers = assignRoleMessage.Headers.GetInt("PUBSUB_NUM_RECEIVERS"); string sendKey = assignRoleMessage.Headers.GetString("PUBSUB_KEY"); sendDestination = sendKey; log.Info("numMessages = " + numMessages); log.Info("messageSize = " + messageSize); log.Info("sendKey = " + sendKey); log.Info("role = " + role); switch (role) { // Check if the sender role is being assigned, and set up a single message producer if so. case Roles.SENDER: // Create a new connection to pass the test messages on. connection = new IConnection[1]; channel = new IChannel[1]; connection[0] = TestClient.CreateConnection(TestClient.brokerUrl, TestClient.virtualHost); channel[0] = connection[0].CreateChannel(false, AcknowledgeMode.AutoAcknowledge); // Extract and retain the test parameters. publisher = channel[0].CreatePublisherBuilder() .WithExchangeName(ExchangeNameDefaults.TOPIC) .WithRoutingKey(sendDestination) .WithMandatory(false) .WithImmediate(false) .Create(); break; // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number // of receiver connections. case Roles.RECEIVER: // Create the required number of receiver connections. connection = new IConnection[numReceivers]; channel = new IChannel[numReceivers]; for (int i = 0; i < numReceivers; i++) { connection[i] = TestClient.CreateConnection(TestClient.brokerUrl, TestClient.virtualHost); channel[i] = connection[i].CreateChannel(false, AcknowledgeMode.AutoAcknowledge); IMessageConsumer consumer = channel[i].CreateConsumerBuilder(sendDestination).Create(); consumer.OnMessage += new MessageReceivedDelegate(OnMessage); } break; } // Start all the connection dispatcher threads running. foreach (IConnection con in connection) { con.Start(); } } public IMessage GetReport(IChannel channel) { log.Info("public Message GetReport(IChannel channel): called"); // Close the test connection. //connection.Stop(); // Generate a report message containing the count of the number of messages passed. IMessage report = channel.CreateMessage(); //report.Headers.SetString("CONTROL_TYPE", "REPORT"); report.Headers.SetInt("MESSAGE_COUNT", messageCount); return report; } /// /// Counts incoming test messages. /// /// /// The incoming test message. public void OnMessage(IMessage message) { log.Info("public void onMessage(IMessage message = " + message + "): called"); // Increment the message count. messageCount++; } } }