/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Threading;
using log4net;
using Apache.Qpid.Buffer;
using Apache.Qpid.Client.Message;
using Apache.Qpid.Messaging;
using Apache.Qpid.Framing;
namespace Apache.Qpid.Client
{
public class BasicMessageProducer : Closeable, IMessagePublisher
{
protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
///
/// If true, messages will not get a timestamp.
///
private bool _disableTimestamps;
///
/// Priority of messages created by this producer.
///
private int _messagePriority;
///
/// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
///
private long _timeToLive;
///
/// Delivery mode used for this producer.
///
private DeliveryMode _deliveryMode;
private bool _immediate;
private bool _mandatory;
string _exchangeName;
string _routingKey;
///
/// Default encoding used for messages produced by this producer.
///
private string _encoding;
///
/// Default encoding used for message produced by this producer.
///
private string _mimeType;
///
/// True if this producer was created from a transacted session
///
private bool _transacted;
private ushort _channelId;
///
/// This is an id generated by the session and is used to tie individual producers to the session. This means we
/// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
/// to the session so that when an error is propagated to the session it can close the producer (meaning that
/// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
///
private long _producerId;
///
/// The session used to create this producer
///
private AmqChannel _channel;
public BasicMessageProducer(string exchangeName, string routingKey,
bool transacted,
ushort channelId,
AmqChannel channel,
long producerId,
DeliveryMode deliveryMode,
long timeToLive,
bool immediate,
bool mandatory,
int priority)
{
_exchangeName = exchangeName;
_routingKey = routingKey;
_transacted = transacted;
_channelId = channelId;
_channel = channel;
_producerId = producerId;
_deliveryMode = deliveryMode;
_timeToLive = timeToLive;
_immediate = immediate;
_mandatory = mandatory;
_messagePriority = priority;
_channel.RegisterProducer(producerId, this);
}
#region IMessagePublisher Members
public DeliveryMode DeliveryMode
{
get
{
CheckNotClosed();
return _deliveryMode;
}
set
{
CheckNotClosed();
_deliveryMode = value;
}
}
public string ExchangeName
{
get { return _exchangeName; }
}
public string RoutingKey
{
get { return _routingKey; }
}
public bool DisableMessageID
{
get
{
throw new Exception("The method or operation is not implemented.");
}
set
{
throw new Exception("The method or operation is not implemented.");
}
}
public bool DisableMessageTimestamp
{
get
{
CheckNotClosed();
return _disableTimestamps;
}
set
{
CheckNotClosed();
_disableTimestamps = value;
}
}
public int Priority
{
get
{
CheckNotClosed();
return _messagePriority;
}
set
{
CheckNotClosed();
if ( value < 0 || value > 9 )
{
throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
}
_messagePriority = value;
}
}
public override void Close()
{
_logger.Debug("Closing producer " + this);
Interlocked.Exchange(ref _closed, CLOSED);
_channel.DeregisterProducer(_producerId);
}
public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
{
CheckNotClosed();
SendImpl(
_exchangeName,
_routingKey,
(AbstractQmsMessage)msg,
deliveryMode,
priority,
(uint)timeToLive,
_mandatory,
_immediate
);
}
public void Send(IMessage msg)
{
CheckNotClosed();
SendImpl(
_exchangeName,
_routingKey,
(AbstractQmsMessage)msg,
_deliveryMode,
_messagePriority,
(uint)_timeToLive,
_mandatory,
_immediate
);
}
// This is a short-term hack (knowing that this code will be re-vamped sometime soon)
// to facilitate publishing messages to potentially non-existent recipients.
public void Send(IMessage msg, bool mandatory)
{
CheckNotClosed();
SendImpl(
_exchangeName,
_routingKey,
(AbstractQmsMessage)msg,
_deliveryMode,
_messagePriority,
(uint)_timeToLive,
mandatory,
_immediate
);
}
public long TimeToLive
{
get
{
CheckNotClosed();
return _timeToLive;
}
set
{
CheckNotClosed();
if ( value < 0 )
{
throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
}
_timeToLive = value;
}
}
#endregion
public string MimeType
{
get
{
CheckNotClosed();
return _mimeType;
}
set
{
CheckNotClosed();
_mimeType = value;
}
}
public string Encoding
{
get
{
CheckNotClosed();
return _encoding;
}
set
{
CheckNotClosed();
_encoding = value;
}
}
public void Dispose()
{
Close();
}
#region Message Publishing
private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
{
// todo: handle session access ticket
AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(
_channel.ChannelId, 0, exchangeName,
routingKey, mandatory, immediate
);
// fix message properties
if ( !_disableTimestamps )
{
message.Timestamp = DateTime.UtcNow.Ticks;
if (timeToLive != 0)
{
message.Expiration = message.Timestamp + timeToLive;
}
} else
{
message.Expiration = 0;
}
message.DeliveryMode = deliveryMode;
message.Priority = (byte)priority;
ByteBuffer payload = message.Data;
int payloadLength = payload.Limit;
ContentBody[] contentBodies = CreateContentBodies(payload);
AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
for ( int i = 0; i < contentBodies.Length; i++ )
{
frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
}
if ( contentBodies.Length > 0 && _logger.IsDebugEnabled )
{
_logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
}
// weight argument of zero indicates no child content headers, just bodies
AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(
_channelId, AmqChannel.BASIC_CONTENT_TYPE, 0,
message.ContentHeaderProperties, (uint)payloadLength
);
if ( _logger.IsDebugEnabled )
{
_logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
}
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
lock ( _channel.Connection.FailoverMutex )
{
_channel.Connection.ProtocolWriter.Write(compositeFrame);
}
}
///
/// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
/// maximum frame size.
///
///
/// return the array of content bodies
private ContentBody[] CreateContentBodies(ByteBuffer payload)
{
if ( payload == null )
{
return null;
} else if ( payload.Remaining == 0 )
{
return new ContentBody[0];
}
// we substract one from the total frame maximum size to account for the end of frame marker in a body frame
// (0xCE byte).
int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1);
int frameCount = CalculateContentBodyFrames(payload);
ContentBody[] bodies = new ContentBody[frameCount];
for ( int i = 0; i < frameCount; i++ )
{
int length = (payload.Remaining >= framePayloadMax)
? framePayloadMax : payload.Remaining;
bodies[i] = new ContentBody(payload, (uint)length);
}
return bodies;
}
private int CalculateContentBodyFrames(ByteBuffer payload)
{
// we substract one from the total frame maximum size to account
// for the end of frame marker in a body frame
// (0xCE byte).
int frameCount;
if ( (payload == null) || (payload.Remaining == 0) )
{
frameCount = 0;
} else
{
int dataLength = payload.Remaining;
int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1;
int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
frameCount = (int)(dataLength / framePayloadMax) + lastFrame;
}
return frameCount;
}
#endregion // Message Publishing
}
}