/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Threading; using log4net; using Qpid.Client.Message; using Qpid.Messaging; namespace Qpid.Client { public class BasicMessageProducer : Closeable, IMessagePublisher { protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer)); /// /// If true, messages will not get a timestamp. /// private bool _disableTimestamps; /// /// Priority of messages created by this producer. /// private int _messagePriority; /// /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. /// private long _timeToLive; /// /// Delivery mode used for this producer. /// private DeliveryMode _deliveryMode; private bool _immediate; private bool _mandatory; string _exchangeName; string _routingKey; /// /// Default encoding used for messages produced by this producer. /// private string _encoding; /// /// Default encoding used for message produced by this producer. /// private string _mimeType; /// /// True if this producer was created from a transacted session /// private bool _transacted; private ushort _channelId; /// /// This is an id generated by the session and is used to tie individual producers to the session. This means we /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers /// to the session so that when an error is propagated to the session it can close the producer (meaning that /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently). /// private long _producerId; /// /// The session used to create this producer /// private AmqChannel _channel; /// /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue /// protected const bool DEFAULT_IMMEDIATE = false; /// /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is /// connected to the exchange for the message /// protected const bool DEFAULT_MANDATORY = true; public BasicMessageProducer(string exchangeName, string routingKey, bool transacted, ushort channelId, AmqChannel channel, long producerId, DeliveryMode deliveryMode, long timeToLive, bool immediate, bool mandatory, int priority) { _exchangeName = exchangeName; _routingKey = routingKey; _transacted = transacted; _channelId = channelId; _channel = channel; _producerId = producerId; _deliveryMode = deliveryMode; _timeToLive = timeToLive; _immediate = immediate; _mandatory = mandatory; _messagePriority = priority; _channel.RegisterProducer(producerId, this); } #region IMessagePublisher Members public DeliveryMode DeliveryMode { get { CheckNotClosed(); return _deliveryMode; } set { CheckNotClosed(); _deliveryMode = value; } } public string ExchangeName { get { return _exchangeName; } } public string RoutingKey { get { return _routingKey; } } public bool DisableMessageID { get { throw new Exception("The method or operation is not implemented."); } set { throw new Exception("The method or operation is not implemented."); } } public bool DisableMessageTimestamp { get { CheckNotClosed(); return _disableTimestamps; } set { CheckNotClosed(); _disableTimestamps = value; } } public int Priority { get { CheckNotClosed(); return _messagePriority; } set { CheckNotClosed(); if (value < 0 || value > 9) { throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9"); } _messagePriority = value; } } public override void Close() { _logger.Info("Closing producer " + this); Interlocked.Exchange(ref _closed, CLOSED); _channel.DeregisterProducer(_producerId); } public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive) { CheckNotClosed(); SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); } public void Send(IMessage msg) { CheckNotClosed(); SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); } // This is a short-term hack (knowing that this code will be re-vamped sometime soon) // to facilitate publishing messages to potentially non-existent recipients. public void Send(IMessage msg, bool mandatory) { CheckNotClosed(); SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, mandatory, DEFAULT_IMMEDIATE); } public long TimeToLive { get { CheckNotClosed(); return _timeToLive; } set { CheckNotClosed(); if (value < 0) { throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value); } _timeToLive = value; } } #endregion private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate) { _channel.BasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, priority, timeToLive, _disableTimestamps); } public string MimeType { set { CheckNotClosed(); _mimeType = value; } } public string Encoding { set { CheckNotClosed(); _encoding = value; } } public void Dispose() { Close(); } } }