/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Threading;
using log4net;
using Qpid.Client.Message;
using Qpid.Messaging;
namespace Qpid.Client
{
public class BasicMessageProducer : Closeable, IMessagePublisher
{
protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
///
/// If true, messages will not get a timestamp.
///
private bool _disableTimestamps;
///
/// Priority of messages created by this producer.
///
private int _messagePriority;
///
/// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
///
private long _timeToLive;
///
/// Delivery mode used for this producer.
///
private DeliveryMode _deliveryMode;
private bool _immediate;
private bool _mandatory;
string _exchangeName;
string _routingKey;
///
/// Default encoding used for messages produced by this producer.
///
private string _encoding;
///
/// Default encoding used for message produced by this producer.
///
private string _mimeType;
///
/// True if this producer was created from a transacted session
///
private bool _transacted;
private ushort _channelId;
///
/// This is an id generated by the session and is used to tie individual producers to the session. This means we
/// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
/// to the session so that when an error is propagated to the session it can close the producer (meaning that
/// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
///
private long _producerId;
///
/// The session used to create this producer
///
private AmqChannel _channel;
///
/// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue
///
protected const bool DEFAULT_IMMEDIATE = false;
///
/// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is
/// connected to the exchange for the message
///
protected const bool DEFAULT_MANDATORY = true;
public BasicMessageProducer(string exchangeName, string routingKey,
bool transacted,
ushort channelId,
AmqChannel channel,
long producerId,
DeliveryMode deliveryMode,
long timeToLive,
bool immediate,
bool mandatory,
int priority)
{
_exchangeName = exchangeName;
_routingKey = routingKey;
_transacted = transacted;
_channelId = channelId;
_channel = channel;
_producerId = producerId;
_deliveryMode = deliveryMode;
_timeToLive = timeToLive;
_immediate = immediate;
_mandatory = mandatory;
_messagePriority = priority;
_channel.RegisterProducer(producerId, this);
}
#region IMessagePublisher Members
public DeliveryMode DeliveryMode
{
get
{
CheckNotClosed();
return _deliveryMode;
}
set
{
CheckNotClosed();
_deliveryMode = value;
}
}
public string ExchangeName
{
get { return _exchangeName; }
}
public string RoutingKey
{
get { return _routingKey; }
}
public bool DisableMessageID
{
get
{
throw new Exception("The method or operation is not implemented.");
}
set
{
throw new Exception("The method or operation is not implemented.");
}
}
public bool DisableMessageTimestamp
{
get
{
CheckNotClosed();
return _disableTimestamps;
}
set
{
CheckNotClosed();
_disableTimestamps = value;
}
}
public int Priority
{
get
{
CheckNotClosed();
return _messagePriority;
}
set
{
CheckNotClosed();
if (value < 0 || value > 9)
{
throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
}
_messagePriority = value;
}
}
public override void Close()
{
_logger.Info("Closing producer " + this);
Interlocked.Exchange(ref _closed, CLOSED);
_channel.DeregisterProducer(_producerId);
}
public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
{
CheckNotClosed();
SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY,
DEFAULT_IMMEDIATE);
}
public void Send(IMessage msg)
{
CheckNotClosed();
SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
}
// This is a short-term hack (knowing that this code will be re-vamped sometime soon)
// to facilitate publishing messages to potentially non-existent recipients.
public void Send(IMessage msg, bool mandatory)
{
CheckNotClosed();
SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
mandatory, DEFAULT_IMMEDIATE);
}
public long TimeToLive
{
get
{
CheckNotClosed();
return _timeToLive;
}
set
{
CheckNotClosed();
if (value < 0)
{
throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
}
_timeToLive = value;
}
}
#endregion
private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
{
_channel.BasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, priority, timeToLive, _disableTimestamps);
}
public string MimeType
{
set
{
CheckNotClosed();
_mimeType = value;
}
}
public string Encoding
{
set
{
CheckNotClosed();
_encoding = value;
}
}
public void Dispose()
{
Close();
}
}
}