using System; using ZMQ; using System.IO; using Thrift.Transport; namespace ZmqClient { public class TZmqClient : TTransport { Socket _sock; String _endpoint; MemoryStream _wbuf = new MemoryStream (); MemoryStream _rbuf = new MemoryStream (); void debug (string msg) { //Uncomment to enable debug // Console.WriteLine (msg); } public TZmqClient (Context ctx, String endpoint, SocketType sockType) { _sock = ctx.Socket (sockType); _endpoint = endpoint; } public override void Open () { _sock.Connect (_endpoint); } public override void Close () { throw new NotImplementedException (); } public override bool IsOpen { get { throw new NotImplementedException (); } } public override int Read (byte[] buf, int off, int len) { debug ("Client_Read"); if (off != 0 || len != buf.Length) throw new NotImplementedException (); if (_rbuf.Length == 0) { //Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift reponse debug ("Client_Read Filling buffer.."); byte[] tmpBuf = _sock.Recv (); debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length)); _rbuf.Write (tmpBuf, 0, tmpBuf.Length); _rbuf.Position = 0; //For reading } int ret = _rbuf.Read (buf, 0, len); if (_rbuf.Length == _rbuf.Position) //Finished reading _rbuf.SetLength (0); debug (string.Format ("Client_Read return {0}b, remaining {1}b", ret, _rbuf.Length - _rbuf.Position)); return ret; } public override void Write (byte[] buf, int off, int len) { debug ("Client_Write"); _wbuf.Write (buf, off, len); } public override void Flush () { debug ("Client_Flush"); _sock.Send (_wbuf.GetBuffer ()); _wbuf = new MemoryStream (); } } }