/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ namespace Apache.Qpid.Integration.Tests.framework { /// /// Assertion models an assertion on a test . /// ///

///
CRC Card
Responsibilities ///
Indicate whether or not the assertion passes when applied. ///
///

public interface Assertion { /// /// Applies the assertion. /// /// true if the assertion passes, false if it fails. public bool apply(); } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System.Collections.Generic.LinkedList; using System.Collections.Generic.IList; namespace Apache.Qpid.Integration.Tests.framework { /// /// AssertionBase is a base class for implenmenting assertions. It provides a mechanism to store error messages, and /// report all error messages when its method is called. /// ///

///
CRC Card
Responsibilities Collaborations ///
Collect error messages. ///
///

public abstract class AssertionBase : Assertion { /// Holds the error messages. IList errors = new LinkedList(); /// /// Adds an error message to the assertion. /// /// An error message to add to the assertion. public void addError(string error) { errors.add(error); } /// /// Prints all of the error messages in the assertion into a string. /// /// All of the error messages in the assertion as a string. public string ToString() { string result = ""; for (string error : errors) { result += error; } return result; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ namespace Apache.Qpid.Integration.Tests.framework { /// /// BrokerLifecycleAware is an awareness interface implemented by test cases that can run control the life-cycle of /// the brokers on which they run. Its purpose is to expose additional instrumentation of brokers during testing, that /// enables tests to use an automated failure mechanism to simulate broker failures, and to re-start failed brokers. /// ///

///
CRC Card
Responsibilities Collaborations ///
Indicate whether or not a test case is using an in-vm broker. ///
Track which in-vm broker is currently in use. ///
Accept setting of a failure mechanism. . ///
///

/// /// Need to think about how to present the brokers through this interface. Thinking numbering the available /// brokers from 1 will do. Then can kill 1 and assume failing onto 2. Restart 1 and kill 2 and fail back onto /// 1 again? public interface BrokerLifecycleAware { public void setInVmBrokers(); /// /// Indicates whether or not a test case is using in-vm brokers. /// /// true if the test is using in-vm brokers, false otherwise. public bool usingInVmBroker(); /// /// Sets the currently live in-vm broker. /// /// The currently live in-vm broker. public void setLiveBroker(int i); /// /// Reports the currently live in-vm broker. /// /// The currently live in-vm broker. public int getLiveBroker(); /// /// Accepts a failure mechanism. /// /// The failure mechanism. public void setFailureMechanism(CauseFailure failureMechanism); } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ namespace Apache.Qpid.Integration.Tests.framework { /// /// CauseFailure provides a method to cause a failure in a messaging broker, usually used in conjunction with fail-over /// or other failure mode testing. In some cases failures may be automated, for example by shutting down an in-vm broker, /// or by sending a special control signal to a broker over a network connection. In other cases, it may be preferable /// to ask a user interactively to cause a failure scenario, in which case an implementation may display a prompt or /// dialog box asking for notification once the failure has been caused. The purpose of this interface is to abstract /// the exact cause and nature of a failure out of failure test cases. /// ///

///
CRC Card
Responsibilities ///
Cause messaging broker failure. ///
///

public interface CauseFailure { /// Causes the active message broker to fail. void causeFailure(); } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using Apache.Qpid.Integration.Tests.framework.CauseFailure; using java.io.IOException; namespace Apache.Qpid.Integration.Tests.framework { /// /// Causes a message broker failure by interactively prompting the user to cause it. /// ///

///
CRC Card
Responsibilities Collaborations ///
Cause messaging broker failure. ///
///

public class CauseFailureUserPrompt : CauseFailure { /// Causes the active message broker to fail. public void causeFailure() { waitForUser("Cause a broker failure now, then press Return."); } /// /// Outputs a prompt to the console and waits for the user to press return. /// /// The prompt to display on the console. private void waitForUser(string prompt) { System.out.println(prompt); try { System.in.read(); } catch (IOException e) { // Ignored. } System.out.println("Continuing."); } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System.Collections.Generic.IList; namespace Apache.Qpid.Integration.Tests.framework { /// /// A Circuit is the basic test unit against which test cases are to be written. A circuit consists of two 'ends', an /// instigating 'publisher' end and a more passive 'receivers' end. /// ///

Once created, the life-cycle of a circuit may be controlled by ing it, or ing it. /// Once started, the circuit is ready to send messages over. Once closed the circuit can no longer be used. /// ///

The state of the circuit may be taken with the method, and asserted against by the /// method. /// ///

There is a default test procedure which may be performed against the circuit. The outline of this procedure is: /// ///

    /// Start the circuit.
    /// Send test messages.
    /// Request a status report.
    /// Assert conditions on the publishing end of the circuit.
    /// Assert conditions on the receiving end of the circuit.
    /// Close the circuit.
    /// Pass with no failed assertions or fail with a list of failed assertions.
    /// 
/// ///

///
CRC Card
Responsibilities ///
Supply the publishing and receiving ends of a test messaging circuit. ///
Start the circuit running. ///
Close the circuit down. ///
Take a reading of the circuits state. ///
Apply assertions against the circuits state. ///
Send test messages over the circuit. ///
Perform the default test procedue on the circuit. ///
///

public interface Circuit { /// /// Gets the interface on the publishing end of the circuit. /// /// The publishing end of the circuit. public Publisher getPublisher(); /// /// Gets the interface on the receiving end of the circuit. /// /// The receiving end of the circuit. public Receiver getReceiver(); /// /// Connects and starts the circuit. After this method is called the circuit is ready to send messages. public void start(); /// /// Checks the test circuit. The effect of this is to gather the circuits state, for both ends of the circuit, /// into a report, against which assertions may be checked. public void check(); /// /// Closes the circuit. All associated resources are closed. public void close(); /// /// Applied a list of assertions against the test circuit. The method should be called before doing /// this, to ensure that the circuit has gathered its state into a report to assert against. /// /// The list of assertions to apply to the circuit. /// /// Any assertions that failed. public IList applyAssertions(List assertions); /// /// Runs the default test procedure against the circuit, and checks that all of the specified assertions hold. /// /// The number of messages to send using the default test procedure. /// The list of assertions to apply. /// /// Any assertions that failed. public IList test(int numMessages, List assertions); } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using javax.jms.*; namespace Apache.Qpid.Integration.Tests.framework { /// /// A CircuitEnd is a pair consisting of one message producer and one message consumer, that represents one end of a /// test circuit. It is a standard unit of connectivity allowing a full-duplex conversation to be held, provided both /// the consumer and producer are instantiated and configured. /// ///

///
CRC Card
Responsibilities ///
Provide a message producer for sending messages. ///
Provide a message consumer for receiving messages. ///
///

/// /// Update the so that it accepts these as the basic conversation /// connection units. public interface CircuitEnd { /// /// Gets the message producer at this circuit end point. /// /// The message producer at with this circuit end point. public MessageProducer getProducer(); /// /// Gets the message consumer at this circuit end point. /// /// The message consumer at this circuit end point. public MessageConsumer getConsumer(); /// /// Send the specified message over the producer at this end point. /// /// The message to send. /// /// Any JMS exception occuring during the send is allowed to fall through. public void send(Message message) throws JMSException; /// /// Gets the JMS Session associated with this circuit end point. /// /// The JMS Session associated with this circuit end point. public Session getSession(); /// /// Closes the message producers and consumers and the sessions, associated with this circuit end point. /// /// Any JMSExceptions occurring during the close are allowed to fall through. public void close() throws JMSException; /// /// Returns the message monitor for reporting on received messages on this circuit end. /// /// The message monitor for this circuit end. public MessageMonitor getMessageMonitor(); /// /// Returns the exception monitor for reporting on exceptions received on this circuit end. /// /// The exception monitor for this circuit end. public ExceptionMonitor getExceptionMonitor(); } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using javax.jms.*; namespace Apache.Qpid.Integration.Tests.framework { /// /// A CircuitEndBase is a pair consisting of one message producer and one message consumer, that represents one end of a /// test circuit. It is a standard unit of connectivity allowing a full-duplex conversation to be held, provided both /// the consumer and producer are instantiated and configured. /// ///

///
CRC Card
Responsibilities ///
Provide a message producer for sending messages. ///
Provide a message consumer for receiving messages. ///
///

public class CircuitEndBase : CircuitEnd { /// Holds the single message producer. MessageProducer producer; /// Holds the single message consumer. MessageConsumer consumer; /// Holds the controlSession for the circuit end. Session session; /// Holds the message monitor for the circuit end. MessageMonitor messageMonitor; /// Holds the exception monitor for the circuit end. ExceptionMonitor exceptionMonitor; /// /// Creates a circuit end point on the specified producer, consumer and controlSession. Monitors are also configured /// for messages and exceptions received by the circuit end. /// /// The message producer for the circuit end point. /// The message consumer for the circuit end point. /// The controlSession for the circuit end point. /// The monitor to notify of all messages received by the circuit end. /// The monitor to notify of all exceptions received by the circuit end. public CircuitEndBase(MessageProducer producer, MessageConsumer consumer, Session session, MessageMonitor messageMonitor, ExceptionMonitor exceptionMonitor) { this.producer = producer; this.consumer = consumer; this.session = session; this.messageMonitor = messageMonitor; this.exceptionMonitor = exceptionMonitor; } /// /// Gets the message producer at this circuit end point. /// /// The message producer at with this circuit end point. public MessageProducer getProducer() { return producer; } /// /// Gets the message consumer at this circuit end point. /// /// The message consumer at this circuit end point. public MessageConsumer getConsumer() { return consumer; } /// /// Send the specified message over the producer at this end point. /// /// The message to send. /// Any JMS exception occuring during the send is allowed to fall through. public void send(Message message) throws JMSException { producer.send(message); } /// /// Gets the JMS Session associated with this circuit end point. /// /// The JMS Session associated with this circuit end point. public Session getSession() { return session; } /// /// Closes the message producers and consumers and the sessions, associated with this circuit end point. /// /// Any JMSExceptions occurring during the close are allowed to fall through. public void close() throws JMSException { if (producer != null) { producer.close(); } if (consumer != null) { consumer.close(); } } /// /// Returns the message monitor for reporting on received messages on this circuit end. /// /// The message monitor for this circuit end. public MessageMonitor getMessageMonitor() { return messageMonitor; } /// /// Returns the exception monitor for reporting on exceptions received on this circuit end. /// /// The exception monitor for this circuit end. public ExceptionMonitor getExceptionMonitor() { return exceptionMonitor; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ namespace Apache.Qpid.Integration.Tests.framework.clocksynch { /// /// ClockSynchFailureException represents failure of a to achieve synchronization. For example, /// this could be because a reference signal is not available, or because a desired accurracy cannot be attained. /// ///

///
CRC Card
Responsibilities Collaborations ///
Represent failure to achieve synchronization. ///
///

public class ClockSynchFailureException extends Exception { /// /// Creates a clock synch failure exception. /// /// The detail message (which is saved for later retrieval by the method). /// The cause (which is saved for later retrieval by the method). (A null /// value is permitted, and indicates that the cause is nonexistent or unknown.) public ClockSynchFailureException(string message, Throwable cause) { super(message, cause); } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ namespace Apache.Qpid.Integration.Tests.framework.clocksynch { /// /// ClockSynchronizer provides an interface through which two nodes may synchronize their clocks. It is expected that one /// node will act as the reference clock, to which no delta need be applied, and the other node will act as the slave, /// and which must apply a delta to its local clock to get a clock synchronized with the reference. /// ///

The slave side will initiate the computation of a clock delta by calling the method. This method /// will not return until the delta has been computed, at which point there is a method to return its value, as well as /// an estimate of the likely error (usually one standard deviation), in the synchronization. For convenience there is a /// method to return the value of System.nanoTime() with the delta added in. /// ///

///
CRC Card
Responsibilities Collaborations ///
Trigger a clock synchronization. ///
Compute a clock delta to apply to the local clock. ///
Estimate the error in the synchronzation. ///
///

public interface ClockSynchronizer { /// /// The slave side should call this to copute a clock delta with the reference. /// /// If synchronization cannot be achieved. public void synch() throws ClockSynchFailureException; /// /// Gets the clock delta in nano seconds. /// /// The clock delta in nano seconds. public long getDelta(); /// /// Gets an estimate of the clock error in nan seconds. /// /// An estimate of the clock error in nan seconds. public long getEpsilon(); /// /// Gets the local clock time with any computed delta added in. /// /// The local clock time with any computed delta added in. public long nanoTime(); } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using uk.co.thebadgerset.junit.extensions.ShutdownHookable; using uk.co.thebadgerset.junit.extensions.Throttle; namespace Apache.Qpid.Integration.Tests.framework.clocksynch { /// /// ClockSynchThread is a convenient utility for running a thread that periodically synchronizes the clock against /// a reference. Supply it with a and a and it will continually keep the /// clock up-to-date at a rate determined by the throttle. /// ///

///
CRC Card
Responsibilities Collaborations ///
Continually sychronize the clock at a throttled rate. ///
///

public class ClockSynchThread extends Thread : ShutdownHookable { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(ClockSynchThread)); /// Holds the clock syncher for the synch thread. private ClockSynchronizer clockSyncher; /// Holds the throttle to limit the synch rate. private Throttle throttle; /// Flag to indicate that the periodic clock syncher should keep running. bool doSynch = true; /// /// Creates a clock synchronizer thread from a clock synchronizer and a throttle. /// /// The clock synchronizer. /// The throttle. public ClockSynchThread(ClockSynchronizer syncher, Throttle throttle) { this.clockSyncher = syncher; this.throttle = throttle; } /// Terminates the synchronization thread. public void terminate() { doSynch = false; } /// Continually updates the clock, until is called. public void run() { while (doSynch) { // Perform a clock clockSynch. try { // Wait controlled by the throttle before doing the next synch. throttle.throttle(); clockSyncher.synch(); log.debug("Clock synched, delta = " + clockSyncher.getDelta() + ", epsilon = " + clockSyncher.getEpsilon() + "."); } // Terminate the synch thread if the synchronization cannot be achieved. catch (ClockSynchFailureException e) { log.debug("Cannot synchronize the clock (reference service may be down). Terminating the synch thread."); doSynch = false; } } } /// /// Gets the clock synchronizer that is kept continually up to date. /// /// The clock synchronizer that is kept continually up to date. public ClockSynchronizer getClockSyncher() { return clockSyncher; } /// /// Supplies a shutdown hook, that terminates the synching thread. /// /// The shut down hook. public Thread getShutdownHook() { return new Thread(new Runnable() { public void run() { doSynch = false; } }); } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ namespace Apache.Qpid.Integration.Tests.framework.clocksynch { /// /// LocalClockSynchronizer is a fake that simply calls System.nanoTime(). It exists so that /// the same tests can be run distributed or locally, taking timings against the ClockSynchronizer interface without /// being aware of how they are being run. /// ///

///
CRC Card
Responsibilities Collaborations ///
Supply the local clock with no delta. ///
///

public class LocalClockSynchronizer : ClockSynchronizer { /// /// The slave side should call this to copute a clock delta with the reference. /// /// If synchronization cannot be achieved. public void synch() throws ClockSynchFailureException { } /// /// Gets the clock delta in nano seconds. /// /// The clock delta in nano seconds. public long getDelta() { return 0L; } /// /// Gets an estimate of the clock error in nan seconds. /// /// An estimate of the clock error in nan seconds. public long getEpsilon() { return 0L; } /// /// Gets the local clock time with any computed delta added in. /// /// The local clock time with any computed delta added in. public long nanoTime() { return System.nanoTime(); } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using uk.co.thebadgerset.junit.extensions.ShutdownHookable; using java.io.IOException; using java.net.*; using java.nio.ByteBuffer; namespace Apache.Qpid.Integration.Tests.framework.clocksynch { /// /// UDPClockReference supplies a refernce clock signal (generated from System.nanoTime()). /// ///

///
CRC Card
Responsibilities Collaborations ///
Supply a reference clock signal. ///
///

/// /// Port hard coded. Make configurable. /// /// Errors rethrown as runtimes, or silently terminate the service. Could add better error handling if needed. public class UDPClockReference : Runnable, ShutdownHookable { /// Used for debugging. // private static ILog log = LogManager.GetLogger(typeof(UDPClockReference)); /// Defines the timeout to use when polling the socket for time requests. private static final int TIMEOUT = 200; /// Defines the port to run the clock reference on. public static final int REFERENCE_PORT = 4444; /// Holds the socket to receive clock reference requests on. protected DatagramSocket socket = null; /// Flag used to indicate that the time server should keep running. Set to false to terminate. protected bool publish = true; /// Creates a clock reference service on the standard port. public UDPClockReference() { try { socket = new DatagramSocket(REFERENCE_PORT); socket.setSoTimeout(TIMEOUT); } catch (SocketException e) { throw new RuntimeException(e); } } /// /// Implements the run loop for this reference time server. This waits for incoming time requests, and replies to /// any, with a message with the local time stamp in it. Periodically (controlled by ), the run /// loop will check if the flag has been cleared, and terminate the reference time service if so. /// public void run() { byte[] buf = new byte[256]; ByteBuffer bbuf = ByteBuffer.wrap(buf); while (publish) { try { // Wait for a reference time request. DatagramPacket packet = new DatagramPacket(buf, buf.length); bool timedOut = false; try { socket.receive(packet); } catch (SocketTimeoutException e) { timedOut = true; } if (!timedOut) { // Work out from the received packet, where to reply to. InetAddress address = packet.getAddress(); int port = packet.getPort(); // Respond to the time request by sending back the local clock as the reference time. bbuf.putLong(System.nanoTime()); bbuf.flip(); packet = new DatagramPacket(bbuf.array(), bbuf.capacity(), address, port); socket.send(packet); } } catch (IOException e) { publish = false; } } socket.close(); } /// /// Supplies a shutdown hook. /// /// The shut down hook. public Thread getShutdownHook() { return new Thread(new Runnable() { public void run() { publish = false; } }); } /// /// For testing purposes. Runs a reference clock on the default port. /// /// None. public static void main(String[] args) { try { // Create the clock reference service. UDPClockReference clock = new UDPClockReference(); // Set up a shutdown hook for it. Runtime.getRuntime().addShutdownHook(clock.getShutdownHook()); // Start the service. clock.run(); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using uk.co.thebadgerset.junit.extensions.util.CommandLineParser; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using java.io.IOException; using java.net.*; using java.nio.ByteBuffer; using java.util.Arrays; namespace Apache.Qpid.Integration.Tests.framework.clocksynch { /// /// UDPClockSynchronizer is a that sends pings as UDP datagrams, and uses the following simple /// algorithm to perform clock synchronization: /// ///
    ///
  1. Slave initiates synchronization with a Reference clock.
  2. ///
  3. Slave stamps current local time on a "time request" message and sends to the Reference.
  4. ///
  5. Upon receipt by Reference, Reference stamps Reference-time and returns.
  6. ///
  7. Upon receipt by Slave, Slave subtracts current time from sent time and divides by two to compute latency. It /// subtracts current time from Reference time to determine Slave-Reference time delta and adds in the /// half-latency to get the correct clock delta.
  8. ///
  9. The first result is immediately used to update the clock since it will get the local clock into at least /// the right ballpark.
  10. ///
  11. The Slave repeats steps 2 through 4, 15 more times.
  12. ///
  13. The results of the packet receipts are accumulated and sorted in lowest-latency to highest-latency order. The /// median latency is determined by picking the mid-point sample from this ordered list.
  14. ///
  15. All samples outside 1 standard-deviation from the median are discarded and the remaining samples /// are averaged using an arithmetic mean.
  16. ///
/// ///

The use of UDP datagrams, instead of TCP based communication eliminates the hidden delays that TCP can introduce, /// as it can transparently re-order or re-send packets, or introduce delays as packets are naggled. /// ///

///
CRC Card
Responsibilities Collaborations ///
Trigger a clock synchronziation. ///
Compute a clock delta to apply to the local clock. ///
Estimate the error in the synchronzation. ///
///

public class UDPClockSynchronizer : ClockSynchronizer { /// Used for debugging. // private static ILog log = LogManager.GetLogger(typeof(UDPClockSynchronizer)); /// Defines the timeout to use when waiting for responses to time requests. private static final int TIMEOUT = 50; /// The clock delta. private long delta = 0L; /// Holds an estimate of the clock error relative to the reference clock. private long epsilon = 0L; /// Holds the address of the reference clock. private InetAddress referenceAddress; /// Holds the socket to communicate with the reference service over. private DatagramSocket socket; /// Used to control the shutdown in the main test loop. private static bool doSynch = true; /// /// Creates a clock synchronizer against the specified address for the reference. /// /// The address of the reference service. public UDPClockSynchronizer(string address) { try { referenceAddress = InetAddress.getByName(address); } catch (UnknownHostException e) { throw new RuntimeException(e); } } /// /// The slave side should call this to compute a clock delta with the reference. /// /// If synchronization cannot be achieved, due to unavailability of the reference /// time service. public void synch() throws ClockSynchFailureException { try { socket = new DatagramSocket(); socket.setSoTimeout(TIMEOUT); // Synchronize on a single ping, to get the clock into the right ball-park. synch(1); // Synchronize on 15 pings. synch(15); // And again, for greater accuracy, on 31. synch(31); socket.close(); } catch (SocketException e) { throw new RuntimeException(e); } } /// /// Updates the synchronization delta by performing the specified number of reference clock requests. /// /// The number of reference clock request cycles to perform. /// /// If synchronization cannot be achieved, due to unavailability of the reference /// time service. protected void synch(int n) throws ClockSynchFailureException { // log.debug("protected void synch(int n = " + n + "): called"); // Create an array of deltas by performing n reference pings. long[] delta = new long[n]; for (int i = 0; i < n; i++) { delta[i] = ping(); } // Reject any deltas that are larger than 1 s.d. above the median. long median = median(delta); long sd = standardDeviation(delta); // log.debug("median = " + median); // log.debug("sd = " + sd); long[] tempDeltas = new long[n]; int count = 0; for (int i = 0; i < n; i++) { if ((delta[i] <= (median + sd)) && (delta[i] >= (median - sd))) { tempDeltas[count] = delta[i]; count++; } else { // log.debug("Rejected: " + delta[i]); } } System.arraycopy(tempDeltas, 0, delta, 0, count); // Estimate the delta as the mean of the remaining deltas. this.delta += mean(delta); // Estimate the error as the standard deviation of the remaining deltas. this.epsilon = standardDeviation(delta); // log.debug("this.delta = " + this.delta); // log.debug("this.epsilon = " + this.epsilon); } /// /// Performs a single reference clock request cycle and returns the estimated delta relative to the local clock. /// This is computed as the half-latency of the requst cycle, plus the reference clock, minus the local clock. /// /// The estimated clock delta. /// /// If the reference service is not responding. protected long ping() throws ClockSynchFailureException { // log.debug("protected long ping(): called"); try { byte[] buf = new byte[256]; bool timedOut = false; long start = 0L; long refTime = 0L; long localTime = 0L; long latency = 0L; int failCount = 0; // Keep trying the ping until it gets a response, or 10 tries in a row all time out. do { // Start timing the request latency. start = nanoTime(); // Get the reference time. DatagramPacket packet = new DatagramPacket(buf, buf.length, referenceAddress, UDPClockReference.REFERENCE_PORT); socket.send(packet); packet = new DatagramPacket(buf, buf.length); timedOut = false; try { socket.receive(packet); } catch (SocketTimeoutException e) { timedOut = true; failCount++; continue; } ByteBuffer bbuf = ByteBuffer.wrap(packet.getData()); refTime = bbuf.getLong(); // Stop timing the request latency. localTime = nanoTime(); latency = localTime - start; // log.debug("refTime = " + refTime); // log.debug("localTime = " + localTime); // log.debug("start = " + start); // log.debug("latency = " + latency); // log.debug("delta = " + ((latency / 2) + (refTime - localTime))); } while (timedOut && (failCount < 10)); // Fail completely if the fail count is too high. if (failCount >= 10) { throw new ClockSynchFailureException("Clock reference not responding.", null); } // Estimate delta as (ref clock + half-latency) - local clock. return (latency / 2) + (refTime - localTime); } catch (IOException e) { throw new RuntimeException(e); } } /// /// Gets the clock delta in nano seconds. /// /// The clock delta in nano seconds. public long getDelta() { return delta; } /// /// Gets an estimate of the clock error in nan seconds. /// /// An estimate of the clock error in nan seconds. public long getEpsilon() { return epsilon; } /// /// Gets the local clock time with any computed delta added in. /// /// The local clock time with any computed delta added in. public long nanoTime() { return System.nanoTime() + delta; } /// /// Computes the median of a series of values. /// /// The values. /// /// The median. public static long median(long[] values) { // log.debug("public static long median(long[] values = " + Arrays.ToString(values) + "): called"); long median; // Order the list of values. long[] orderedValues = new long[values.length]; System.arraycopy(values, 0, orderedValues, 0, values.length); Arrays.sort(orderedValues); // Check if the median is computed from a pair of middle value. if ((orderedValues.length % 2) == 0) { int middle = orderedValues.length / 2; median = (orderedValues[middle] + orderedValues[middle - 1]) / 2; } // The median is computed from a single middle value. else { median = orderedValues[orderedValues.length / 2]; } // log.debug("median = " + median); return median; } /// /// Computes the mean of a series of values. /// /// The values. /// /// The mean. public static long mean(long[] values) { // log.debug("public static long mean(long[] values = " + Arrays.ToString(values) + "): called"); long total = 0L; for (long value : values) { total += value; } long mean = total / values.length; // log.debug("mean = " + mean); return mean; } /// /// Computes the variance of series of values. /// /// The values. /// /// The variance of the values. public static long variance(long[] values) { // log.debug("public static long variance(long[] values = " + Arrays.ToString(values) + "): called"); long mean = mean(values); long totalVariance = 0; for (long value : values) { long diff = (value - mean); totalVariance += diff/// diff; } long variance = totalVariance / values.length; // log.debug("variance = " + variance); return variance; } /// /// Computes the standard deviation of a series of values. /// /// The values. /// /// The standard deviation. public static long standardDeviation(long[] values) { // log.debug("public static long standardDeviation(long[] values = " + Arrays.ToString(values) + "): called"); long sd = Double.valueOf(Math.sqrt(variance(values))).longValue(); // log.debug("sd = " + sd); return sd; } /// /// For testing purposes. Supply address of reference clock as arg 1. /// /// Address of reference clock as arg 1. public static void main(String[] args) { ParsedProperties options = new ParsedProperties(CommandLineParser.processCommandLine(args, new CommandLineParser( new String[][] { { "1", "Address of clock reference service.", "address", "true" } }), System.getProperties())); string address = options.getProperty("1"); // Create a clock synchronizer. UDPClockSynchronizer clockSyncher = new UDPClockSynchronizer(address); // Set up a shutdown hook for it. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { doSynch = false; } })); // Repeat the clock synching until the user kills the progam. while (doSynch) { // Perform a clock clockSynch. try { clockSyncher.synch(); // Print out the clock delta and estimate of the error. System.out.println("Delta = " + clockSyncher.getDelta()); System.out.println("Epsilon = " + clockSyncher.getEpsilon()); try { Thread.sleep(250); } catch (InterruptedException e) { // Restore the interrupted status and terminate the loop. Thread.currentThread().interrupt(); doSynch = false; } } // Terminate if the reference time service is unavailable. catch (ClockSynchFailureException e) { doSynch = false; } } } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using Apache.Qpid.Integration.Tests.framework.*; using org.apache.qpid.util.ConversationFactory; using uk.co.thebadgerset.junit.extensions.TimingController; using uk.co.thebadgerset.junit.extensions.TimingControllerAware; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using javax.jms.Destination; using javax.jms.JMSException; using javax.jms.Message; using javax.jms.Session; using System.Collections.Generic.LinkedList; using System.Collections.Generic.IList; namespace Apache.Qpid.Integration.Tests.framework.distributedcircuit { /// /// DistributedCircuitImpl is a distributed implementation of the test . Many publishers and receivers /// accross multiple machines may be combined to form a single test circuit. The test circuit extracts reports from /// all of its publishers and receivers, and applies its assertions to these reports. /// ///

///
CRC Card
Responsibilities Collaborations ///
Supply the publishing and receiving ends of a test messaging circuit. ///
Start the circuit running. ///
Close the circuit down. ///
Take a reading of the circuits state. ///
Apply assertions against the circuits state. ///
Send test messages over the circuit. ///
Perform the default test procedue on the circuit. ///
///

/// /// There is a short pause after receiving sender reports before asking for receiver reports, because receivers may /// not have finished receiving all their test messages before the report request arrives. This is going to be a /// problem for taking test timings and needs to be eliminiated. Suggested solution: have receiver send back reports /// asynchronously, on test batch size boundaries, and do so automatically rather than having to have the report /// request sent to them. Number each test run, or otherwise uniquely identify it, when a receiver does not get /// any more messages on a test run for more than a timeout, it can assume the test is complete and send a final /// report. On the coordinator end a future will need to be created to wait for all final reports to come in, and /// to register results and timings for the test. This must work in such a way that a new test cycle can be started /// without waiting for the results of the old one to come in. /// /// Add in setting of timing controller, from timing aware test cases. public class DistributedCircuitImpl : Circuit, TimingControllerAware { /// Used for debugging purposes. private static ILog log = LogManager.GetLogger(typeof(DistributedCircuitImpl)); /// Holds the conversation factory over which to coordinate the test. protected ConversationFactory conversationFactory; /// Holds the controlSession over which to hold the control conversation. protected Session controlSession; /// Holds the sender nodes in the test circuit. protected IList senders; /// Holds the receiver nodes in the test circuit. protected IList receivers; /// Holds the sender control conversations. protected ConversationFactory.Conversation[] senderConversation; /// Holds the receiver control conversations. protected ConversationFactory.Conversation[] receiverConversation; /// Holds the control topics for the senders in the test circuit. protected Destination[] senderControlTopic; /// Holds the control topics for the receivers in the test circuit. protected Destination[] receiverControlTopic; /// Holds the number of messages to send per test run. protected int numMessages; /// /// Holds the timing controller for the circuit. This is used to log test times asynchronously, when reciever nodes /// return their reports after senders have completed a test case. TimingController timingController; /// /// Creates a distributed test circuit on the specified senders and receivers. /// /// The controlSession for all control conversations. /// The senders. /// The receivers. /// A control conversation with the senders. /// A control conversation with the receivers. /// The senders control topic. /// The receivers control topic. protected DistributedCircuitImpl(Session session, IList senders, List receivers, ConversationFactory.Conversation[] senderConversation, ConversationFactory.Conversation[] receiverConversation, Destination[] senderControlTopic, Destination[] receiverControlTopic) { this.controlSession = session; this.senders = senders; this.receivers = receivers; this.senderConversation = senderConversation; this.receiverConversation = receiverConversation; this.senderControlTopic = senderControlTopic; this.receiverControlTopic = receiverControlTopic; } /// /// Creates a distributed test circuit from the specified test parameters, on the senders and receivers /// given. /// /// The test parameters. /// The sender ends in the test circuit. /// The receiver ends in the test circuit. /// A conversation factory for creating the control conversations with senders and receivers. /// /// A connected and ready to start, test circuit. public static Circuit createCircuit(ParsedProperties testProps, IList senders, IList receivers, ConversationFactory conversationFactory) { log.debug("public static Circuit createCircuit(ParsedProperties testProps, IList senders, " + " IList receivers, ConversationFactory conversationFactory)"); try { Session session = conversationFactory.getSession(); // Create control conversations with each of the senders. ConversationFactory.Conversation[] senderConversation = new ConversationFactory.Conversation[senders.size()]; Destination[] senderControlTopic = new Destination[senders.size()]; for (int i = 0; i < senders.size(); i++) { TestClientDetails sender = senders.get(i); senderControlTopic[i] = session.createTopic(sender.privateControlKey); senderConversation[i] = conversationFactory.startConversation(); } log.debug("Sender conversations created."); // Create control conversations with each of the receivers. ConversationFactory.Conversation[] receiverConversation = new ConversationFactory.Conversation[receivers.size()]; Destination[] receiverControlTopic = new Destination[receivers.size()]; for (int i = 0; i < receivers.size(); i++) { TestClientDetails receiver = receivers.get(i); receiverControlTopic[i] = session.createTopic(receiver.privateControlKey); receiverConversation[i] = conversationFactory.startConversation(); } log.debug("Receiver conversations created."); // Assign the sender role to each of the sending test clients. for (int i = 0; i < senders.size(); i++) { TestClientDetails sender = senders.get(i); Message assignSender = conversationFactory.getSession().createMessage(); TestUtils.setPropertiesOnMessage(assignSender, testProps); assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignSender.setStringProperty("ROLE", "SENDER"); senderConversation[i].send(senderControlTopic[i], assignSender); } log.debug("Sender role assignments sent."); // Assign the receivers role to each of the receiving test clients. for (int i = 0; i < receivers.size(); i++) { TestClientDetails receiver = receivers.get(i); Message assignReceiver = session.createMessage(); TestUtils.setPropertiesOnMessage(assignReceiver, testProps); assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignReceiver.setStringProperty("ROLE", "RECEIVER"); receiverConversation[i].send(receiverControlTopic[i], assignReceiver); } log.debug("Receiver role assignments sent."); // Wait for the senders and receivers to confirm their roles. for (int i = 0; i < senders.size(); i++) { senderConversation[i].receive(); } log.debug("Got all sender role confirmations"); for (int i = 0; i < receivers.size(); i++) { receiverConversation[i].receive(); } log.debug("Got all receiver role confirmations"); // Package everything up as a circuit. return new DistributedCircuitImpl(session, senders, receivers, senderConversation, receiverConversation, senderControlTopic, receiverControlTopic); } catch (JMSException e) { throw new RuntimeException("JMSException not handled."); } } /// /// Used by tests cases that can supply a to set the /// controller on an aware test. /// /// The timing controller. public void setTimingController(TimingController controller) { this.timingController = controller; } /// /// Gets the interface on the publishing end of the circuit. /// /// The publishing end of the circuit. public Publisher getPublisher() { throw new RuntimeException("Not Implemented."); } /// /// Gets the interface on the receiving end of the circuit. /// /// The receiving end of the circuit. public Receiver getReceiver() { throw new RuntimeException("Not Implemented."); } /// /// Connects and starts the circuit. After this method is called the circuit is ready to send messages. public void start() { log.debug("public void start(): called"); try { // Start the test on each of the senders. Message start = controlSession.createMessage(); start.setStringProperty("CONTROL_TYPE", "START"); start.setIntProperty("MESSAGE_COUNT", numMessages); for (int i = 0; i < senders.size(); i++) { senderConversation[i].send(senderControlTopic[i], start); } log.debug("All senders told to start their tests."); } catch (JMSException e) { throw new RuntimeException("Unhandled JMSException.", e); } } /// /// Checks the test circuit. The effect of this is to gather the circuits state, for both ends of the circuit, /// into a report, against which assertions may be checked. /// /// Replace the asynch receiver report thread with a choice of direct or asynch executor, so that asynch /// or synch logging of test timings is optional. Also need to provide an onMessage method that is capable /// of receiving timing reports that receivers will generate during an ongoing test, on the test sample /// size boundaries. The message timing logging code should be factored out as a common method that can /// be called in response to the final report responses, or the onMessage method. Another alternative is /// to abandon the final report request altogether and just use the onMessage method? I think the two /// differ though, as the final report is used to apply assertions, and the ongoing report is just for /// periodic timing results... In which case, maybe there needs to be a way for the onMessage method /// to process just some of the incoming messages, and forward the rest on to the conversion helper, as /// a sort of pre-conversation helper filter? Make conversation expose its onMessage method (it should /// already) and allow another delivery thread to filter the incoming messages to the conversation. public void check() { log.debug("public void check(): called"); try { // Wait for all the test senders to return their reports. for (int i = 0; i < senders.size(); i++) { Message senderReport = senderConversation[i].receive(); log.debug("Sender " + senderReport.getStringProperty("CLIENT_NAME") + " reports message count: " + senderReport.getIntProperty("MESSAGE_COUNT")); log.debug("Sender " + senderReport.getStringProperty("CLIENT_NAME") + " reports message time: " + senderReport.getLongProperty("TEST_TIME")); } log.debug("Got all sender test reports."); // Apply sender assertions to pass/fail the tests. // Inject a short pause to give the receivers time to finish receiving their test messages. TestUtils.pause(500); // Ask the receivers for their reports. Message statusRequest = controlSession.createMessage(); statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); for (int i = 0; i < receivers.size(); i++) { receiverConversation[i].send(receiverControlTopic[i], statusRequest); } log.debug("All receiver test reports requested."); // Wait for all receiver reports to come in, but do so asynchronously. Runnable gatherAllReceiverReports = new Runnable() { public void run() { try { // Wait for all the receivers to send their reports. for (int i = 0; i < receivers.size(); i++) { Message receiverReport = receiverConversation[i].receive(); string clientName = receiverReport.getStringProperty("CLIENT_NAME"); int messageCount = receiverReport.getIntProperty("MESSAGE_COUNT"); long testTime = receiverReport.getLongProperty("TEST_TIME"); log.debug("Receiver " + clientName + " reports message count: " + messageCount); log.debug("Receiver " + receiverReport.getStringProperty("CLIENT_NAME") + " reports message time: " + testTime); // Apply receiver assertions to pass/fail the tests. // Log the test timings on the asynchronous test timing controller. /*try { timingController.completeTest(true, messageCount, testTime); } // The timing controll can throw InterruptedException is the current test is to be // interrupted. catch (InterruptedException e) { e.printStackTrace(); }*/ } log.debug("All receiver test reports received."); } catch (JMSException e) { throw new RuntimeException(e); } } }; Thread receiverReportsThread = new Thread(gatherAllReceiverReports); receiverReportsThread.start(); // return new Message[] { senderReport, receiverReport }; } catch (JMSException e) { throw new RuntimeException("Unhandled JMSException.", e); } } /// Closes the circuit. All associated resources are closed. public void close() { log.debug("public void close(): called"); // End the current test on all senders and receivers. } /// /// Applies a list of assertions against the test circuit. The method should be called before doing /// this, to ensure that the circuit has gathered its state into a report to assert against. /// /// The list of assertions to apply. /// /// Any assertions that failed. public IList applyAssertions(List assertions) { log.debug("public IList applyAssertions(List assertions = " + assertions + "): called"); IList failures = new LinkedList(); for (Assertion assertion : assertions) { if (!assertion.apply()) { failures.add(assertion); } } return failures; } /// /// Runs the default test procedure against the circuit, and checks that all of the specified assertions hold. /// /// The number of messages to send using the default test procedure. /// The list of assertions to apply. /// /// Any assertions that failed. /// /// From check onwards needs to be handled as a future. The future must call back onto the test case to /// report results asynchronously. public IList test(int numMessages, List assertions) { log.debug("public IList test(int numMessages = " + numMessages + ", List assertions = " + assertions + "): called"); // Keep the number of messages to send per test run, where the send method can reference it. this.numMessages = numMessages; // Start the test running on all sender circuit ends. start(); // Request status reports to be handed in. check(); // Assert conditions on the publishing end of the circuit. // Assert conditions on the receiving end of the circuit. IList failures = applyAssertions(assertions); // Close the circuit ending the current test case. close(); // Pass with no failed assertions or fail with a list of failed assertions. return failures; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using Apache.Qpid.Integration.Tests.framework.Assertion; using Apache.Qpid.Integration.Tests.framework.Publisher; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; namespace Apache.Qpid.Integration.Tests.framework.distributedcircuit { /// /// DistributedPublisherImpl represents the status of the publishing side of a test circuit. Its main purpose is to /// provide assertions that can be applied to verify the behaviour of a non-local publisher. /// ///

///
CRC Card
Responsibilities Collaborations ///
Provide assertion that the publishers received no exceptions. ///
Provide assertion that the publishers received a no consumers error code on every message. ///
Provide assertion that the publishers received a no route error code on every message. ///
///

public class DistributedPublisherImpl : Publisher { /// /// Provides an assertion that the publisher encountered no exceptions. /// /// The test configuration properties. /// An assertion that the publisher encountered no exceptions. public Assertion noExceptionsAssertion(ParsedProperties testProps) { throw new RuntimeException("Not implemented."); } /// /// Provides an assertion that the publisher got a no consumers exception on every message. /// /// An assertion that the publisher got a no consumers exception on every message. public Assertion noConsumersAssertion() { throw new RuntimeException("Not implemented."); } /// /// Provides an assertion that the publisher got a no rout exception on every message. /// /// An assertion that the publisher got a no rout exception on every message. public Assertion noRouteAssertion() { throw new RuntimeException("Not implemented."); } /// /// Provides an assertion that the AMQP channel was forcibly closed by an error condition. /// /// The test configuration properties. /// An assertion that the AMQP channel was forcibly closed by an error condition. public Assertion channelClosedAssertion(ParsedProperties testProps) { throw new RuntimeException("Not implemented."); } /// /// Provides an assertion that the publisher got a given exception during the test. /// /// The test configuration properties. /// The exception class to check for. /// An assertion that the publisher got a given exception during the test. public Assertion exceptionAssertion(ParsedProperties testProps, Class exceptionClass) { throw new RuntimeException("Not implemented."); } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using Apache.Qpid.Integration.Tests.framework.Assertion; using Apache.Qpid.Integration.Tests.framework.Receiver; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; namespace Apache.Qpid.Integration.Tests.framework.distributedcircuit { /// /// DistributedReceiverImpl represents the status of the receiving side of a test circuit. Its main purpose is to /// provide assertions that can be applied to verify the behaviour of a non-local receiver. /// ///

///
CRC Card
Responsibilities Collaborations ///
Provide assertion that the receivers received no exceptions. ///
Provide assertion that the receivers received all test messages sent to it. ///
///

public class DistributedReceiverImpl : Receiver { /// /// Provides an assertion that the receivers encountered no exceptions. /// /// The test configuration properties. /// An assertion that the receivers encountered no exceptions. public Assertion noExceptionsAssertion(ParsedProperties testProps) { throw new RuntimeException("Not implemented."); } /// /// Provides an assertion that the receivers got all messages that were sent to it. /// /// The test configuration properties. /// An assertion that the receivers got all messages that were sent to it. public Assertion allMessagesReceivedAssertion(ParsedProperties testProps) { throw new RuntimeException("Not implemented."); } /// /// Provides an assertion that the receivers got none of the messages that were sent to it. /// /// The test configuration properties. /// An assertion that the receivers got none of the messages that were sent to it. public Assertion noMessagesReceivedAssertion(ParsedProperties testProps) { throw new RuntimeException("Not implemented."); } /// /// Provides an assertion that the AMQP channel was forcibly closed by an error condition. /// /// The test configuration properties. /// An assertion that the AMQP channel was forcibly closed by an error condition. public Assertion channelClosedAssertion(ParsedProperties testProps) { throw new RuntimeException("Not implemented."); } /// /// Provides an assertion that the receiver got a given exception during the test. /// /// The test configuration properties. /// The exception class to check for. An assertion that the receiver got a given exception during the test. public Assertion exceptionAssertion(ParsedProperties testProps, Class exceptionClass) { throw new RuntimeException("Not implemented."); } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using Apache.Qpid.Integration.Tests.framework.*; using Apache.Qpid.Integration.Tests.framework.distributedtesting.TestClientControlledTest; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using uk.co.thebadgerset.junit.extensions.util.TestContextProperties; using javax.jms.*; namespace Apache.Qpid.Integration.Tests.framework.distributedcircuit { /// /// A TestClientCircuitEnd is a that may be controlled from a /// , and that forms a single publishing or /// receiving end point in a distributed test . /// ///

When operating in the SENDER role, this circuit end is capable of acting as part of the default circuit test /// procedure (described in the class comment for ). That is, it will /// send the number of test messages required, using the test configuration parameters given in the test invite, and /// return a report on its activities to the circuit controller. /// ///

When operation in the RECEIVER role, this circuit end acts as part of the default circuit test procedure. It will /// receive test messages, on the setup specified in the test configuration parameters, and keep count of the messages /// received, and time taken to receive them. When requested by the circuit controller to provide a report, it will /// return this report of its activities. /// ///

///
CRC Card
Responsibilities Collaborations ///
Provide a message producer for sending messages. /// , , ///
Provide a message consumer for receiving messages. /// , , ///
Supply the name of the test case that this implements. ///
Accept/Reject invites based on test parameters. ///
Adapt to assigned roles. ///
Perform test case actions. ///
Generate test reports. ///
///

public class TestClientCircuitEnd : CircuitEnd, TestClientControlledTest { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(TestClientCircuitEnd)); /// Holds the test parameters. ParsedProperties testProps; /// The number of test messages to send. private int numMessages; /// The role to be played by the test. private Roles role; /// The connection to send the test messages on. private Connection connection; /// Holds the circuit end for this test. CircuitEnd circuitEnd; /// /// Holds a message monitor for this circuit end, either the monitor on the consumer when in RECEIVER more, or /// a monitor updated on every message sent, when acting as a SENDER. MessageMonitor messageMonitor; /// /// Should provide the name of the test case that this class implements. The exact names are defined in the /// interop testing spec. /// /// The name of the test case that this implements. public string getName() { return "DEFAULT_CIRCUIT_TEST"; } /// /// Determines whether the test invite that matched this test case is acceptable. /// /// The invitation to accept or reject. /// true to accept the invitation, false to reject it. /// /// Any JMSException resulting from reading the message are allowed to fall through. public bool acceptInvite(Message inviteMessage) throws JMSException { log.debug("public bool acceptInvite(Message inviteMessage): called"); // Populate the test parameters from the invitation. testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); for (Object key : testProps.keySet()) { string propName = (String) key; // If the test parameters is overridden by the invitation, use it instead. string inviteValue = inviteMessage.getStringProperty(propName); if (inviteValue != null) { testProps.setProperty(propName, inviteValue); log.debug("Test invite supplied override to " + propName + " of " + inviteValue); } } // Accept the invitation. return true; } /// /// Assigns the role to be played by this test case. The test parameters are fully specified in the /// assignment message. When this method return the test case will be ready to execute. /// /// The role to be played; sender or receivers. /// The role assingment message, contains the full test parameters. /// /// Any JMSException resulting from reading the message are allowed to fall through. public void assignRole(Roles role, Message assignRoleMessage) throws JMSException { log.debug("public void assignRole(Roles role, Message assignRoleMessage): called"); // Take note of the role to be played. this.role = role; // Extract and retain the test parameters. numMessages = 1; // assignRoleMessage.getIntProperty("NUM_MESSAGES"); // Connect using the test parameters. connection = TestUtils.createConnection(testProps); // Create a circuit end that matches the assigned role and test parameters. LocalCircuitFactory circuitFactory = new LocalCircuitFactory(); switch (role) { // Check if the sender role is being assigned, and set up a message producer if so. case SENDER: // Set up the publisher. circuitEnd = circuitFactory.createPublisherCircuitEnd(connection, testProps, 0L); // Create a custom message monitor that will be updated on every message sent. messageMonitor = new MessageMonitor(); break; // Otherwise the receivers role is being assigned, so set this up to listen for messages. case RECEIVER: // Set up the receiver. circuitEnd = circuitFactory.createReceiverCircuitEnd(connection, testProps, 0L); // Use the message monitor from the consumer for stats. messageMonitor = getMessageMonitor(); break; } // Reset all messaging stats for the report. messageMonitor.reset(); connection.start(); } /// /// Performs the test case actions. Returning from here, indicates that the sending role has completed its test. /// /// The number of test messages to send. /// /// Any JMSException resulting from reading the message are allowed to fall through. /// /// Add round robin on destinations where multiple destinations being used. /// /// Add rate limiting when rate limit specified on publishers. /// /// Add Max pending message size protection. The receiver will have to send back some acks once in a while, /// to notify the publisher that its messages are being consumed. This makes the safety valve harder to /// implement than in the single VM case. For example, if the limit is 1000 messages, might want to get back /// an ack every 500, to notify the publisher that it can keep sending. What about pub/sub tests? Will it be /// necessary to wait for an ack from every receiver? This will have the effect of rate limiting to slow /// consumers too. /// /// Add commits on every commit batch size boundary. public void start(int numMessages) throws JMSException { log.debug("public void start(): called"); // If in the SENDER role, send the specified number of test messages to the circuit destinations. if (role.equals(Roles.SENDER)) { Message testMessage = getSession().createMessage(); for (int i = 0; i < numMessages; i++) { getProducer().send(testMessage); // Increment the message count and timings. messageMonitor.onMessage(testMessage); } } } /// /// Gets a report on the actions performed by the test case in its assigned role. /// /// The controlSession to create the report message in. /// The report message. /// /// Any JMSExceptions resulting from creating the report are allowed to fall through. public Message getReport(Session session) throws JMSException { Message report = session.createMessage(); report.setStringProperty("CONTROL_TYPE", "REPORT"); // Add the count of messages sent/received to the report. report.setIntProperty("MESSAGE_COUNT", messageMonitor.getNumMessage()); // Add the time to send/receive messages to the report. report.setLongProperty("TEST_TIME", messageMonitor.getTime()); // Add any exceptions detected to the report. return report; } /// /// Gets the message producer at this circuit end point. /// /// The message producer at with this circuit end point. public MessageProducer getProducer() { return circuitEnd.getProducer(); } /// /// Gets the message consumer at this circuit end point. /// /// The message consumer at this circuit end point. public MessageConsumer getConsumer() { return circuitEnd.getConsumer(); } /// /// Send the specified message over the producer at this end point. /// /// The message to send. /// /// Any JMS exception occuring during the send is allowed to fall through. public void send(Message message) throws JMSException { // Send the message on the circuit ends producer. circuitEnd.send(message); } /// /// Gets the JMS Session associated with this circuit end point. /// /// The JMS Session associated with this circuit end point. public Session getSession() { return circuitEnd.getSession(); } /// /// Closes the message producers and consumers and the sessions, associated with this circuit end point. /// /// Any JMSExceptions occurring during the close are allowed to fall through. public void close() throws JMSException { // Close the producer and consumer. circuitEnd.close(); } /// /// Returns the message monitor for reporting on received messages on this circuit end. /// /// The message monitor for this circuit end. public MessageMonitor getMessageMonitor() { return circuitEnd.getMessageMonitor(); } /// /// Returns the exception monitor for reporting on exceptions received on this circuit end. /// /// The exception monitor for this circuit end. public ExceptionMonitor getExceptionMonitor() { return circuitEnd.getExceptionMonitor(); } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using junit.framework.Test; using junit.framework.TestResult; using junit.framework.TestSuite; using log4net; using org.apache.log4j.NDC; using Apache.Qpid.Integration.Tests.framework.FrameworkBaseCase; using Apache.Qpid.Integration.Tests.framework.MessagingTestConfigProperties; using Apache.Qpid.Integration.Tests.framework.TestClientDetails; using Apache.Qpid.Integration.Tests.framework.TestUtils; using Apache.Qpid.Integration.Tests.framework.clocksynch.UDPClockReference; using org.apache.qpid.util.ConversationFactory; using uk.co.thebadgerset.junit.extensions.TKTestRunner; using uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; using uk.co.thebadgerset.junit.extensions.util.CommandLineParser; using uk.co.thebadgerset.junit.extensions.util.MathUtils; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using uk.co.thebadgerset.junit.extensions.util.TestContextProperties; using javax.jms.*; using java.net.InetAddress; using java.util.*; using java.util.concurrent.LinkedBlockingQueue; namespace Apache.Qpid.Integration.Tests.framework.distributedtesting { /// ///

Implements the coordinator client described in the interop testing specification /// (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). This coordinator is built on /// top of the JUnit testing framework. /// ///

///
CRC Card
Responsibilities Collaborations ///
Find out what test clients are available. ///
Decorate available tests to run on all available clients. ///
Attach XML test result logger. ///
Terminate the interop testing framework. ///
///

/// /// Should accumulate failures over all tests, and return with success or fail code based on all results. May need /// to write a special TestResult to do this properly. At the moment only the last one used will be tested for /// errors, as the start method creates a fresh one for each test case run. public class Coordinator extends TKTestRunner { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(Coordinator)); /// Used for reporting to the console. private static ILog console = LogManager.GetLogger("CONSOLE"); /// Defines the possible distributed test engines available to run coordinated test cases with. public enum TestEngine { /// Specifies the interop test engine. This tests all available clients in pairs. INTEROP, /// Specifies the fanout test engine. This sets up one publisher role, and many reciever roles. FANOUT } /// /// Holds the test context properties that provides the default test parameters, plus command line overrides. /// This is initialized with the default test parameters, to which command line overrides may be applied. protected static ParsedProperties testContextProperties = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); /// Holds the URL of the broker to coordinate the tests on. protected string brokerUrl; /// Holds the virtual host to coordinate the tests on. If null, then the default virtual host is used. protected string virtualHost; /// Holds the list of all clients that enlisted, when the compulsory invite was issued. protected Set enlistedClients = new HashSet(); /// Holds the conversation helper for the control conversation. protected ConversationFactory conversationFactory; /// Holds the connection that the coordinating messages are sent over. protected Connection connection; /// Holds the path of the directory to output test results too, if one is defined. protected string reportDir; /// Holds the coordinating test engine type to run the tests through. protected TestEngine engine; /// Flag that indicates that all test clients should be terminated upon completion of the test cases. protected bool terminate; /// /// Creates an interop test coordinator on the specified broker and virtual host. /// /// The number of times to repeat the test, or test batch size. /// The length of time to run the tests for. -1 means no duration has been set. /// The concurrency levels to ramp up to. /// A delay in milliseconds between test runs. /// The sets of 'size' parameters to pass to test. /// The name of the test case to run. /// The directory to output the test results to. /// The name of the test run; used to name the output file. /// Whether to print comments during test run. /// The URL of the broker to connect to. /// The virtual host to run all tests on. Optional, may be null. /// The distributed test engine type to run the tests with. /// true if test client nodes should be terminated at the end of the tests. /// true if the CSV results listener should be attached. /// true if the XML results listener should be attached. /// List of factories for user specified decorators. public Coordinator(Integer repetitions, Long duration, int[] threads, int delay, int[] params, string testCaseName, string reportDir, string runName, bool verbose, string brokerUrl, string virtualHost, TestEngine engine, bool terminate, bool csv, bool xml, IList decoratorFactories) { super(repetitions, duration, threads, delay, params, testCaseName, reportDir, runName, csv, xml, verbose, decoratorFactories); log.debug("public Coordinator(Integer repetitions = " + repetitions + " , Long duration = " + duration + ", int[] threads = " + Arrays.ToString(threads) + ", int delay = " + delay + ", int[] params = " + Arrays.ToString(params) + ", string testCaseName = " + testCaseName + ", string reportDir = " + reportDir + ", string runName = " + runName + ", bool verbose = " + verbose + ", string brokerUrl = " + brokerUrl + ", string virtualHost =" + virtualHost + ", TestEngine engine = " + engine + ", bool terminate = " + terminate + ", bool csv = " + csv + ", bool xml = " + xml + "): called"); // Retain the connection parameters. this.brokerUrl = brokerUrl; this.virtualHost = virtualHost; this.reportDir = reportDir; this.engine = engine; this.terminate = terminate; } /// /// The entry point for the interop test coordinator. This client accepts the following command line arguments: /// ///

///
-b The broker URL. Mandatory. ///
-h The virtual host. Optional. ///
-o The directory to output test results to. Optional. ///
-e The type of test distribution engine to use. Optional. One of: interop, fanout. ///
... Free arguments. The distributed test cases to run. /// Mandatory. At least one must be defined. ///
name=value Trailing argument define name/value pairs. Added to the test contenxt properties. /// Optional. ///
///

/// The command line arguments. public static void main(String[] args) { NDC.push("coordinator"); log.debug("public static void main(String[] args = " + Arrays.ToString(args) + "): called"); console.info("Qpid Distributed Test Coordinator."); // Override the default broker url to be localhost:5672. testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672"); try { // Use the command line parser to evaluate the command line with standard handling behaviour (print errors // and usage then exist if there are errors). // Any options and trailing name=value pairs are also injected into the test context properties object, // to override any defaults that may have been set up. ParsedProperties options = new ParsedProperties(CommandLineParser.processCommandLine(args, new CommandLineParser( new String[][] { { "b", "The broker URL.", "broker", "false" }, { "h", "The virtual host to use.", "virtual host", "false" }, { "o", "The name of the directory to output test timings to.", "dir", "false" }, { "e", "The test execution engine to use. Default is interop.", "engine", "interop", "^interop$|^fanout$", "true" }, { "t", "Terminate test clients on completion of tests.", null, "false" }, { "-csv", "Output test results in CSV format.", null, "false" }, { "-xml", "Output test results in XML format.", null, "false" }, { "-trefaddr", "To specify an alternative to hostname for time singal reference.", "address", "false" }, { "c", "The number of tests to run concurrently.", "num", "false", MathUtils.SEQUENCE_REGEXP }, { "r", "The number of times to repeat each test.", "num", "false" }, { "d", "The length of time to run the tests for.", "duration", "false", MathUtils.DURATION_REGEXP }, { "f", "The maximum rate to call the tests at.", "frequency", "false", "^([1-9][0-9]*)/([1-9][0-9]*)$" }, { "s", "The size parameter to run tests with.", "size", "false", MathUtils.SEQUENCE_REGEXP }, { "v", "Verbose mode.", null, "false" }, { "n", "A name for this test run, used to name the output file.", "name", "true" }, { "X:decorators", "A list of additional test decorators to wrap the tests in.", "\"class.name[:class.name]*\"", "false" } }), testContextProperties)); // Extract the command line options. string brokerUrl = options.getProperty("b"); string virtualHost = options.getProperty("h"); string reportDir = options.getProperty("o"); reportDir = (reportDir == null) ? "." : reportDir; string testEngine = options.getProperty("e"); TestEngine engine = "fanout".equals(testEngine) ? TestEngine.FANOUT : TestEngine.INTEROP; bool terminate = options.getPropertyAsBoolean("t"); bool csvResults = options.getPropertyAsBoolean("-csv"); bool xmlResults = options.getPropertyAsBoolean("-xml"); string threadsstring = options.getProperty("c"); Integer repetitions = options.getPropertyAsInteger("r"); string durationstring = options.getProperty("d"); string paramsstring = options.getProperty("s"); bool verbose = options.getPropertyAsBoolean("v"); string testRunName = options.getProperty("n"); string decorators = options.getProperty("X:decorators"); int[] threads = (threadsstring == null) ? null : MathUtils.parseSequence(threadsString); int[] params = (paramsstring == null) ? null : MathUtils.parseSequence(paramsString); Long duration = (durationstring == null) ? null : MathUtils.parseDuration(durationString); // If broker or virtual host settings were specified as command line options, override the defaults in the // test context properties with them. // Collection all of the test cases to be run. Collection> testCaseClasses = new ArrayList>(); // Create a list of test decorator factories for use specified decorators to be applied. IList decoratorFactories = parseDecorators(decorators); // Scan for available test cases using a classpath scanner. // ClasspathScanner.getMatches(DistributedTestCase.class, "^Test.*", true); // Hard code the test classes till the classpath scanner is fixed. // Collections.addAll(testCaseClasses, InteropTestCase1DummyRun.class, InteropTestCase2BasicP2P.class, // InteropTestCase3BasicPubSub.class); // Parse all of the free arguments as test cases to run. for (int i = 1; true; i++) { string nextFreeArg = options.getProperty(Integer.ToString(i)); // Terminate the loop once all free arguments have been consumed. if (nextFreeArg == null) { break; } try { Class nextClass = Class.forName(nextFreeArg); if (FrameworkBaseCase.class.isAssignableFrom(nextClass)) { testCaseClasses.add(nextClass); console.info("Found distributed test case: " + nextFreeArg); } } catch (ClassNotFoundException e) { console.info("Unable to instantiate the test case: " + nextFreeArg + "."); } } // Check that some test classes were actually found. if (testCaseClasses.isEmpty()) { throw new RuntimeException( "No test cases implementing FrameworkBaseCase were specified on the command line."); } // Extract the names of all the test classes, to pass to the start method. int i = 0; String[] testClassNames = new String[testCaseClasses.size()]; for (Class testClass : testCaseClasses) { testClassNames[i++] = testClass.getName(); } // Create a coordinator and begin its test procedure. Coordinator coordinator = new Coordinator(repetitions, duration, threads, 0, params, null, reportDir, testRunName, verbose, brokerUrl, virtualHost, engine, terminate, csvResults, xmlResults, decoratorFactories); TestResult testResult = coordinator.start(testClassNames); // Return different error codes, depending on whether or not there were test failures. if (testResult.failureCount() > 0) { System.exit(FAILURE_EXIT); } else { System.exit(SUCCESS_EXIT); } } catch (Exception e) { log.debug("Top level handler caught execption.", e); console.info(e.getMessage()); e.printStackTrace(); System.exit(EXCEPTION_EXIT); } } /// /// Starts all of the test classes to be run by this coordinator. /// /// An array of all the coordinating test case implementations. /// /// A JUnit TestResult to run the tests with. /// /// Any underlying exceptions are allowed to fall through, and fail the test process. public TestResult start(String[] testClassNames) throws Exception { log.debug("public TestResult start(String[] testClassNames = " + Arrays.ToString(testClassNames) + ": called"); // Connect to the broker. connection = TestUtils.createConnection(TestContextProperties.getInstance()); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination controlTopic = session.createTopic("iop.control"); Destination responseQueue = session.createQueue("coordinator"); conversationFactory = new ConversationFactory(connection, responseQueue, LinkedBlockingQueue.class); ConversationFactory.Conversation conversation = conversationFactory.startConversation(); connection.start(); // Broadcast the compulsory invitation to find out what clients are available to test. Message invite = session.createMessage(); invite.setStringProperty("CONTROL_TYPE", "INVITE"); invite.setJMSReplyTo(responseQueue); conversation.send(controlTopic, invite); // Wait for a short time, to give test clients an opportunity to reply to the invitation. Collection enlists = conversation.receiveAll(0, 500); enlistedClients = extractEnlists(enlists); for (TestClientDetails client : enlistedClients) { log.debug("Got enlisted test client: " + client); console.info("Test node " + client.clientName + " available."); } // Start the clock reference service running. UDPClockReference clockReference = new UDPClockReference(); Thread clockRefThread = new Thread(clockReference); registerShutdownHook(clockReference); clockRefThread.start(); // Broadcast to all clients to synchronize their clocks against the coordinators clock reference. Message clockSynchRequest = session.createMessage(); clockSynchRequest.setStringProperty("CONTROL_TYPE", "CLOCK_SYNCH"); string localAddress = InetAddress.getByName(InetAddress.getLocalHost().getHostName()).getHostAddress(); clockSynchRequest.setStringProperty("ADDRESS", localAddress); conversation.send(controlTopic, clockSynchRequest); // Run the test in the suite using JUnit. TestResult result = null; for (string testClassName : testClassNames) { // Record the current test class, so that the test results can be output to a file incorporating this name. this.currentTestClassName = testClassName; result = super.start(new String[] { testClassName }); } // At this point in time, all tests have completed. Broadcast the shutdown message, if the termination option // was set on the command line. if (terminate) { Message terminate = session.createMessage(); terminate.setStringProperty("CONTROL_TYPE", "TERMINATE"); conversation.send(controlTopic, terminate); } return result; } /// /// For a collection of enlist messages, this method pulls out of the client details for the enlisting clients. /// /// The enlist messages. /// /// A set of enlisting clients, extracted from the enlist messages. /// /// Any underlying JMSException is allowed to fall through. public static Set extractEnlists(Collection enlists) throws JMSException { log.debug("public static Set extractEnlists(Collection enlists = " + enlists + "): called"); Set enlistedClients = new HashSet(); // Retain the list of all available clients. for (Message enlist : enlists) { TestClientDetails clientDetails = new TestClientDetails(); clientDetails.clientName = enlist.getStringProperty("CLIENT_NAME"); clientDetails.privateControlKey = enlist.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY"); string replyType = enlist.getStringProperty("CONTROL_TYPE"); if ("ENLIST".equals(replyType)) { enlistedClients.add(clientDetails); } else if ("DECLINE".equals(replyType)) { log.debug("Test client " + clientDetails.clientName + " declined the invite."); } else { log.warn("Got an unknown reply type, " + replyType + ", to the invite."); } } return enlistedClients; } /// /// Runs a test or suite of tests, using the super class implemenation. This method wraps the test to be run /// in any test decorators needed to add in the coordinators ability to invite test clients to participate in /// tests. /// /// The test to run. /// Undocumented. Nothing in the JUnit javadocs to say what this is for. /// /// The results of the test run. public TestResult doRun(Test test, bool wait) { log.debug("public TestResult doRun(Test \"" + test + "\", bool " + wait + "): called"); // Wrap all tests in the test suite with WrappedSuiteTestDecorators. This is quite ugly and a bit baffling, // but the reason it is done is because the JUnit implementation of TestDecorator has some bugs in it. WrappedSuiteTestDecorator targetTest = null; if (test instanceof TestSuite) { log.debug("targetTest is a TestSuite"); TestSuite suite = (TestSuite) test; int numTests = suite.countTestCases(); log.debug("There are " + numTests + " in the suite."); for (int i = 0; i < numTests; i++) { Test nextTest = suite.testAt(i); log.debug("suite.testAt(" + i + ") = " + nextTest); if (nextTest instanceof FrameworkBaseCase) { log.debug("nextTest is a FrameworkBaseCase"); } } targetTest = new WrappedSuiteTestDecorator(suite); log.debug("Wrapped with a WrappedSuiteTestDecorator."); } // Apply any optional user specified decorators. targetTest = applyOptionalUserDecorators(targetTest); // Wrap the tests in a suitable distributed test decorator, to perform the invite/test cycle. targetTest = newTestDecorator(targetTest, enlistedClients, conversationFactory, connection); // TestSuite suite = new TestSuite(); // suite.addTest(targetTest); // Wrap the tests in a scaled test decorator to them them as a 'batch' in one thread. // targetTest = new ScaledTestDecorator(targetTest, new int[] { 1 }); return super.doRun(targetTest, wait); } /// /// Creates a wrapped test decorator, that is capable of inviting enlisted clients to participate in a specified /// test. This is the test engine that sets up the roles and sequences a distributed test case. /// /// The test decorator to wrap. /// The enlisted clients available to run the test. /// The conversation factory used to build conversation helper over the specified connection. /// The connection to talk to the enlisted clients over. /// /// An invititing test decorator, that invites all the enlisted clients to participate in tests, in pairs. protected DistributedTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set enlistedClients, ConversationFactory conversationFactory, Connection connection) { switch (engine) { case FANOUT: return new FanOutTestDecorator(targetTest, enlistedClients, conversationFactory, connection); case INTEROP: default: return new InteropTestDecorator(targetTest, enlistedClients, conversationFactory, connection); } } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using junit.framework.TestResult; using log4net; using Apache.Qpid.Integration.Tests.framework.FrameworkBaseCase; using Apache.Qpid.Integration.Tests.framework.TestClientDetails; using Apache.Qpid.Integration.Tests.framework.sequencers.CircuitFactory; using org.apache.qpid.util.ConversationFactory; using uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; using javax.jms.Connection; using javax.jms.Destination; using javax.jms.JMSException; using javax.jms.Message; using java.util.*; namespace Apache.Qpid.Integration.Tests.framework.distributedtesting { /// /// DistributedTestDecorator is a base class for writing test decorators that invite test clients to participate in /// distributed test cases. It provides a helper method, , that broadcasts an invitation and /// returns the set of test clients that are available to particiapte in the test. /// ///

When used to wrap a test, it replaces the default implementations /// with a suitable circuit factory for distributed tests. Concrete implementations can use this to configure the sending /// and receiving roles on the test. /// ///

///
CRC Card
Responsibilities Collaborations ///
Broadcast test invitations and collect enlists. . ///
///

public abstract class DistributedTestDecorator extends WrappedSuiteTestDecorator { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(DistributedTestDecorator)); /// Holds the contact information for all test clients that are available and that may take part in the test. Set allClients; /// Holds the conversation helper for the control level conversation for coordinating the test through. ConversationFactory conversationFactory; /// Holds the connection that the control conversation is held over. Connection connection; /// Holds the underlying test suite that this decorator wraps. WrappedSuiteTestDecorator testSuite; /// Holds the control topic, on which test invitations are broadcast. protected Destination controlTopic; /// /// Creates a wrapped suite test decorator from another one. /// /// The test suite. /// The list of all clients that responded to the compulsory invite. /// The conversation helper for the control level, test coordination conversation. /// The connection that the coordination messages are sent over. public DistributedTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, ConversationFactory controlConversation, Connection controlConnection) { super(suite); log.debug("public DistributedTestDecorator(WrappedSuiteTestDecorator suite, Set allClients = " + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called"); testSuite = suite; allClients = availableClients; conversationFactory = controlConversation; connection = controlConnection; // Set up the test control topic. try { controlTopic = conversationFactory.getSession().createTopic("iop.control"); } catch (JMSException e) { throw new RuntimeException("Unable to create the coordinating control topic to broadcast test invites on.", e); } } /// /// Should run all of the tests in the wrapped test suite. /// /// The the results object to monitor the test results with. public abstract void run(TestResult testResult); /// /// Should provide the distributed test sequencer to pass to /// tests. /// /// A distributed test sequencer. public abstract CircuitFactory getTestSequencer(); /// /// Broadcasts an invitation to participate in a coordinating test case to find out what clients are available to /// run the test case. /// /// The coordinating test case to broadcast an inviate for. /// /// A set of test clients that accepted the invitation. protected Set signupClients(FrameworkBaseCase coordTest) { // Broadcast the invitation to find out what clients are available to test. Set enlists; try { Message invite = conversationFactory.getSession().createMessage(); ConversationFactory.Conversation conversation = conversationFactory.startConversation(); invite.setStringProperty("CONTROL_TYPE", "INVITE"); invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName())); conversation.send(controlTopic, invite); // Wait for a short time, to give test clients an opportunity to reply to the invitation. Collection replies = conversation.receiveAll(allClients.size(), 500); enlists = Coordinator.extractEnlists(replies); } catch (JMSException e) { throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e); } return enlists; } /// /// Prints a string summarizing this test decorator, mainly for debugging purposes. /// /// string representation for debugging purposes. public string ToString() { return "DistributedTestDecorator: [ testSuite = " + testSuite + " ]"; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using junit.framework.Test; using junit.framework.TestResult; using log4net; using Apache.Qpid.Integration.Tests.framework.DropInTest; using Apache.Qpid.Integration.Tests.framework.FrameworkBaseCase; using Apache.Qpid.Integration.Tests.framework.TestClientDetails; using Apache.Qpid.Integration.Tests.framework.sequencers.CircuitFactory; using Apache.Qpid.Integration.Tests.framework.sequencers.FanOutCircuitFactory; using org.apache.qpid.util.ConversationFactory; using uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; using javax.jms.Connection; using javax.jms.JMSException; using javax.jms.Message; using javax.jms.MessageListener; using System.Collections.Generic.Collection; using java.util.Iterator; using java.util.Set; namespace Apache.Qpid.Integration.Tests.framework.distributedtesting { /// /// FanOutTestDecorator is an that runs one test client in the sender role, and the remainder /// in the receivers role. It also has the capability to listen for new test cases joining the test beyond the initial start /// point. This feature can be usefull when experimenting with adding more load, in the form of more test clients, to assess /// its impact on a running test. /// ///

///
CRC Card
Responsibilities Collaborations ///
Execute coordinated test cases. ///
Accept test clients joining a running test. ///
///

public class FanOutTestDecorator extends DistributedTestDecorator : MessageListener { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(FanOutTestDecorator)); /// Holds the currently running test case. FrameworkBaseCase currentTest = null; /// /// Creates a wrapped suite test decorator from another one. /// /// The test suite. /// The list of all clients that responded to the compulsory invite. /// The conversation helper for the control level, test coordination conversation. /// The connection that the coordination messages are sent over. public FanOutTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, ConversationFactory controlConversation, Connection controlConnection) { super(suite, availableClients, controlConversation, controlConnection); log.debug("public DistributedTestDecorator(WrappedSuiteTestDecorator suite, Set allClients = " + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called"); testSuite = suite; allClients = availableClients; conversationFactory = controlConversation; connection = controlConnection; // Sign available clients up to the test. for (Test test : getAllUnderlyingTests()) { FrameworkBaseCase coordTest = (FrameworkBaseCase) test; // Get all of the clients able to participate in the test. Set enlists = signupClients(coordTest); // Check that there were some clients available. if (enlists.size() == 0) { throw new RuntimeException("No clients to test with"); } // Create a distributed test circuit factory for the test. CircuitFactory circuitFactory = getTestSequencer(); // Set up the first client in the sender role, and the remainder in the receivers role. Iterator clients = enlists.iterator(); circuitFactory.setSender(clients.next()); while (clients.hasNext()) { // Set the sending and receiving client details on the test case. circuitFactory.setReceiver(clients.next()); } // Pass down the connection to hold the coordinating conversation over. circuitFactory.setConversationFactory(conversationFactory); // If the current test case is a drop-in test, set it up as the currently running test for late joiners to // add in to. Otherwise the current test field is set to null, to indicate that late joiners are not allowed. currentTest = (coordTest instanceof DropInTest) ? coordTest : null; // Execute the test case. coordTest.setCircuitFactory(circuitFactory); } } /// /// Broadcasts a test invitation and accepts enlists from participating clients. The wrapped test cases are run /// with one test client in the sender role, and the remaining test clients in the receiving role. /// ///

Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime /// exceptions, resulting in the non-completion of the test run. ///

/// The the results object to monitor the test results with. /// /// Better error recovery for failure of the invite/enlist conversation could be added. public void run(TestResult testResult) { log.debug("public void run(TestResult testResult): called"); // Listen for late joiners on the control topic. try { conversationFactory.getSession().createConsumer(controlTopic).setMessageListener(this); } catch (JMSException e) { throw new RuntimeException("Unable to set up the message listener on the control topic.", e); } // Run all of the test cases in the test suite. /*for (Test test : getAllUnderlyingTests()) { FrameworkBaseCase coordTest = (FrameworkBaseCase) test; // Get all of the clients able to participate in the test. Set enlists = signupClients(coordTest); // Check that there were some clients available. if (enlists.size() == 0) { throw new RuntimeException("No clients to test with"); } // Create a distributed test circuit factory for the test. CircuitFactory circuitFactory = getTestSequencer(); // Set up the first client in the sender role, and the remainder in the receivers role. Iterator clients = enlists.iterator(); circuitFactory.setSender(clients.next()); while (clients.hasNext()) { // Set the sending and receiving client details on the test case. circuitFactory.setReceiver(clients.next()); } // Pass down the connection to hold the coordinating conversation over. circuitFactory.setConversationFactory(conversationFactory); // If the current test case is a drop-in test, set it up as the currently running test for late joiners to // add in to. Otherwise the current test field is set to null, to indicate that late joiners are not allowed. currentTest = (coordTest instanceof DropInTest) ? coordTest : null; // Execute the test case. coordTest.setCircuitFactory(circuitFactory); }*/ // Run all of the test cases in the test suite. for (Test test : getAllUnderlyingTests()) { FrameworkBaseCase coordTest = (FrameworkBaseCase) test; coordTest.run(testResult); currentTest = null; } } /// /// Should provide the distributed test sequencer to pass to /// tests. /// /// A distributed test sequencer. public CircuitFactory getTestSequencer() { return new FanOutCircuitFactory(); } /// /// Listens to incoming messages on the control topic. If the messages are 'join' messages, signalling a new /// test client wishing to join the current test, then the new client will be added to the current test in the /// receivers role. /// /// The incoming control message. public void onMessage(Message message) { try { // Check if the message is from a test client attempting to join a running test, and join it to the current // test case if so. if (message.getStringProperty("CONTROL_TYPE").equals("JOIN") && (currentTest != null)) { ((DropInTest) currentTest).lateJoin(message); } } // There is not a lot can be done with this error, so it is deliberately ignored. catch (JMSException e) { log.debug("Unable to process message:" + message); } } /// /// Prints a string summarizing this test decorator, mainly for debugging purposes. /// /// string representation for debugging purposes. public string ToString() { return "FanOutTestDecorator: [ testSuite = " + testSuite + " ]"; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using junit.framework.Test; using junit.framework.TestResult; using log4net; using Apache.Qpid.Integration.Tests.framework.FrameworkBaseCase; using Apache.Qpid.Integration.Tests.framework.TestClientDetails; using Apache.Qpid.Integration.Tests.framework.sequencers.CircuitFactory; using Apache.Qpid.Integration.Tests.framework.sequencers.InteropCircuitFactory; using org.apache.qpid.util.ConversationFactory; using uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; using javax.jms.Connection; using java.util.*; namespace Apache.Qpid.Integration.Tests.framework.distributedtesting { /// /// DistributedTestDecorator is a test decorator, written to implement the interop test specification. Given a list /// of enlisted test clients, that are available to run interop tests, this decorator invites them to participate /// in each test in the wrapped test suite. Amongst all the clients that respond to the invite, all pairs are formed, /// and each pairing (in both directions, but excluding the reflexive pairings) is split into a sender and receivers /// role and a test case run between them. Any enlisted combinations that do not accept a test invite are automatically /// failed. /// ///

///
CRC Card
Responsibilities Collaborations ///
Broadcast test invitations and collect enlists. . ///
Output test failures for clients unwilling to run the test case. ///
Execute distributed test cases. ///
Fail non-participating pairings. ///
///

public class InteropTestDecorator extends DistributedTestDecorator { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(InteropTestDecorator)); /// /// Creates a wrapped suite test decorator from another one. /// /// The test suite. /// The list of all clients that responded to the compulsory invite. /// The conversation helper for the control level, test coordination conversation. /// The connection that the coordination messages are sent over. public InteropTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, ConversationFactory controlConversation, Connection controlConnection) { super(suite, availableClients, controlConversation, controlConnection); } /// /// Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is /// then repeated for every combination of test clients (provided the wrapped test case extends /// . /// ///

Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime exceptions, /// resulting in the non-completion of the test run. ///

/// Better error recovery for failure of the invite/enlist conversation could be added. /// The the results object to monitor the test results with. public void run(TestResult testResult) { log.debug("public void run(TestResult testResult): called"); Collection tests = testSuite.getAllUnderlyingTests(); for (Test test : getAllUnderlyingTests()) { FrameworkBaseCase coordTest = (FrameworkBaseCase) test; // Broadcast the invitation to find out what clients are available to test. Set enlists = signupClients(coordTest); // Compare the list of willing clients to the list of all available. Set optOuts = new HashSet(allClients); optOuts.removeAll(enlists); // Output test failures for clients that will not particpate in the test. Set> failPairs = allPairs(optOuts, allClients); for (List failPair : failPairs) { // Create a distributed test circuit factory for the test. CircuitFactory circuitFactory = getTestSequencer(); // Create an automatic failure test for the opted out test pair. FrameworkBaseCase failTest = new OptOutTestCase("testOptOut"); circuitFactory.setSender(failPair.get(0)); circuitFactory.setReceiver(failPair.get(1)); failTest.setCircuitFactory(circuitFactory); failTest.run(testResult); } // Loop over all combinations of clients, willing to run the test. Set> enlistedPairs = allPairs(enlists, enlists); for (List enlistedPair : enlistedPairs) { // Create a distributed test circuit factory for the test. CircuitFactory circuitFactory = getTestSequencer(); // Set the sending and receiving client details on the test circuitFactory. circuitFactory.setSender(enlistedPair.get(0)); circuitFactory.setReceiver(enlistedPair.get(1)); // Pass down the connection to hold the coordination conversation over. circuitFactory.setConversationFactory(conversationFactory); // Execute the test case. coordTest.setCircuitFactory(circuitFactory); coordTest.run(testResult); } } } /// /// Should provide the distributed test sequencer to pass to /// tests. /// /// A distributed test sequencer. public CircuitFactory getTestSequencer() { return new InteropCircuitFactory(); } /// /// Produces all pairs of combinations of elements from two sets. The ordering of the elements in the pair is /// important, that is the pair is distinct from ; both pairs are generated. For any element, i, in /// both the left and right sets, the reflexive pair is not generated. /// /// The left set. /// The right set. /// @param The type of the content of the pairs. ///
/// All pairs formed from the permutations of all elements of the left and right sets. private Set> allPairs(Set left, Set right) { log.debug("private Set> allPairs(Set left = " + left + ", Set right = " + right + "): called"); Set> results = new HashSet>(); // Form all pairs from left to right. // Form all pairs from right to left. for (E le : left) { for (E re : right) { if (!le.equals(re)) { results.add(new Pair(le, re)); results.add(new Pair(re, le)); } } } log.debug("results = " + results); return results; } /// A simple implementation of a pair, using a list. private class Pair extends ArrayList { /// /// Creates a new pair of elements. /// /// The first element. /// The second element. public Pair(T first, T second) { super(); super.add(first); super.add(second); } } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using Apache.Qpid.Integration.Tests.framework.sequencers.CircuitFactory; using Apache.Qpid.Integration.Tests.framework.FrameworkBaseCase; namespace Apache.Qpid.Integration.Tests.framework.distributedtesting { /// /// An OptOutTestCase is a test case that automatically fails. It is used when a list of test clients has been generated /// from a compulsory invite, but only some of those clients have responded to a specific test case invite. The clients /// that did not respond, may automatically be given a fail for some tests. /// ///

///
CRC Card
Responsibilities Collaborations ///
Fail the test with a suitable reason. ///
///

public class OptOutTestCase extends FrameworkBaseCase { /// /// Creates a new coordinating test case with the specified name. /// /// The test case name. public OptOutTestCase(string name) { super(name); } /// Generates an appropriate test failure assertion. public void testOptOut() { CircuitFactory circuitFactory = getCircuitFactory(); fail("One of " + circuitFactory.getSender() + " and " + getCircuitFactory().getReceivers() + " opted out of the test."); } /// /// Should provide a translation from the junit method name of a test to its test case name as defined in the /// interop testing specification. For example the method "testP2P" might map onto the interop test case name /// "TC2_BasicP2P". /// /// The name of the JUnit test method. /// The name of the corresponding interop test case. public string getTestCaseNameForTestMethod(string methodName) { return "OptOutTest"; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using org.apache.log4j.NDC; using Apache.Qpid.Integration.Tests.framework.MessagingTestConfigProperties; using Apache.Qpid.Integration.Tests.framework.TestUtils; using Apache.Qpid.Integration.Tests.framework.clocksynch.ClockSynchThread; using Apache.Qpid.Integration.Tests.framework.clocksynch.UDPClockSynchronizer; using org.apache.qpid.util.ReflectionUtils; using org.apache.qpid.util.ReflectionUtilsException; using uk.co.thebadgerset.junit.extensions.SleepThrottle; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using uk.co.thebadgerset.junit.extensions.util.TestContextProperties; using javax.jms.*; using java.util.*; namespace Apache.Qpid.Integration.Tests.framework.distributedtesting { /// /// Implements a test client as described in the interop testing spec /// (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that /// reacts to control message sequences send by the test . /// ///

///
Messages Handled by TestClient
Message Action ///
Invite(compulsory) Reply with Enlist. ///
Invite(test case) Reply with Enlist if test case available. ///
AssignRole(test case) Reply with Accept Role if matches an enlisted test. Keep test parameters. ///
Start Send test messages defined by test parameters. Send report on messages sent. ///
Status Request Send report on messages received. ///
Terminate Terminate the test client. ///
ClockSynch Synch clock against the supplied UDP address. ///
/// ///

///
CRC Card
Responsibilities Collaborations ///
Handle all incoming control messages. ///
Configure and look up test cases by name. ///
///

public class TestClient : MessageListener { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(TestClient)); /// Used for reporting to the console. private static ILog console = LogManager.GetLogger("CONSOLE"); /// Holds the default identifying name of the test client. public static final string CLIENT_NAME = "java"; /// Holds the URL of the broker to run the tests on. public static string brokerUrl; /// Holds the virtual host to run the tests on. If null, then the default virtual host is used. public static string virtualHost; /// /// Holds the test context properties that provides the default test parameters, plus command line overrides. /// This is initialized with the default test parameters, to which command line overrides may be applied. /// public static ParsedProperties testContextProperties = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); /// Holds all the test cases loaded from the classpath. Map testCases = new HashMap(); /// Holds the test case currently being run by this client. protected TestClientControlledTest currentTestCase; /// Holds the connection to the broker that the test is being coordinated on. protected Connection connection; /// Holds the message producer to hold the test coordination over. protected MessageProducer producer; /// Holds the JMS controlSession for the test coordination. protected Session session; /// Holds the name of this client, with a default value. protected string clientName = CLIENT_NAME; /// This flag indicates that the test client should attempt to join the currently running test case on start up. protected bool join; /// Holds the clock synchronizer for the test node. ClockSynchThread clockSynchThread; /// /// Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client /// identifying name. /// /// The url of the broker to connect to. /// The virtual host to conect to. /// The client name to use. /// Flag to indicate that this client should attempt to join running tests. public TestClient(string pBrokerUrl, string pVirtualHost, string clientName, bool join) { log.debug("public TestClient(string pBrokerUrl = " + pBrokerUrl + ", string pVirtualHost = " + pVirtualHost + ", string clientName = " + clientName + ", bool join = " + join + "): called"); // Retain the connection parameters. brokerUrl = pBrokerUrl; virtualHost = pVirtualHost; this.clientName = clientName; this.join = join; } /// /// The entry point for the interop test coordinator. This client accepts the following command line arguments: /// ///

///
-b The broker URL. Optional. ///
-h The virtual host. Optional. ///
-n The test client name. Optional. ///
name=value Trailing argument define name/value pairs. Added to system properties. Optional. ///
///

/// The command line arguments. public static void main(String[] args) { log.debug("public static void main(String[] args = " + Arrays.ToString(args) + "): called"); console.info("Qpid Distributed Test Client."); // Override the default broker url to be localhost:5672. testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672"); // Use the command line parser to evaluate the command line with standard handling behaviour (print errors // and usage then exist if there are errors). // Any options and trailing name=value pairs are also injected into the test context properties object, // to override any defaults that may have been set up. ParsedProperties options = new ParsedProperties(uk.co.thebadgerset.junit.extensions.util.CommandLineParser.processCommandLine(args, new uk.co.thebadgerset.junit.extensions.util.CommandLineParser( new String[][] { { "b", "The broker URL.", "broker", "false" }, { "h", "The virtual host to use.", "virtual host", "false" }, { "o", "The name of the directory to output test timings to.", "dir", "false" }, { "n", "The name of the test client.", "name", "false" }, { "j", "Join this test client to running test.", "false" } }), testContextProperties)); // Extract the command line options. string brokerUrl = options.getProperty("b"); string virtualHost = options.getProperty("h"); string clientName = options.getProperty("n"); clientName = (clientName == null) ? CLIENT_NAME : clientName; bool join = options.getPropertyAsBoolean("j"); // To distinguish logging output set up an NDC on the client name. NDC.push(clientName); // Create a test client and start it running. TestClient client = new TestClient(brokerUrl, virtualHost, clientName, join); // Use a class path scanner to find all the interop test case implementations. // Hard code the test classes till the classpath scanner is fixed. Collection> testCaseClasses = new ArrayList>(); // ClasspathScanner.getMatches(TestClientControlledTest.class, "^TestCase.*", true); testCaseClasses.addAll(loadTestCases("org.apache.qpid.interop.clienttestcases.TestCase1DummyRun", "org.apache.qpid.interop.clienttestcases.TestCase2BasicP2P", "org.apache.qpid.interop.clienttestcases.TestCase3BasicPubSub", "org.apache.qpid.interop.clienttestcases.TestCase4P2PMessageSize", "org.apache.qpid.interop.clienttestcases.TestCase5PubSubMessageSize", "Apache.Qpid.Integration.Tests.framework.distributedcircuit.TestClientCircuitEnd")); try { client.start(testCaseClasses); } catch (Exception e) { log.error("The test client was unable to start.", e); console.info(e.getMessage()); System.exit(1); } } /// /// Parses a list of class names, and loads them if they are available on the class path. /// /// The names of the classes to load. /// /// A list of the loaded test case classes. public static IList> loadTestCases(String... classNames) { IList> testCases = new LinkedList>(); for (string className : classNames) { try { Class cls = ReflectionUtils.forName(className); testCases.add((Class) cls); } catch (ReflectionUtilsException e) { // Ignore, class could not be found, so test not available. console.warn("Requested class " + className + " cannot be found, ignoring it."); } catch (ClassCastException e) { // Ignore, class was not of correct type to be a test case. console.warn("Requested class " + className + " is not an instance of TestClientControlledTest."); } } return testCases; } /// /// Starts the interop test client running. This causes it to start listening for incoming test invites. /// /// The classes of the available test cases. The test case names from these are used to /// matchin incoming test invites against. /// /// Any underlying JMSExceptions are allowed to fall through. protected void start(Collection> testCaseClasses) throws JMSException { log.debug("protected void start(Collection> testCaseClasses = " + testCaseClasses + "): called"); // Create all the test case implementations and index them by the test names. for (Class nextClass : testCaseClasses) { try { TestClientControlledTest testCase = nextClass.newInstance(); testCases.put(testCase.getName(), testCase); } catch (InstantiationException e) { log.warn("Could not instantiate test case class: " + nextClass.getName(), e); // Ignored. } catch (IllegalAccessException e) { log.warn("Could not instantiate test case class due to illegal access: " + nextClass.getName(), e); // Ignored. } } // Open a connection to communicate with the coordinator on. connection = TestUtils.createConnection(testContextProperties); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Set this up to listen for control messages. Topic privateControlTopic = session.createTopic("iop.control." + clientName); MessageConsumer consumer = session.createConsumer(privateControlTopic); consumer.setMessageListener(this); Topic controlTopic = session.createTopic("iop.control"); MessageConsumer consumer2 = session.createConsumer(controlTopic); consumer2.setMessageListener(this); // Create a producer to send replies with. producer = session.createProducer(null); // If the join flag was set, then broadcast a join message to notify the coordinator that a new test client // is available to join the current test case, if it supports it. This message may be ignored, or it may result // in this test client receiving a test invite. if (join) { Message joinMessage = session.createMessage(); joinMessage.setStringProperty("CONTROL_TYPE", "JOIN"); joinMessage.setStringProperty("CLIENT_NAME", clientName); joinMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); producer.send(controlTopic, joinMessage); } // Start listening for incoming control messages. connection.start(); } /// /// Handles all incoming control messages. /// /// The incoming message. public void onMessage(Message message) { NDC.push(clientName); log.debug("public void onMessage(Message message = " + message + "): called"); try { string controlType = message.getStringProperty("CONTROL_TYPE"); string testName = message.getStringProperty("TEST_NAME"); log.debug("Received control of type '" + controlType + "' for the test '" + testName + "'"); // Check if the message is a test invite. if ("INVITE".equals(controlType)) { // Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites // for which test cases exist. bool enlist = false; if (testName != null) { log.debug("Got an invite to test: " + testName); // Check if the requested test case is available. TestClientControlledTest testCase = testCases.get(testName); if (testCase != null) { log.debug("Found implementing class for test '" + testName + "', enlisting for it."); // Check if the test case will accept the invitation. enlist = testCase.acceptInvite(message); log.debug("The test case " + (enlist ? " accepted the invite, enlisting for it." : " did not accept the invite, not enlisting.")); // Make the requested test case the current test case. currentTestCase = testCase; } else { log.debug("Received an invite to the test '" + testName + "' but this test is not known."); } } else { log.debug("Got a compulsory invite, enlisting for it."); enlist = true; } if (enlist) { // Reply with the client name in an Enlist message. Message enlistMessage = session.createMessage(); enlistMessage.setStringProperty("CONTROL_TYPE", "ENLIST"); enlistMessage.setStringProperty("CLIENT_NAME", clientName); enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID()); log.debug("Sending enlist message '" + enlistMessage + "' to " + message.getJMSReplyTo()); producer.send(message.getJMSReplyTo(), enlistMessage); } else { // Reply with the client name in an Decline message. Message enlistMessage = session.createMessage(); enlistMessage.setStringProperty("CONTROL_TYPE", "DECLINE"); enlistMessage.setStringProperty("CLIENT_NAME", clientName); enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID()); log.debug("Sending decline message '" + enlistMessage + "' to " + message.getJMSReplyTo()); producer.send(message.getJMSReplyTo(), enlistMessage); } } else if ("ASSIGN_ROLE".equals(controlType)) { // Assign the role to the current test case. string roleName = message.getStringProperty("ROLE"); log.debug("Got a role assignment to role: " + roleName); TestClientControlledTest.Roles role = Enum.valueOf(TestClientControlledTest.Roles.class, roleName); currentTestCase.assignRole(role, message); // Reply by accepting the role in an Accept Role message. Message acceptRoleMessage = session.createMessage(); acceptRoleMessage.setStringProperty("CLIENT_NAME", clientName); acceptRoleMessage.setStringProperty("CONTROL_TYPE", "ACCEPT_ROLE"); acceptRoleMessage.setJMSCorrelationID(message.getJMSCorrelationID()); log.debug("Sending accept role message '" + acceptRoleMessage + "' to " + message.getJMSReplyTo()); producer.send(message.getJMSReplyTo(), acceptRoleMessage); } else if ("START".equals(controlType) || "STATUS_REQUEST".equals(controlType)) { if ("START".equals(controlType)) { log.debug("Got a start notification."); // Extract the number of test messages to send from the start notification. int numMessages; try { numMessages = message.getIntProperty("MESSAGE_COUNT"); } catch (NumberFormatException e) { // If the number of messages is not specified, use the default of one. numMessages = 1; } // Start the current test case. currentTestCase.start(numMessages); } else { log.debug("Got a status request."); } // Generate the report from the test case and reply with it as a Report message. Message reportMessage = currentTestCase.getReport(session); reportMessage.setStringProperty("CLIENT_NAME", clientName); reportMessage.setStringProperty("CONTROL_TYPE", "REPORT"); reportMessage.setJMSCorrelationID(message.getJMSCorrelationID()); log.debug("Sending report message '" + reportMessage + "' to " + message.getJMSReplyTo()); producer.send(message.getJMSReplyTo(), reportMessage); } else if ("TERMINATE".equals(controlType)) { console.info("Received termination instruction from coordinator."); // Is a cleaner shutdown needed? connection.close(); System.exit(0); } else if ("CLOCK_SYNCH".equals(controlType)) { log.debug("Received clock synch command."); string address = message.getStringProperty("ADDRESS"); log.debug("address = " + address); // Re-create (if necessary) and start the clock synch thread to synch the clock every ten seconds. if (clockSynchThread != null) { clockSynchThread.terminate(); } SleepThrottle throttle = new SleepThrottle(); throttle.setRate(0.1f); clockSynchThread = new ClockSynchThread(new UDPClockSynchronizer(address), throttle); clockSynchThread.start(); } else { // Log a warning about this but otherwise ignore it. log.warn("Got an unknown control message, controlType = " + controlType + ", message = " + message); } } catch (JMSException e) { // Log a warning about this, but otherwise ignore it. log.warn("Got JMSException whilst handling message: " + message, e); } // Log any runtimes that fall through this message handler. These are fatal errors for the test client. catch (RuntimeException e) { log.error("The test client message handler got an unhandled exception: ", e); console.info("The message handler got an unhandled exception, terminating the test client."); System.exit(1); } finally { NDC.pop(); } } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using javax.jms.JMSException; using javax.jms.Message; using javax.jms.MessageListener; using javax.jms.Session; namespace Apache.Qpid.Integration.Tests.framework.distributedtesting { /// /// TestClientControlledTest provides an interface that classes implementing test cases to run on a /// node can use. Implementations must be Java beans, that is, to provide a default constructor and to implement the /// method. /// ///

The methods specified in this interface are called when the receives control instructions to /// apply to the test. There are control instructions to present the test case with the test invite, so that it may /// choose whether or not to participate in the test, assign the test to play the sender or receiver role, start the /// test and obtain the test status report. /// ///

///
CRC Card
Responsibilities ///
Supply the name of the test case that this implements. ///
Accept/Reject invites based on test parameters. ///
Adapt to assigned roles. ///
Perform test case actions. ///
Generate test reports. ///
///

public interface TestClientControlledTest { /// Defines the possible test case roles that an interop test case can take on. public enum Roles { /// Specifies the sender role. SENDER, /// Specifies the receivers role. RECEIVER } /// /// Should provide the name of the test case that this class implements. The exact names are defined in the /// interop testing spec. /// /// The name of the test case that this implements. public string getName(); /// /// Determines whether the test invite that matched this test case is acceptable. /// /// The invitation to accept or reject. /// /// true to accept the invitation, false to reject it. /// /// Any JMSException resulting from reading the message are allowed to fall through. public bool acceptInvite(Message inviteMessage) throws JMSException; /// /// Assigns the role to be played by this test case. The test parameters are fully specified in the /// assignment message. When this method return the test case will be ready to execute. /// /// The role to be played; sender or receivers. /// The role assingment message, contains the full test parameters. /// /// Any JMSException resulting from reading the message are allowed to fall through. public void assignRole(Roles role, Message assignRoleMessage) throws JMSException; /// /// Performs the test case actions. Returning from here, indicates that the sending role has completed its test. /// /// The number of test messages to send. /// /// Any JMSException resulting from reading the message are allowed to fall through. public void start(int numMessages) throws JMSException; /// /// Gets a report on the actions performed by the test case in its assigned role. /// /// The controlSession to create the report message in. /// /// The report message. /// /// Any JMSExceptions resulting from creating the report are allowed to fall through. public Message getReport(Session session) throws JMSException; } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using javax.jms.JMSException; using javax.jms.Message; namespace Apache.Qpid.Integration.Tests.framework { /// /// A DropIn test is a test case that can accept late joining test clients into a running test. This can be usefull, /// for interactive experimentation. /// ///

///
CRC Card
Responsibilities ///
Accept late joining test clients. ///
///

public interface DropInTest { /// /// Should accept a late joining client into a running test case. The client will be enlisted with a control message /// with the 'CONTROL_TYPE' field set to the value 'LATEJOIN'. It should also provide values for the fields: /// ///

///
CLIENT_NAME A unique name for the new client. ///
CLIENT_PRIVATE_CONTROL_KEY The key for the route on which the client receives its control messages. ///
///

/// The late joiners join message. /// /// Any JMS Exception are allowed to fall through, indicating that the join failed. public void lateJoin(Message message) throws JMSException; } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using javax.jms.ExceptionListener; using javax.jms.JMSException; using java.io.PrintWriter; using java.io.StringWriter; using java.util.ArrayList; using System.Collections.Generic.IList; namespace Apache.Qpid.Integration.Tests.framework { /// /// An exception monitor, listens for JMS exception on a connection or consumer. It record all exceptions that it receives /// and provides methods to test the number and type of exceptions received. /// ///

///
CRC Card
Responsibilities Collaborations ///
Record all exceptions received. ///
///

public class ExceptionMonitor : ExceptionListener { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(ExceptionMonitor)); /// Holds the received exceptions. IList exceptions = new ArrayList(); /// /// Receives incoming exceptions. /// /// The exception to record. public synchronized void onException(JMSException e) { log.debug("public void onException(JMSException e): called", e); exceptions.add(e); } /// /// Checks that no exceptions have been received. /// /// true if no exceptions have been received, false otherwise. public synchronized bool assertNoExceptions() { return exceptions.isEmpty(); } /// /// Checks that exactly one exception has been received. /// /// true if exactly one exception been received, false otherwise. public synchronized bool assertOneJMSException() { return exceptions.size() == 1; } /// /// Checks that exactly one exception, with a linked cause of the specified type, has been received. /// /// The type of the linked cause. /// /// true if exactly one exception, with a linked cause of the specified type, been received, /// false otherwise. public synchronized bool assertOneJMSExceptionWithLinkedCause(Class aClass) { if (exceptions.size() == 1) { Exception e = exceptions.get(0); if (e instanceof JMSException) { JMSException jmse = (JMSException) e; Exception linkedCause = jmse.getLinkedException(); if ((linkedCause != null) && aClass.isInstance(linkedCause)) { return true; } } } return false; } /// /// Checks that at least one exception of the the specified type, has been received. /// /// The type of the exception. /// /// true if at least one exception of the specified type has been received, false otherwise. public synchronized bool assertExceptionOfType(Class exceptionClass) { // Start by assuming that the exception has no been received. bool passed = false; // Scan all the exceptions for a match. for (Exception e : exceptions) { if (exceptionClass.isInstance(e)) { passed = true; break; } } return passed; } /// /// Reports the number of exceptions held by this monitor. /// /// The number of exceptions held by this monitor. public synchronized int size() { return exceptions.size(); } /// /// Clears the record of received exceptions. /// public synchronized void reset() { exceptions = new ArrayList(); } /// /// Provides a dump of the stack traces of all exceptions that this exception monitor was notified of. Mainly /// use for debugging/test failure reporting purposes. /// /// A string containing a dump of the stack traces of all exceptions. public synchronized string ToString() { string result = "ExceptionMonitor: holds " + exceptions.size() + " exceptions.\n\n"; for (Exception ex : exceptions) { result += getStackTrace(ex) + "\n"; } return result; } /// /// Prints an exception stack trace into a string. /// /// The throwable to get the stack trace from. /// /// A string containing the throwables stack trace. public static string getStackTrace(Throwable t) { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw, true); t.printStackTrace(pw); pw.flush(); sw.flush(); return sw.ToString(); } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using org.apache.log4j.NDC; using Apache.Qpid.Integration.Tests.framework.BrokerLifecycleAware; using Apache.Qpid.Integration.Tests.framework.sequencers.CircuitFactory; using uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; using uk.co.thebadgerset.junit.extensions.SetupTaskAware; using uk.co.thebadgerset.junit.extensions.SetupTaskHandler; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using uk.co.thebadgerset.junit.extensions.util.TestContextProperties; using java.util.ArrayList; using System.Collections.Generic.IList; namespace Apache.Qpid.Integration.Tests.framework { /// /// FrameworkBaseCase provides a starting point for writing test cases against the test framework. Its main purpose is /// to provide some convenience methods for testing. /// ///

///
CRC Card
Responsibilities Collaborations ///
Create and clean up in-vm brokers on every test case. ///
Produce lists of assertions from assertion creation calls. ///
Produce JUnit failures from assertion failures. ///
Convert failed assertions to error messages. ///
///

public class FrameworkBaseCase extends AsymptoticTestCase : FrameworkTestContext, SetupTaskAware, BrokerLifecycleAware { /// Used for debugging purposes. private static ILog log = LogManager.GetLogger(typeof(FrameworkBaseCase)); /// Holds the test sequencer to create and run test circuits with. protected CircuitFactory circuitFactory = new LocalCircuitFactory(); /// Used to read the tests configurable properties through. protected ParsedProperties testProps; /// A default setup task processor to delegate setup tasks to. protected SetupTaskHandler taskHandler = new SetupTaskHandler(); /// Flag used to track whether the test is in-vm or not. protected bool isUsingInVM; /// Holds the failure mechanism. protected CauseFailure failureMechanism = new CauseFailureUserPrompt(); /// /// Creates a new test case with the specified name. /// /// The test case name. public FrameworkBaseCase(string name) { super(name); } /// /// Returns the test case sequencer that provides test circuit, and test sequence implementations. The sequencer /// that this base case returns by default is suitable for running a test circuit with both circuit ends colocated /// on the same JVM. /// /// The test case sequencer. protected CircuitFactory getCircuitFactory() { return circuitFactory; } /// /// Overrides the default test circuit factory. Test decorators can use this to supply distributed test sequencers or /// other test circuit factory specializations. /// /// The new test circuit factory. public void setCircuitFactory(CircuitFactory circuitFactory) { this.circuitFactory = circuitFactory; } /// /// Reports the current test case name. /// /// The current test case name. public TestCaseVector getTestCaseVector() { return new TestCaseVector(this.getName(), 0); } /// /// Reports the current test case parameters. /// /// The current test case parameters. public MessagingTestConfigProperties getTestParameters() { return new MessagingTestConfigProperties(testProps); } /// /// Creates a list of assertions. /// /// The assertions to compile in a list. /// /// A list of assertions. protected IList assertionList(Assertion... asserts) { IList result = new ArrayList(); for (Assertion assertion : asserts) { result.add(assertion); } return result; } /// /// Generates a JUnit assertion exception (failure) if any assertions are passed into this method, also concatenating /// all of the error messages in the assertions together to form an error message to diagnose the test failure with. /// /// The list of failed assertions. protected static void assertNoFailures(List asserts) { log.debug("protected void assertNoFailures(List asserts = " + asserts + "): called"); // Check if there are no assertion failures, and return without doing anything if so. if ((asserts == null) || asserts.isEmpty()) { return; } // Compile all of the assertion failure messages together. string errorMessage = assertionsToString(asserts); // Fail with the error message from all of the assertions. fail(errorMessage); } /// /// Converts a list of failed assertions into an error message. /// /// The failed assertions. /// /// The error message. protected static string assertionsToString(List asserts) { string errorMessage = ""; for (Assertion assertion : asserts) { errorMessage += assertion.ToString() + "\n"; } return errorMessage; } /// /// Ensures that the in-vm broker is created and initialized. /// /// /// Any exceptions allowed to fall through and fail the test. protected void setUp() throws Exception { NDC.push(getName()); testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); // Process all optional setup tasks. This may include in-vm broker creation, if a decorator has added it. taskHandler.runSetupTasks(); } /// Ensures that the in-vm broker is cleaned up after each test run. protected void tearDown() { NDC.pop(); // Process all optional tear down tasks. This may include in-vm broker clean up, if a decorator has added it. taskHandler.runTearDownTasks(); } /// /// Adds the specified task to the tests setup. /// /// The task to add to the tests setup. public void chainSetupTask(Runnable task) { taskHandler.chainSetupTask(task); } /// /// Adds the specified task to the tests tear down. /// /// The task to add to the tests tear down. public void chainTearDownTask(Runnable task) { taskHandler.chainTearDownTask(task); } /// /// Should provide a translation from the junit method name of a test to its test case name as known to the test /// clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test /// case name to place into the test invite. For example the method "testP2P" might map onto the interop test case /// name "TC2_BasicP2P". /// /// The name of the JUnit test method. /// /// The name of the corresponding interop test case. public string getTestCaseNameForTestMethod(string methodName) { return methodName; } public void setInVmBrokers() { isUsingInVM = true; } /// /// Indicates whether or not a test case is using in-vm brokers. /// /// true if the test is using in-vm brokers, false otherwise. public bool usingInVmBroker() { return isUsingInVM; } /// /// Sets the currently live in-vm broker. /// /// The currently live in-vm broker. public void setLiveBroker(int i) { } /// /// Reports the currently live in-vm broker. /// /// The currently live in-vm broker. public int getLiveBroker() { return 0; } /// /// Accepts a failure mechanism. /// /// The failure mechanism. public void setFailureMechanism(CauseFailure failureMechanism) { this.failureMechanism = failureMechanism; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ namespace Apache.Qpid.Integration.Tests.framework { /// /// A FrameworkTestContext provides context information to test code about the current test case being run; its name, its /// parameters. /// ///

///
CRC Card
Responsibilities Collaborations ///
Provide the name of the current test case. ///
Provide the test parameters. ///
///

public interface FrameworkTestContext { /// /// Reports the current test case name. /// /// The current test case name. TestCaseVector getTestCaseVector(); /// /// Reports the current test case parameters. /// /// The current test case parameters. MessagingTestConfigProperties getTestParameters(); } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using Apache.Qpid.Integration.Tests.framework.*; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using javax.jms.*; using System.Collections.Generic.LinkedList; using System.Collections.Generic.IList; namespace Apache.Qpid.Integration.Tests.framework.localcircuit { /// /// LocalCircuitImpl provides an implementation of the test circuit. This is a local only circuit implementation that /// supports a single producer/consumer on each end of the circuit, with both ends of the circuit on the same JVM. /// ///

///
CRC Card
Responsibilities Collaborations ///
Supply the publishing and receiving ends of a test messaging circuit. /// , ///
Start the circuit running. ///
Close the circuit down. ///
Take a reading of the circuits state. ///
Apply assertions against the circuits state. ///
Send test messages over the circuit. ///
Perform the default test procedure on the circuit. ///
Provide access to connection and controlSession exception monitors. ///
///

public class LocalCircuitImpl : Circuit { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(LocalCircuitImpl)); /// Holds the test configuration for the circuit. private ParsedProperties testProps; /// Holds the publishing end of the circuit. private LocalPublisherImpl publisher; /// Holds the receiving end of the circuit. private LocalReceiverImpl receiver; /// Holds the connection for the publishing end of the circuit. private Connection connection; /// Holds the exception listener for the connection on the publishing end of the circuit. private ExceptionMonitor connectionExceptionMonitor; /// Holds the exception listener for the controlSession on the publishing end of the circuit. private ExceptionMonitor exceptionMonitor; /// /// Creates a test circuit using the specified test parameters. The publisher, receivers, connection and /// connection monitor must already have been created, to assemble the circuit. /// /// The test parameters. /// The test publisher. /// The test receivers. /// The connection. /// The connection exception monitor. public LocalCircuitImpl(ParsedProperties testProps, LocalPublisherImpl publisher, LocalReceiverImpl receiver, Connection connection, ExceptionMonitor connectionExceptionMonitor) { this.testProps = testProps; this.publisher = publisher; this.receiver = receiver; this.connection = connection; this.connectionExceptionMonitor = connectionExceptionMonitor; this.exceptionMonitor = new ExceptionMonitor(); // Set this as the parent circuit on the publisher and receivers. publisher.setCircuit(this); receiver.setCircuit(this); } /// /// Gets the interface on the publishing end of the circuit. /// /// The publishing end of the circuit. public Publisher getPublisher() { return publisher; } /// /// Gets the local publishing circuit end, for direct manipulation. /// /// The local publishing circuit end. public CircuitEnd getLocalPublisherCircuitEnd() { return publisher; } /// /// Gets the interface on the receiving end of the circuit. /// /// The receiving end of the circuit. public Receiver getReceiver() { return receiver; } /// /// Gets the local receiving circuit end, for direct manipulation. /// /// The local receiving circuit end. public CircuitEnd getLocalReceiverCircuitEnd() { return receiver; } /// /// Checks the test circuit. The effect of this is to gather the circuits state, for both ends of the circuit, /// into a report, against which assertions may be checked. /// public void check() { } /// /// Applied a list of assertions against the test circuit. The method should be called before doing /// this, to ensure that the circuit has gathered its state into a report to assert against. /// /// The list of assertions to apply. /// Any assertions that failed. public IList applyAssertions(List assertions) { IList failures = new LinkedList(); for (Assertion assertion : assertions) { if (!assertion.apply()) { failures.add(assertion); } } return failures; } /// Connects and starts the circuit. After this method is called the circuit is ready to send messages. public void start() { } /// Closes the circuit. All associated resources are closed. public void close() { try { publisher.close(); receiver.close(); connection.close(); } catch (JMSException e) { throw new RuntimeException("Got JMSException during close:" + e.getMessage(), e); } } /// Sends a message on the test circuit. The exact nature of the message sent is controlled by the test parameters. protected void send() { // Cast the test properties into a typed interface for convenience. MessagingTestConfigProperties props = new MessagingTestConfigProperties(testProps); bool transactional = props.getPublisherTransacted(); bool rollback = props.getRollbackPublisher(); // Send a message through the publisher and log any exceptions raised. try { CircuitEnd end = getLocalPublisherCircuitEnd(); end.send(createTestMessage(end)); if (rollback) { end.getSession().rollback(); } else if (transactional) { end.getSession().commit(); } } catch (JMSException e) { exceptionMonitor.onException(e); } } /// /// Runs the default test procedure against the circuit, and checks that all of the specified assertions hold. The /// outline of the default test procedure is: /// ///

        /// Start the circuit.
        /// Send test messages.
        /// Request a status report.
        /// Assert conditions on the publishing end of the circuit.
        /// Assert conditions on the receiving end of the circuit.
        /// Close the circuit.
        /// Pass with no failed assertions or fail with a list of failed assertions.
        /// 
///
/// The number of messages to send using the default test procedure. /// The list of assertions to apply. /// Any assertions that failed. public IList test(int numMessages, List assertions) { // Start the test circuit. start(); // Send the requested number of test messages. for (int i = 0; i < numMessages; i++) { send(); } // Inject a short pause to allow time for exceptions to come back asynchronously. TestUtils.pause(500L); // Request a status report. check(); // Clean up the publisher/receivers/controlSession/connections. close(); // Apply all of the requested assertions, keeping record of any that fail. IList failures = applyAssertions(assertions); // Return any failed assertions to the caller. return failures; } /// /// Creates a message with the properties defined as per the test parameters. /// /// The circuit end to create the message on. /// /// The test message. /// /// Any JMSException occurring during creation of the message is allowed to fall through. private Message createTestMessage(CircuitEnd client) throws JMSException { // Cast the test properties into a typed interface for convenience. MessagingTestConfigProperties props = new MessagingTestConfigProperties(testProps); return TestUtils.createTestMessageOfSize(client.getSession(), props.getMessageSize()); } /// /// Gets the exception monitor for the publishing ends connection. /// /// The exception monitor for the publishing ends connection. public ExceptionMonitor getConnectionExceptionMonitor() { return connectionExceptionMonitor; } /// /// Gets the exception monitor for the publishing ends controlSession. /// /// The exception monitor for the publishing ends controlSession. public ExceptionMonitor getExceptionMonitor() { return exceptionMonitor; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using Apache.Qpid.Integration.Tests.framework.*; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using javax.jms.MessageConsumer; using javax.jms.MessageProducer; using javax.jms.Session; namespace Apache.Qpid.Integration.Tests.framework.localcircuit { /// /// Provides an implementation of the interface and wraps a single message producer and consumer on /// a single controlSession, as a . A local publisher also acts as a circuit end, because for a locally /// located circuit the assertions may be applied directly, there does not need to be any inter-process messaging /// between the publisher and its single circuit end, in order to ascertain its status. /// ///

///
CRC Card
Responsibilities Collaborations ///
Provide a message producer for sending messages. ///
Provide a message consumer for receiving messages. ///
Provide assertion that the publisher received no exceptions. ///
Provide assertion that the publisher received a no consumers error code. ///
Provide assertion that the publisher received a no route error code. ///
///

public class LocalPublisherImpl extends CircuitEndBase : Publisher { /// Holds a reference to the containing circuit. protected LocalCircuitImpl circuit; /// /// Creates a circuit end point on the specified producer, consumer and controlSession. Monitors are also configured /// for messages and exceptions received by the circuit end. /// /// The message producer for the circuit end point. /// The message consumer for the circuit end point. /// The controlSession for the circuit end point. /// The monitor to notify of all messages received by the circuit end. /// The monitor to notify of all exceptions received by the circuit end. public LocalPublisherImpl(MessageProducer producer, MessageConsumer consumer, Session session, MessageMonitor messageMonitor, ExceptionMonitor exceptionMonitor) { super(producer, consumer, session, messageMonitor, exceptionMonitor); } /// /// Creates a circuit end point from the producer, consumer and controlSession in a circuit end base implementation. /// /// The circuit end base implementation to take producers and consumers from. public LocalPublisherImpl(CircuitEndBase end) { super(end.getProducer(), end.getConsumer(), end.getSession(), end.getMessageMonitor(), end.getExceptionMonitor()); } /// Provides an assertion that the publisher encountered no exceptions. /// /// The test configuration properties. /// /// An assertion that the publisher encountered no exceptions. public Assertion noExceptionsAssertion(ParsedProperties testProps) { return new AssertionBase() { public bool apply() { bool passed = true; ExceptionMonitor sessionExceptionMonitor = circuit.getExceptionMonitor(); ExceptionMonitor connectionExceptionMonitor = circuit.getConnectionExceptionMonitor(); if (!connectionExceptionMonitor.assertNoExceptions()) { passed = false; addError("Was expecting no exceptions.\n"); addError("Got the following exceptions on the connection, " + circuit.getConnectionExceptionMonitor()); } if (!sessionExceptionMonitor.assertNoExceptions()) { passed = false; addError("Was expecting no exceptions.\n"); addError("Got the following exceptions on the producer, " + circuit.getExceptionMonitor()); } return passed; } }; } /// /// Provides an assertion that the AMQP channel was forcibly closed by an error condition. /// /// The test configuration properties. /// /// An assertion that the AMQP channel was forcibly closed by an error condition. public Assertion channelClosedAssertion(ParsedProperties testProps) { return new NotApplicableAssertion(testProps); } /// /// Provides an assertion that the publisher got a given exception during the test. /// /// The test configuration properties. /// The exception class to check for. /// /// An assertion that the publisher got a given exception during the test. public Assertion exceptionAssertion(ParsedProperties testProps, final Class exceptionClass) { return new AssertionBase() { public bool apply() { bool passed = true; ExceptionMonitor connectionExceptionMonitor = circuit.getConnectionExceptionMonitor(); if (!connectionExceptionMonitor.assertExceptionOfType(exceptionClass)) { passed = false; addError("Was expecting linked exception type " + exceptionClass.getName() + " on the connection.\n"); addError((connectionExceptionMonitor.size() > 0) ? ("Actually got the following exceptions on the connection, " + connectionExceptionMonitor) : "Got no exceptions on the connection."); } return passed; } }; } /// /// Sets the contianing circuit. /// /// The containing circuit. public void setCircuit(LocalCircuitImpl circuit) { this.circuit = circuit; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using Apache.Qpid.Integration.Tests.framework.*; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using javax.jms.MessageConsumer; using javax.jms.MessageProducer; using javax.jms.Session; namespace Apache.Qpid.Integration.Tests.framework.localcircuit { /// /// Provides an implementation of the interface that wraps a single message producer and consumer on /// a single controlSession, as a . A local receiver also acts as a circuit end, because for a locally /// located circuit the assertions may be applied directly, there does not need to be any inter process messaging /// between the publisher and its single circuit end, in order to ascertain its status. /// ///

///
CRC Card
Responsibilities Collaborations ///
Provide a message producer for sending messages. ///
Provide a message consumer for receiving messages. ///
Provide assertion that the receivers received no exceptions. ///
Provide assertion that the receivers received all test messages sent to it. ///
///

public class LocalReceiverImpl extends CircuitEndBase : Receiver { /// Holds a reference to the containing circuit. private LocalCircuitImpl circuit; /// /// Creates a circuit end point on the specified producer, consumer and controlSession. Monitors are also configured /// for messages and exceptions received by the circuit end. /// /// The message producer for the circuit end point. /// The message consumer for the circuit end point. /// The controlSession for the circuit end point. /// The monitor to notify of all messages received by the circuit end. /// The monitor to notify of all exceptions received by the circuit end. public LocalReceiverImpl(MessageProducer producer, MessageConsumer consumer, Session session, MessageMonitor messageMonitor, ExceptionMonitor exceptionMonitor) { super(producer, consumer, session, messageMonitor, exceptionMonitor); } /// /// Creates a circuit end point from the producer, consumer and controlSession in a circuit end base implementation. /// /// The circuit end base implementation to take producers and consumers from. public LocalReceiverImpl(CircuitEndBase end) { super(end.getProducer(), end.getConsumer(), end.getSession(), end.getMessageMonitor(), end.getExceptionMonitor()); } /// /// Provides an assertion that the receivers encountered no exceptions. /// /// The test configuration properties. /// /// An assertion that the receivers encountered no exceptions. public Assertion noExceptionsAssertion(ParsedProperties testProps) { return new NotApplicableAssertion(testProps); } /// /// Provides an assertion that the AMQP channel was forcibly closed by an error condition. /// /// The test configuration properties. /// /// An assertion that the AMQP channel was forcibly closed by an error condition. public Assertion channelClosedAssertion(ParsedProperties testProps) { return new NotApplicableAssertion(testProps); } /// /// Provides an assertion that the receivers got all messages that were sent to it. /// /// The test configuration properties. /// /// An assertion that the receivers got all messages that were sent to it. public Assertion allMessagesReceivedAssertion(ParsedProperties testProps) { return new NotApplicableAssertion(testProps); } /// /// Provides an assertion that the receivers got none of the messages that were sent to it. /// /// The test configuration properties. /// /// An assertion that the receivers got none of the messages that were sent to it. public Assertion noMessagesReceivedAssertion(ParsedProperties testProps) { return new NotApplicableAssertion(testProps); } /// /// Provides an assertion that the receiver got a given exception during the test. /// /// The test configuration properties. /// The exception class to check for. An assertion that the receiver got a given exception during the test. public Assertion exceptionAssertion(ParsedProperties testProps, Class exceptionClass) { return new NotApplicableAssertion(testProps); } /// /// Sets the contianing circuit. /// /// The containing circuit. public void setCircuit(LocalCircuitImpl circuit) { this.circuit = circuit; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using Apache.Qpid.Integration.Tests.framework.localcircuit.LocalCircuitImpl; using Apache.Qpid.Integration.Tests.framework.localcircuit.LocalPublisherImpl; using Apache.Qpid.Integration.Tests.framework.localcircuit.LocalReceiverImpl; using Apache.Qpid.Integration.Tests.framework.sequencers.CircuitFactory; using org.apache.qpid.util.ConversationFactory; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using javax.jms.*; using System.Collections.Generic.IList; using java.util.Properties; using java.util.concurrent.atomic.AtomicLong; namespace Apache.Qpid.Integration.Tests.framework { /// /// LocalCircuitFactory is a circuit factory that creates test circuits with publishing and receiving ends rooted /// on the same JVM. The ends of the circuit are presented as and interfaces, which /// in turn provide methods to apply assertions to the circuit. The creation of the circuit ends, and the presentation /// of the ends as publisher/receiver interfaces, are designed to be overriden, so that circuits and assertions that /// use messaging features not available in JMS can be written. This provides an extension point for writing tests /// against proprietary features of JMS implementations. /// ///

///
CRC Card
Responsibilities Collaborations ///
Provide a standard test procedure over a test circuit. ///
Construct test circuits appropriate to a tests context. ///
///

public class LocalCircuitFactory : CircuitFactory { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(LocalCircuitFactory)); /// Used to create unique destination names for each test. protected static AtomicLong uniqueDestsId = new AtomicLong(); /// /// Holds a test coordinating conversation with the test clients. This should consist of assigning the test roles, /// begining the test and gathering the test reports from the participants. /// /// The test circuit. /// The list of assertions to apply to the test circuit. /// The test case definition. public void sequenceTest(Circuit testCircuit, IList assertions, Properties testProperties) { FrameworkBaseCase.assertNoFailures(testCircuit.test(1, assertions)); } /// /// Creates a test circuit for the test, configered by the test parameters specified. /// /// The test parameters. /// /// A test circuit. public Circuit createCircuit(ParsedProperties testProperties) { Circuit result; // Cast the test properties into a typed interface for convenience. MessagingTestConfigProperties props = new MessagingTestConfigProperties(testProperties); // Create a standard publisher/receivers test client pair on a shared connection, individual sessions. try { // Get a unique offset to append to destination names to make them unique to the connection. long uniqueId = uniqueDestsId.incrementAndGet(); // Set up the connection. Connection connection = TestUtils.createConnection(testProperties); // Add the connection exception listener to assert on exception conditions with. // ExceptionMonitor exceptionMonitor = new ExceptionMonitor(); // connection.setExceptionListener(exceptionMonitor); // Set up the publisher. CircuitEndBase publisherEnd = createPublisherCircuitEnd(connection, props, uniqueId); // Set up the receiver. CircuitEndBase receiverEnd = createReceiverCircuitEnd(connection, props, uniqueId); // Start listening for incoming messages. connection.start(); // Namespace everything up. LocalPublisherImpl publisher = createPublisherFromCircuitEnd(publisherEnd); LocalReceiverImpl receiver = createReceiverFromCircuitEnd(receiverEnd); result = new LocalCircuitImpl(testProperties, publisher, receiver, connection, publisher.getExceptionMonitor()); } catch (JMSException e) { throw new RuntimeException("Could not create publisher/receivers pair due to a JMSException.", e); } return result; } /// /// Creates a local from a . Sub-classes may override this to provide more /// specialized receivers if necessary. /// /// The receiving circuit end. /// /// A . protected LocalReceiverImpl createReceiverFromCircuitEnd(CircuitEndBase receiverEnd) { return new LocalReceiverImpl(receiverEnd); } /// /// Creates a local from a . Sub-classes may override this to provide more /// specialized receivers if necessary. /// /// The publishing circuit end. /// /// A . protected LocalPublisherImpl createPublisherFromCircuitEnd(CircuitEndBase publisherEnd) { return new LocalPublisherImpl(publisherEnd); } /// /// Builds a circuit end suitable for the publishing side of a test circuit, from standard test parameters. /// /// The connection to build the circuit end on. /// The test parameters to configure the circuit end construction. /// A unique number to being numbering destinations from, to make this circuit unique. /// /// A circuit end suitable for the publishing side of a test circuit. /// /// Any underlying JMSExceptions are allowed to fall through and fail the creation. public CircuitEndBase createPublisherCircuitEnd(Connection connection, ParsedProperties testProps, long uniqueId) throws JMSException { log.debug( "public CircuitEndBase createPublisherCircuitEnd(Connection connection, ParsedProperties testProps, long uniqueId = " + uniqueId + "): called"); // Cast the test properties into a typed interface for convenience. MessagingTestConfigProperties props = new MessagingTestConfigProperties(testProps); // Check that the test properties do not contain AMQP/Qpid specific settings, and fail if they do. if (props.getImmediate() || props.getMandatory()) { throw new RuntimeException( "Cannot create a pure JMS circuit as the test properties require AMQP specific options."); } Session session = connection.createSession(props.getPublisherTransacted(), props.getAckMode()); Destination destination = props.getPubsub() ? session.createTopic(props.getSendDestinationNameRoot() + "_" + uniqueId) : session.createQueue(props.getSendDestinationNameRoot() + "_" + uniqueId); MessageProducer producer = props.getPublisherProducerBind() ? session.createProducer(destination) : null; MessageConsumer consumer = props.getPublisherConsumerBind() ? session.createConsumer(session.createQueue(props.getReceiveDestinationNameRoot() + "_" + uniqueId)) : null; MessageMonitor messageMonitor = new MessageMonitor(); if (consumer != null) { consumer.setMessageListener(messageMonitor); } ExceptionMonitor exceptionMonitor = new ExceptionMonitor(); connection.setExceptionListener(exceptionMonitor); if (!props.getPublisherConsumerActive() && (consumer != null)) { consumer.close(); } return new CircuitEndBase(producer, consumer, session, messageMonitor, exceptionMonitor); } /// /// Builds a circuit end suitable for the receiving side of a test circuit, from standard test parameters. /// /// The connection to build the circuit end on. /// The test parameters to configure the circuit end construction. /// A unique number to being numbering destinations from, to make this circuit unique. /// /// A circuit end suitable for the receiving side of a test circuit. /// /// Any underlying JMSExceptions are allowed to fall through and fail the creation. public CircuitEndBase createReceiverCircuitEnd(Connection connection, ParsedProperties testProps, long uniqueId) throws JMSException { log.debug( "public CircuitEndBase createReceiverCircuitEnd(Connection connection, ParsedProperties testProps, long uniqueId = " + uniqueId + "): called"); // Cast the test properties into a typed interface for convenience. MessagingTestConfigProperties props = new MessagingTestConfigProperties(testProps); // Check that the test properties do not contain AMQP/Qpid specific settings, and fail if they do. if (props.getImmediate() || props.getMandatory()) { throw new RuntimeException( "Cannot create a pure JMS circuit as the test properties require AMQP specific options."); } Session session = connection.createSession(props.getPublisherTransacted(), props.getAckMode()); MessageProducer producer = props.getReceiverProducerBind() ? session.createProducer(session.createQueue(props.getReceiveDestinationNameRoot() + "_" + uniqueId)) : null; Destination destination = props.getPubsub() ? session.createTopic(props.getSendDestinationNameRoot() + "_" + uniqueId) : session.createQueue(props.getSendDestinationNameRoot() + "_" + uniqueId); MessageConsumer consumer = props.getReceiverConsumerBind() ? ((props.getDurableSubscription() && props.getPubsub()) ? session.createDurableSubscriber((Topic) destination, "testsub") : session.createConsumer(destination)) : null; MessageMonitor messageMonitor = new MessageMonitor(); if (consumer != null) { consumer.setMessageListener(messageMonitor); } if (!props.getReceiverConsumerActive() && (consumer != null)) { consumer.close(); } return new CircuitEndBase(producer, consumer, session, messageMonitor, null); } /// /// Sets the sender test client to coordinate the test with. /// /// The contact details of the sending client in the test. public void setSender(TestClientDetails sender) { throw new RuntimeException("Not implemented."); } /// /// Sets the receiving test client to coordinate the test with. /// /// The contact details of the sending client in the test. public void setReceiver(TestClientDetails receiver) { throw new RuntimeException("Not implemented."); } /// /// Supplies the sending test client. /// /// The sending test client. public TestClientDetails getSender() { throw new RuntimeException("Not implemented."); } /// /// Supplies the receiving test client. /// /// The receiving test client. public IList getReceivers() { throw new RuntimeException("Not implemented."); } /// /// Accepts the conversation factory over which to hold the test coordinating conversation. /// /// The conversation factory to coordinate the test over. public void setConversationFactory(ConversationFactory conversationFactory) { throw new RuntimeException("Not implemented."); } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ namespace Apache.Qpid.Integration.Tests.framework { /// /// MessageIdentityVector provides a message identification scheme, that matches individual messages with test cases. /// Test messages are being sent by a number of test clients, sending messages over a set of routes, and being received /// by another set of test clients. Each test is itself, being run within a test cycle, of which there could be many. It /// is the job of the test coordinator to request and receive reports from the available test clients, on what has been /// sent, what has been received, and what errors may have occurred, and to reconcile this information against the /// assertions being applied by the test case. In order to be able to figure out which messages belong to which test, /// there needs to be an identification scheme, that the coordinator can use to correlate messages in senders and /// receiver reports. Every message sent in a test can be associated with this information. /// ///

///
CRC Card
Responsibilities Collaborations ///
Identify a test case, a handling client id, a circuit end within the client, and a test cycle number. ///
///

public class MessageIdentityVector { /// Holds the test case vector component of the message identity vector. private TestCaseVector testCaseVector; /// The unique client id. private string clientId; /// The unique circuit end number within the client id. private int circuitEndId; /// /// Creates a new identity vector for test messages. /// /// The name of the test case generating the messages. /// The unique id of the client implementing a circuit end that is handling the messages. /// The unique id number of the circuit end within the client. /// The cycle iteration number of the test case. public MessageIdentityVector(string testCase, string clientId, int circuitEndId, int testCycleNumber) { this.testCaseVector = new TestCaseVector(testCase, testCycleNumber); this.clientId = clientId; this.circuitEndId = circuitEndId; } /// /// Reports the test case vector component of the message identity vector. /// /// The test case vector component of the message identity vector. public TestCaseVector getTestCaseVector() { return testCaseVector; } /// /// Reports the name of the test case. /// /// The name of the test case. public string getTestCase() { return testCaseVector.getTestCase(); } /// /// Reports the test iteration cycle number within the test case. /// /// The test iteration cycle number within the test case. public int getTestCycleNumber() { return testCaseVector.getTestCycleNumber(); } /// /// Resports the client id. /// /// The client id. public string getClientId() { return clientId; } /// /// Reports the circuit end number within the test client. /// /// The circuit end number within the test client. public int getCircuitEndId() { return circuitEndId; } /// /// Compares this identity vector with another for equality. All fields must match. /// /// The identity vector to compare with. /// /// true if the identity vector is identical to this one by all fields, false otherwise. public bool equals(Object o) { if (this == o) { return true; } if ((o == null) || (getClass() != o.getClass())) { return false; } MessageIdentityVector that = (MessageIdentityVector) o; if (circuitEndId != that.circuitEndId) { return false; } if ((clientId != null) ? (!clientId.equals(that.clientId)) : (that.clientId != null)) { return false; } if ((testCaseVector != null) ? (!testCaseVector.equals(that.testCaseVector)) : (that.testCaseVector != null)) { return false; } return true; } /// /// Computes a hash code for this identity vector based on all fields. /// /// A hash code for this identity vector based on all fields. public int hashCode() { int result; result = ((testCaseVector != null) ? testCaseVector.hashCode() : 0); result = (31 * result) + ((clientId != null) ? clientId.hashCode() : 0); result = (31 * result) + circuitEndId; return result; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using javax.jms.Message; using javax.jms.MessageListener; using java.util.concurrent.atomic.AtomicInteger; namespace Apache.Qpid.Integration.Tests.framework { /// /// MessageMonitor is used to record information about messages received. This will provide methods to check various /// properties, such as the type, number and content of messages received in order to verify the correct behaviour of /// tests. /// ///

///
CRC Card
Responsibilities Collaborations ///
Count incoming messages. ///
Record time ellapsed since the arrival of the first message. ///
Reset all counts and timings. ///
///

public class MessageMonitor : MessageListener { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(MessageMonitor)); /// Holds the count of messages received since the last query. protected AtomicInteger numMessages = new AtomicInteger(); /// Holds the time of arrival of the first message. protected Long firstMessageTime = null; /// /// Handles received messages. Does Nothing. /// /// The message. Ignored. public void onMessage(Message message) { // log.debug("public void onMessage(Message message): called"); numMessages.getAndIncrement(); } /// /// Gets the count of messages. /// /// The count of messages. public int getNumMessage() { if (firstMessageTime == null) { firstMessageTime = System.nanoTime(); } return numMessages.get(); } /// /// Gets the time elapsed since the first message arrived, in nanos, or zero if no messages have arrived yet. /// /// The time elapsed since the first message arrived, in nanos, or zero if no messages have arrived yet. public long getTime() { if (firstMessageTime != null) { return System.nanoTime() - firstMessageTime; } else { return 0L; } } /// Resets the message count and timer to zero. public void reset() { numMessages.set(0); firstMessageTime = null; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using javax.jms.Session; using java.util.Properties; namespace Apache.Qpid.Integration.Tests.framework { /// /// MessagingTestConfigProperties defines a set of property names and default values for specifying a messaging topology, /// and test parameters for running a messaging test over that topology. A Properties object holding some of these /// properties, superimposed onto the defaults, is used to establish test topologies and control test behaviour. /// ///

A complete list of the parameters, default values and comments on their usage is provided here: /// ///

///
Parameters
Parameter Default Comments ///
messageSize 0 Message size in bytes. Not including any headers. ///
destinationName ping The root name to use to generate destination names to ping. ///
persistent false Determines whether peristent delivery is used. ///
transacted false Determines whether messages are sent/received in transactions. ///
broker tcp://localhost:5672 Determines the broker to connect to. ///
virtualHost test Determines the virtual host to send all ping over. ///
rate 0 The maximum rate (in hertz) to send messages at. 0 means no limit. ///
verbose false The verbose flag for debugging. Prints to console on every message. ///
pubsub false Whether to ping topics or queues. Uses p2p by default. ///
username guest The username to access the broker with. ///
password guest The password to access the broker with. ///
selector null Not used. Defines a message selector to filter pings with. ///
destinationCount 1 The number of receivers listening to the pings. ///
timeout 30000 In milliseconds. The timeout to stop waiting for replies. ///
commitBatchSize 1 The number of messages per transaction in transactional mode. ///
uniqueDests true Whether each receivers only listens to one ping destination or all. ///
durableDests false Whether or not durable destinations are used. ///
ackMode AUTO_ACK The message acknowledgement mode. Possible values are: /// 0 - SESSION_TRANSACTED /// 1 - AUTO_ACKNOWLEDGE /// 2 - CLIENT_ACKNOWLEDGE /// 3 - DUPS_OK_ACKNOWLEDGE /// 257 - NO_ACKNOWLEDGE /// 258 - PRE_ACKNOWLEDGE ///
maxPending 0 The maximum size in bytes, of messages sent but not yet received. /// Limits the volume of messages currently buffered on the client /// or broker. Can help scale test clients by limiting amount of buffered /// data to avoid out of memory errors. ///
/// ///

///
CRC Card
Responsibilities Collaborations ///
Provide the names and defaults of all test parameters. ///
///

/// /// Put a type-safe wrapper around these properties, but continue to store the parameters as properties. This is /// simply to ensure that it is a simple matter to serialize/deserialize string/string pairs onto messages. public class MessagingTestConfigProperties extends ParsedProperties { // ====================== Connection Properties ================================== /// Holds the name of the default connection configuration. public static final string CONNECTION_NAME = "broker"; /// Holds the name of the property to get the initial context factory name from. public static final string INITIAL_CONTEXT_FACTORY_PROPNAME = "java.naming.factory.initial"; /// Defines the class to use as the initial context factory by default. public static final string INITIAL_CONTEXT_FACTORY_DEFAULT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; /// Holds the name of the property to get the test broker url from. public static final string BROKER_PROPNAME = "qpid.test.broker"; /// Holds the default broker url for the test. public static final string BROKER_DEFAULT = "vm://:1"; /// Holds the name of the property to get the test broker virtual path. public static final string VIRTUAL_HOST_PROPNAME = "virtualHost"; /// Holds the default virtual path for the test. public static final string VIRTUAL_HOST_DEFAULT = ""; /// Holds the name of the property to get the broker access username from. public static final string USERNAME_PROPNAME = "username"; /// Holds the default broker log on username. public static final string USERNAME_DEFAULT = "guest"; /// Holds the name of the property to get the broker access password from. public static final string PASSWORD_PROPNAME = "password"; /// Holds the default broker log on password. public static final string PASSWORD_DEFAULT = "guest"; // ====================== Messaging Topology Properties ========================== /// Holds the name of the property to get the bind publisher procuder flag from. public static final string PUBLISHER_PRODUCER_BIND_PROPNAME = "publisherProducerBind"; /// Holds the default value of the publisher producer flag. public static final bool PUBLISHER_PRODUCER_BIND_DEFAULT = true; /// Holds the name of the property to get the bind publisher procuder flag from. public static final string PUBLISHER_CONSUMER_BIND_PROPNAME = "publisherConsumerBind"; /// Holds the default value of the publisher consumer flag. public static final bool PUBLISHER_CONSUMER_BIND_DEFAULT = false; /// Holds the name of the property to get the bind receivers procuder flag from. public static final string RECEIVER_PRODUCER_BIND_PROPNAME = "receiverProducerBind"; /// Holds the default value of the receivers producer flag. public static final bool RECEIVER_PRODUCER_BIND_DEFAULT = false; /// Holds the name of the property to get the bind receivers procuder flag from. public static final string RECEIVER_CONSUMER_BIND_PROPNAME = "receiverConsumerBind"; /// Holds the default value of the receivers consumer flag. public static final bool RECEIVER_CONSUMER_BIND_DEFAULT = true; /// Holds the name of the property to get the publishers consumer active flag from. public static final string PUBLISHER_CONSUMER_ACTIVE_PROPNAME = "publisherConsumerActive"; /// Holds the default value of the publishers consumer active flag. public static final bool PUBLISHER_CONSUMER_ACTIVE_DEFAULT = true; /// Holds the name of the property to get the receivers consumer active flag from. public static final string RECEIVER_CONSUMER_ACTIVE_PROPNAME = "receiverConsumerActive"; /// Holds the default value of the receivers consumer active flag. public static final bool RECEIVER_CONSUMER_ACTIVE_DEFAULT = true; /// Holds the name of the property to get the destination name root from. public static final string SEND_DESTINATION_NAME_ROOT_PROPNAME = "sendDestinationRoot"; /// Holds the root of the name of the default destination to send to. public static final string SEND_DESTINATION_NAME_ROOT_DEFAULT = "sendTo"; /// Holds the name of the property to get the destination name root from. public static final string RECEIVE_DESTINATION_NAME_ROOT_PROPNAME = "receiveDestinationRoot"; /// Holds the root of the name of the default destination to send to. public static final string RECEIVE_DESTINATION_NAME_ROOT_DEFAULT = "receiveFrom"; /// Holds the name of the proeprty to get the destination count from. public static final string DESTINATION_COUNT_PROPNAME = "destinationCount"; /// Defines the default number of destinations to ping. public static final int DESTINATION_COUNT_DEFAULT = 1; /// Holds the name of the property to get the p2p or pub/sub messaging mode from. public static final string PUBSUB_PROPNAME = "pubsub"; /// Holds the pub/sub mode default, true means ping a topic, false means ping a queue. public static final bool PUBSUB_DEFAULT = false; // ====================== JMS Options and Flags ================================= /// Holds the name of the property to get the test delivery mode from. public static final string PERSISTENT_MODE_PROPNAME = "persistent"; /// Holds the message delivery mode to use for the test. public static final bool PERSISTENT_MODE_DEFAULT = false; /// Holds the name of the property to get the test transactional mode from. public static final string TRANSACTED_PUBLISHER_PROPNAME = "transactedPublisher"; /// Holds the transactional mode to use for the test. public static final bool TRANSACTED_PUBLISHER_DEFAULT = false; /// Holds the name of the property to get the test transactional mode from. public static final string TRANSACTED_RECEIVER_PROPNAME = "transactedReceiver"; /// Holds the transactional mode to use for the test. public static final bool TRANSACTED_RECEIVER_DEFAULT = false; /// Holds the name of the property to set the no local flag from. public static final string NO_LOCAL_PROPNAME = "noLocal"; /// Defines the default value of the no local flag to use when consuming messages. public static final bool NO_LOCAL_DEFAULT = false; /// Holds the name of the property to get the message acknowledgement mode from. public static final string ACK_MODE_PROPNAME = "ackMode"; /// Defines the default message acknowledgement mode. public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; /// Holds the name of the property to get the durable subscriptions flag from, when doing pub/sub messaging. public static final string DURABLE_SUBSCRIPTION_PROPNAME = "durableSubscription"; /// Defines the default value of the durable subscriptions flag. public static final bool DURABLE_SUBSCRIPTION_DEFAULT = false; // ====================== Qpid/AMQP Options and Flags ================================ /// Holds the name of the property to set the exclusive flag from. public static final string EXCLUSIVE_PROPNAME = "exclusive"; /// Defines the default value of the exclusive flag to use when consuming messages. public static final bool EXCLUSIVE_DEFAULT = false; /// Holds the name of the property to set the immediate flag from. public static final string IMMEDIATE_PROPNAME = "immediate"; /// Defines the default value of the immediate flag to use when sending messages. public static final bool IMMEDIATE_DEFAULT = false; /// Holds the name of the property to set the mandatory flag from. public static final string MANDATORY_PROPNAME = "mandatory"; /// Defines the default value of the mandatory flag to use when sending messages. public static final bool MANDATORY_DEFAULT = false; /// Holds the name of the property to get the durable destinations flag from. public static final string DURABLE_DESTS_PROPNAME = "durableDests"; /// Default value for the durable destinations flag. public static final bool DURABLE_DESTS_DEFAULT = false; /// Holds the name of the property to set the prefetch size from. public static final string PREFETCH_PROPNAME = "prefetch"; /// Defines the default prefetch size to use when consuming messages. public static final int PREFETCH_DEFAULT = 100; // ====================== Common Test Parameters ================================ /// Holds the name of the property to get the test message size from. public static final string MESSAGE_SIZE_PROPNAME = "messageSize"; /// Used to set up a default message size. public static final int MESSAGE_SIZE_DEAFULT = 0; /// Holds the name of the property to get the message rate from. public static final string RATE_PROPNAME = "rate"; /// Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. public static final int RATE_DEFAULT = 0; /// Holds the name of the proeprty to get the. public static final string SELECTOR_PROPNAME = "selector"; /// Holds the default message selector. public static final string SELECTOR_DEFAULT = ""; /// Holds the name of the property to get the waiting timeout for response messages. public static final string TIMEOUT_PROPNAME = "timeout"; /// Default time to wait before assuming that a ping has timed out. public static final long TIMEOUT_DEFAULT = 30000; /// Holds the name of the property to get the commit batch size from. public static final string TX_BATCH_SIZE_PROPNAME = "commitBatchSize"; /// Defines the default number of pings to send in each transaction when running transactionally. public static final int TX_BATCH_SIZE_DEFAULT = 1; /// Holds the name of the property to set the maximum amount of pending message data for a producer to hold. public static final string MAX_PENDING_PROPNAME = "maxPending"; /// Defines the default maximum quantity of pending message data to allow producers to hold. public static final int MAX_PENDING_DEFAULT = 0; /// Holds the name of the property to get the publisher rollback flag from. public static final string ROLLBACK_PUBLISHER_PROPNAME = "rollbackPublisher"; /// Holds the default publisher roll back setting. public static final bool ROLLBACK_PUBLISHER_DEFAULT = false; /// Holds the name of the property to get the publisher rollback flag from. public static final string ROLLBACK_RECEIVER_PROPNAME = "rollbackReceiver"; /// Holds the default publisher roll back setting. public static final bool ROLLBACK_RECEIVER_DEFAULT = false; // ====================== Options that control the bahviour of the test framework. ========================= /// Holds the name of the property to get the behavioural mode of not applicable assertions. public static final string NOT_APPLICABLE_ASSERTION_PROPNAME = "notApplicableAssertion"; /// Holds the default behavioral mode of not applicable assertions, which is logging them as a warning. public static final string NOT_APPLICABLE_ASSERTION_DEFAULT = "warn"; /// Holds the name of the property to get the verbose mode proeprty from. public static final string VERBOSE_PROPNAME = "verbose"; /// Holds the default verbose mode. public static final bool VERBOSE_DEFAULT = false; /// Holds the default configuration properties. public static ParsedProperties defaults = new ParsedProperties(); static { defaults.setPropertyIfNull(INITIAL_CONTEXT_FACTORY_PROPNAME, INITIAL_CONTEXT_FACTORY_DEFAULT); defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); defaults.setPropertyIfNull(PUBLISHER_PRODUCER_BIND_PROPNAME, PUBLISHER_PRODUCER_BIND_DEFAULT); defaults.setPropertyIfNull(PUBLISHER_CONSUMER_BIND_PROPNAME, PUBLISHER_CONSUMER_BIND_DEFAULT); defaults.setPropertyIfNull(RECEIVER_PRODUCER_BIND_PROPNAME, RECEIVER_PRODUCER_BIND_DEFAULT); defaults.setPropertyIfNull(RECEIVER_CONSUMER_BIND_PROPNAME, RECEIVER_CONSUMER_BIND_DEFAULT); defaults.setPropertyIfNull(PUBLISHER_CONSUMER_ACTIVE_PROPNAME, PUBLISHER_CONSUMER_ACTIVE_DEFAULT); defaults.setPropertyIfNull(RECEIVER_CONSUMER_ACTIVE_PROPNAME, RECEIVER_CONSUMER_ACTIVE_DEFAULT); defaults.setPropertyIfNull(SEND_DESTINATION_NAME_ROOT_PROPNAME, SEND_DESTINATION_NAME_ROOT_DEFAULT); defaults.setPropertyIfNull(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME, RECEIVE_DESTINATION_NAME_ROOT_DEFAULT); defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); defaults.setPropertyIfNull(TRANSACTED_PUBLISHER_PROPNAME, TRANSACTED_PUBLISHER_DEFAULT); defaults.setPropertyIfNull(TRANSACTED_RECEIVER_PROPNAME, TRANSACTED_RECEIVER_DEFAULT); defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT); defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); defaults.setPropertyIfNull(DURABLE_SUBSCRIPTION_PROPNAME, DURABLE_SUBSCRIPTION_DEFAULT); defaults.setPropertyIfNull(EXCLUSIVE_PROPNAME, EXCLUSIVE_DEFAULT); defaults.setPropertyIfNull(IMMEDIATE_PROPNAME, IMMEDIATE_DEFAULT); defaults.setPropertyIfNull(MANDATORY_PROPNAME, MANDATORY_DEFAULT); defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); defaults.setPropertyIfNull(PREFETCH_PROPNAME, PREFETCH_DEFAULT); defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); defaults.setPropertyIfNull(ROLLBACK_PUBLISHER_PROPNAME, ROLLBACK_PUBLISHER_DEFAULT); defaults.setPropertyIfNull(ROLLBACK_RECEIVER_PROPNAME, ROLLBACK_RECEIVER_DEFAULT); defaults.setPropertyIfNull(NOT_APPLICABLE_ASSERTION_PROPNAME, NOT_APPLICABLE_ASSERTION_DEFAULT); defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); } /// Creates a test configuration based on the defaults. public MessagingTestConfigProperties() { super(defaults); } /// /// Creates a test configuration based on the supplied properties. /// /// The test configuration. public MessagingTestConfigProperties(Properties properties) { super(properties); } /// /// The size of test messages to send. /// /// The size of test messages to send. public int getMessageSize() { return getPropertyAsInteger(MESSAGE_SIZE_PROPNAME); } /// /// Flag to indicate that the publishing producer should be set up to publish to a destination. /// /// Flag to indicate that the publishing producer should be set up to publish to a destination. public bool getPublisherProducerBind() { return getPropertyAsBoolean(PUBLISHER_PRODUCER_BIND_PROPNAME); } /// /// Flag to indicate that the publishing consumer should be set up to receive from a destination. /// /// Flag to indicate that the publishing consumer should be set up to receive from a destination. public bool getPublisherConsumerBind() { return getPropertyAsBoolean(PUBLISHER_CONSUMER_BIND_PROPNAME); } /// /// Flag to indicate that the receiving producer should be set up to publish to a destination. /// /// Flag to indicate that the receiving producer should be set up to publish to a destination. public bool getReceiverProducerBind() { return getPropertyAsBoolean(RECEIVER_PRODUCER_BIND_PROPNAME); } /// /// Flag to indicate that the receiving consumer should be set up to receive from a destination. /// /// Flag to indicate that the receiving consumer should be set up to receive from a destination. public bool getReceiverConsumerBind() { return getPropertyAsBoolean(RECEIVER_CONSUMER_BIND_PROPNAME); } /// /// Flag to indicate that the publishing consumer should be created and actively listening. /// /// Flag to indicate that the publishing consumer should be created. public bool getPublisherConsumerActive() { return getPropertyAsBoolean(PUBLISHER_CONSUMER_ACTIVE_PROPNAME); } /// /// Flag to indicate that the receiving consumers should be created and actively listening. /// /// Flag to indicate that the receiving consumers should be created and actively listening. public bool getReceiverConsumerActive() { return getPropertyAsBoolean(RECEIVER_CONSUMER_ACTIVE_PROPNAME); } /// /// A root to create all test destination names from. /// /// A root to create all test destination names from. public string getSendDestinationNameRoot() { return getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME); } /// /// A root to create all receiving destination names from. /// /// A root to create all receiving destination names from. public string getReceiveDestinationNameRoot() { return getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME); } /// /// Flag to indicate that persistent messages should be used. /// /// Flag to indicate that persistent messages should be used. public bool getPersistentMode() { return getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME); } /// /// Flag to indicate that transactional messages should be sent by the publisher. /// /// Flag to indicate that transactional messages should be sent by the publisher. public bool getPublisherTransacted() { return getPropertyAsBoolean(TRANSACTED_PUBLISHER_PROPNAME); } /// /// Flag to indicate that transactional receives should be used by the receiver. /// /// Flag to indicate that transactional receives should be used by the receiver. public bool getReceiverTransacted() { return getPropertyAsBoolean(TRANSACTED_PUBLISHER_PROPNAME); } /// /// The name of the virtual host to run all tests over. /// /// The name of the virtual host to run all tests over. public string getVirtualHost() { return getProperty(VIRTUAL_HOST_PROPNAME); } /// /// Limiting rate for each sender in messages per second, or zero for unlimited. /// /// Limiting rate for each sender in messages per second, or zero for unlimited. public string getRate() { return getProperty(RATE_PROPNAME); } /// /// Flag to indicate that test messages should be received publish/subscribe style by all receivers. /// /// Flag to indicate that test messages should be received publish/subscribe style by all receivers. public bool getPubsub() { return getPropertyAsBoolean(PUBSUB_PROPNAME); } /// /// The username credentials to run tests with. /// /// The username credentials to run tests with. public string getUsername() { return getProperty(USERNAME_PROPNAME); } /// /// The password credentials to run tests with. /// /// The password credentials to run tests with. public string getPassword() { return getProperty(PASSWORD_PROPNAME); } /// /// The timeout duration to fail tests on, should they receive no messages within it. /// /// The timeout duration to fail tests on, should they receive no messages within it. public long getTimeout() { return getPropertyAsLong(TIMEOUT_PROPNAME); } /// /// The number of messages to batch into each transaction in transational tests. /// /// The number of messages to batch into each transaction in transational tests. public int getTxBatchSize() { return getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME); } /// /// Flag to indicate that tests should use durable destinations. /// /// Flag to indicate that tests should use durable destinations. public bool getDurableDests() { return getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); } /// /// The ack mode for message receivers to use. /// /// The ack mode for message receivers to use. public int getAckMode() { return getPropertyAsInteger(ACK_MODE_PROPNAME); } /// /// Flag to indicate that tests should use durable subscriptions. /// /// Flag to indicate that tests should use durable subscriptions. public bool getDurableSubscription() { return getPropertyAsBoolean(DURABLE_SUBSCRIPTION_PROPNAME); } /// /// The maximum amount of in-flight data, in bytes, that tests should send at any time. /// /// The maximum amount of in-flight data, in bytes, that tests should send at any time. public int getMaxPending() { return getPropertyAsInteger(MAX_PENDING_PROPNAME); } /// /// The size of the prefetch queue to use. /// /// The size of the prefetch queue to use. public int getPrefetch() { return getPropertyAsInteger(PREFETCH_PROPNAME); } /// /// Flag to indicate that subscriptions should be no-local. /// /// Flag to indicate that subscriptions should be no-local. public bool getNoLocal() { return getPropertyAsBoolean(NO_LOCAL_PROPNAME); } /// /// Flag to indicate that subscriptions should be exclusive. /// /// Flag to indicate that subscriptions should be exclusive. public bool getExclusive() { return getPropertyAsBoolean(EXCLUSIVE_PROPNAME); } /// /// Flag to indicate that messages must be delivered immediately. /// /// Flag to indicate that messages must be delivered immediately. public bool getImmediate() { return getPropertyAsBoolean(IMMEDIATE_PROPNAME); } /// /// Flag to indicate that messages must be routable. /// /// Flag to indicate that messages must be routable. public bool getMandatory() { return getPropertyAsBoolean(MANDATORY_PROPNAME); } /// /// Gets the value of a flag to indicate that the publisher should rollback all messages sent. /// /// A flag to indicate that the publisher should rollback all messages sent. public bool getRollbackPublisher() { return getPropertyAsBoolean(ROLLBACK_PUBLISHER_PROPNAME); } /// /// Gets the value of a flag to indicate that the receiver should rollback all messages received, then receive them /// again. /// /// A flag to indicate that the publisher should rollback all messages received. public bool getRollbackReceiver() { return getPropertyAsBoolean(ROLLBACK_RECEIVER_PROPNAME); } /// /// Gets the behavioural mode of not applicable assertions. Should be one of 'quiet', 'warn' or 'fail'. /// /// The behavioural mode of not applicable assertions. public string getNotApplicableAssertionMode() { return getProperty(NOT_APPLICABLE_ASSERTION_PROPNAME); } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; namespace Apache.Qpid.Integration.Tests.framework { /// /// NotApplicableAssertion is a messaging assertion that can be used when an assertion requested by a test-case is not /// applicable to the testing scenario. For example an assertion may relate to AMQP functionality, but a test case may be /// being run over a non-AMQP JMS implementation, in which case the request to create the assertion may return this /// instead of the proper assertion. The test framework is configurable to quietly drop these assertions, log them /// as warnings to the console, or raise them as test failures. /// ///

///
CRC Card
Responsibilities Collaborations ///
Quitely pass. ///
Log a warning. ///
Raise a test failure. ///
///

public class NotApplicableAssertion : Assertion { /// Used for logging to the console. private static ILog console = LogManager.GetLogger("CONSOLE." + NotApplicableAssertion.class.getName()); /// The possible behavioural modes of this assertion. private enum Mode { /// Quietly ignore the assertion by passing. Quiet, /// Ignore the assertion by passing but log a warning about it. Warn, /// Fail the assertion. Fail; } /// The behavioural mode of the assertion. private Mode mode; /// /// Creates an assertion that is driven by the value of the 'notApplicableAssertion' property of the test /// configuration. Its value should match one of 'quiet', 'warn' or 'fail' and if it does not it is automatically /// read as 'fail'. /// /// The test configuration properties. public NotApplicableAssertion(ParsedProperties testProperties) { // Cast the test properties into a typed interface for convenience. MessagingTestConfigProperties props = new MessagingTestConfigProperties(testProperties); string modeName = props.getNotApplicableAssertionMode(); if ("quiet".equals(modeName)) { mode = Mode.Quiet; } else if ("warn".equals(modeName)) { mode = Mode.Warn; } else { mode = Mode.Fail; } } /// /// Applies the assertion. /// /// true if the assertion passes, false if it fails. public bool apply() { switch (mode) { case Quiet: return true; case Warn: console.warn("Warning: Not applicable assertion being ignored."); return true; case Fail: default: return false; } } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; namespace Apache.Qpid.Integration.Tests.framework { /// /// A Publisher represents the status of the publishing side of a test circuit. Its main purpose is to provide assertions /// that can be applied to test the behaviour of the publishers. /// ///

///
CRC Card
Responsibilities ///
Provide assertion that the publishers received no exceptions. ///
///

/// /// There are mixtures of AMQP and JMS assertions in this interface. Either keep them here, but quietly (or with a /// warning or error) drop them from test cases where they are not relevant, or push them down into sub-classes. /// I am tempted to go with the dropping/warning/error approach, that would imply that it makes sense to pull /// the assertions back from AMQPPublisher to here. public interface Publisher { // Assertions that are meaningfull to AMQP and to JMS. /// /// Provides an assertion that the publisher encountered no exceptions. /// /// The test configuration properties. /// /// An assertion that the publisher encountered no exceptions. public Assertion noExceptionsAssertion(ParsedProperties testProps); // Assertions that are meaningfull only to AMQP. /// /// Provides an assertion that the AMQP channel was forcibly closed by an error condition. /// /// The test configuration properties. /// /// An assertion that the AMQP channel was forcibly closed by an error condition. public Assertion channelClosedAssertion(ParsedProperties testProps); // Assertions that are meaningfull only to Java/JMS. /// /// Provides an assertion that the publisher got a given exception during the test. /// /// The test configuration properties. /// The exception class to check for. /// /// An assertion that the publisher got a given exception during the test. public Assertion exceptionAssertion(ParsedProperties testProps, Class exceptionClass); } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; namespace Apache.Qpid.Integration.Tests.framework { /// /// A Receiver is a that represents the status of the receiving side of a test circuit. Its main /// purpose is to provide assertions that can be applied to check the behaviour of the receivers. /// ///

///
CRC Card
Responsibilities ///
Provide assertion that the receivers received no exceptions. ///
Provide assertion that the receivers received all test messages sent to it. ///
///

/// /// There are mixtures of AMQP and JMS assertions in this interface. Either keep them here, but quietly (or with a /// warning or error) drop them from test cases where they are not relevant, or push them down into sub-classes. /// I am tempted to go with the dropping/warning/error approach. public interface Receiver { // Assertions that are meaningfull to AMQP and to JMS. /// /// Provides an assertion that the receivers encountered no exceptions. /// /// The test configuration properties. /// /// An assertion that the receivers encountered no exceptions. public Assertion noExceptionsAssertion(ParsedProperties testProps); /// /// Provides an assertion that the receivers got all messages that were sent to it. /// /// The test configuration properties. /// /// An assertion that the receivers got all messages that were sent to it. public Assertion allMessagesReceivedAssertion(ParsedProperties testProps); /// /// Provides an assertion that the receivers got none of the messages that were sent to it. /// /// The test configuration properties. /// /// An assertion that the receivers got none of the messages that were sent to it. public Assertion noMessagesReceivedAssertion(ParsedProperties testProps); // Assertions that are meaningfull only to AMQP. /// /// Provides an assertion that the AMQP channel was forcibly closed by an error condition. /// /// The test configuration properties. /// /// An assertion that the AMQP channel was forcibly closed by an error condition. public Assertion channelClosedAssertion(ParsedProperties testProps); // Assertions that are meaningfull only to Java/JMS. /// /// Provides an assertion that the receiver got a given exception during the test. /// /// The test configuration properties. /// The exception class to check for. /// /// An assertion that the receiver got a given exception during the test. public Assertion exceptionAssertion(ParsedProperties testProps, Class exceptionClass); } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using Apache.Qpid.Integration.Tests.framework.Circuit; using Apache.Qpid.Integration.Tests.framework.TestClientDetails; using org.apache.qpid.util.ConversationFactory; using System.Collections.Generic.LinkedList; using System.Collections.Generic.IList; using java.util.Properties; namespace Apache.Qpid.Integration.Tests.framework.sequencers { /// /// BaseCircuitFactory provides some functionality common to all s, such as the details of /// all s that make up the end-points of /// the circuits that the factory creates, and an active that can be used to generate /// control conversations with those circuit end-points. /// ///

///
CRC Card
Responsibilities Collaborations ///
Hold the details of the sending and receiving end-points to create circuits from. ///
Provide a conversation factory to create control conversations with the end-points. ///
///

public abstract class BaseCircuitFactory : CircuitFactory { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(BaseCircuitFactory)); /// Holds the contact details for the sending test client. protected TestClientDetails sender; /// Holds the contact details for the receving test client. protected IList receivers = new LinkedList(); /// Holds the conversation factory over which to coordinate the test. protected ConversationFactory conversationFactory; /// /// Creates a test circuit for the test, configered by the test parameters specified. /// /// The test parameters. /// A test circuit. public Circuit createCircuit(Properties testProperties) { throw new RuntimeException("Not implemented."); } /// /// Sets the sender test client to coordinate the test with. /// /// The contact details of the sending client in the test. public void setSender(TestClientDetails sender) { log.debug("public void setSender(TestClientDetails sender = " + sender + "): called"); this.sender = sender; } /// /// Sets the receiving test client to coordinate the test with. /// /// The contact details of the sending client in the test. public void setReceiver(TestClientDetails receiver) { log.debug("public void setReceiver(TestClientDetails receivers = " + receiver + "): called"); this.receivers.add(receiver); } /// /// Supplies the sending test client. /// /// The sending test client. public TestClientDetails getSender() { return sender; } /// /// Supplies the receiving test client. /// /// The receiving test client. public IList getReceivers() { return receivers; } /// /// Accepts the conversation factory over which to hold the test coordinating conversation. /// /// The conversation factory to coordinate the test over. public void setConversationFactory(ConversationFactory conversationFactory) { this.conversationFactory = conversationFactory; } /// /// Provides the conversation factory for providing the distributed test sequencing conversations over the test /// connection. /// /// The conversation factory to create test sequencing conversations with. public ConversationFactory getConversationFactory() { return conversationFactory; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using Apache.Qpid.Integration.Tests.framework.Assertion; using Apache.Qpid.Integration.Tests.framework.Circuit; using Apache.Qpid.Integration.Tests.framework.TestClientDetails; using org.apache.qpid.util.ConversationFactory; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using javax.jms.JMSException; using javax.jms.Message; using System.Collections.Generic.IList; using System.Collections.Generic.IDictionary; using java.util.Properties; namespace Apache.Qpid.Integration.Tests.framework.sequencers { /// /// A CircuitFactory is responsibile for creating test circuits appropriate to the context that a test case is /// running in, and providing an implementation of a standard test procedure over a test circuit. /// ///

///
CRC Card
Responsibilities ///
Provide a standard test procedure over a test circuit. ///
Construct test circuits appropriate to a tests context. ///
///

public interface CircuitFactory { /// /// Holds a test coordinating conversation with the test clients. This should consist of assigning the test roles, /// begining the test, gathering the test reports from the participants, and checking for assertion failures against /// the test reports. /// /// The test circuit. /// The list of assertions to apply to the test circuit. /// The test case definition. /// /// @deprecated Use test circuits and Circuit.test instead. public void sequenceTest(Circuit testCircuit, IList assertions, Properties testProperties); /// /// Creates a test circuit for the test, configered by the test parameters specified. /// /// The test parameters. /// /// A test circuit. public Circuit createCircuit(ParsedProperties testProperties); /// /// Sets the sender test client to coordinate the test with. /// /// The contact details of the sending client in the test. public void setSender(TestClientDetails sender); /// /// Sets the receiving test client to coordinate the test with. /// /// The contact details of the sending client in the test. public void setReceiver(TestClientDetails receiver); /// /// Supplies the sending test client. /// /// The sending test client. public TestClientDetails getSender(); /// /// Supplies the receiving test client. /// /// The receiving test client. public IList getReceivers(); /// /// Accepts the conversation factory over which to hold the test coordinating conversation. /// /// The conversation factory to coordinate the test over. public void setConversationFactory(ConversationFactory conversationFactory); } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using Apache.Qpid.Integration.Tests.framework.Assertion; using Apache.Qpid.Integration.Tests.framework.Circuit; using Apache.Qpid.Integration.Tests.framework.TestClientDetails; using Apache.Qpid.Integration.Tests.framework.TestUtils; using Apache.Qpid.Integration.Tests.framework.distributedcircuit.DistributedCircuitImpl; using org.apache.qpid.util.ConversationFactory; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using javax.jms.Destination; using javax.jms.JMSException; using javax.jms.Message; using javax.jms.Session; using System.Collections.Generic.LinkedList; using System.Collections.Generic.IList; using java.util.Properties; namespace Apache.Qpid.Integration.Tests.framework.sequencers { /// /// FanOutCircuitFactory is a circuit factory that creates distributed test circuits. Given a set of participating /// test client nodes, it assigns one node to the SENDER role and the remainder to the RECEIVER role. /// ///

///
CRC Card
Responsibilities Collaborations ///
Create distributed circuits from one to many test nodes, for fanout style testing. ///
///

/// /// Adapt this to be an n*m topology circuit factory. Need to add circuit topology definitions to the test /// parameters. Place n senders onto the available test clients, and m receivers. Where n or m is larger than /// the available nodes, start stacking multiple test clients on each node. There will also be an option that /// indicates whether nodes can play both roles, and how many nodes out of all available may be assigned to /// each role. /// /// The createCircuit methods on this and InteropCircuitFactory are going to be identical. This is because the /// partitioning into senders and receivers is already done by the test decorators. Either eliminate these factories /// as unnesesary, or move the partitioning functionality into the factories, in which case the test decorators /// can probably be merged or eliminated. There is confusion over the placement of responsibilities between the /// factories and the test decorators... although the test decorators may well do more than just circuit creation /// in the future. For example, there may have to be a special decorator for test repetition that does one circuit /// creation, but the runs many tests over it, in which case the handling of responsibilities becomes clearer. public class FanOutCircuitFactory extends BaseCircuitFactory { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(FanOutCircuitFactory)); /// /// Creates a test circuit for the test, configered by the test parameters specified. /// /// The test parameters. /// A test circuit. public Circuit createCircuit(ParsedProperties testProperties) { log.debug("public Circuit createCircuit(ParsedProperties testProperties): called"); IList senders = new LinkedList(); senders.add(getSender()); IList receivers = getReceivers(); ConversationFactory conversationFactory = getConversationFactory(); return DistributedCircuitImpl.createCircuit(testProperties, senders, receivers, conversationFactory); } /// /// Holds a test coordinating conversation with the test clients. This should consist of assigning the test roles, /// begining the test, gathering the test reports from the participants, and checking for assertion failures against /// the test reports. /// /// The test circuit. /// The list of assertions to apply to the test circuit. /// The test case definition. /// /// @deprecated Scheduled for removal once existing tests converted over to use test circuits. public void sequenceTest(Circuit testCircuit, IList assertions, Properties testProperties) { log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); TestClientDetails sender = getSender(); IList receivers = getReceivers(); ConversationFactory conversationFactory = getConversationFactory(); try { // Create a conversation on the sender clients private control route. Session session = conversationFactory.getSession(); Destination senderControlTopic = session.createTopic(sender.privateControlKey); ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); // Assign the sender role to the sending test client. Message assignSender = conversationFactory.getSession().createMessage(); TestUtils.setPropertiesOnMessage(assignSender, testProperties); assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignSender.setStringProperty("ROLE", "SENDER"); assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER"); senderConversation.send(senderControlTopic, assignSender); // Wait for the sender to confirm its role. senderConversation.receive(); // Assign the receivers roles. for (TestClientDetails receiver : receivers) { assignReceiverRole(receiver, testProperties, true); } // Start the test on the sender. Message start = session.createMessage(); start.setStringProperty("CONTROL_TYPE", "START"); senderConversation.send(senderControlTopic, start); // Wait for the test sender to return its report. Message senderReport = senderConversation.receive(); TestUtils.pause(500); // Ask the receivers for their reports. Message statusRequest = session.createMessage(); statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); // Gather the reports from all of the receiving clients. // Return all of the test reports, the senders report first. // return new Message[] { senderReport }; } catch (JMSException e) { throw new RuntimeException("Unhandled JMSException."); } } /// /// Assigns the receivers role to the specified test client that is to act as a receivers during the test. This method /// does not always wait for the receiving clients to confirm their role assignments. This is because this method /// may be called from an 'onMessage' method, when a client is joining the test at a later point in time, and it /// is not possible to do a synchronous receive during an 'onMessage' method. There is a flag to indicate whether /// or not to wait for role confirmations. /// /// The test client to assign the receivers role to. /// The test parameters. /// Indicates whether role confirmation should be waited for. /// /// Any JMSExceptions occurring during the conversation are allowed to fall through. /// /// @deprecated Scheduled for removal once existing tests converted over to use test circuits. protected void assignReceiverRole(TestClientDetails receiver, Properties testProperties, bool confirm) throws JMSException { log.info("assignReceiverRole(TestClientDetails receivers = " + receiver + ", Map testProperties = " + testProperties + "): called"); ConversationFactory conversationFactory = getConversationFactory(); // Create a conversation with the receiving test client. Session session = conversationFactory.getSession(); Destination receiverControlTopic = session.createTopic(receiver.privateControlKey); ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); // Assign the receivers role to the receiving client. Message assignReceiver = session.createMessage(); TestUtils.setPropertiesOnMessage(assignReceiver, testProperties); assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignReceiver.setStringProperty("ROLE", "RECEIVER"); assignReceiver.setStringProperty("CLIENT_NAME", receiver.clientName); receiverConversation.send(receiverControlTopic, assignReceiver); // Wait for the role confirmation to come back. if (confirm) { receiverConversation.receive(); } } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using Apache.Qpid.Integration.Tests.framework.Assertion; using Apache.Qpid.Integration.Tests.framework.Circuit; using Apache.Qpid.Integration.Tests.framework.TestClientDetails; using Apache.Qpid.Integration.Tests.framework.TestUtils; using Apache.Qpid.Integration.Tests.framework.distributedcircuit.DistributedCircuitImpl; using org.apache.qpid.util.ConversationFactory; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using javax.jms.Destination; using javax.jms.JMSException; using javax.jms.Message; using javax.jms.Session; using System.Collections.Generic.LinkedList; using System.Collections.Generic.IList; using java.util.Properties; namespace Apache.Qpid.Integration.Tests.framework.sequencers { /// /// InteropCircuitFactory is a circuit factory that creates distributed test circuits. Given a set of participating /// test client nodes, it assigns one node to the SENDER role and one the RECEIVER role. /// ///

///
CRC Card
Responsibilities Collaborations ///
Create distributed circuits from pairs of test nodes, for interop style testing. ///
///

/// /// The partitioning of a set of nodes into sender and receiver roles is actually done by the interop test /// decorator. See the todo comment in FanOutCircuitFactory about merging the factories with the decorators, or /// more carefully dividing up responsibilities between them. /// /// The squenceTest code is deprecated, but currently still used by the interop tests. It will be removed once it /// have been fully replaced by the default test procedure. public class InteropCircuitFactory extends BaseCircuitFactory { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(InteropCircuitFactory)); /// /// Creates a test circuit for the test, configered by the test parameters specified. /// /// The test parameters. /// A test circuit. public Circuit createCircuit(ParsedProperties testProperties) { log.debug("public Circuit createCircuit(ParsedProperties testProperties): called"); IList senders = new LinkedList(); senders.add(getSender()); IList receivers = getReceivers(); ConversationFactory conversationFactory = getConversationFactory(); return DistributedCircuitImpl.createCircuit(testProperties, senders, receivers, conversationFactory); } /// /// Holds a test coordinating conversation with the test clients. This should consist of assigning the test roles, /// begining the test, gathering the test reports from the participants, and checking for assertion failures against /// the test reports. /// /// The test circuit. /// The list of assertions to apply to the test circuit. /// The test case definition. public void sequenceTest(Circuit testCircuit, IList assertions, Properties testProperties) { log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); TestClientDetails sender = getSender(); IList receivers = getReceivers(); ConversationFactory conversationFactory = getConversationFactory(); try { Session session = conversationFactory.getSession(); Destination senderControlTopic = session.createTopic(sender.privateControlKey); Destination receiverControlTopic = session.createTopic(receivers.get(0).privateControlKey); ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); Message assignSender = conversationFactory.getSession().createMessage(); TestUtils.setPropertiesOnMessage(assignSender, testProperties); assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignSender.setStringProperty("ROLE", "SENDER"); senderConversation.send(senderControlTopic, assignSender); // Assign the receivers role the receiving client. Message assignReceiver = session.createMessage(); TestUtils.setPropertiesOnMessage(assignReceiver, testProperties); assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignReceiver.setStringProperty("ROLE", "RECEIVER"); receiverConversation.send(receiverControlTopic, assignReceiver); // Wait for the senders and receivers to confirm their roles. senderConversation.receive(); receiverConversation.receive(); // Start the test. Message start = session.createMessage(); start.setStringProperty("CONTROL_TYPE", "START"); senderConversation.send(senderControlTopic, start); // Wait for the test sender to return its report. Message senderReport = senderConversation.receive(); TestUtils.pause(500); // Ask the receivers for its report. Message statusRequest = session.createMessage(); statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); receiverConversation.send(receiverControlTopic, statusRequest); // Wait for the receivers to send its report. Message receiverReport = receiverConversation.receive(); // return new Message[] { senderReport, receiverReport }; // Apply assertions. } catch (JMSException e) { throw new RuntimeException("JMSException not handled."); } } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ namespace Apache.Qpid.Integration.Tests.framework { /// ///

///
CRC Card
Responsibilities Collaborations ///
///
///

public class TestCaseVector { /// The test case name. private string testCase; /// The test cycle number within the test case. private int testCycleNumber; public TestCaseVector(string testCase, int testCycleNumber) { this.testCase = testCase; this.testCycleNumber = testCycleNumber; } public string getTestCase() { return testCase; } public int getTestCycleNumber() { return testCycleNumber; } public bool equals(Object o) { if (this == o) { return true; } if ((o == null) || (getClass() != o.getClass())) { return false; } TestCaseVector that = (TestCaseVector) o; if (testCycleNumber != that.testCycleNumber) { return false; } if ((testCase != null) ? (!testCase.equals(that.testCase)) : (that.testCase != null)) { return false; } return true; } public int hashCode() { int result; result = ((testCase != null) ? testCase.hashCode() : 0); result = (31 * result) + testCycleNumber; return result; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ namespace Apache.Qpid.Integration.Tests.framework { /// /// TestClientDetails is used to encapsulate information about an interop test client. It pairs together the unique /// name of the client, and the route on which it listens to its control messages. /// ///

///
CRC Card
Responsibilities Collaborations ///
Record test clients control addresses together with their names. ///
///

public class TestClientDetails { /// The test clients name. public string clientName; /// The routing key of the test clients control topic. public string privateControlKey; /// /// Two TestClientDetails are considered to be equal, iff they have the same client name. /// /// The object to compare to. /// /// If the object to compare to is a TestClientDetails equal to this one, false otherwise. public bool equals(Object o) { if (this == o) { return true; } if (!(o instanceof TestClientDetails)) { return false; } final TestClientDetails testClientDetails = (TestClientDetails) o; return !((clientName != null) ? (!clientName.equals(testClientDetails.clientName)) : (testClientDetails.clientName != null)); } /// /// Computes a hash code compatible with the equals method; based on the client name alone. /// /// A hash code for this. public int hashCode() { return ((clientName != null) ? clientName.hashCode() : 0); } /// /// Outputs the client name and address details. Mostly used for debugging purposes. /// /// The client name and address. public string ToString() { return "TestClientDetails: [ clientName = " + clientName + ", privateControlKey = " + privateControlKey + " ]"; } } } /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using log4net; using static Apache.Qpid.Integration.Tests.framework.MessagingTestConfigProperties.*; using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; using javax.jms.*; using javax.naming.Context; using javax.naming.InitialContext; using javax.naming.NamingException; using System.Collections.Generic.IDictionary; namespace Apache.Qpid.Integration.Tests.framework { /// /// TestUtils provides static helper methods that are usefull for writing tests against QPid. /// ///

///
CRC Card
Responsibilities Collaborations ///
Create connections from test properties. ///
Create test messages. ///
Inject a short pause in a test. ///
Serialize properties into a message. ///
///

public class TestUtils { /// Used for debugging. private static ILog log = LogManager.GetLogger(typeof(TestUtils)); /// Some dummy data to stuff all test messages with. private static final byte[] MESSAGE_DATA_BYTES = "Test Message -- Test Message -- Test Message -- Test Message -- Test Message -- Test Message -- Test Message -- " .getBytes(); /// /// Establishes a JMS connection using a set of properties and qpids built in JNDI implementation. This is a simple /// convenience method for code that does not anticipate handling connection failures. All exceptions that indicate /// that the connection has failed, are wrapped as rutime exceptions, presumably handled by a top level failure /// handler. /// ///

This utility makes use of the following test parameters from to control /// the connection creation: /// ///

///
The username. ///
The password. ///
The virtual host name. ///
The broker URL. ///
The broker name in the initial context. /// /// Connection properties as defined in . /// /// A JMS conneciton. public static Connection createConnection(ParsedProperties messagingProps) { log.debug("public static Connection createConnection(ParsedProperties messagingProps = " + messagingProps + "): called"); try { // Extract the configured connection properties from the test configuration. string conUsername = messagingProps.getProperty(USERNAME_PROPNAME); string conPassword = messagingProps.getProperty(PASSWORD_PROPNAME); string virtualHost = messagingProps.getProperty(VIRTUAL_HOST_PROPNAME); string brokerUrl = messagingProps.getProperty(BROKER_PROPNAME); // Create the broker connection url. string connectionstring = "amqp://" + conUsername + ":" + conPassword + "@clientid/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'"; // Create properties to create the initial context from, and inject the connection factory configuration // for the defined connection name into it. messagingProps.setProperty("connectionfactory." + CONNECTION_NAME, connectionString); Context ctx = new InitialContext(messagingProps); ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME); return cf.createConnection(); } catch (NamingException e) { throw new RuntimeException("Got JNDI NamingException whilst looking up the connection factory.", e); } catch (JMSException e) { throw new RuntimeException("Could not establish connection due to JMSException.", e); } } /// /// Creates a test message of the specified size, on the given JMS session. /// /// The JMS session. /// The size of the message in bytes. /// /// A bytes message, of the specified size, filled with dummy data. /// /// Any underlying JMSExceptions are allowed to fall through. public static Message createTestMessageOfSize(Session session, int size) throws JMSException { BytesMessage message = session.createBytesMessage(); if (size > 0) { int div = MESSAGE_DATA_BYTES.length / size; int mod = MESSAGE_DATA_BYTES.length % size; for (int i = 0; i < div; i++) { message.writeBytes(MESSAGE_DATA_BYTES); } if (mod != 0) { message.writeBytes(MESSAGE_DATA_BYTES, 0, mod); } } return message; } /// /// Pauses for the specified length of time. In the event of failing to pause for at least that length of time /// due to interuption of the thread, a RutimeException is raised to indicate the failure. The interupted status /// of the thread is restores in that case. This method should only be used when it is expected that the pause /// will be succesfull, for example in test code that relies on inejecting a pause. /// /// The minimum time to pause for in milliseconds. public static void pause(long t) { try { Thread.sleep(t); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); throw new RuntimeException("Failed to generate the requested pause length.", e); } } /// /// Sets properties of different types on a JMS Message. /// /// The message to set properties on. /// The property name/value pairs to set. /// /// All underlying JMSExceptions are allowed to fall through. /// /// Move this helper method somewhere else. For example, TestUtils. public static void setPropertiesOnMessage(Message message, Map properties) throws JMSException { for (Map.Entry entry : properties.entrySet()) { string name = entry.getKey().ToString(); Object value = entry.getValue(); message.setObjectProperty(name, value); } } } }