/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.IO; using System.Messaging; using System.Text; using Apache.NMS.Util; namespace Apache.NMS.MSMQ { public enum NMSMessageType { BaseMessage, TextMessage, BytesMessage, ObjectMessage, MapMessage, StreamMessage } public class DefaultMessageConverter : IMessageConverter { public virtual Message ToMsmqMessage(IMessage message) { Message msmqMessage = new Message(); PrimitiveMap metaData = new PrimitiveMap(); ConvertMessageBodyToMSMQ(message, msmqMessage); if(message.NMSTimeToLive != TimeSpan.Zero) { msmqMessage.TimeToBeReceived = message.NMSTimeToLive; } if(message.NMSCorrelationID != null) { metaData.SetString("NMSCorrelationID", message.NMSCorrelationID); } msmqMessage.Recoverable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent); msmqMessage.Priority = ToMessagePriority(message.NMSPriority); msmqMessage.ResponseQueue = ToMsmqDestination(message.NMSReplyTo); if(message.NMSType != null) { msmqMessage.Label = message.NMSType; } // Store the NMS meta data in the extension area msmqMessage.Extension = metaData.Marshal(); return msmqMessage; } public virtual IMessage ToNmsMessage(Message message) { BaseMessage answer = CreateNmsMessage(message); // Get the NMS meta data from the extension area PrimitiveMap metaData = PrimitiveMap.Unmarshal(message.Extension); try { answer.NMSMessageId = message.Id; answer.NMSCorrelationID = metaData.GetString("NMSCorrelationID"); answer.NMSDeliveryMode = (message.Recoverable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); answer.NMSDestination = ToNmsDestination(message.DestinationQueue); } catch(InvalidOperationException) { } try { answer.NMSType = message.Label; answer.NMSReplyTo = ToNmsDestination(message.ResponseQueue); answer.NMSTimeToLive = message.TimeToBeReceived; } catch(InvalidOperationException) { } return answer; } private static MessagePriority ToMessagePriority(MsgPriority msgPriority) { switch(msgPriority) { case MsgPriority.Lowest: return MessagePriority.Lowest; case MsgPriority.VeryLow: return MessagePriority.VeryLow; case MsgPriority.Low: case MsgPriority.AboveLow: return MessagePriority.Low; default: case MsgPriority.BelowNormal: case MsgPriority.Normal: return MessagePriority.Normal; case MsgPriority.AboveNormal: return MessagePriority.AboveNormal; case MsgPriority.High: return MessagePriority.High; case MsgPriority.VeryHigh: return MessagePriority.VeryHigh; case MsgPriority.Highest: return MessagePriority.Highest; } } protected virtual void ConvertMessageBodyToMSMQ(IMessage message, Message answer) { if(message is TextMessage) { TextMessage textMessage = message as TextMessage; byte[] buf = Encoding.UTF32.GetBytes(textMessage.Text); answer.BodyStream.Write(buf, 0, buf.Length); answer.AppSpecific = (int) NMSMessageType.TextMessage; } else if(message is BytesMessage) { BytesMessage bytesMessage = message as BytesMessage; answer.BodyStream.Write(bytesMessage.Content, 0, bytesMessage.Content.Length); answer.AppSpecific = (int) NMSMessageType.BytesMessage; } else if(message is ObjectMessage) { ObjectMessage objectMessage = message as ObjectMessage; answer.Body = objectMessage.Body; answer.AppSpecific = (int) NMSMessageType.ObjectMessage; } else if(message is MapMessage) { MapMessage mapMessage = message as MapMessage; PrimitiveMap mapBody = mapMessage.Body as PrimitiveMap; byte[] buf = mapBody.Marshal(); answer.BodyStream.Write(buf, 0, buf.Length); answer.AppSpecific = (int) NMSMessageType.MapMessage; } else if(message is StreamMessage) { StreamMessage streamMessage = message as StreamMessage; answer.AppSpecific = (int) NMSMessageType.StreamMessage; // TODO: Implement } else if(message is BaseMessage) { answer.AppSpecific = (int) NMSMessageType.BaseMessage; } else { throw new Exception("unhandled message type"); } } protected virtual BaseMessage CreateNmsMessage(Message message) { BaseMessage result = null; if((int) NMSMessageType.TextMessage == message.AppSpecific) { TextMessage textMessage = new TextMessage(); string content = String.Empty; if(message.BodyStream != null && message.BodyStream.Length > 0) { byte[] buf = null; buf = new byte[message.BodyStream.Length]; message.BodyStream.Read(buf, 0, buf.Length); content = Encoding.UTF32.GetString(buf); } textMessage.Text = content; result = textMessage; } else if((int) NMSMessageType.BytesMessage == message.AppSpecific) { byte[] buf = null; if(message.BodyStream != null && message.BodyStream.Length > 0) { buf = new byte[message.BodyStream.Length]; message.BodyStream.Read(buf, 0, buf.Length); } BytesMessage bytesMessage = new BytesMessage(); bytesMessage.Content = buf; result = bytesMessage; } else if((int) NMSMessageType.ObjectMessage == message.AppSpecific) { ObjectMessage objectMessage = new ObjectMessage(); objectMessage.Body = message.Body; result = objectMessage; } else if((int) NMSMessageType.MapMessage == message.AppSpecific) { byte[] buf = null; if(message.BodyStream != null && message.BodyStream.Length > 0) { buf = new byte[message.BodyStream.Length]; message.BodyStream.Read(buf, 0, buf.Length); } MapMessage mapMessage = new MapMessage(); mapMessage.Body = PrimitiveMap.Unmarshal(buf); result = mapMessage; } else if((int) NMSMessageType.StreamMessage == message.AppSpecific) { StreamMessage streamMessage = new StreamMessage(); // TODO: Implement result = streamMessage; } else { BaseMessage baseMessage = new BaseMessage(); result = baseMessage; } return result; } public MessageQueue ToMsmqDestination(IDestination destination) { if(null == destination) { return null; } return new MessageQueue((destination as Destination).Path); } protected virtual IDestination ToNmsDestination(MessageQueue destinationQueue) { if(null == destinationQueue) { return null; } return new Queue(destinationQueue.Path); } } }