/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.IO;
using System.Text;
using Qpid.Framing;
using Qpid.Messaging;
using Qpid.Buffer;
namespace Qpid.Client.Message
{
public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage
{
private const string MIME_TYPE = "application/octet-stream";
///
/// The backingstore for the data
///
private MemoryStream _dataStream;
private int _bodyLength;
private BinaryReader _reader;
private BinaryWriter _writer;
public QpidBytesMessage() : this(null)
{
}
///
/// Construct a bytes message with existing data.
///
/// if data is not null, the message is immediately in read only mode. if data is null, it is in
/// write-only mode
QpidBytesMessage(ByteBuffer data) : base(data)
{
// superclass constructor has instantiated a content header at this point
ContentHeaderProperties.ContentType = MIME_TYPE;
if (data == null)
{
_dataStream = new MemoryStream();
_writer = new BinaryWriter(_dataStream);
}
else
{
_dataStream = new MemoryStream(data.ToByteArray());
_bodyLength = data.ToByteArray().Length;
_reader = new BinaryReader(_dataStream);
}
}
internal QpidBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
: base(messageNbr, (BasicContentHeaderProperties)contentHeader.Properties, data)
{
ContentHeaderProperties.ContentType = MIME_TYPE;
_dataStream = new MemoryStream(data.ToByteArray());
_bodyLength = data.ToByteArray().Length;
_reader = new BinaryReader(_dataStream);
}
public override void ClearBody()
{
if (_reader != null)
{
_reader.Close();
_reader = null;
}
_dataStream = new MemoryStream();
_bodyLength = 0;
_writer = new BinaryWriter(_dataStream);
}
public override string ToBodyString()
{
CheckReadable();
try
{
return GetText();
}
catch (IOException e)
{
throw new QpidException(e.ToString());
}
}
private string GetText()
{
if (_dataStream != null)
{
// we cannot just read the underlying buffer since it may be larger than the amount of
// "filled" data. Length is not the same as Capacity.
byte[] data = new byte[_dataStream.Length];
_dataStream.Read(data, 0, (int)_dataStream.Length);
return Encoding.UTF8.GetString(data);
}
else
{
return null;
}
}
//public override byte[] Data
//{
// get
// {
// if (_dataStream == null)
// {
// return null;
// }
// else
// {
// byte[] data = new byte[_dataStream.Length];
// _dataStream.Position = 0;
// _dataStream.Read(data, 0, (int) _dataStream.Length);
// return data;
// }
// }
// set
// {
// throw new NotSupportedException("Cannot set data payload except during construction");
// }
//}
public override string MimeType
{
get
{
return MIME_TYPE;
}
}
public long BodyLength
{
get
{
CheckReadable();
return _bodyLength;
}
}
///
///
///
/// if the message is in write mode
private void CheckReadable()
{
if (_reader == null)
{
throw new MessageNotReadableException("You need to call reset() to make the message readable");
}
}
private void CheckWritable()
{
if (_reader != null)
{
throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
}
}
public bool ReadBoolean()
{
CheckReadable();
try
{
return _reader.ReadBoolean();
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public byte ReadByte()
{
CheckReadable();
try
{
return _reader.ReadByte();
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public short ReadSignedByte()
{
CheckReadable();
try
{
return _reader.ReadSByte();
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public short ReadShort()
{
CheckReadable();
try
{
return _reader.ReadInt16();
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public char ReadChar()
{
CheckReadable();
try
{
return _reader.ReadChar();
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public int ReadInt()
{
CheckReadable();
try
{
return _reader.ReadInt32();
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public long ReadLong()
{
CheckReadable();
try
{
return _reader.ReadInt64();
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public float ReadFloat()
{
CheckReadable();
try
{
return _reader.ReadSingle();
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public double ReadDouble()
{
CheckReadable();
try
{
return _reader.ReadDouble();
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public string ReadUTF()
{
CheckReadable();
try
{
byte[] data = _reader.ReadBytes((int)_dataStream.Length);
return Encoding.UTF8.GetString(data);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public int ReadBytes(byte[] bytes)
{
if (bytes == null)
{
throw new ArgumentNullException("bytes");
}
CheckReadable();
try
{
return _reader.Read(bytes, 0, bytes.Length);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public int ReadBytes(byte[] bytes, int count)
{
CheckReadable();
if (bytes == null)
{
throw new ArgumentNullException("bytes");
}
if (count < 0)
{
throw new ArgumentOutOfRangeException("count must be >= 0");
}
if (count > bytes.Length)
{
count = bytes.Length;
}
try
{
return _reader.Read(bytes, 0, count);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void WriteBoolean(bool b)
{
CheckWritable();
try
{
_writer.Write(b);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void WriteByte(byte b)
{
CheckWritable();
try
{
_writer.Write(b);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void WriteShort(short i)
{
CheckWritable();
try
{
_writer.Write(i);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void WriteChar(char c)
{
CheckWritable();
try
{
_writer.Write(c);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void WriteSignedByte(short value)
{
CheckWritable();
try
{
_writer.Write(value);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void WriteDouble(double value)
{
CheckWritable();
try
{
_writer.Write(value);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void WriteFloat(float value)
{
CheckWritable();
try
{
_writer.Write(value);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void WriteInt(int value)
{
CheckWritable();
try
{
_writer.Write(value);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void WriteLong(long value)
{
CheckWritable();
try
{
_writer.Write(value);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void Write(int i)
{
CheckWritable();
try
{
_writer.Write(i);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void Write(long l)
{
CheckWritable();
try
{
_writer.Write(l);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void Write(float v)
{
CheckWritable();
try
{
_writer.Write(v);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void Write(double v)
{
CheckWritable();
try
{
_writer.Write(v);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void WriteUTF(string value)
{
CheckWritable();
try
{
byte[] encodedData = Encoding.UTF8.GetBytes(value);
_writer.Write(encodedData);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void WriteBytes(byte[] bytes)
{
CheckWritable();
try
{
_writer.Write(bytes);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void WriteBytes(byte[] bytes, int offset, int length)
{
CheckWritable();
try
{
_writer.Write(bytes, offset, length);
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
public void Reset()
{
CheckWritable();
try
{
_writer.Close();
_writer = null;
_reader = new BinaryReader(_dataStream);
_bodyLength = (int) _dataStream.Length;
}
catch (IOException e)
{
throw new QpidException(e.ToString(), e);
}
}
}
}