// $Id$ // // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // using System; using System.Diagnostics; namespace Org.Apache.Etch.Bindings.Csharp.Util { /// /// Packetizes a stream data source. Reads a packet header, /// a 32-bit flag and a 32-bit length, little-endian, verifies /// the flag, and then, using the length from the header, /// reads the packet data and passes it to the packet handler. /// As a packet source, accepts a packet and prepends a packet /// header to it before delivering it to a data source. /// public class Packetizer : SessionData, TransportPacket { /// /// URI term to specify max packet size /// public const String MAX_PKT_SIZE_TERM = "Packetizer.maxPktSize"; private const int SIG = unchecked( (int) 0xdeadbeef ); private const int HEADER_SIZE = 8; /// /// The default maximum packet size that will be accepted, 16376 bytes. /// public const int DEFAULT_MAX_PKT_SIZE = 16384 - HEADER_SIZE; /// /// Constructs the Packetizer with the specified transport /// and the packet size. /// /// Transport to send data /// the maximum packet size that will be accepted. /// Must be >= 0. If maxPktSize == 0, the default will be used. private Packetizer( TransportData transport, int maxPktSize ) { if ( maxPktSize < 0 ) throw new ArgumentOutOfRangeException( "maxPktSize < 0" ); this.transport = transport; this.maxPktSize = maxPktSize; transport.SetSession(this); } public Packetizer( TransportData transport, URL uri, Resources resources ) : this( transport, (int)uri.GetIntegerTerm( MAX_PKT_SIZE_TERM, DEFAULT_MAX_PKT_SIZE ) ) { // nothing to do. } public Packetizer(TransportData transport, String uri, Resources resources) : this(transport, new URL(uri), resources) { // nothing to do. } private readonly TransportData transport; private SessionPacket session; private readonly int maxPktSize; public override string ToString() { return String.Format("Packetizer / {0}", transport); } private bool wantHeader = true; private int bodyLen; private readonly FlexBuffer savedBuf = new FlexBuffer(); private int ProcessHeader( FlexBuffer buf, bool reset ) { int sig = buf.GetInt(); if ( sig != SIG ) throw new Exception( "bad SIG" ); int pktSize = buf.GetInt(); if ( reset ) buf.Reset(); if ( pktSize < 0 || (maxPktSize > 0 && pktSize > maxPktSize) ) throw new Exception( "pktSize < 0 || (maxPktSize > 0 && pktSize > maxPktSize)" ); return pktSize; } public int HeaderSize() { return HEADER_SIZE; } public Object SessionQuery( Object query ) { return session.SessionQuery( query ); } public void SessionControl( Object control, Object value ) { session.SessionControl( control, value ); } public void SessionNotify( Object eventObj ) { session.SessionNotify( eventObj ); } public Object TransportQuery( Object query ) { return transport.TransportQuery( query ); } public void TransportControl( Object control, Object value ) { transport.TransportControl( control, value ); } public void TransportNotify( Object eventObj ) { transport.TransportNotify( eventObj ); } #region TransportPacket Members public void SetSession(SessionPacket session) { this.session = session; } public SessionPacket GetSession() { return this.session; } public void TransportPacket(Who recipient, FlexBuffer buf) { // Data-ize the packet // assert index is at the start of the header. int dataSize = buf.Avail(); if (dataSize < HEADER_SIZE) throw new ArgumentException("dataSize < HEADER_SIZE"); int pktSize = dataSize - HEADER_SIZE; if (maxPktSize > 0 && pktSize > maxPktSize) throw new ArgumentException( "maxPktSize > 0 && pktSize > maxPktSize" ); int index = buf.Index(); buf.PutInt(SIG); buf.PutInt(pktSize); buf.SetIndex(index); transport.TransportData(recipient, buf); } #endregion #region SessionData Members public void SessionData(Who sender, FlexBuffer buf) { while (buf.Avail() > 0) { if (wantHeader) { // do we have enough to make a header if ((savedBuf.Length() + buf.Avail()) >= HEADER_SIZE) { int pktSize; if (savedBuf.Length() == 0) { // savedBuf is empty, entire header in buf. pktSize = ProcessHeader(buf, false); } else // header split across savedBuf and buf { // move just enough data from buf to savedBuf to have a header. int needFromBuf = HEADER_SIZE - savedBuf.Length(); savedBuf.Put(buf, needFromBuf); savedBuf.SetIndex(0); pktSize = ProcessHeader(savedBuf, true); } if (pktSize == 0) continue; bodyLen = pktSize; wantHeader = false; } else // want header but not enough space to make it { // save buf in savedBuf. savedBuf.SetIndex(savedBuf.Length()); savedBuf.Put(buf); } } else if ((savedBuf.Length() + buf.Avail()) >= bodyLen) { // want body, and there's enough to make it. // three possible cases: the body is entirely in savedBuf, // the body is split, or the body is entirely in buf. assert // that the body cannot entirely be in savedBuf, or else // we'd have processed it last time. Debug.Assert(savedBuf.Length() < bodyLen); if (savedBuf.Length() == 0) { // savedBuf is empty, entire body in buf. int length = buf.Length(); int index = buf.Index(); buf.SetLength(index + bodyLen); // handler.Packet(sender, buf); session.SessionPacket(sender,buf); buf.SetLength(length); buf.SetIndex(index + bodyLen); wantHeader = true; } else // body split across savedBuf and buf { // move just enough data from buf to savedBuf to have a body. int needFromBuf = bodyLen - savedBuf.Length(); savedBuf.Put(buf, needFromBuf); savedBuf.SetIndex(0); // handler.Packet(sender, savedBuf); session.SessionPacket(sender,savedBuf); savedBuf.Reset(); wantHeader = true; } } else // want body, but there's not enough to make it. { // save buf in savedBuf. savedBuf.Put(buf); } } // buf is now empty, and there's nothing else to do. Debug.Assert(buf.Avail() == 0); } #endregion } }