/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections; using System.Text; using log4net; using Qpid.Framing; using Qpid.Messaging; using Qpid.Buffer; namespace Qpid.Client.Message { public abstract class AbstractQmsMessage : AMQMessage, IMessage { private static readonly ILog _log = LogManager.GetLogger(typeof(AbstractQmsMessage)); // protected long _messageNbr; protected bool _redelivered; protected ByteBuffer _data; //protected AbstractQmsMessage() : base(new BasicContentHeaderProperties()) //{ //} //protected AbstractQmsMessage(ulong messageNbr, BasicContentHeaderProperties contentHeader) // : this(contentHeader) //{ // _messageNbr = messageNbr; //} //protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader) // : base(contentHeader) //{ //} #region new_java_ctrs protected AbstractQmsMessage(ByteBuffer data) : base(new BasicContentHeaderProperties()) { _data = data; if (_data != null) { _data.Acquire(); } } protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) : this(contentHeader, deliveryTag) { _data = data; if (_data != null) { _data.Acquire(); } } protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag) { } #endregion public string MessageId { get { if (ContentHeaderProperties.MessageId == null) { ContentHeaderProperties.MessageId = "ID:" + DeliveryTag; } return ContentHeaderProperties.MessageId; } set { ContentHeaderProperties.MessageId = value; } } public long Timestamp { get { // TODO: look at ulong/long choice return (long) ContentHeaderProperties.Timestamp; } set { ContentHeaderProperties.Timestamp = (ulong) value; } } public byte[] CorrelationIdAsBytes { get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); } set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); } } public string CorrelationId { get { return ContentHeaderProperties.CorrelationId; } set { ContentHeaderProperties.ContentType = value; } } struct Dest { public string ExchangeName; public string RoutingKey; public Dest(string exchangeName, string routingKey) { ExchangeName = exchangeName; RoutingKey = routingKey; } } public string ReplyToExchangeName { get { Dest dest = ReadReplyToHeader(); return dest.ExchangeName; } set { Dest dest = ReadReplyToHeader(); dest.ExchangeName = value; WriteReplyToHeader(dest); } } public string ReplyToRoutingKey { get { Dest dest = ReadReplyToHeader(); return dest.RoutingKey; } set { Dest dest = ReadReplyToHeader(); dest.RoutingKey = value; WriteReplyToHeader(dest); } } private Dest ReadReplyToHeader() { string replyToEncoding = ContentHeaderProperties.ReplyTo; if (replyToEncoding == null) { return new Dest(); } else { string routingKey; string exchangeName = GetExchangeName(replyToEncoding, out routingKey); return new Dest(exchangeName, routingKey); } } private void WriteReplyToHeader(Dest dest) { string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey); ContentHeaderProperties.ReplyTo = encodedDestination; } private static string GetExchangeName(string replyToEncoding, out string routingKey) { string[] split = replyToEncoding.Split(new char[':']); if (_log.IsDebugEnabled) { _log.Debug(string.Format("replyToEncoding = '{0}'", replyToEncoding)); _log.Debug(string.Format("split = {0}", split)); _log.Debug(string.Format("split.Length = {0}", split.Length)); } if (split.Length == 1) { // Using an alternative split implementation here since it appears that string.Split // is broken in .NET. It doesn't split when the first character is the delimiter. // Here we check for the first character being the delimiter. This handles the case // where ExchangeName is empty (i.e. sends will be to the default exchange). if (replyToEncoding[0] == ':') { split = new string[2]; split[0] = null; split[1] = replyToEncoding.Substring(1); if (_log.IsDebugEnabled) { _log.Debug("Alternative split method..."); _log.Debug(string.Format("split = {0}", split)); _log.Debug(string.Format("split.Length = {0}", split.Length)); } } } if (split.Length != 2) { throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding); } string exchangeName = split[0]; routingKey = split[1]; return exchangeName; } public DeliveryMode DeliveryMode { get { byte b = ContentHeaderProperties.DeliveryMode; switch (b) { case 1: return DeliveryMode.NonPersistent; case 2: return DeliveryMode.Persistent; default: throw new QpidException("Illegal value for delivery mode in content header properties"); } } set { ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2); } } public bool Redelivered { get { return _redelivered; } set { _redelivered = value; } } public string Type { get { return MimeType; } set { //MimeType = value; } } public long Expiration { get { return ContentHeaderProperties.Expiration; } set { ContentHeaderProperties.Expiration = (uint) value; } } public int Priority { get { return ContentHeaderProperties.Priority; } set { ContentHeaderProperties.Priority = (byte) value; } } // FIXME: implement public string ContentType { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } // FIXME: implement public string ContentEncoding { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } public void Acknowledge() { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge // is not specified. In our case, we only set the session field where client acknowledge mode is specified. if (_channel != null) { // we set multiple to true here since acknowledgement implies acknowledge of all previous messages // received on the session _channel.AcknowledgeMessage((ulong)DeliveryTag, true); } } public IHeaders Headers { get { return new QpidHeaders(this); } } public abstract void ClearBody(); /// /// Get a String representation of the body of the message. Used in the /// toString() method which outputs this before message properties. /// /// public abstract string ToBodyString(); /// /// Return the raw byte array that is used to populate the frame when sending /// the message. /// /// a byte array of message data public ByteBuffer Data { get { // make sure we rewind the data just in case any method has moved the // position beyond the start if (_data != null) { _data.Rewind(); } return _data; } set { _data = value; } } public abstract string MimeType { get; } public override string ToString() { try { StringBuilder buf = new StringBuilder("Body:\n"); buf.Append(ToBodyString()); buf.Append("\nQmsTimestamp: ").Append(Timestamp); buf.Append("\nQmsExpiration: ").Append(Expiration); buf.Append("\nQmsPriority: ").Append(Priority); buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode); buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName); buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey); buf.Append("\nAMQ message number: ").Append(DeliveryTag); buf.Append("\nProperties:"); if (ContentHeaderProperties.Headers == null) { buf.Append(""); } else { buf.Append(Headers.ToString()); } return buf.ToString(); } catch (Exception e) { return e.ToString(); } } public IFieldTable UnderlyingMessagePropertiesMap { get { return ContentHeaderProperties.Headers; } set { ContentHeaderProperties.Headers = (FieldTable)value; } } public FieldTable PopulateHeadersFromMessageProperties() { if (ContentHeaderProperties.Headers == null) { return null; } else { // // We need to convert every property into a String representation // Note that type information is preserved in the property name // FieldTable table = new FieldTable(); foreach (DictionaryEntry entry in ContentHeaderProperties.Headers) { string propertyName = (string) entry.Key; if (propertyName == null) { continue; } else { table[propertyName] = entry.Value.ToString(); } } return table; } } /// /// Get the AMQ message number assigned to this message /// /// the message number //public ulong MessageNbr //{ // get // { // return _messageNbr; // } // set // { // _messageNbr = value; // } //} public BasicContentHeaderProperties ContentHeaderProperties { get { return (BasicContentHeaderProperties) _contentHeaderProperties; } } } internal class QpidHeaders : IHeaders { public const char BOOLEAN_PROPERTY_PREFIX = 'B'; public const char BYTE_PROPERTY_PREFIX = 'b'; public const char SHORT_PROPERTY_PREFIX = 's'; public const char INT_PROPERTY_PREFIX = 'i'; public const char LONG_PROPERTY_PREFIX = 'l'; public const char FLOAT_PROPERTY_PREFIX = 'f'; public const char DOUBLE_PROPERTY_PREFIX = 'd'; public const char STRING_PROPERTY_PREFIX = 'S'; AbstractQmsMessage _message; public QpidHeaders(AbstractQmsMessage message) { _message = message; } public bool Contains(string name) { CheckPropertyName(name); if (_message.ContentHeaderProperties.Headers == null) { return false; } else { // TODO: fix this return _message.ContentHeaderProperties.Headers.Contains(STRING_PROPERTY_PREFIX + name); } } public void Clear() { if (_message.ContentHeaderProperties.Headers != null) { _message.ContentHeaderProperties.Headers.Clear(); } } public string this[string name] { get { return GetString(name); } set { SetString(name, value); } } public bool GetBoolean(string name) { CheckPropertyName(name); if (_message.ContentHeaderProperties.Headers == null) { return false; } else { object b = _message.ContentHeaderProperties.Headers[BOOLEAN_PROPERTY_PREFIX + name]; if (b == null) { return false; } else { return (bool)b; } } } public void SetBoolean(string name, bool b) { CheckPropertyName(name); _message.ContentHeaderProperties.Headers[BOOLEAN_PROPERTY_PREFIX + name] = b; } public byte GetByte(string propertyName) { CheckPropertyName(propertyName); if (_message.ContentHeaderProperties.Headers == null) { return 0; } else { object b = _message.ContentHeaderProperties.Headers[BYTE_PROPERTY_PREFIX + propertyName]; if (b == null) { return 0; } else { return (byte)b; } } } public void SetByte(string propertyName, byte b) { CheckPropertyName(propertyName); _message.ContentHeaderProperties.Headers[BYTE_PROPERTY_PREFIX + propertyName] = b; } public short GetShort(string propertyName) { CheckPropertyName(propertyName); if (_message.ContentHeaderProperties.Headers == null) { return 0; } else { object s = _message.ContentHeaderProperties.Headers[SHORT_PROPERTY_PREFIX + propertyName]; if (s == null) { return 0; } else { return (short)s; } } } public void SetShort(string propertyName, short i) { CheckPropertyName(propertyName); _message.ContentHeaderProperties.Headers[SHORT_PROPERTY_PREFIX + propertyName] = i; } public int GetInt(string propertyName) { CheckPropertyName(propertyName); if (_message.ContentHeaderProperties.Headers == null) { return 0; } else { object i = _message.ContentHeaderProperties.Headers[INT_PROPERTY_PREFIX + propertyName]; if (i == null) { return 0; } else { return (int)i; } } } public void SetInt(string propertyName, int i) { CheckPropertyName(propertyName); _message.ContentHeaderProperties.Headers[INT_PROPERTY_PREFIX + propertyName] = i; } public long GetLong(string propertyName) { CheckPropertyName(propertyName); if (_message.ContentHeaderProperties.Headers == null) { return 0; } else { object l = _message.ContentHeaderProperties.Headers[LONG_PROPERTY_PREFIX + propertyName]; if (l == null) { // temp - the spec says do this but this throws a NumberFormatException //return Long.valueOf(null).longValue(); return 0; } else { return (long)l; } } } public void SetLong(string propertyName, long l) { CheckPropertyName(propertyName); _message.ContentHeaderProperties.Headers[LONG_PROPERTY_PREFIX + propertyName] = l; } public float GetFloat(String propertyName) { CheckPropertyName(propertyName); if (_message.ContentHeaderProperties.Headers == null) { return 0; } else { object f = _message.ContentHeaderProperties.Headers[FLOAT_PROPERTY_PREFIX + propertyName]; if (f == null) { return 0; } else { return (float)f; } } } public void SetFloat(string propertyName, float f) { CheckPropertyName(propertyName); _message.ContentHeaderProperties.Headers[FLOAT_PROPERTY_PREFIX + propertyName] = f; } public double GetDouble(string propertyName) { CheckPropertyName(propertyName); if (_message.ContentHeaderProperties.Headers == null) { return 0; } else { object d = _message.ContentHeaderProperties.Headers[DOUBLE_PROPERTY_PREFIX + propertyName]; if (d == null) { return 0; } else { return (double)d; } } } public void SetDouble(string propertyName, double v) { CheckPropertyName(propertyName); _message.ContentHeaderProperties.Headers[DOUBLE_PROPERTY_PREFIX + propertyName] = v; } public string GetString(string propertyName) { CheckPropertyName(propertyName); if (_message.ContentHeaderProperties.Headers == null) { return null; } else { return (string)_message.ContentHeaderProperties.Headers[STRING_PROPERTY_PREFIX + propertyName]; } } public void SetString(string propertyName, string value) { CheckPropertyName(propertyName); CreatePropertyMapIfRequired(); propertyName = STRING_PROPERTY_PREFIX + propertyName; _message.ContentHeaderProperties.Headers[propertyName] = value; } private void CheckPropertyName(string propertyName) { if (propertyName == null) { throw new ArgumentException("Property name must not be null"); } else if ("".Equals(propertyName)) { throw new ArgumentException("Property name must not be the empty string"); } if (_message.ContentHeaderProperties.Headers == null) { _message.ContentHeaderProperties.Headers = new FieldTable(); } } private void CreatePropertyMapIfRequired() { if (_message.ContentHeaderProperties.Headers == null) { _message.ContentHeaderProperties.Headers = new FieldTable(); } } public override string ToString() { StringBuilder buf = new StringBuilder("{"); int i = 0; foreach (DictionaryEntry entry in _message.ContentHeaderProperties.Headers) { ++i; if (i > 1) { buf.Append(", "); } string propertyName = (string)entry.Key; if (propertyName == null) { buf.Append("\nInternal error: Property with NULL key defined"); } else { buf.Append(propertyName.Substring(1)); buf.Append(" : "); char typeIdentifier = propertyName[0]; buf.Append(typeIdentifierToName(typeIdentifier)); buf.Append(" = ").Append(entry.Value); } } buf.Append("}"); return buf.ToString(); } private static string typeIdentifierToName(char typeIdentifier) { switch (typeIdentifier) { case BOOLEAN_PROPERTY_PREFIX: return "boolean"; case BYTE_PROPERTY_PREFIX: return "byte"; case SHORT_PROPERTY_PREFIX: return "short"; case INT_PROPERTY_PREFIX: return "int"; case LONG_PROPERTY_PREFIX: return "long"; case FLOAT_PROPERTY_PREFIX: return "float"; case DOUBLE_PROPERTY_PREFIX: return "double"; case STRING_PROPERTY_PREFIX: return "string"; default: return "unknown ( '" + typeIdentifier + "')"; } } } }