/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using Apache.NMS.Stomp.Commands; using Apache.NMS.Stomp.Transport; using Apache.NMS.Util; using System; using System.IO; using System.Text; namespace Apache.NMS.Stomp.Protocol { /// /// Implements the STOMP protocol. /// public class StompWireFormat : IWireFormat { private Encoding encoder = new UTF8Encoding(); private IPrimitiveMapMarshaler mapMarshaler = new XmlPrimitiveMapMarshaler(); private ITransport transport; public StompWireFormat() { } public ITransport Transport { get { return transport; } set { transport = value; } } public int Version { get { return 1; } } public Encoding Encoder { get { return this.encoder; } set { this.encoder = value; } } public IPrimitiveMapMarshaler MapMarshaler { get { return this.mapMarshaler; } set { this.mapMarshaler = value; } } public void Marshal(Object o, BinaryWriter dataOut) { Tracer.Debug("StompWireFormat - Marshaling: " + o); if(o is ConnectionInfo) { WriteConnectionInfo((ConnectionInfo) o, dataOut); } else if(o is Message) { WriteMessage((Message) o, dataOut); } else if(o is ConsumerInfo) { WriteConsumerInfo((ConsumerInfo) o, dataOut); } else if(o is MessageAck) { WriteMessageAck((MessageAck) o, dataOut); } else if(o is TransactionInfo) { WriteTransactionInfo((TransactionInfo) o, dataOut); } else if(o is ShutdownInfo) { WriteShutdownInfo((ShutdownInfo) o, dataOut); } else if(o is RemoveInfo) { WriteRemoveInfo((RemoveInfo) o, dataOut); } else if(o is KeepAliveInfo) { WriteKeepAliveInfo((KeepAliveInfo) o, dataOut); } else if(o is Command) { Command command = o as Command; if(command.ResponseRequired) { Response response = new Response(); response.CorrelationId = command.CommandId; SendCommand(response); Tracer.Debug("StompWireFormat - Autorespond to command: " + o.GetType()); } } else { Tracer.Debug("StompWireFormat - Ignored command: " + o.GetType()); } } public Object Unmarshal(BinaryReader dataIn) { StompFrame frame = new StompFrame(); frame.FromStream(dataIn); Object answer = CreateCommand(frame); return answer; } protected virtual Object CreateCommand(StompFrame frame) { string command = frame.Command; if(command == "RECEIPT" || command == "CONNECTED") { string text = frame.RemoveProperty("receipt-id"); if(text != null) { Response answer = new Response(); if(text.StartsWith("ignore:")) { text = text.Substring("ignore:".Length); } Tracer.Debug("StompWireFormat - Received RESPONSE command: CorrelationId = " + text); answer.CorrelationId = Int32.Parse(text); return answer; } else if(command == "CONNECTED") { text = frame.RemoveProperty("response-id"); Tracer.Debug("StompWireFormat - Received CONNECTED command: ResponseId = " + text); if(text != null) { Response answer = new Response(); answer.CorrelationId = Int32.Parse(text); return answer; } } } else if(command == "ERROR") { string text = frame.RemoveProperty("receipt-id"); if(text != null && text.StartsWith("ignore:")) { Tracer.Debug("StompWireFormat - Received ERROR Response command: correlationId = " + text); Response answer = new Response(); answer.CorrelationId = Int32.Parse(text.Substring("ignore:".Length)); return answer; } else { ExceptionResponse answer = new ExceptionResponse(); if(text != null) { answer.CorrelationId = Int32.Parse(text); } BrokerError error = new BrokerError(); error.Message = frame.RemoveProperty("message"); answer.Exception = error; Tracer.Debug("StompWireFormat - Received ERROR command: " + error.Message); return answer; } } else if(command == "MESSAGE") { Tracer.Debug("StompWireFormat - Received MESSAGE command"); return ReadMessage(frame); } Tracer.Error("Unknown command: " + frame.Command + " headers: " + frame.Properties); return null; } protected virtual Command ReadMessage(StompFrame frame) { Message message = null; string transformation = frame.RemoveProperty("transformation"); if(frame.HasProperty("content-length")) { message = new BytesMessage(); message.Content = frame.Content; } else if(transformation == "jms-map-xml") { message = new MapMessage(this.mapMarshaler.Unmarshal(frame.Content) as PrimitiveMap); } else { message = new TextMessage(encoder.GetString(frame.Content, 0, frame.Content.Length)); } // Remove any receipt header we might have attached if the outbound command was // sent with response required set to true frame.RemoveProperty("receipt"); // Clear any attached content length headers as they aren't needed anymore and can // clutter the Message Properties. frame.RemoveProperty("content-length"); message.Type = frame.RemoveProperty("type"); message.Destination = Destination.ConvertToDestination(frame.RemoveProperty("destination")); message.ReplyTo = Destination.ConvertToDestination(frame.RemoveProperty("reply-to")); message.TargetConsumerId = new ConsumerId(frame.RemoveProperty("subscription")); message.CorrelationId = frame.RemoveProperty("correlation-id"); Tracer.Debug("RECV - Inbound MessageId = " + frame.GetProperty("message-id")); message.MessageId = new MessageId(frame.RemoveProperty("message-id")); message.Persistent = StompHelper.ToBool(frame.RemoveProperty("persistent"), false); // If it came from NMS.Stomp we added this header to ensure its reported on the // receiver side. if(frame.HasProperty("NMSXDeliveryMode")) { message.Persistent = StompHelper.ToBool(frame.RemoveProperty("NMSXDeliveryMode"), false); } if(frame.HasProperty("priority")) { message.Priority = Byte.Parse(frame.RemoveProperty("priority")); } if(frame.HasProperty("timestamp")) { message.Timestamp = Int64.Parse(frame.RemoveProperty("timestamp")); } if(frame.HasProperty("expires")) { message.Expiration = Int64.Parse(frame.RemoveProperty("expires")); } if(frame.RemoveProperty("redelivered") != null) { // We aren't told how many times that the message was redelivered so if it // is tagged as redelivered we always set the counter to one. message.RedeliveryCounter = 1; } // now lets add the generic headers foreach(string key in frame.Properties.Keys) { Object value = frame.Properties[key]; if(value != null) { // lets coerce some standard header extensions if(key == "JMSXGroupSeq" || key == "NMSXGroupSeq") { value = Int32.Parse(value.ToString()); message.Properties["NMSXGroupSeq"] = value; continue; } else if(key == "JMSXGroupID" || key == "NMSXGroupID") { message.Properties["NMSXGroupID"] = value; continue; } } message.Properties[key] = value; } MessageDispatch dispatch = new MessageDispatch(); dispatch.Message = message; dispatch.ConsumerId = message.TargetConsumerId; dispatch.Destination = message.Destination; dispatch.RedeliveryCounter = message.RedeliveryCounter; return dispatch; } protected virtual void WriteMessage(Message command, BinaryWriter dataOut) { StompFrame frame = new StompFrame("SEND"); if(command.ResponseRequired) { frame.SetProperty("receipt", command.CommandId); } frame.SetProperty("destination", Destination.ConvertToStompString(command.Destination)); if(command.ReplyTo != null) { frame.SetProperty("reply-to", Destination.ConvertToStompString(command.ReplyTo)); } if(command.CorrelationId != null ) { frame.SetProperty("correlation-id", command.CorrelationId); } if(command.Expiration != 0) { frame.SetProperty("expires", command.Expiration); } if(command.Timestamp != 0) { frame.SetProperty("timestamp", command.Timestamp); } if(command.Priority != 4) { frame.SetProperty("priority", command.Priority); } if(command.Type != null) { frame.SetProperty("type", command.Type); } if(command.TransactionId!=null) { frame.SetProperty("transaction", command.TransactionId.ToString()); } frame.SetProperty("persistent", command.Persistent.ToString().ToLower()); frame.SetProperty("NMSXDeliveryMode", command.Persistent.ToString().ToLower()); if(command.NMSXGroupID != null) { frame.SetProperty("JMSXGroupID", command.NMSXGroupID); frame.SetProperty("NMSXGroupID", command.NMSXGroupID); frame.SetProperty("JMSXGroupSeq", command.NMSXGroupSeq); frame.SetProperty("NMSXGroupSeq", command.NMSXGroupSeq); } // Perform any Content Marshaling. command.BeforeMarshall(this); // Store the Marshaled Content. frame.Content = command.Content; if(command is BytesMessage) { if(command.Content != null && command.Content.Length > 0) { frame.SetProperty("content-length", command.Content.Length); } frame.SetProperty("transformation", "jms-byte"); } else if(command is MapMessage) { frame.SetProperty("transformation", this.mapMarshaler.Name); } // Marshal all properties to the Frame. IPrimitiveMap map = command.Properties; foreach(string key in map.Keys) { frame.SetProperty(key, map[key]); } frame.ToStream(dataOut); } protected virtual void WriteMessageAck(MessageAck command, BinaryWriter dataOut) { StompFrame frame = new StompFrame("ACK"); if(command.ResponseRequired) { frame.SetProperty("receipt", "ignore:" + command.CommandId); } frame.SetProperty("message-id", command.LastMessageId.ToString()); Tracer.Debug("ACK - Outbound MessageId = " + frame.GetProperty("message-id")); if(command.TransactionId != null) { frame.SetProperty("transaction", command.TransactionId.ToString()); } frame.ToStream(dataOut); } protected virtual void WriteConnectionInfo(ConnectionInfo command, BinaryWriter dataOut) { // lets force a receipt for the Connect Frame. StompFrame frame = new StompFrame("CONNECT"); frame.SetProperty("client-id", command.ClientId); frame.SetProperty("login", command.UserName); frame.SetProperty("passcode", command.Password); frame.SetProperty("request-id", command.CommandId); frame.ToStream(dataOut); } protected virtual void WriteShutdownInfo(ShutdownInfo command, BinaryWriter dataOut) { System.Diagnostics.Debug.Assert(!command.ResponseRequired); new StompFrame("DISCONNECT").ToStream(dataOut); } protected virtual void WriteConsumerInfo(ConsumerInfo command, BinaryWriter dataOut) { StompFrame frame = new StompFrame("SUBSCRIBE"); if(command.ResponseRequired) { frame.SetProperty("receipt", command.CommandId); } frame.SetProperty("destination", Destination.ConvertToStompString(command.Destination)); frame.SetProperty("id", command.ConsumerId.ToString()); frame.SetProperty("durable-subscriber-name", command.SubscriptionName); frame.SetProperty("selector", command.Selector); frame.SetProperty("ack", StompHelper.ToStomp(command.AckMode)); Tracer.Debug("SUBSCRIBE : Outbound AckMode = " + frame.GetProperty("ack")); if(command.NoLocal) { frame.SetProperty("no-local", command.NoLocal.ToString()); } // ActiveMQ extensions to STOMP if(command.Transformation != null) { frame.SetProperty("transformation", command.Transformation); } else { frame.SetProperty("transformation", "jms-xml"); } frame.SetProperty("activemq.dispatchAsync", command.DispatchAsync); if(command.Exclusive) { frame.SetProperty("activemq.exclusive", command.Exclusive); } if(command.SubscriptionName != null) { frame.SetProperty("activemq.subscriptionName", command.SubscriptionName); // For an older 4.0 broker we need to set this header so they get the // subscription as well.. frame.SetProperty("activemq.subcriptionName", command.SubscriptionName); } frame.SetProperty("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit); frame.SetProperty("activemq.prefetchSize", command.PrefetchSize); frame.SetProperty("activemq.priority", command.Priority); if(command.Retroactive) { frame.SetProperty("activemq.retroactive", command.Retroactive); } frame.ToStream(dataOut); } protected virtual void WriteKeepAliveInfo(KeepAliveInfo command, BinaryWriter dataOut) { dataOut.Write((byte) '\n' ); } protected virtual void WriteRemoveInfo(RemoveInfo command, BinaryWriter dataOut) { StompFrame frame = new StompFrame("UNSUBSCRIBE"); object id = command.ObjectId; if(id is ConsumerId) { ConsumerId consumerId = id as ConsumerId; if(command.ResponseRequired) { frame.SetProperty("receipt", command.CommandId); } frame.SetProperty("id", consumerId.ToString() ); frame.ToStream(dataOut); } } protected virtual void WriteTransactionInfo(TransactionInfo command, BinaryWriter dataOut) { string type = "BEGIN"; TransactionType transactionType = (TransactionType) command.Type; switch(transactionType) { case TransactionType.Commit: command.ResponseRequired = true; type = "COMMIT"; break; case TransactionType.Rollback: command.ResponseRequired = true; type = "ABORT"; break; } Tracer.Debug("StompWireFormat - For transaction type: " + transactionType + " we are using command type: " + type); StompFrame frame = new StompFrame(type); if(command.ResponseRequired) { frame.SetProperty("receipt", command.CommandId); } frame.SetProperty("transaction", command.TransactionId.ToString()); frame.ToStream(dataOut); } protected virtual void SendCommand(Command command) { if(transport == null) { Tracer.Fatal("No transport configured so cannot return command: " + command); } else { transport.Command(transport, command); } } protected virtual string ToString(object value) { if(value != null) { return value.ToString(); } else { return null; } } } }