/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Threading; using System.Collections; using System.Collections.Generic; using log4net; using Apache.Qpid.Client.Message; using Apache.Qpid.Collections; using Apache.Qpid.Framing; using Apache.Qpid.Messaging; namespace Apache.Qpid.Client { public class BasicMessageConsumer : Closeable, IMessageConsumer { private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageConsumer)); private bool _noLocal; /** Holds the exclusive status flag for the consumers access to its queue. */ private bool _exclusive; public bool Exclusive { get { return _exclusive; } } private bool _browse; public bool Browse { get { return _browse; } } public bool NoLocal { get { return _noLocal; } set { _noLocal = value; } } private AcknowledgeMode _acknowledgeMode; public AcknowledgeMode AcknowledgeMode { get { return _acknowledgeMode; } } private MessageReceivedDelegate _messageListener; private bool IsMessageListenerSet { get { return _messageListener != null; } } /// /// The consumer tag allows us to close the consumer by sending a jmsCancel method to the /// broker /// private string _consumerTag; /// /// We need to know the channel id when constructing frames /// private ushort _channelId; private readonly string _queueName; /// /// Protects the setting of a messageListener /// private readonly object _syncLock = new object(); /// /// We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover /// private int _prefetchHigh; /// /// We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover /// private int _prefetchLow; /// /// When true indicates that either a message listener is set or that /// a blocking receive call is in progress /// private bool _receiving; /// /// Used in the blocking receive methods to receive a message from /// the Channel thread. /// private readonly ConsumerProducerQueue _messageQueue = new ConsumerProducerQueue(); private MessageFactoryRegistry _messageFactory; private AmqChannel _channel; // // Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. // //private long _lastDeliveryTag; /// /// Explicit list of all received but un-acked messages in a transaction. Used to ensure acking is completed when transaction is committed. /// private LinkedList _receivedDeliveryTags; /// /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode /// private int _outstanding; /// /// Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. /// Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow /// private bool _dups_ok_acknowledge_send; internal BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, MessageFactoryRegistry messageFactory, AmqChannel channel, int prefetchHigh, int prefetchLow, bool exclusive, bool browse) { _channelId = channelId; _queueName = queueName; _noLocal = noLocal; _messageFactory = messageFactory; _channel = channel; _acknowledgeMode = _channel.AcknowledgeMode; _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; _browse = browse; if (_acknowledgeMode == AcknowledgeMode.SessionTransacted) { _receivedDeliveryTags = new LinkedList(); } } #region IMessageConsumer Members public MessageReceivedDelegate OnMessage { get { return _messageListener; } set { CheckNotClosed(); lock (_syncLock) { // If someone is already receiving if (_messageListener != null && _receiving) { throw new InvalidOperationException("Another thread is already receiving..."); } _messageListener = value; _receiving = (_messageListener != null); if (_receiving) { _logger.Debug("Message listener set for queue with name " + _queueName); } } } } public IMessage Receive(long delay) { CheckNotClosed(); lock (_syncLock) { // If someone is already receiving if (_receiving) { throw new InvalidOperationException("Another thread is already receiving (possibly asynchronously)..."); } _receiving = true; } try { object o = _messageQueue.Dequeue(delay); return ReturnMessageOrThrowAndPostDeliver(o); } finally { lock (_syncLock) { _receiving = false; } } } private IMessage ReturnMessageOrThrowAndPostDeliver(object o) { IMessage m = ReturnMessageOrThrow(o); if (m != null) { PostDeliver(m); } return m; } public IMessage Receive() { return Receive(Timeout.Infinite); } public IMessage ReceiveNoWait() { return Receive(0); } #endregion /// /// We can get back either a Message or an exception from the queue. This method examines the argument and deals /// with it by throwing it (if an exception) or returning it (in any other case). /// /// the object off the queue /// a message only if o is a Message /// JMSException if the argument is a throwable. If it is a QpidMessagingException it is rethrown as is, but if not /// a QpidMessagingException is created with the linked exception set appropriately private IMessage ReturnMessageOrThrow(object o) { // errors are passed via the queue too since there is no way of interrupting the poll() via the API. if (o is Exception) { Exception e = (Exception) o; throw new QpidException("Message consumer forcibly closed due to error: " + e, e); } else { return (IMessage) o; } } #region IDisposable Members public void Dispose() { Close(); } #endregion public override void Close() { if (_closed == CLOSED) { return; } // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex lock (_channel.Connection.FailoverMutex) { lock (_closingLock) { Interlocked.Exchange(ref _closed, CLOSED); AMQFrame cancelFrame = BasicCancelBody.CreateAMQFrame(_channelId, _consumerTag, false); try { _channel.Connection.ConvenientProtocolWriter.SyncWrite( cancelFrame, typeof(BasicCancelOkBody)); } catch (AMQException e) { _logger.Error("Error closing consumer: " + e, e); throw new QpidException("Error closing consumer: " + e); } finally { DeregisterConsumer(); } } } } /** * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case * of a message listener or a synchronous receive() caller. * * @param messageFrame the raw unprocessed mesage * @param channelId channel on which this message was sent */ internal void NotifyMessage(UnprocessedMessage messageFrame, int channelId) { if (_logger.IsDebugEnabled) { _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); } try { AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag, messageFrame.DeliverBody.Redelivered, messageFrame.ContentHeader, messageFrame.Bodies); _logger.Debug("Message is of type: " + jmsMessage.GetType().Name); PreDeliver(jmsMessage); if (IsMessageListenerSet) { // We do not need a lock around the test above, and the dispatch below as it is invalid // for an application to alter an installed listener while the session is started. #if __MonoCS__ _messageListener(jmsMessage); #else _messageListener.Invoke(jmsMessage); #endif PostDeliver(jmsMessage); } else { _messageQueue.Enqueue(jmsMessage); } } catch (Exception e) { _logger.Error("Caught exception (dump follows) - ignoring...", e); // FIXME } } internal void NotifyError(Exception cause) { lock (_syncLock) { SetClosed(); // we have no way of propagating the exception to a message listener - a JMS limitation - so we // deal with the case where we have a synchronous receive() waiting for a message to arrive if (_messageListener == null) { // offer only succeeds if there is a thread waiting for an item from the queue _messageQueue.Enqueue(cause); _logger.Debug("Passed exception to synchronous queue for propagation to receive()"); } DeregisterConsumer(); } } private void SetClosed() { Interlocked.Exchange(ref _closed, CLOSED); } /// /// Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean /// case and in the case of an error occurring. /// internal void DeregisterConsumer() { _channel.DeregisterConsumer(_consumerTag); } public string ConsumerTag { get { return _consumerTag; } set { _consumerTag = value; } } /** * Called when you need to invalidate a consumer. Used for example when failover has occurred and the * client has vetoed automatic resubscription. * The caller must hold the failover mutex. */ internal void MarkClosed() { SetClosed(); DeregisterConsumer(); } public string QueueName { get { return _queueName; } } /// /// Acknowledge up to last message delivered (if any). Used when commiting. /// internal void AcknowledgeDelivered() { foreach (long tag in _receivedDeliveryTags) { _channel.AcknowledgeMessage((ulong)tag, false); } _receivedDeliveryTags.Clear(); } internal void RejectUnacked() { foreach (long tag in _receivedDeliveryTags) { _channel.RejectMessage((ulong)tag, true); } _receivedDeliveryTags.Clear(); } private void PreDeliver(AbstractQmsMessage msg) { switch (AcknowledgeMode) { case AcknowledgeMode.PreAcknowledge: _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, false); break; case AcknowledgeMode.ClientAcknowledge: // We set the session so that when the user calls acknowledge() it can call the method on session // to send out the appropriate frame. //msg.setAMQSession(_session); msg.Channel = _channel; break; } } private void PostDeliver(IMessage m) { AbstractQmsMessage msg = (AbstractQmsMessage) m; switch (AcknowledgeMode) { case AcknowledgeMode.DupsOkAcknowledge: if (++_outstanding >= _prefetchHigh) { _dups_ok_acknowledge_send = true; } if (_outstanding <= _prefetchLow) { _dups_ok_acknowledge_send = false; } if (_dups_ok_acknowledge_send) { _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true); } break; case AcknowledgeMode.AutoAcknowledge: _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true); break; case AcknowledgeMode.SessionTransacted: _receivedDeliveryTags.AddLast(msg.DeliveryTag); break; } } } }