/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections; using System.Text.RegularExpressions; using System.Threading; using log4net; using Apache.Qpid.Buffer; using Apache.Qpid.Client.Message; using Apache.Qpid.Client.Util; using Apache.Qpid.Collections; using Apache.Qpid.Framing; using Apache.Qpid.Messaging; using Apache.Qpid.Protocol; namespace Apache.Qpid.Client { /// ///

///
CRC Card
Responsibilities Collaborations ///
Declare queues. ///
Declare exchanges. ///
Bind queues to exchanges. ///
Create messages. ///
Set up message consumers on the channel. ///
Set up message producers on the channel. ///
Commit the current transaction. ///
Roll-back the current transaction. ///
Close the channel. ///
///

public class AmqChannel : Closeable, IChannel { private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel)); internal const int BASIC_CONTENT_TYPE = 60; public const int DEFAULT_PREFETCH_HIGH_MARK = 5000; public const int DEFAULT_PREFETCH_LOW_MARK = 2500; private static int _nextSessionNumber = 0; private AMQConnection _connection; private int _sessionNumber; private bool _suspended; private object _suspensionLock = new object(); // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. private int _nextConsumerNumber = 1; private bool _transacted; private AcknowledgeMode _acknowledgeMode; private ushort _channelId; private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; private FlowControlQueue _queue; private Dispatcher _dispatcher; private MessageFactoryRegistry _messageFactoryRegistry; /// Holds all of the producers created by this channel. private Hashtable _producers = Hashtable.Synchronized(new Hashtable()); /// Holds all of the consumers created by this channel. private Hashtable _consumers = Hashtable.Synchronized(new Hashtable()); private ArrayList _replayFrames = new ArrayList(); /// /// The counter of the _next producer id. This id is generated by the session and used only to allow the /// producer to identify itself to the session when deregistering itself. /// /// Access to this id does not require to be synchronized since according to the JMS specification only one /// thread of control is allowed to create producers for any given session instance. /// private long _nextProducerId; /// /// Initializes a new instance of the class. /// /// The connection. /// The channel id. /// if set to true [transacted]. /// The acknowledge mode. /// Default prefetch high value /// Default prefetch low value internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) : this() { _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); _connection = con; _transacted = transacted; if ( transacted ) { _acknowledgeMode = AcknowledgeMode.SessionTransacted; } else { _acknowledgeMode = acknowledgeMode; } _channelId = channelId; _defaultPrefetchHighMark = defaultPrefetchHigh; _defaultPrefetchLowMark = defaultPrefetchLow; if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) { _queue = new FlowControlQueue(_defaultPrefetchLowMark, _defaultPrefetchHighMark, new ThresholdMethod(OnPrefetchLowMark), new ThresholdMethod(OnPrefetchHighMark)); } else { // low and upper are the same _queue = new FlowControlQueue(_defaultPrefetchHighMark, _defaultPrefetchHighMark, null, null); } } private AmqChannel() { _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); } /// /// Acknowledge mode for messages received. /// public AcknowledgeMode AcknowledgeMode { get { CheckNotClosed(); return _acknowledgeMode; } } /// /// True if the channel should use transactions. /// public bool Transacted { get { CheckNotClosed(); return _transacted; } } /// /// Prefetch value to be used as the default for /// consumers created on this channel. /// public int DefaultPrefetch { get { return DefaultPrefetchHigh; } } /// /// Prefetch low value to be used as the default for /// consumers created on this channel. /// public int DefaultPrefetchLow { get { return _defaultPrefetchLowMark; } } /// /// Prefetch high value to be used as the default for /// consumers created on this channel. /// public int DefaultPrefetchHigh { get { return _defaultPrefetchHighMark; } } /// Indicates whether or not this channel is currently suspended. public bool IsSuspended { get { return _suspended; } } /// Provides the channels number within the the connection. public ushort ChannelId { get { return _channelId; } } /// Provides the connection that this channel runs over. public AMQConnection Connection { get { return _connection; } } /// /// Declare a new exchange. /// /// Name of the exchange /// Class of the exchange, from public void DeclareExchange(String exchangeName, String exchangeClass) { _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass)); DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null); } /// /// Declare a new exchange using the default exchange class. /// /// Name of the exchange public void DeleteExchange(string exchangeName) { throw new NotImplementedException(); } /// /// Declare a new queue with the specified set of arguments. /// /// Name of the queue /// True if the queue should be durable /// True if the queue should be exclusive to this channel /// True if the queue should be deleted when the channel closes public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) { DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete, null); } /// /// Declare a new queue with the specified set of arguments. /// /// Name of the queue /// True if the queue should be durable /// True if the queue should be exclusive to this channel /// True if the queue should be deleted when the channel closes /// Optional arguments to Queue.Declare public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete, IFieldTable args) { DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete, args); } /// /// Delete a queue with the specifies arguments. /// /// Name of the queue to delete /// If true, the queue will not deleted if it has no consumers /// If true, the queue will not deleted if it has no messages /// If true, the server will not respond to the method public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) { DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait); } /// /// Generate a new Unique name to use for a queue. /// /// A unique name to this channel public string GenerateUniqueName() { string result = _connection.ProtocolSession.GenerateQueueName(); return Regex.Replace(result, "[^a-z0-9_]", "_"); } /// /// Removes all messages from a queue. /// /// Name of the queue to delete /// If true, the server will not respond to the method public void PurgeQueue(string queueName, bool noWait) { DoPurgeQueue(queueName, noWait); } /// /// Bind a queue to the specified exchange. /// /// Name of queue to bind /// Name of exchange to bind to /// Routing key public void Bind(string queueName, string exchangeName, string routingKey) { DoBind(queueName, exchangeName, routingKey, new FieldTable()); } /// /// Bind a queue to the specified exchange. /// /// Name of queue to bind /// Name of exchange to bind to /// Routing key /// Table of arguments for the binding. Used to bind with a Headers Exchange public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args) { DoBind(queueName, exchangeName, routingKey, (FieldTable)args); } /// /// Create a new empty message with no body. /// /// The new message public IMessage CreateMessage() { return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); } /// /// Create a new message of the specified MIME type. /// /// The mime type to create /// The new message public IMessage CreateMessage(string mimeType) { return _messageFactoryRegistry.CreateMessage(mimeType); } /// /// Creates a new message for bytes (application/octet-stream). /// /// The new message public IBytesMessage CreateBytesMessage() { return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); } /// /// Creates a new text message (text/plain) with empty content. /// /// The new message public ITextMessage CreateTextMessage() { return CreateTextMessage(String.Empty); } /// /// Creates a new text message (text/plain) with a body. /// /// Initial body of the message /// The new message public ITextMessage CreateTextMessage(string text) { ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); msg.Text = text; return msg; } /// /// Creates a new Consumer using the builder pattern. /// /// Name of queue to receive messages from /// The builder object public MessageConsumerBuilder CreateConsumerBuilder(string queueName) { return new MessageConsumerBuilder(this, queueName); } /// /// Creates a new consumer. /// /// Name of queue to receive messages from /// Low prefetch value /// High prefetch value /// If true, messages sent on this channel will not be received by this consumer /// If true, the consumer opens the queue in exclusive mode /// The new consumer public IMessageConsumer CreateConsumer(string queueName, int prefetchLow, int prefetchHigh, bool noLocal, bool exclusive) { _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} ", queueName, prefetchLow, prefetchHigh, noLocal, exclusive)); return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, false); } /// /// Creates a new consumer. /// /// Name of queue to receive messages from /// Low prefetch value /// High prefetch value /// If true, messages sent on this channel will not be received by this consumer /// If true, the consumer opens the queue in exclusive mode /// If true, the consumer only browses and does not consume messages /// The new consumer public IMessageConsumer CreateConsumer(string queueName, int prefetchLow, int prefetchHigh, bool noLocal, bool exclusive, bool browse) { _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} browse={5}", queueName, prefetchLow, prefetchHigh, noLocal, exclusive, browse)); return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, browse); } /// /// Unsubscribe from a queue. /// /// Subscription name public void Unsubscribe(String name) { throw new NotImplementedException(); } /// /// Create a new message publisher using the builder pattern. /// /// The builder object public MessagePublisherBuilder CreatePublisherBuilder() { return new MessagePublisherBuilder(this); } /// /// Create a new message publisher. /// /// Name of exchange to publish to /// Routing key /// Default delivery mode /// Default TTL time of messages /// If true, sent immediately /// If true, the broker will return an error /// (as a connection exception) if the message cannot be delivered /// Default message priority /// The new message publisher public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode, long timeToLive, bool immediate, bool mandatory, int priority) { _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}", exchangeName, "none", routingKey)); return CreateProducerImpl(exchangeName, routingKey, deliveryMode, timeToLive, immediate, mandatory, priority); } /// /// Recover after transaction failure. /// /// The 0-8 protocol does not support this, not implemented exception will always be thrown. public void Recover() { CheckNotClosed(); CheckNotTransacted(); throw new NotImplementedException(); } /// /// Commit the transaction. /// public void Commit() { // FIXME: Fail over safety. Needs FailoverSupport? CheckNotClosed(); CheckTransacted(); // throws IllegalOperationException if not a transacted session try { // Acknowledge up to message last delivered (if any) for each consumer. // Need to send ack for messages delivered to consumers so far. foreach (BasicMessageConsumer consumer in _consumers.Values) { // Sends acknowledgement to server. consumer.AcknowledgeDelivered(); } // Commits outstanding messages sent and outstanding acknowledgements. _connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody)); } catch (AMQException e) { throw new QpidException("Failed to commit", e); } } /// /// Rollback the transaction. /// public void Rollback() { lock (_suspensionLock) { CheckTransacted(); // throws IllegalOperationException if not a transacted session try { bool suspended = IsSuspended; if (!suspended) { Suspend(true); } // Reject up to message last delivered (if any) for each consumer. // Need to send reject for messages delivered to consumers so far. foreach (BasicMessageConsumer consumer in _consumers.Values) { // Sends acknowledgement to server. consumer.RejectUnacked(); } _connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); if ( !suspended ) { Suspend(false); } } catch (AMQException e) { throw new QpidException("Failed to rollback", e); } } } /// /// Create a disconnected channel that will fault /// for most things, but is useful for testing /// /// A new disconnected channel public static IChannel CreateDisconnectedChannel() { return new AmqChannel(); } public override void Close() { lock (_connection.FailoverMutex) { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session lock (_closingLock) { SetClosed(); // we pass null since this is not an error case CloseProducersAndConsumers(null); try { _connection.CloseSession(this); } catch (AMQException e) { throw new QpidException("Error closing session: " + e); } finally { _connection.DeregisterSession(_channelId); } } } } /** * Called when the server initiates the closure of the session * unilaterally. * @param e the exception that caused this session to be closed. Null causes the */ public void ClosedWithException(Exception e) { lock (_connection.FailoverMutex) { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request SetClosed(); AMQException amqe; if (e is AMQException) { amqe = (AMQException) e; } else { amqe = new AMQException("Closing session forcibly", e); } _connection.DeregisterSession(_channelId); CloseProducersAndConsumers(amqe); } } public void MessageReceived(UnprocessedMessage message) { if (_logger.IsDebugEnabled) { _logger.Debug("Message received in session with channel id " + _channelId); } if ( message.DeliverBody == null ) { ReturnBouncedMessage(message); } else { _queue.Enqueue(message); } } public void Dispose() { Close(); } private void SetClosed() { Interlocked.Exchange(ref _closed, CLOSED); } /// /// Close all producers or consumers. This is called either in the error case or when closing the session normally. /// the exception, may be null to indicate no error has occurred /// private void CloseProducersAndConsumers(AMQException amqe) { try { CloseProducers(); } catch (QpidException e) { _logger.Error("Error closing session: " + e, e); } try { CloseConsumers(amqe); } catch (QpidException e) { _logger.Error("Error closing session: " + e, e); } } /// /// Called to close message producers cleanly. This may or may not be as a result of an error. There is /// currently no way of propagating errors to message producers (this is a JMS limitation). /// private void CloseProducers() { _logger.Debug("Closing producers on session " + this); // we need to clone the list of producers since the close() method updates the _producers collection // which would result in a concurrent modification exception ArrayList clonedProducers = new ArrayList(_producers.Values); foreach (BasicMessageProducer prod in clonedProducers) { _logger.Debug("Closing producer " + prod); prod.Close(); } // at this point the _producers map is empty } /// /// Called to close message consumers cleanly. This may or may not be as a result of an error. /// not null if this is a result of an error occurring at the connection level private void CloseConsumers(Exception error) { if (_dispatcher != null) { _dispatcher.StopDispatcher(); } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception ArrayList clonedConsumers = new ArrayList(_consumers.Values); foreach (BasicMessageConsumer con in clonedConsumers) { if (error != null) { con.NotifyError(error); } else { con.Close(); } } // at this point the _consumers map will be empty } private IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey, DeliveryMode deliveryMode, long timeToLive, bool immediate, bool mandatory, int priority) { lock (_closingLock) { CheckNotClosed(); try { return new BasicMessageProducer(exchangeName, routingKey, _transacted, _channelId, this, GetNextProducerId(), deliveryMode, timeToLive, immediate, mandatory, priority); } catch (AMQException e) { _logger.Error("Error creating message producer: " + e, e); throw new QpidException("Error creating message producer", e); } } } /// Creates a message consumer on this channel. /// /// The name of the queue to attach the consumer to. /// The pre-fetch buffer low-water mark. /// The pre-fetch buffer high-water mark. /// The no-local flag, true means that the consumer does not receive messages sent on this channel. /// The exclusive flag, true gives this consumer exclusive receive access to the queue. /// /// The message consumer. private IMessageConsumer CreateConsumerImpl(string queueName, int prefetchLow, int prefetchHigh, bool noLocal, bool exclusive, bool browse) { lock (_closingLock) { CheckNotClosed(); BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal, _messageFactoryRegistry, this, prefetchHigh, prefetchLow, exclusive, browse); try { RegisterConsumer(consumer); } catch (AMQException e) { throw new QpidException("Error registering consumer: " + e, e); } return consumer; } } private void CheckTransacted() { if (!Transacted) { throw new InvalidOperationException("Channel is not transacted"); } } private void CheckNotTransacted() { if (Transacted) { throw new InvalidOperationException("Channel is transacted"); } } internal void Start() { _dispatcher = new Dispatcher(this); Thread dispatcherThread = new Thread(new ThreadStart(_dispatcher.RunDispatcher)); dispatcherThread.IsBackground = true; dispatcherThread.Start(); } internal void Stop() { Suspend(true); if (_dispatcher != null) { _dispatcher.StopDispatcher(); } } internal void RegisterConsumer(string consumerTag, IMessageConsumer consumer) { _consumers[consumerTag] = consumer; } /// /// Called by the MessageConsumer when closing, to deregister the consumer from the /// map from consumerTag to consumer instance. /// /// the consumer tag, that was broker-generated internal void DeregisterConsumer(string consumerTag) { _consumers.Remove(consumerTag); } internal void RegisterProducer(long producerId, IMessagePublisher publisher) { _producers[producerId] = publisher; } internal void DeregisterProducer(long producerId) { _producers.Remove(producerId); } private long GetNextProducerId() { return ++_nextProducerId; } /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after * failover when the client has veoted resubscription. * * The caller of this method must already hold the failover mutex. */ internal void MarkClosed() { SetClosed(); _connection.DeregisterSession(_channelId); MarkClosedProducersAndConsumers(); } private void MarkClosedProducersAndConsumers() { try { // no need for a markClosed* method in this case since there is no protocol traffic closing a producer CloseProducers(); } catch (QpidException e) { _logger.Error("Error closing session: " + e, e); } try { MarkClosedConsumers(); } catch (QpidException e) { _logger.Error("Error closing session: " + e, e); } } private void MarkClosedConsumers() { if (_dispatcher != null) { _dispatcher.StopDispatcher(); } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception ArrayList clonedConsumers = new ArrayList(_consumers.Values); foreach (BasicMessageConsumer consumer in clonedConsumers) { consumer.MarkClosed(); } // at this point the _consumers map will be empty } private void DoPurgeQueue(string queueName, bool noWait) { try { _logger.DebugFormat("PurgeQueue {0}", queueName); AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait); if (noWait) _connection.ProtocolWriter.Write(purgeQueue); else _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody)); } catch (AMQException) { throw; } } /** * Replays frame on fail over. * * @throws AMQException */ internal void ReplayOnFailOver() { _logger.Debug(string.Format("Replaying frames for channel {0}", _channelId)); foreach (AMQFrame frame in _replayFrames) { _logger.Debug(string.Format("Replaying frame=[{0}]", frame)); _connection.ProtocolWriter.Write(frame); } } /// /// Callers must hold the failover mutex before calling this method. /// /// private void RegisterConsumer(BasicMessageConsumer consumer) { // Need to generate a consumer tag on the client so we can exploit the nowait flag. String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); consumer.ConsumerTag = tag; _consumers.Add(tag, consumer); String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, consumer.Exclusive, consumer.AcknowledgeMode, tag, consumer.Browse); } internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) { _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}", queueName, exchangeName, routingKey, args)); AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, queueName, exchangeName, routingKey, false, args); lock (_connection.FailoverMutex) { _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody)); } // AS FIXME: wasnae me _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0, queueName, exchangeName, routingKey, true, args)); } private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag, bool browse) { FieldTable args = new FieldTable(); if(browse) { args["x-filter-no-consume"] = true; } AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, queueName, tag, noLocal, acknowledgeMode == AcknowledgeMode.NoAcknowledge, exclusive, true, args); _replayFrames.Add(basicConsume); _connection.ProtocolWriter.Write(basicConsume); return tag; } private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) { try { _logger.Debug(string.Format("DeleteQueue name={0}", queueName)); AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait); if (noWait) { _connection.ProtocolWriter.Write(queueDelete); } else { _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody)); } // AS FIXME: wasnae me _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true)); } catch (AMQException) { throw; } } private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete, IFieldTable args) { _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}", queueName, isDurable, isExclusive, isAutoDelete)); AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, isAutoDelete, false, (FieldTable) args); lock (_connection.FailoverMutex) { _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody)); } // AS FIXME: wasnae me _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, isAutoDelete, true, null)); } // AMQP-level method. private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName, string exchangeClass, bool passive, bool durable, bool autoDelete, bool xinternal, bool noWait, FieldTable args) { _logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}", _channelId, exchangeName, exchangeClass)); AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, durable, autoDelete, xinternal, noWait, args); if (noWait) { lock (_connection.FailoverMutex) { _connection.ProtocolWriter.Write(declareExchange); } // AS FIXME: wasnae me _replayFrames.Add(declareExchange); } else { throw new NotImplementedException("Don't use nowait=false with DeclareExchange"); //_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); } } /** * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is * AUTO_ACK or similar. * * @param deliveryTag the tag of the last message to be acknowledged * @param multiple if true will acknowledge all messages up to and including the one specified by the * delivery tag */ internal void AcknowledgeMessage(ulong deliveryTag, bool multiple) { AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple); if (_logger.IsDebugEnabled) { _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); } // FIXME: lock FailoverMutex here? _connection.ProtocolWriter.Write(ackFrame); } public void RejectMessage(ulong deliveryTag, bool requeue) { if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || (_acknowledgeMode == AcknowledgeMode.SessionTransacted)) { AMQFrame rejectFrame = BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue); _connection.ProtocolWriter.Write(rejectFrame); } } /// /// Handle a message that bounced from the server, creating /// the corresponding exception and notifying the connection about it /// /// Unprocessed message private void ReturnBouncedMessage(UnprocessedMessage message) { try { AbstractQmsMessage bouncedMessage = _messageFactoryRegistry.CreateMessage(0, false, message.ContentHeader, message.Bodies); int errorCode = message.BounceBody.ReplyCode; string reason = message.BounceBody.ReplyText; _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); AMQException exception; if (errorCode == AMQConstant.NO_CONSUMERS.Code) { exception = new AMQNoConsumersException(reason, bouncedMessage); } else if (errorCode == AMQConstant.NO_ROUTE.Code) { exception = new AMQNoRouteException(reason, bouncedMessage); } else { exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); } _connection.ExceptionReceived(exception); } catch (Exception ex) { _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); } } private void OnPrefetchLowMark(int count) { if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) { _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count); Suspend(false); } } private void OnPrefetchHighMark(int count) { if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) { _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count); Suspend(true); } } private void Suspend(bool suspend) { lock (_suspensionLock) { if (_logger.IsDebugEnabled) { _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); } _suspended = suspend; AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend); Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody)); } } /// A Dispatcher turns the consumption of incoming messages from an arrival queue, into event notifications on consumers. /// The arrival queue is typically a blocking queue, on which a dispatcher waits for messages to consume. Upon receipt of a message /// the dispatcher finds the consumer that is listening to the queue to which the message has been send and notifies it of the new /// message. /// ///

