/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections; using System.Text; using log4net; using Apache.Qpid.Framing; using Apache.Qpid.Messaging; using Apache.Qpid.Buffer; namespace Apache.Qpid.Client.Message { public abstract class AbstractQmsMessage : AMQMessage, IMessage { private static ILog log = LogManager.GetLogger(typeof(AbstractQmsMessage)); protected bool _redelivered; protected ByteBuffer _data; protected bool _readableMessage = false; private QpidHeaders _headers; protected AbstractQmsMessage(ByteBuffer data) : base(new BasicContentHeaderProperties()) { Init(data); } protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) : this(contentHeader, deliveryTag) { Init(data); } protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag) { Init(null); } private void Init(ByteBuffer data) { _data = data; if ( _data != null ) { _data.Acquire(); } _readableMessage = (data != null); if ( ContentHeaderProperties.Headers == null ) ContentHeaderProperties.Headers = new FieldTable(); _headers = new QpidHeaders(ContentHeaderProperties.Headers); } // // Properties // /// /// The application message identifier /// public string MessageId { get { if (ContentHeaderProperties.MessageId == null) { ContentHeaderProperties.MessageId = "ID:" + DeliveryTag; } return ContentHeaderProperties.MessageId; } set { ContentHeaderProperties.MessageId = value; } } /// /// The message timestamp /// public long Timestamp { get { // TODO: look at ulong/long choice return (long) ContentHeaderProperties.Timestamp; } set { ContentHeaderProperties.Timestamp = (ulong) value; } } /// /// The as a byte array. /// public byte[] CorrelationIdAsBytes { get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); } set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); } } /// /// The application correlation identifier /// public string CorrelationId { get { return ContentHeaderProperties.CorrelationId; } set { ContentHeaderProperties.CorrelationId = value; } } struct Dest { public string ExchangeName; public string RoutingKey; public Dest(string exchangeName, string routingKey) { ExchangeName = exchangeName; RoutingKey = routingKey; } } /// /// Exchange name of the reply-to address /// public string ReplyToExchangeName { get { return ReadReplyToHeader().ExchangeName; } set { BindingURL dest = ReadReplyToHeader(); dest.ExchangeName = value; WriteReplyToHeader(dest); } } /// /// Routing key of the reply-to address /// public string ReplyToRoutingKey { get { return ReadReplyToHeader().RoutingKey; } set { BindingURL dest = ReadReplyToHeader(); dest.RoutingKey = value; WriteReplyToHeader(dest); } } /// /// Non-persistent (1) or persistent (2) /// public DeliveryMode DeliveryMode { get { byte b = ContentHeaderProperties.DeliveryMode; switch (b) { case 1: return DeliveryMode.NonPersistent; case 2: return DeliveryMode.Persistent; default: throw new QpidException("Illegal value for delivery mode in content header properties"); } } set { ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2); } } /// /// True, if this is a redelivered message /// public bool Redelivered { get { return _redelivered; } set { _redelivered = value; } } /// /// The message type name /// public string Type { get { return ContentHeaderProperties.Type; } set { ContentHeaderProperties.Type = value; } } /// /// Message expiration specification /// public long Expiration { get { return ContentHeaderProperties.Expiration; } set { ContentHeaderProperties.Expiration = value; } } /// /// The message priority, 0 to 9 /// public byte Priority { get { return ContentHeaderProperties.Priority; } set { ContentHeaderProperties.Priority = (byte) value; } } /// /// The MIME Content Type /// public string ContentType { get { return ContentHeaderProperties.ContentType; } set { ContentHeaderProperties.ContentType = value; } } /// /// The MIME Content Encoding /// public string ContentEncoding { get { return ContentHeaderProperties.Encoding; } set { ContentHeaderProperties.Encoding = value; } } /// /// Headers of this message /// public IHeaders Headers { get { return _headers; } } /// /// The creating user id /// public string UserId { get { return ContentHeaderProperties.UserId; } set { ContentHeaderProperties.UserId = value; } } /// /// The creating application id /// public string AppId { get { return ContentHeaderProperties.AppId; } set { ContentHeaderProperties.AppId = value; } } /// /// Intra-cluster routing identifier /// public string ClusterId { get { return ContentHeaderProperties.ClusterId; } set { ContentHeaderProperties.ClusterId = value; } } /// /// Return the raw byte array that is used to populate the frame when sending /// the message. /// /// a byte array of message data public ByteBuffer Data { get { if (_data != null) { if (!_readableMessage) { _data.Flip(); } else { // Make sure we rewind the data just in case any method has moved the // position beyond the start. _data.Rewind(); } } return _data; } set { _data = value; } } public void Acknowledge() { // we set multiple to true here since acknowledgement implies acknowledge of all messages // received on the session. That's a bit JMSy though. Acknowledge(true); } public void Acknowledge(bool ackprevious) { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge // is not specified. In our case, we only set the session field where client acknowledge mode is specified. if (_channel != null) { _channel.AcknowledgeMessage((ulong)DeliveryTag, ackprevious); } } public abstract void ClearBodyImpl(); public void ClearBody() { ClearBodyImpl(); _readableMessage = false; } /// /// Get a String representation of the body of the message. Used in the /// toString() method which outputs this before message properties. /// /// public abstract string ToBodyString(); public override string ToString() { try { StringBuilder buf = new StringBuilder("Body:\n"); buf.Append(ToBodyString()); buf.Append("\nQmsTimestamp: ").Append(Timestamp); buf.Append("\nQmsExpiration: ").Append(Expiration); buf.Append("\nQmsPriority: ").Append(Priority); buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode); buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName); buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey); buf.Append("\nAMQ message number: ").Append(DeliveryTag); buf.Append("\nProperties:"); if (ContentHeaderProperties.Headers == null) { buf.Append(""); } else { buf.Append(Headers.ToString()); } return buf.ToString(); } catch (Exception e) { return e.ToString(); } } public FieldTable PopulateHeadersFromMessageProperties() { if (ContentHeaderProperties.Headers == null) { return null; } else { // // We need to convert every property into a String representation // Note that type information is preserved in the property name // FieldTable table = new FieldTable(); foreach (DictionaryEntry entry in ContentHeaderProperties.Headers) { string propertyName = (string) entry.Key; if (propertyName == null) { continue; } else { table[propertyName] = entry.Value.ToString(); } } return table; } } public BasicContentHeaderProperties ContentHeaderProperties { get { return (BasicContentHeaderProperties) _contentHeaderProperties; } } protected virtual void Reset() { _readableMessage = true; } public bool IsReadable { get { return _readableMessage; } } public bool isWritable { get { return !_readableMessage; } } protected void CheckReadable() { if ( !_readableMessage ) { throw new MessageNotReadableException("You need to call reset() to make the message readable"); } } /// /// Decodes the replyto field if one is set. /// /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is /// empty the replyto field is expected to being with ':'. /// /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception. /// /// /// A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise. private BindingURL ReadReplyToHeader() { string replyToEncoding = ContentHeaderProperties.ReplyTo; //log.Debug("replyToEncoding = " + replyToEncoding); BindingURL bindingUrl = new BindingURL(replyToEncoding); //log.Debug("bindingUrl = " + bindingUrl.ToString()); return bindingUrl; //log.Info("replyToEncoding = " + replyToEncoding); // if ( replyToEncoding == null ) // { // return new Dest(); // } else // { // // Split the replyto field on a ':' // string[] split = replyToEncoding.Split(':'); // // Ensure that the replyto field argument only consisted of two parts. // if ( split.Length != 2 ) // { // throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding); // } // // Extract the exchange name and routing key from the split replyto field. // string exchangeName = split[0]; // string[] split2 = split[1].Split('/'); // string routingKey = split2[3]; // return new Dest(exchangeName, routingKey); // } } private void WriteReplyToHeader(BindingURL dest) { string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey); ContentHeaderProperties.ReplyTo = encodedDestination; } } public class BindingURL { public readonly static string OPTION_EXCLUSIVE = "exclusive"; public readonly static string OPTION_AUTODELETE = "autodelete"; public readonly static string OPTION_DURABLE = "durable"; public readonly static string OPTION_CLIENTID = "clientid"; public readonly static string OPTION_SUBSCRIPTION = "subscription"; public readonly static string OPTION_ROUTING_KEY = "routingkey"; /// Holds the undecoded URL string url; /// Holds the decoded options. IDictionary options = new Hashtable(); /// Holds the decoded exchange class. string exchangeClass; /// Holds the decoded exchange name. string exchangeName; /// Holds the destination name. string destination; /// Holds the decoded queue name. string queueName; /// /// The binding URL has the format: /// :///[]/[]? public BindingURL(string url) { this.url = url; Parse(); } public string Url { get { return url; } } public string ExchangeClass { get { return exchangeClass; } set { exchangeClass = value; } } public string ExchangeName { get { return exchangeName; } set { exchangeName = value; } } public string QueueName { get { return queueName; } set { queueName = value; } } public string DestinationName { get { return destination; } set { destination = value; } } public string RoutingKey { get { return (string)options[OPTION_ROUTING_KEY]; } set { options[OPTION_ROUTING_KEY] = value; } } public bool ContainsOption(string key) { return options.Contains(key); } public string ToString() { return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName + ", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] "; } private void Parse() { Uri binding = new Uri(url); // Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified. string exchangeClass = binding.Scheme; if (exchangeClass == null) { url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url; Parse(); return; } else { this.exchangeClass = exchangeClass; } // Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified. string exchangeName = binding.Host; if (exchangeName == null) { if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS)) { this.exchangeName = ""; } } else { this.exchangeName = exchangeName; } // Extract the destination and queue name. if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals("")) { throw new UriFormatException("Destination or Queue required"); } else { int slashOffset = binding.AbsolutePath.IndexOf("/", 1); if (slashOffset == -1) { throw new UriFormatException("Destination required"); } else { String path = binding.AbsolutePath; this.destination = path.Substring(1, slashOffset - 1); this.queueName = path.Substring(slashOffset + 1); } } ParseOptions(options, binding.Query); // If the routing key is not set as an option, set it to the destination name. if (!ContainsOption(OPTION_ROUTING_KEY)) { options[OPTION_ROUTING_KEY] = destination; } } /// /// options looks like this /// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value' /// public static void ParseOptions(IDictionary optionMap, string options) { // Check that there really are some options to parse. if ((options == null) || (options.IndexOf('=') == -1)) { return; } int optionIndex = options.IndexOf('='); string option = options.Substring(0, optionIndex); int length = options.Length; int nestedQuotes = 0; // Holds the index of the final "'". int valueIndex = optionIndex; // Loop over all the options.Dest while ((nestedQuotes > 0) || (valueIndex < length)) { valueIndex++; if (valueIndex >= length) { break; } if (options[valueIndex] == '\'') { if ((valueIndex + 1) < options.Length) { if ((options[valueIndex + 1] == '&') || (options[valueIndex + 1] == ',') || (options[valueIndex + 1] == ';') || (options[valueIndex + 1] == '\'')) { nestedQuotes--; if (nestedQuotes == 0) { // We've found the value of an option break; } } else { nestedQuotes++; } } else { // We are at the end of the string // Check to see if we are corectly closing quotes if (options[valueIndex] == '\'') { nestedQuotes--; } break; } } } } } }