/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using log4net;
using uk.co.thebadgerset.junit.extensions.util.CommandLineParser;
using uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
using java.io.IOException;
using java.net.*;
using java.nio.ByteBuffer;
using java.util.Arrays;
namespace Apache.Qpid.Integration.Tests.framework.clocksynch
{
///
/// UDPClockSynchronizer is a that sends pings as UDP datagrams, and uses the following simple
/// algorithm to perform clock synchronization:
///
///
///
Slave initiates synchronization with a Reference clock.
///
Slave stamps current local time on a "time request" message and sends to the Reference.
///
Upon receipt by Reference, Reference stamps Reference-time and returns.
///
Upon receipt by Slave, Slave subtracts current time from sent time and divides by two to compute latency. It
/// subtracts current time from Reference time to determine Slave-Reference time delta and adds in the
/// half-latency to get the correct clock delta.
///
The first result is immediately used to update the clock since it will get the local clock into at least
/// the right ballpark.
///
The Slave repeats steps 2 through 4, 15 more times.
///
The results of the packet receipts are accumulated and sorted in lowest-latency to highest-latency order. The
/// median latency is determined by picking the mid-point sample from this ordered list.
///
All samples outside 1 standard-deviation from the median are discarded and the remaining samples
/// are averaged using an arithmetic mean.
///
///
/// The use of UDP datagrams, instead of TCP based communication eliminates the hidden delays that TCP can introduce,
/// as it can transparently re-order or re-send packets, or introduce delays as packets are naggled.
///
///
CRC Card
///
Responsibilities
Collaborations
///
Trigger a clock synchronziation.
///
Compute a clock delta to apply to the local clock.
///
Estimate the error in the synchronzation.
///
///
public class UDPClockSynchronizer : ClockSynchronizer
{
/// Used for debugging.
// private static ILog log = LogManager.GetLogger(typeof(UDPClockSynchronizer));
/// Defines the timeout to use when waiting for responses to time requests.
private static final int TIMEOUT = 50;
/// The clock delta.
private long delta = 0L;
/// Holds an estimate of the clock error relative to the reference clock.
private long epsilon = 0L;
/// Holds the address of the reference clock.
private InetAddress referenceAddress;
/// Holds the socket to communicate with the reference service over.
private DatagramSocket socket;
/// Used to control the shutdown in the main test loop.
private static bool doSynch = true;
///
/// Creates a clock synchronizer against the specified address for the reference.
///
/// The address of the reference service.
public UDPClockSynchronizer(string address)
{
try
{
referenceAddress = InetAddress.getByName(address);
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
}
///
/// The slave side should call this to compute a clock delta with the reference.
///
/// If synchronization cannot be achieved, due to unavailability of the reference
/// time service.
public void synch() throws ClockSynchFailureException
{
try
{
socket = new DatagramSocket();
socket.setSoTimeout(TIMEOUT);
// Synchronize on a single ping, to get the clock into the right ball-park.
synch(1);
// Synchronize on 15 pings.
synch(15);
// And again, for greater accuracy, on 31.
synch(31);
socket.close();
}
catch (SocketException e)
{
throw new RuntimeException(e);
}
}
///
/// Updates the synchronization delta by performing the specified number of reference clock requests.
///
/// The number of reference clock request cycles to perform.
///
/// If synchronization cannot be achieved, due to unavailability of the reference
/// time service.
protected void synch(int n) throws ClockSynchFailureException
{
// log.debug("protected void synch(int n = " + n + "): called");
// Create an array of deltas by performing n reference pings.
long[] delta = new long[n];
for (int i = 0; i < n; i++)
{
delta[i] = ping();
}
// Reject any deltas that are larger than 1 s.d. above the median.
long median = median(delta);
long sd = standardDeviation(delta);
// log.debug("median = " + median);
// log.debug("sd = " + sd);
long[] tempDeltas = new long[n];
int count = 0;
for (int i = 0; i < n; i++)
{
if ((delta[i] <= (median + sd)) && (delta[i] >= (median - sd)))
{
tempDeltas[count] = delta[i];
count++;
}
else
{
// log.debug("Rejected: " + delta[i]);
}
}
System.arraycopy(tempDeltas, 0, delta, 0, count);
// Estimate the delta as the mean of the remaining deltas.
this.delta += mean(delta);
// Estimate the error as the standard deviation of the remaining deltas.
this.epsilon = standardDeviation(delta);
// log.debug("this.delta = " + this.delta);
// log.debug("this.epsilon = " + this.epsilon);
}
///
/// Performs a single reference clock request cycle and returns the estimated delta relative to the local clock.
/// This is computed as the half-latency of the requst cycle, plus the reference clock, minus the local clock.
///
/// The estimated clock delta.
///
/// If the reference service is not responding.
protected long ping() throws ClockSynchFailureException
{
// log.debug("protected long ping(): called");
try
{
byte[] buf = new byte[256];
bool timedOut = false;
long start = 0L;
long refTime = 0L;
long localTime = 0L;
long latency = 0L;
int failCount = 0;
// Keep trying the ping until it gets a response, or 10 tries in a row all time out.
do
{
// Start timing the request latency.
start = nanoTime();
// Get the reference time.
DatagramPacket packet =
new DatagramPacket(buf, buf.length, referenceAddress, UDPClockReference.REFERENCE_PORT);
socket.send(packet);
packet = new DatagramPacket(buf, buf.length);
timedOut = false;
try
{
socket.receive(packet);
}
catch (SocketTimeoutException e)
{
timedOut = true;
failCount++;
continue;
}
ByteBuffer bbuf = ByteBuffer.wrap(packet.getData());
refTime = bbuf.getLong();
// Stop timing the request latency.
localTime = nanoTime();
latency = localTime - start;
// log.debug("refTime = " + refTime);
// log.debug("localTime = " + localTime);
// log.debug("start = " + start);
// log.debug("latency = " + latency);
// log.debug("delta = " + ((latency / 2) + (refTime - localTime)));
}
while (timedOut && (failCount < 10));
// Fail completely if the fail count is too high.
if (failCount >= 10)
{
throw new ClockSynchFailureException("Clock reference not responding.", null);
}
// Estimate delta as (ref clock + half-latency) - local clock.
return (latency / 2) + (refTime - localTime);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
///
/// Gets the clock delta in nano seconds.
///
/// The clock delta in nano seconds.
public long getDelta()
{
return delta;
}
///
/// Gets an estimate of the clock error in nan seconds.
///
/// An estimate of the clock error in nan seconds.
public long getEpsilon()
{
return epsilon;
}
///
/// Gets the local clock time with any computed delta added in.
///
/// The local clock time with any computed delta added in.
public long nanoTime()
{
return System.nanoTime() + delta;
}
///
/// Computes the median of a series of values.
///
/// The values.
///
/// The median.
public static long median(long[] values)
{
// log.debug("public static long median(long[] values = " + Arrays.ToString(values) + "): called");
long median;
// Order the list of values.
long[] orderedValues = new long[values.length];
System.arraycopy(values, 0, orderedValues, 0, values.length);
Arrays.sort(orderedValues);
// Check if the median is computed from a pair of middle value.
if ((orderedValues.length % 2) == 0)
{
int middle = orderedValues.length / 2;
median = (orderedValues[middle] + orderedValues[middle - 1]) / 2;
}
// The median is computed from a single middle value.
else
{
median = orderedValues[orderedValues.length / 2];
}
// log.debug("median = " + median);
return median;
}
///
/// Computes the mean of a series of values.
///
/// The values.
///
/// The mean.
public static long mean(long[] values)
{
// log.debug("public static long mean(long[] values = " + Arrays.ToString(values) + "): called");
long total = 0L;
for (long value : values)
{
total += value;
}
long mean = total / values.length;
// log.debug("mean = " + mean);
return mean;
}
///
/// Computes the variance of series of values.
///
/// The values.
///
/// The variance of the values.
public static long variance(long[] values)
{
// log.debug("public static long variance(long[] values = " + Arrays.ToString(values) + "): called");
long mean = mean(values);
long totalVariance = 0;
for (long value : values)
{
long diff = (value - mean);
totalVariance += diff/// diff;
}
long variance = totalVariance / values.length;
// log.debug("variance = " + variance);
return variance;
}
///
/// Computes the standard deviation of a series of values.
///
/// The values.
///
/// The standard deviation.
public static long standardDeviation(long[] values)
{
// log.debug("public static long standardDeviation(long[] values = " + Arrays.ToString(values) + "): called");
long sd = Double.valueOf(Math.sqrt(variance(values))).longValue();
// log.debug("sd = " + sd);
return sd;
}
///
/// For testing purposes. Supply address of reference clock as arg 1.
///
/// Address of reference clock as arg 1.
public static void main(String[] args)
{
ParsedProperties options =
new ParsedProperties(CommandLineParser.processCommandLine(args,
new CommandLineParser(
new String[][]
{
{ "1", "Address of clock reference service.", "address", "true" }
}), System.getProperties()));
string address = options.getProperty("1");
// Create a clock synchronizer.
UDPClockSynchronizer clockSyncher = new UDPClockSynchronizer(address);
// Set up a shutdown hook for it.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable()
{
public void run()
{
doSynch = false;
}
}));
// Repeat the clock synching until the user kills the progam.
while (doSynch)
{
// Perform a clock clockSynch.
try
{
clockSyncher.synch();
// Print out the clock delta and estimate of the error.
System.out.println("Delta = " + clockSyncher.getDelta());
System.out.println("Epsilon = " + clockSyncher.getEpsilon());
try
{
Thread.sleep(250);
}
catch (InterruptedException e)
{
// Restore the interrupted status and terminate the loop.
Thread.currentThread().interrupt();
doSynch = false;
}
}
// Terminate if the reference time service is unavailable.
catch (ClockSynchFailureException e)
{
doSynch = false;
}
}
}
}
}