/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Collections;
using System.Text;
using log4net;
using Qpid.Framing;
using Qpid.Messaging;
using Qpid.Buffer;
namespace Qpid.Client.Message
{
public abstract class AbstractQmsMessage : AMQMessage, IMessage
{
private static readonly ILog _log = LogManager.GetLogger(typeof(AbstractQmsMessage));
protected bool _redelivered;
protected ByteBuffer _data;
protected bool _readableMessage = false;
#region new_java_ctrs
protected AbstractQmsMessage(ByteBuffer data)
: base(new BasicContentHeaderProperties())
{
_data = data;
if (_data != null)
{
_data.acquire();
}
_readableMessage = (data != null);
}
protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
: this(contentHeader, deliveryTag)
{
_data = data;
if (_data != null)
{
_data.acquire();
}
_readableMessage = data != null;
}
protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
{
}
#endregion
public string MessageId
{
get
{
if (ContentHeaderProperties.MessageId == null)
{
ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
}
return ContentHeaderProperties.MessageId;
}
set
{
ContentHeaderProperties.MessageId = value;
}
}
public long Timestamp
{
get
{
// TODO: look at ulong/long choice
return (long) ContentHeaderProperties.Timestamp;
}
set
{
ContentHeaderProperties.Timestamp = (ulong) value;
}
}
protected void CheckReadable()
{
if (!_readableMessage)
{
throw new MessageNotReadableException("You need to call reset() to make the message readable");
}
}
public byte[] CorrelationIdAsBytes
{
get
{
return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId);
}
set
{
ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value);
}
}
public string CorrelationId
{
get
{
return ContentHeaderProperties.CorrelationId;
}
set
{
ContentHeaderProperties.ContentType = value;
}
}
struct Dest
{
public string ExchangeName;
public string RoutingKey;
public Dest(string exchangeName, string routingKey)
{
ExchangeName = exchangeName;
RoutingKey = routingKey;
}
}
public string ReplyToExchangeName
{
get
{
Dest dest = ReadReplyToHeader();
return dest.ExchangeName;
}
set
{
Dest dest = ReadReplyToHeader();
dest.ExchangeName = value;
WriteReplyToHeader(dest);
}
}
public string ReplyToRoutingKey
{
get
{
Dest dest = ReadReplyToHeader();
return dest.RoutingKey;
}
set
{
Dest dest = ReadReplyToHeader();
dest.RoutingKey = value;
WriteReplyToHeader(dest);
}
}
private Dest ReadReplyToHeader()
{
string replyToEncoding = ContentHeaderProperties.ReplyTo;
if (replyToEncoding == null)
{
return new Dest();
}
else
{
string routingKey;
string exchangeName = GetExchangeName(replyToEncoding, out routingKey);
return new Dest(exchangeName, routingKey);
}
}
private void WriteReplyToHeader(Dest dest)
{
string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
ContentHeaderProperties.ReplyTo = encodedDestination;
}
private static string GetExchangeName(string replyToEncoding, out string routingKey)
{
string[] split = replyToEncoding.Split(new char[':']);
if (_log.IsDebugEnabled)
{
_log.Debug(string.Format("replyToEncoding = '{0}'", replyToEncoding));
_log.Debug(string.Format("split = {0}", split));
_log.Debug(string.Format("split.Length = {0}", split.Length));
}
if (split.Length == 1)
{
// Using an alternative split implementation here since it appears that string.Split
// is broken in .NET. It doesn't split when the first character is the delimiter.
// Here we check for the first character being the delimiter. This handles the case
// where ExchangeName is empty (i.e. sends will be to the default exchange).
if (replyToEncoding[0] == ':')
{
split = new string[2];
split[0] = null;
split[1] = replyToEncoding.Substring(1);
if (_log.IsDebugEnabled)
{
_log.Debug("Alternative split method...");
_log.Debug(string.Format("split = {0}", split));
_log.Debug(string.Format("split.Length = {0}", split.Length));
}
}
}
if (split.Length != 2)
{
throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
}
string exchangeName = split[0];
routingKey = split[1];
return exchangeName;
}
public DeliveryMode DeliveryMode
{
get
{
byte b = ContentHeaderProperties.DeliveryMode;
switch (b)
{
case 1:
return DeliveryMode.NonPersistent;
case 2:
return DeliveryMode.Persistent;
default:
throw new QpidException("Illegal value for delivery mode in content header properties");
}
}
set
{
ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2);
}
}
public bool Redelivered
{
get
{
return _redelivered;
}
set
{
_redelivered = value;
}
}
public string Type
{
get
{
return MimeType;
}
set
{
//MimeType = value;
}
}
public long Expiration
{
get
{
return ContentHeaderProperties.Expiration;
}
set
{
ContentHeaderProperties.Expiration = (uint) value;
}
}
public int Priority
{
get
{
return ContentHeaderProperties.Priority;
}
set
{
ContentHeaderProperties.Priority = (byte) value;
}
}
// FIXME: implement
public string ContentType
{
get { throw new NotImplementedException(); }
set { throw new NotImplementedException(); }
}
// FIXME: implement
public string ContentEncoding
{
get { throw new NotImplementedException(); }
set { throw new NotImplementedException(); }
}
public void Acknowledge()
{
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
if (_channel != null)
{
// we set multiple to true here since acknowledgement implies acknowledge of all previous messages
// received on the session
_channel.AcknowledgeMessage((ulong)DeliveryTag, true);
}
}
public IHeaders Headers
{
get { return new QpidHeaders(this); }
}
public abstract void ClearBodyImpl();
public void ClearBody()
{
ClearBodyImpl();
_readableMessage = false;
}
///
/// Get a String representation of the body of the message. Used in the
/// toString() method which outputs this before message properties.
///
///
public abstract string ToBodyString();
///
/// Return the raw byte array that is used to populate the frame when sending
/// the message.
///
/// a byte array of message data
public ByteBuffer Data
{
get
{
if (_data != null)
{
if (!_readableMessage)
{
_data.flip();
}
else
{
// Make sure we rewind the data just in case any method has moved the
// position beyond the start.
_data.rewind();
}
}
return _data;
}
set
{
_data = value;
}
}
public abstract string MimeType
{
get;
}
public override string ToString()
{
try
{
StringBuilder buf = new StringBuilder("Body:\n");
buf.Append(ToBodyString());
buf.Append("\nQmsTimestamp: ").Append(Timestamp);
buf.Append("\nQmsExpiration: ").Append(Expiration);
buf.Append("\nQmsPriority: ").Append(Priority);
buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
buf.Append("\nAMQ message number: ").Append(DeliveryTag);
buf.Append("\nProperties:");
if (ContentHeaderProperties.Headers == null)
{
buf.Append("");
}
else
{
buf.Append(Headers.ToString());
}
return buf.ToString();
}
catch (Exception e)
{
return e.ToString();
}
}
public IFieldTable UnderlyingMessagePropertiesMap
{
get
{
return ContentHeaderProperties.Headers;
}
set
{
ContentHeaderProperties.Headers = (FieldTable)value;
}
}
public FieldTable PopulateHeadersFromMessageProperties()
{
if (ContentHeaderProperties.Headers == null)
{
return null;
}
else
{
//
// We need to convert every property into a String representation
// Note that type information is preserved in the property name
//
FieldTable table = new FieldTable();
foreach (DictionaryEntry entry in ContentHeaderProperties.Headers)
{
string propertyName = (string) entry.Key;
if (propertyName == null)
{
continue;
}
else
{
table[propertyName] = entry.Value.ToString();
}
}
return table;
}
}
public BasicContentHeaderProperties ContentHeaderProperties
{
get
{
return (BasicContentHeaderProperties) _contentHeaderProperties;
}
}
protected virtual void Reset()
{
_readableMessage = true;
}
public bool IsReadable
{
get { return _readableMessage; }
}
public bool isWritable
{
get { return !_readableMessage; }
}
}
}