// $Id$
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
using System;
using Org.Apache.Etch.Bindings.Csharp.Msg;
using Org.Apache.Etch.Bindings.Csharp.Support;
using Org.Apache.Etch.Bindings.Csharp.Util;
using NUnit.Framework;
namespace Org.Apache.Etch.Bindings.Csharp.Transport
{
[TestFixture]
public class TestMessagizer
{
private const sbyte VERSION = 3; // BinaryTaggedData.VERSION;
private const sbyte NONE = -127; // TypeCode.NONE;
private const sbyte BYTE = -124; // TypeCode.BYTE;
private readonly Who who = new MyWho();
//static Etch.Support.DefaultValueFactory vf = new Etch.Support.DefaultValueFactory();
public DefaultValueFactory vf;
public MyMessageHandler mh;
public MyPacketSource ps;
Messagizer m;
public TestMessagizer()
{
//ps = new MyPacketSource( this );
}
///
/// Call before the test fixture
///
[TestFixtureSetUp]
public void SetUpBeforeClass()
{
vf = new MyValueFactory();
ps = new MyPacketSource();
mh = new MyMessageHandler();
Resources res = new Resources();
res.Add(TransportConsts.VALUE_FACTORY, vf);
m = new Messagizer(ps,"foo:?Messagizer.format=binary", res);
m.SetSession(mh);
Console.WriteLine();
Console.Write( "TestMessagizer" );
}
/* public void SetUpBeforeEveryTest()
{
ps = new MyPacketSource();
mh = new MyMessageHandler();
m = new Messagizer( mh, vf );
m.SetSource( ps );
} */
#region Test PacketHandler methods
public class MyWho : Who
{
public MyWho()
{
//nothing
}
}
//[Test]
//public void Up()
//{
// SetUpBeforeEveryTest();
// PacketHandler ph = m;
// Assert.IsNull(mh.what);
// Assert.IsNull( mh.src );
// ph.Up( ps );
// Assert.AreEqual( mh.src, m );
// Assert.AreEqual(mh.what, What.TESTMESSAGEHANDLERUP);
// Assert.AreEqual( ps, m.GetPacketSource() );
//}
[Test]
public void Packet1()
{
// PacketHandler ph = m;
SessionPacket ph = m;
FlexBuffer buf = new FlexBuffer( new byte[] { (byte)VERSION,
unchecked((byte)BYTE), 1, 0, unchecked((byte)NONE) } );
mh.handled = true;
// ph.Packet( who, buf );
ph.SessionPacket(who,buf);
Assert.AreEqual( What.TESTMESSAGEHANDLERMESSAGE, mh.what );
Assert.AreEqual( ps, m.GetTransport() );
Assert.AreEqual( who, mh.xsender);
Assert.IsNotNull( mh.xmsg );
Assert.AreEqual( MyValueFactory.mt_add, mh.xmsg.GetXType );
Assert.AreEqual( 0, mh.xmsg.Count );
}
[Test]
public void Packet2()
{
SessionPacket ph = m;
FlexBuffer buf = new FlexBuffer(new byte[] { (byte)VERSION,
unchecked((byte)BYTE), 1, 0, unchecked((byte)NONE) });
mh.handled = false;
ph.SessionPacket( who, buf );
Assert.AreEqual(What.OOB_NOTIFY_HANDLER, mh.what);
Assert.AreEqual( ps, m.GetTransport() );
Assert.AreEqual( who, mh.xsender );
Assert.IsNotNull( mh.xmsg );
Assert.AreEqual( MyValueFactory.mt_add, mh.xmsg.GetXType );
Assert.AreEqual( 0, mh.xmsg.Count );
}
[Test]
public void Packet3()
{
SessionPacket ph = m;
FlexBuffer buf = new FlexBuffer(new byte[] { (byte)VERSION,
unchecked((byte)BYTE), 2, 0, unchecked((byte)NONE) });
mh.handled = true;
ph.SessionPacket( who, buf );
Assert.AreEqual( What.TESTMESSAGEHANDLERMESSAGE, mh.what );
Assert.AreEqual( ps, m.GetTransport() );
Assert.AreEqual( who, mh.xsender );
Assert.IsNotNull( mh.xmsg );
Assert.AreEqual( MyValueFactory.mt_add_result, mh.xmsg.GetXType );
Assert.AreEqual( 0, mh.xmsg.Count );
}
[Test]
public void Packet4()
{
SessionPacket ph = m;
FlexBuffer buf = new FlexBuffer(new byte[] { (byte)VERSION,
unchecked((byte)BYTE), 2, 0, unchecked((byte)NONE) });
mh.handled = false;
ph.SessionPacket( who, buf );
Assert.AreEqual(What.OOB_NOTIFY_HANDLER, mh.what);
Assert.AreEqual( ps, m.GetTransport() );
Assert.AreEqual( who, mh.xsender );
Assert.IsNotNull( mh.xmsg );
Assert.AreEqual( MyValueFactory.mt_add_result, mh.xmsg.GetXType );
Assert.AreEqual( 0, mh.xmsg.Count );
}
//[Test]
//public void Down()
//{
// SetUpBeforeEveryTest();
// PacketHandler ph = m;
// Assert.IsNull( mh.what );
// Assert.IsNull( mh.src );
// ph.Down(ps);
// Assert.AreEqual( mh.src, m );
// Assert.AreEqual( mh.what, What.TESTMESSAGEHANDLERDOWN );
// Assert.AreEqual( ps, m.GetPacketSource() );
//}
#endregion Test PacketHandler methods
#region MessagizerMessageSource methods
//[Test]
//public void Close1()
//{
// SetUpBeforeEveryTest();
// MessagizerMessageSource mms = m;
// Assert.IsNull( ps.what );
// mms.Close( true );
// Assert.AreEqual(ps.what, What.PACKETSOURCECLOSE);
// Assert.IsTrue( ps.reset );
//}
//[Test]
//public void Close2()
//{
// SetUpBeforeEveryTest();
// MessagizerMessageSource mms = m;
// Assert.IsNull( ps.what );
// mms.Close( false );
// Assert.AreEqual( ps.what, What.PACKETSOURCECLOSE );
// Assert.IsFalse( ps.reset );
//}
//[Test]
//public void LocalAddress()
//{
// SetUpBeforeEveryTest();
// MessagizerMessageSource mms = m;
// EndPoint isa = new IPEndPoint( IPAddress.Parse( "127.0.0.1" ), 23 );
// ps.localAddress = isa;
// Assert.AreEqual( isa, mms.LocalAddress() );
//}
//[Test]
//public void RemoteAddress()
//{
// SetUpBeforeEveryTest();
// MessagizerMessageSource mms = m;
// EndPoint isa = new IPEndPoint( IPAddress.Parse( "127.0.0.1" ), 23 );
// ps.remoteAddress = isa;
// Assert.AreEqual( isa, mms.RemoteAddress() );
//}
//[Test]
//public void ShutdownInput()
//{
// SetUpBeforeEveryTest();
// MessagizerMessageSource mms = m;
// Assert.IsNull( ps.what );
// m.ShutdownInput();
// Assert.AreEqual( ps.what, What.PACKETSOURCESHUTDOWNINPUT );
//}
//[Test]
//public void ShutdownOutput()
//{
// SetUpBeforeEveryTest();
// MessagizerMessageSource mms = m;
// Assert.IsNull( ps.what );
// m.ShutdownOutput();
// Assert.AreEqual( ps.what, What.PACKETSOURCESHUTDOWNOUTPUT );
//}
//[Test]
//public void Stop()
//{
// SetUpBeforeEveryTest();
// MessagizerMessageSource mms = m;
// Assert.IsNull( ps.what );
// mms.Stop();
// Assert.AreEqual(ps.what, What.PACKETSOURCESTOP);
//}
/* public void Message1()
{
MessagizerMessageSource mms = m;
Message msg = new Message( MyValueFactory.mt_add, vf );
ps.headerSize = 0;
Assert.IsNull( ps.what );
mms.Messagex( who, msg );
Assert.AreEqual( What.PACKETSOURCEPACKET, ps.what );
Assert.AreEqual( who, ps.xrecipient );
Assert.IsNotNull( ps.xbuf );
Assert.AreEqual( 4, ps.xbuf.Length );
Assert.AreEqual( VERSION, ps.xbuf[ 0 ] );
Assert.AreEqual((byte)1, ps.xbuf[1]);
Assert.AreEqual((byte)0, ps.xbuf[2]);
Assert.AreEqual( NONE, ps.xbuf[ 3 ] );
}
public void Message2()
{
MessagizerMessageSource mms = m;
Message msg = new Message( MyValueFactory.mt_add, vf );
ps.headerSize = 8;
Assert.IsNull( ps.what );
mms.Messagex( who, msg );
Assert.AreEqual( What.PACKETSOURCEPACKET, ps.what );
Assert.AreEqual( who, ps.xrecipient );
Assert.IsNotNull( ps.xbuf );
Assert.AreEqual( 12, ps.xbuf.Length );
Assert.AreEqual( ( byte ) 0, ps.xbuf[ 0 ] );
Assert.AreEqual( ( byte ) 0, ps.xbuf[ 1 ] );
Assert.AreEqual( ( byte ) 0, ps.xbuf[ 2 ] );
Assert.AreEqual( ( byte ) 0, ps.xbuf[ 3 ] );
Assert.AreEqual( ( byte ) 0, ps.xbuf[ 4 ] );
Assert.AreEqual( ( byte ) 0, ps.xbuf[ 5 ] );
Assert.AreEqual( ( byte ) 0, ps.xbuf[ 6 ] );
Assert.AreEqual( ( byte ) 0, ps.xbuf[ 7 ] );
Assert.AreEqual( VERSION, ps.xbuf[ 8 ] );
Assert.AreEqual( ( byte ) 1, ps.xbuf[ 9 ] );
Assert.AreEqual((byte)0, ps.xbuf[9]);
Assert.AreEqual( NONE, ps.xbuf[ 11 ] );
} */
#endregion MessagizerMessageSource methods
// Do following test in stress testing.
/*
[Test]
public void SendreceiveMessageIterative()
{
SetUpBeforeEveryTest();
report = false;
int n = 100000;
//long t0 = Timer.Init();
//Timer t0 = new Timer().Init();
for (int i = 0; i < n; i++) {
MyMessageHandler.Send( m, ps );
Assert.IsTrue(ps.checkFlag);
}
//long t1 = t0.ElapsedNanos();
//double t = (t1-t0) / 1000000000.0;
//Console.WriteLine( "took " + t + " to send " + n + " ( " + n/t + " + per sec)\n" );
}
*/
public enum What
{
PACKETSOURCEPACKET,
TESTMESSAGEHANDLERMESSAGE,
OOB_QUERY_SOURCE,
OOB_CONTROL_SOURCE,
OOB_NOTIFY_SOURCE,
OOB_QUERY_HANDLER,
OOB_CONTROL_HANDLER,
OOB_NOTIFY_HANDLER
};
#region MyPacketSource
public class MyPacketSource : TransportPacket
{
TestMessagizer _tm;
public What what;
public bool checkFlag = true;
//public PktSrcMessageHandler mh;
public Messagizer m;
public SessionPacket handler;
public int headerSize = 0;
public bool reset;
public Who xrecipient;
public Object xquery;
/** */
public Object xquery_result;
/** */
public Object xcontrol;
/** */
public Object xvalue;
/** */
public Object xevent;
public byte[] xbuf;
public int HeaderSize()
{
return headerSize;
}
public MyPacketSource()
{
// nothing to do.
}
public MyPacketSource( TestMessagizer tm )
{
_tm = tm;
}
/**
public class PktSrcMessageHandler : MessageHandler
{
MyPacketSource _mps;
public PktSrcMessageHandler()
{
// nothing to do.
}
public PktSrcMessageHandler( MyPacketSource mps )
{
_mps = mps;
}
public void Up( MessagizerMessageSource src )
{
Console.WriteLine( "up" );
}
public bool Message( MessagizerMessageSource src, Who sender, Message msg )
{
if (report)
Console.WriteLine( "got reply " + msg );
if ( (int)(msg.Get(mf_result)) != 9 )
_mps.checkFlag = false;
return true;
}
public void Down( MessagizerMessageSource src )
{
Console.WriteLine( "down" );
}
}
**/
public void Packet( Who recipient, FlexBuffer buf )
{
what = What.PACKETSOURCEPACKET;
this.xrecipient = recipient;
this.xbuf = buf.GetAvailBytes();
}
#region Transport Members
public object TransportQuery( object query )
{
what = What.OOB_QUERY_SOURCE;
xquery = query;
return xquery_result;
}
public void TransportControl( object control, object value )
{
what = What.OOB_CONTROL_SOURCE;
xcontrol = control;
xvalue = value;
}
public void TransportNotify( object eventObj )
{
what = What.OOB_NOTIFY_SOURCE;
xevent = eventObj;
}
#endregion
#region TransportPacket Members
public void TransportPacket(Who recipient, FlexBuffer buf)
{
what = What.PACKETSOURCEPACKET;
this.xrecipient = recipient;
this.xbuf = buf.GetAvailBytes();
}
#endregion
#region Transport Members
public void SetSession(SessionPacket session)
{
this.handler = session;
}
#endregion
#region Transport Members
public SessionPacket GetSession()
{
return handler;
}
#endregion
}
#endregion MyPacketSource
#region MyMessageHandler
public class MyMessageHandler : SessionMessage
{
public Enum what;
public Who xsender;
public Message xmsg;
public bool handled;
public Object xquery;
public Object xquery_result;
public Object xcontrol;
public Object xvalue;
public Object xevent;
public bool Message( Who sender, Message msg )
{
what = What.TESTMESSAGEHANDLERMESSAGE;
this.xsender = sender;
this.xmsg = msg;
return handled;
/**
if (report)
Console.WriteLine( "message: "+msg );
if (msg.IsType( mt_add ))
{
int? x = (int?)( msg.Get( mf_x ) );
int? y = ( int? ) ( msg.Get( mf_y ) );
Int32 r = ( int ) ( x + y );
Message rmsg = msg.Reply( mt_add_result );
rmsg.Add( mf_result, r );
if (report)
Console.WriteLine( "replying with "+rmsg );
src.Message( sender, rmsg );
}
return true;
* */
}
#region Session Members
public object SessionQuery( object query )
{
what = What.OOB_QUERY_HANDLER;
xquery = query;
return xquery_result;
}
public void SessionControl( object control, object value )
{
what = What.OOB_CONTROL_HANDLER;
xcontrol = control;
xvalue = value;
}
public void SessionNotify( object eventObj )
{
what = What.OOB_NOTIFY_HANDLER;
xevent = eventObj;
}
#endregion
#region SessionMessage Members
public bool SessionMessage(Who sender, Message msg)
{
what = What.TESTMESSAGEHANDLERMESSAGE;
this.xsender = sender;
this.xmsg = msg;
return handled;
}
#endregion
}
#endregion MyMessageHandler
public class MyValueFactory : DefaultValueFactory
{
private readonly static TypeMap types = new TypeMap();
private readonly static Class2TypeMap class2type =
new Class2TypeMap();
public readonly static XType mt_add = types.Add( new XType( 1, "add" ) );
public readonly static XType mt_add_result = types.Add( new XType( 2, "add_result" ) );
public static readonly Field mf_x = new Field(3, "x");
public static readonly Field mf_y = new Field(4, "y");
public readonly static Field mf_result = new Field( 5, "xresult");
static MyValueFactory()
{
DefaultValueFactory.Init( types, class2type );
mt_add.PutValidator( mf_x, Validator_int.Get( 0 ) );
mt_add.PutValidator( mf_y, Validator_int.Get( 0 ) );
mt_add.PutValidator( DefaultValueFactory._mf__messageId, Validator_long.Get( 0 ) );
mt_add_result.PutValidator( mf_result, Validator_int.Get( 0 ) );
mt_add_result.PutValidator( DefaultValueFactory._mf__messageId, Validator_long.Get( 0 ) );
mt_add_result.PutValidator( DefaultValueFactory._mf__inReplyTo, Validator_long.Get( 0 ) );
}
public MyValueFactory() : base("none:", types, class2type)
{
// nothing.
}
}
}
}