// $Id$
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
using System;
using System.Diagnostics;
namespace Org.Apache.Etch.Bindings.Csharp.Util
{
///
/// Packetizes a stream data source. Reads a packet header,
/// a 32-bit flag and a 32-bit length, little-endian, verifies
/// the flag, and then, using the length from the header,
/// reads the packet data and passes it to the packet handler.
/// As a packet source, accepts a packet and prepends a packet
/// header to it before delivering it to a data source.
///
public class Packetizer : SessionData, TransportPacket
{
///
/// URI term to specify max packet size
///
public const String MAX_PKT_SIZE_TERM = "Packetizer.maxPktSize";
private const int SIG = unchecked( (int) 0xdeadbeef );
private const int HEADER_SIZE = 8;
///
/// The default maximum packet size that will be accepted, 16376 bytes.
///
public const int DEFAULT_MAX_PKT_SIZE = 16384 - HEADER_SIZE;
///
/// Constructs the Packetizer with the specified transport
/// and the packet size.
///
/// Transport to send data
/// the maximum packet size that will be accepted.
/// Must be >= 0. If maxPktSize == 0, the default will be used.
private Packetizer( TransportData transport, int maxPktSize )
{
if ( maxPktSize < 0 )
throw new ArgumentOutOfRangeException( "maxPktSize < 0" );
this.transport = transport;
this.maxPktSize = maxPktSize;
transport.SetSession(this);
}
public Packetizer( TransportData transport, URL uri, Resources resources )
: this( transport, (int)uri.GetIntegerTerm( MAX_PKT_SIZE_TERM, DEFAULT_MAX_PKT_SIZE ) )
{
// nothing to do.
}
public Packetizer(TransportData transport, String uri, Resources resources)
: this(transport, new URL(uri), resources)
{
// nothing to do.
}
private readonly TransportData transport;
private SessionPacket session;
private readonly int maxPktSize;
public override string ToString()
{
return String.Format("Packetizer / {0}", transport);
}
private bool wantHeader = true;
private int bodyLen;
private readonly FlexBuffer savedBuf = new FlexBuffer();
private int ProcessHeader( FlexBuffer buf, bool reset )
{
int sig = buf.GetInt();
if ( sig != SIG )
throw new Exception( "bad SIG" );
int pktSize = buf.GetInt();
if ( reset )
buf.Reset();
if ( pktSize < 0 || (maxPktSize > 0 && pktSize > maxPktSize) )
throw new Exception( "pktSize < 0 || (maxPktSize > 0 && pktSize > maxPktSize)" );
return pktSize;
}
public int HeaderSize()
{
return HEADER_SIZE;
}
public Object SessionQuery( Object query )
{
return session.SessionQuery( query );
}
public void SessionControl( Object control, Object value )
{
session.SessionControl( control, value );
}
public void SessionNotify( Object eventObj )
{
session.SessionNotify( eventObj );
}
public Object TransportQuery( Object query )
{
return transport.TransportQuery( query );
}
public void TransportControl( Object control, Object value )
{
transport.TransportControl( control, value );
}
public void TransportNotify( Object eventObj )
{
transport.TransportNotify( eventObj );
}
#region TransportPacket Members
public void SetSession(SessionPacket session)
{
this.session = session;
}
public SessionPacket GetSession()
{
return this.session;
}
public void TransportPacket(Who recipient, FlexBuffer buf)
{
// Data-ize the packet
// assert index is at the start of the header.
int dataSize = buf.Avail();
if (dataSize < HEADER_SIZE)
throw new ArgumentException("dataSize < HEADER_SIZE");
int pktSize = dataSize - HEADER_SIZE;
if (maxPktSize > 0 && pktSize > maxPktSize)
throw new ArgumentException( "maxPktSize > 0 && pktSize > maxPktSize" );
int index = buf.Index();
buf.PutInt(SIG);
buf.PutInt(pktSize);
buf.SetIndex(index);
transport.TransportData(recipient, buf);
}
#endregion
#region SessionData Members
public void SessionData(Who sender, FlexBuffer buf)
{
while (buf.Avail() > 0)
{
if (wantHeader)
{
// do we have enough to make a header
if ((savedBuf.Length() + buf.Avail()) >= HEADER_SIZE)
{
int pktSize;
if (savedBuf.Length() == 0)
{
// savedBuf is empty, entire header in buf.
pktSize = ProcessHeader(buf, false);
}
else // header split across savedBuf and buf
{
// move just enough data from buf to savedBuf to have a header.
int needFromBuf = HEADER_SIZE - savedBuf.Length();
savedBuf.Put(buf, needFromBuf);
savedBuf.SetIndex(0);
pktSize = ProcessHeader(savedBuf, true);
}
if (pktSize == 0)
continue;
bodyLen = pktSize;
wantHeader = false;
}
else // want header but not enough space to make it
{
// save buf in savedBuf.
savedBuf.SetIndex(savedBuf.Length());
savedBuf.Put(buf);
}
}
else if ((savedBuf.Length() + buf.Avail()) >= bodyLen)
{
// want body, and there's enough to make it.
// three possible cases: the body is entirely in savedBuf,
// the body is split, or the body is entirely in buf. assert
// that the body cannot entirely be in savedBuf, or else
// we'd have processed it last time.
Debug.Assert(savedBuf.Length() < bodyLen);
if (savedBuf.Length() == 0)
{
// savedBuf is empty, entire body in buf.
int length = buf.Length();
int index = buf.Index();
buf.SetLength(index + bodyLen);
// handler.Packet(sender, buf);
session.SessionPacket(sender,buf);
buf.SetLength(length);
buf.SetIndex(index + bodyLen);
wantHeader = true;
}
else // body split across savedBuf and buf
{
// move just enough data from buf to savedBuf to have a body.
int needFromBuf = bodyLen - savedBuf.Length();
savedBuf.Put(buf, needFromBuf);
savedBuf.SetIndex(0);
// handler.Packet(sender, savedBuf);
session.SessionPacket(sender,savedBuf);
savedBuf.Reset();
wantHeader = true;
}
}
else // want body, but there's not enough to make it.
{
// save buf in savedBuf.
savedBuf.Put(buf);
}
}
// buf is now empty, and there's nothing else to do.
Debug.Assert(buf.Avail() == 0);
}
#endregion
}
}