// $Id$ // // Copyright 2007-2008 Cisco Systems Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy // of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations // under the License. using System; using Etch.Msg; using Etch.Util; namespace Etch.Transport { /// /// Messagizer is a PacketHandler which accepts packets from /// a PacketSource and turns them into messages for delivery /// to a MessageHandler. Messagizer is also a MessageSource, /// accepting messages and turning them into packets for delivery /// to a PacketSource. /// public class Messagizer : TransportMessage, SessionPacket { /// /// Name of format name in uri or resources. The value is "Messagizer.format". /// The result of looking up this name should be a String. /// /// public const String FORMAT = "Messagizer.format"; /// /// Constructs the Messagizer as a session of transport using uri and resources. /// /// used to deliver packets to the peer. /// the uri being used to configure the transport stack. /// the associated set of resources for this service. public Messagizer(TransportPacket transport, string uri, Resources resources) : this(transport, new URL(uri), resources) { } /// /// Constructs the Messagizer as a session of transport using uri and resources. /// /// used to deliver packets to the peer. /// the uri being used to configure the transport stack. /// the associated set of resources for this service. public Messagizer(TransportPacket transport, URL uri, Resources resources) { // find the format. this.transport = transport; String format = uri.GetTerm(FORMAT, FormatFactory.BINARY); // find the format factory. FormatFactory ff = FormatFactory.Get( format ); if ( ff == null ) throw new ArgumentException( String.Format( "format factory is not defined as '{0}' in format factories", format ) ); // find the value factory. ValueFactory vf = ( ValueFactory ) resources.Get( TransportConsts.VALUE_FACTORY ); if ( vf == null ) throw new ArgumentException(String.Format( "value factory is not defined as '{0}' in resources", TransportConsts.VALUE_FACTORY)); tdi = ff.NewTaggedDataInput( vf, uri.ToString() ); tdo = ff.NewTaggedDataOutput( vf, uri.ToString() ); transport.SetSession(this); } // private readonly ValueFactory vf; private readonly TaggedDataInput tdi; private readonly TaggedDataOutput tdo; private TransportPacket transport; private SessionMessage session; public override string ToString() { return String.Format("Messagizer : {0}", transport); } public TransportPacket GetTransport() { return transport; } private readonly FlexBuffer msgBuf = new FlexBuffer(); public Object SessionQuery( Object query ) { return session.SessionQuery( query ); } public void SessionControl( Object control, Object value ) { session.SessionControl( control, value ); } public void SessionNotify( Object eventObj ) { session.SessionNotify( eventObj ); } public Object TransportQuery( Object query ) { return transport.TransportQuery( query ); } public void TransportControl( Object control, Object value ) { transport.TransportControl( control, value ); } public void TransportNotify( Object eventObj ) { transport.TransportNotify( eventObj ); } #region TransportMessage Members public void SetSession(SessionMessage session) { this.session = session; } public SessionMessage GetSession() { return this.session; } public void TransportMessage(Who recipient, Message msg) { // packetize the message. lock (msgBuf) { try { // assert that msgBuf is reset. // leave space for the packet header msgBuf.Skip(transport.HeaderSize(), true); tdo.WriteMessage(msg, msgBuf); msgBuf.SetIndex(0); transport.TransportPacket(recipient,msgBuf); } finally { msgBuf.Reset(); } } } #endregion #region SessionPacket Members public void SessionPacket(Who sender, FlexBuffer buf) { Message msg = tdi.ReadMessage(buf); bool handled = session.SessionMessage(sender,msg); if (!handled) session.SessionNotify(new UnwantedMessage(sender,msg)); } #endregion } }