The Dispatcher also contains logic to recognize bounced messages. Bounced messages returned from the broker can be /// told apart from regular deliveries because they do not have a delivery queue set on them. When the dispatcher receives a /// bounced message it creates an exception and notifies the connection, to which its containing channel belongs, of the condition. /// ///

///
CRC Card
Responsibilities Collaborations ///
Notify consumers of message arrivals on their queues. ///
Notify the containing connection of bounced message arrivals. ///
///

/// /// Stop mechanism seems wrong, as queue consume is evaluated after stop flag, so could consume and notify one more message. /// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on /// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag. /// /// Exception swallowed, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should /// fall through and terminate the loop, as it is a bug if it occurrs. private class Dispatcher { /// Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). private int _stopped = 0; /// The channel for which this is a dispatcher. private AmqChannel _containingChannel; /// Creates a dispatcher on the specified channel. /// /// The channel on which this is a dispatcher. public Dispatcher(AmqChannel containingChannel) { _containingChannel = containingChannel; } /// The message dispatch loop. Consumes messages from the channels queue, notifying consumers of regular deliveries, and /// the connection of bounced messages. public void RunDispatcher() { UnprocessedMessage message; while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null) { if (message.DeliverBody != null) { BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag]; if (consumer == null) { _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring..."); } else { consumer.NotifyMessage(message, _containingChannel.ChannelId); } } else { try { // Bounced message is processed here, away from the transport thread AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry. CreateMessage(0, false, message.ContentHeader, message.Bodies); int errorCode = message.BounceBody.ReplyCode; string reason = message.BounceBody.ReplyText; _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); } catch (Exception e) { _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); } } } _logger.Debug("Dispatcher thread terminating for channel: " + _containingChannel._channelId + "."); } /// Sets a stop flag on this dispatcher, which causes its dispatch loop to exit at the next available opportunity. public void StopDispatcher() { Interlocked.Exchange(ref _stopped, 1); } } } }