/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Collections;
using System.Text.RegularExpressions;
using System.Threading;
using log4net;
using Apache.Qpid.Buffer;
using Apache.Qpid.Client.Message;
using Apache.Qpid.Client.Util;
using Apache.Qpid.Collections;
using Apache.Qpid.Framing;
using Apache.Qpid.Messaging;
using Apache.Qpid.Protocol;
namespace Apache.Qpid.Client
{
public class AmqChannel : Closeable, IChannel
{
private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel));
internal const int BASIC_CONTENT_TYPE = 60;
private static int _nextSessionNumber = 0;
private int _sessionNumber;
private bool _suspended;
private object _suspensionLock = new object();
// Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature.
private int _nextConsumerNumber = 1;
public const int DEFAULT_PREFETCH_HIGH_MARK = 5000;
public const int DEFAULT_PREFETCH_LOW_MARK = 2500;
private AMQConnection _connection;
private bool _transacted;
private AcknowledgeMode _acknowledgeMode;
private ushort _channelId;
private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
private FlowControlQueue _queue;
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
///
/// Set of all producers created by this session
///
private Hashtable _producers = Hashtable.Synchronized(new Hashtable());
///
/// Maps from consumer tag to JMSMessageConsumer instance
///
private Hashtable _consumers = Hashtable.Synchronized(new Hashtable());
private ArrayList _replayFrames = new ArrayList();
///
/// The counter of the _next producer id. This id is generated by the session and used only to allow the
/// producer to identify itself to the session when deregistering itself.
///
/// Access to this id does not require to be synchronized since according to the JMS specification only one
/// thread of control is allowed to create producers for any given session instance.
///
private long _nextProducerId;
///
/// Responsible for decoding a message fragment and passing it to the appropriate message consumer.
///
private class Dispatcher
{
private int _stopped = 0;
private AmqChannel _containingChannel;
public Dispatcher(AmqChannel containingChannel)
{
_containingChannel = containingChannel;
}
///
/// Runs the dispatcher. This is intended to be Run in a separate thread.
///
public void RunDispatcher()
{
UnprocessedMessage message;
while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null)
{
//_queue.size()
DispatchMessage(message);
}
_logger.Debug("Dispatcher thread terminating for channel " + _containingChannel._channelId);
}
private void DispatchMessage(UnprocessedMessage message)
{
if (message.DeliverBody != null)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag];
if (consumer == null)
{
_logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring...");
}
else
{
consumer.NotifyMessage(message, _containingChannel.ChannelId);
}
}
else
{
try
{
// Bounced message is processed here, away from the mina thread
AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry.
CreateMessage(0, false, message.ContentHeader, message.Bodies);
int errorCode = message.BounceBody.ReplyCode;
string reason = message.BounceBody.ReplyText;
_logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
_containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
}
catch (Exception e)
{
_logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
}
}
}
public void StopDispatcher()
{
Interlocked.Exchange(ref _stopped, 1);
}
}
///
/// Initializes a new instance of the class.
///
/// The connection.
/// The channel id.
/// if set to true [transacted].
/// The acknowledge mode.
/// Default prefetch high value
/// Default prefetch low value
internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
: this()
{
_sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
_connection = con;
_transacted = transacted;
if ( transacted )
{
_acknowledgeMode = AcknowledgeMode.SessionTransacted;
} else
{
_acknowledgeMode = acknowledgeMode;
}
_channelId = channelId;
_defaultPrefetchHighMark = defaultPrefetchHigh;
_defaultPrefetchLowMark = defaultPrefetchLow;
if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
{
_queue = new FlowControlQueue(
_defaultPrefetchLowMark, _defaultPrefetchHighMark,
new ThresholdMethod(OnPrefetchLowMark),
new ThresholdMethod(OnPrefetchHighMark)
);
} else
{
// low and upper are the same
_queue = new FlowControlQueue(
_defaultPrefetchHighMark, _defaultPrefetchHighMark,
null, null
);
}
}
private AmqChannel()
{
_messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry();
}
///
/// Create a disconnected channel that will fault
/// for most things, but is useful for testing
///
/// A new disconnected channel
public static IChannel CreateDisconnectedChannel()
{
return new AmqChannel();
}
public IBytesMessage CreateBytesMessage()
{
return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
}
public IMessage CreateMessage()
{
// TODO: this is supposed to create a message consisting only of message headers
return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
}
public IMessage CreateMessage(string mimeType)
{
return _messageFactoryRegistry.CreateMessage(mimeType);
}
public ITextMessage CreateTextMessage()
{
return CreateTextMessage(String.Empty);
}
public ITextMessage CreateTextMessage(string text)
{
ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
msg.Text = text;
return msg;
}
public bool Transacted
{
get
{
CheckNotClosed();
return _transacted;
}
}
public AcknowledgeMode AcknowledgeMode
{
get
{
CheckNotClosed();
return _acknowledgeMode;
}
}
public void Commit()
{
// FIXME: Fail over safety. Needs FailoverSupport?
CheckNotClosed();
CheckTransacted(); // throws IllegalOperationException if not a transacted session
try
{
// Acknowledge up to message last delivered (if any) for each consumer.
// Need to send ack for messages delivered to consumers so far.
foreach (BasicMessageConsumer consumer in _consumers.Values)
{
// Sends acknowledgement to server.
consumer.AcknowledgeLastDelivered();
}
// Commits outstanding messages sent and outstanding acknowledgements.
_connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody));
}
catch (AMQException e)
{
throw new QpidException("Failed to commit", e);
}
}
public void Rollback()
{
lock ( _suspensionLock )
{
CheckTransacted(); // throws IllegalOperationException if not a transacted session
try
{
bool suspended = IsSuspended;
if ( !suspended )
Suspend(true);
// todo: rollback dispatcher when TX support is added
//if ( _dispatcher != null )
// _dispatcher.Rollback();
_connection.ConvenientProtocolWriter.SyncWrite(
TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
if ( !suspended )
Suspend(false);
} catch ( AMQException e )
{
throw new QpidException("Failed to rollback", e);
}
}
}
public override void Close()
{
lock (_connection.FailoverMutex)
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
lock (_closingLock)
{
SetClosed();
// we pass null since this is not an error case
CloseProducersAndConsumers(null);
try
{
_connection.CloseSession(this);
}
catch (AMQException e)
{
throw new QpidException("Error closing session: " + e);
}
finally
{
_connection.DeregisterSession(_channelId);
}
}
}
}
private void SetClosed()
{
Interlocked.Exchange(ref _closed, CLOSED);
}
///
/// Close all producers or consumers. This is called either in the error case or when closing the session normally.
/// the exception, may be null to indicate no error has occurred
///
private void CloseProducersAndConsumers(AMQException amqe)
{
try
{
CloseProducers();
}
catch (QpidException e)
{
_logger.Error("Error closing session: " + e, e);
}
try
{
CloseConsumers(amqe);
}
catch (QpidException e)
{
_logger.Error("Error closing session: " + e, e);
}
}
/**
* Called when the server initiates the closure of the session
* unilaterally.
* @param e the exception that caused this session to be closed. Null causes the
*/
public void ClosedWithException(Exception e)
{
lock (_connection.FailoverMutex)
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
SetClosed();
AMQException amqe;
if (e is AMQException)
{
amqe = (AMQException) e;
}
else
{
amqe = new AMQException("Closing session forcibly", e);
}
_connection.DeregisterSession(_channelId);
CloseProducersAndConsumers(amqe);
}
}
///
/// Called to close message producers cleanly. This may or may not be as a result of an error. There is
/// currently no way of propagating errors to message producers (this is a JMS limitation).
///
private void CloseProducers()
{
_logger.Debug("Closing producers on session " + this);
// we need to clone the list of producers since the close() method updates the _producers collection
// which would result in a concurrent modification exception
ArrayList clonedProducers = new ArrayList(_producers.Values);
foreach (BasicMessageProducer prod in clonedProducers)
{
_logger.Debug("Closing producer " + prod);
prod.Close();
}
// at this point the _producers map is empty
}
///
/// Called to close message consumers cleanly. This may or may not be as a result of an error.
/// not null if this is a result of an error occurring at the connection level
///
private void CloseConsumers(Exception error)
{
if (_dispatcher != null)
{
_dispatcher.StopDispatcher();
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
ArrayList clonedConsumers = new ArrayList(_consumers.Values);
foreach (BasicMessageConsumer con in clonedConsumers)
{
if (error != null)
{
con.NotifyError(error);
}
else
{
con.Close();
}
}
// at this point the _consumers map will be empty
}
public void Recover()
{
CheckNotClosed();
CheckNotTransacted(); // throws IllegalOperationException if not a transacted session
// TODO: This cannot be implemented using 0.8 semantics
throw new NotImplementedException();
}
public void Run()
{
throw new NotImplementedException();
}
public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode,
long timeToLive, bool immediate, bool mandatory, int priority)
{
_logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}",
exchangeName, "none", routingKey));
return CreateProducerImpl(exchangeName, routingKey, deliveryMode,
timeToLive, immediate, mandatory, priority);
}
public IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey,
DeliveryMode deliveryMode,
long timeToLive, bool immediate, bool mandatory, int priority)
{
lock (_closingLock)
{
CheckNotClosed();
try
{
return new BasicMessageProducer(exchangeName, routingKey, _transacted, _channelId,
this, GetNextProducerId(),
deliveryMode, timeToLive, immediate, mandatory, priority);
}
catch (AMQException e)
{
_logger.Error("Error creating message producer: " + e, e);
throw new QpidException("Error creating message producer", e);
}
}
}
public IMessageConsumer CreateConsumer(string queueName,
int prefetchLow,
int prefetchHigh,
bool noLocal,
bool exclusive,
bool durable,
string subscriptionName)
{
_logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} durable={5} subscriptionName={6}",
queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName));
return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName);
}
private IMessageConsumer CreateConsumerImpl(string queueName,
int prefetchLow,
int prefetchHigh,
bool noLocal,
bool exclusive,
bool durable,
string subscriptionName)
{
if (durable || subscriptionName != null)
{
throw new NotImplementedException(); // TODO: durable subscriptions.
}
lock (_closingLock)
{
CheckNotClosed();
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal,
_messageFactoryRegistry, this,
prefetchHigh, prefetchLow, exclusive);
try
{
RegisterConsumer(consumer);
}
catch (AMQException e)
{
throw new QpidException("Error registering consumer: " + e, e);
}
return consumer;
}
}
public void Unsubscribe(String name)
{
throw new NotImplementedException(); // FIXME
}
private void CheckTransacted()
{
if (!Transacted)
{
throw new InvalidOperationException("Channel is not transacted");
}
}
private void CheckNotTransacted()
{
if (Transacted)
{
throw new InvalidOperationException("Channel is transacted");
}
}
public void MessageReceived(UnprocessedMessage message)
{
if (_logger.IsDebugEnabled)
{
_logger.Debug("Message received in session with channel id " + _channelId);
}
if ( message.DeliverBody == null )
{
ReturnBouncedMessage(message);
} else
{
_queue.Enqueue(message);
}
}
public int DefaultPrefetch
{
get { return DefaultPrefetchHigh; }
}
public int DefaultPrefetchLow
{
get { return _defaultPrefetchLowMark; }
}
public int DefaultPrefetchHigh
{
get { return _defaultPrefetchHighMark; }
}
public bool IsSuspended
{
get { return _suspended; }
}
public ushort ChannelId
{
get
{
return _channelId;
}
}
public AMQConnection Connection
{
get
{
return _connection;
}
}
internal void Start()
{
_dispatcher = new Dispatcher(this);
Thread dispatcherThread = new Thread(new ThreadStart(_dispatcher.RunDispatcher));
dispatcherThread.IsBackground = true;
dispatcherThread.Start();
}
internal void Stop()
{
Suspend(true);
if (_dispatcher != null)
{
_dispatcher.StopDispatcher();
}
}
internal void RegisterConsumer(string consumerTag, IMessageConsumer consumer)
{
_consumers[consumerTag] = consumer;
}
///
/// Called by the MessageConsumer when closing, to deregister the consumer from the
/// map from consumerTag to consumer instance.
///
/// the consumer tag, that was broker-generated
internal void DeregisterConsumer(string consumerTag)
{
_consumers.Remove(consumerTag);
}
internal void RegisterProducer(long producerId, IMessagePublisher publisher)
{
_producers[producerId] = publisher;
}
internal void DeregisterProducer(long producerId)
{
_producers.Remove(producerId);
}
private long GetNextProducerId()
{
return ++_nextProducerId;
}
public void Dispose()
{
Close();
}
/**
* Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
* failover when the client has veoted resubscription.
*
* The caller of this method must already hold the failover mutex.
*/
internal void MarkClosed()
{
SetClosed();
_connection.DeregisterSession(_channelId);
MarkClosedProducersAndConsumers();
}
private void MarkClosedProducersAndConsumers()
{
try
{
// no need for a markClosed* method in this case since there is no protocol traffic closing a producer
CloseProducers();
}
catch (QpidException e)
{
_logger.Error("Error closing session: " + e, e);
}
try
{
MarkClosedConsumers();
}
catch (QpidException e)
{
_logger.Error("Error closing session: " + e, e);
}
}
private void MarkClosedConsumers()
{
if (_dispatcher != null)
{
_dispatcher.StopDispatcher();
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
ArrayList clonedConsumers = new ArrayList(_consumers.Values);
foreach (BasicMessageConsumer consumer in clonedConsumers)
{
consumer.MarkClosed();
}
// at this point the _consumers map will be empty
}
public void PurgeQueue(string queueName, bool noWait)
{
DoPurgeQueue(queueName, noWait);
}
private void DoPurgeQueue(string queueName, bool noWait)
{
try
{
_logger.DebugFormat("PurgeQueue {0}", queueName);
AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait);
if (noWait)
_connection.ProtocolWriter.Write(purgeQueue);
else
_connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody));
}
catch (AMQException)
{
throw;
}
}
/**
* Replays frame on fail over.
*
* @throws AMQException
*/
internal void ReplayOnFailOver()
{
_logger.Debug(string.Format("Replaying frames for channel {0}", _channelId));
foreach (AMQFrame frame in _replayFrames)
{
_logger.Debug(string.Format("Replaying frame=[{0}]", frame));
_connection.ProtocolWriter.Write(frame);
}
}
///
/// Callers must hold the failover mutex before calling this method.
///
///
void RegisterConsumer(BasicMessageConsumer consumer)
{
String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
consumer.Exclusive, consumer.AcknowledgeMode);
consumer.ConsumerTag = consumerTag;
_consumers.Add(consumerTag, consumer);
}
public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args)
{
DoBind(queueName, exchangeName, routingKey, (FieldTable)args);
}
public void Bind(string queueName, string exchangeName, string routingKey)
{
DoBind(queueName, exchangeName, routingKey, new FieldTable());
}
internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
{
_logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}",
queueName, exchangeName, routingKey, args));
AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
queueName, exchangeName,
routingKey, true, args);
_replayFrames.Add(queueBind);
lock (_connection.FailoverMutex)
{
_connection.ProtocolWriter.Write(queueBind);
}
}
private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
{
// Need to generate a consumer tag on the client so we can exploit the nowait flag.
String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
queueName, tag, noLocal,
acknowledgeMode == AcknowledgeMode.NoAcknowledge,
exclusive, true, new FieldTable());
_replayFrames.Add(basicConsume);
_connection.ProtocolWriter.Write(basicConsume);
return tag;
}
public void DeleteExchange(string exchangeName)
{
throw new NotImplementedException(); // FIXME
}
public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
{
DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait);
}
private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
{
try
{
_logger.Debug(string.Format("DeleteQueue name={0}", queueName));
AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0,
queueName, // queueName
ifUnused, // IfUnUsed
ifEmpty, // IfEmpty
noWait); // NoWait
_replayFrames.Add(queueDelete);
if (noWait)
_connection.ProtocolWriter.Write(queueDelete);
else
_connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
}
catch (AMQException)
{
throw;
}
}
public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
{
return new MessageConsumerBuilder(this, queueName);
}
public MessagePublisherBuilder CreatePublisherBuilder()
{
return new MessagePublisherBuilder(this);
}
public string GenerateUniqueName()
{
string result = _connection.ProtocolSession.GenerateQueueName();
return Regex.Replace(result, "[^a-z0-9_]", "_");
}
public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete)
{
DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete);
}
private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete)
{
_logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}",
queueName, isDurable, isExclusive, isAutoDelete));
AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName,
false, isDurable, isExclusive,
isAutoDelete, true, null);
_replayFrames.Add(queueDeclare);
lock (_connection.FailoverMutex)
{
_connection.ProtocolWriter.Write(queueDeclare);
}
}
public void DeclareExchange(String exchangeName, String exchangeClass)
{
_logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass));
DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null);
}
// AMQP-level method.
private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName,
string exchangeClass, bool passive, bool durable,
bool autoDelete, bool xinternal, bool noWait, FieldTable args)
{
_logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}",
_channelId, exchangeName, exchangeClass));
AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(
channelId, ticket, exchangeName, exchangeClass, passive, durable, autoDelete, xinternal, noWait, args);
_replayFrames.Add(declareExchange);
if (noWait)
{
lock (_connection.FailoverMutex)
{
_connection.ProtocolWriter.Write(declareExchange);
}
}
else
{
throw new NotImplementedException("Don't use nowait=false with DeclareExchange");
// _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
}
}
/**
* Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
* a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
* AUTO_ACK or similar.
*
* @param deliveryTag the tag of the last message to be acknowledged
* @param multiple if true will acknowledge all messages up to and including the one specified by the
* delivery tag
*/
internal void AcknowledgeMessage(ulong deliveryTag, bool multiple)
{
AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple);
if (_logger.IsDebugEnabled)
{
_logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
}
// FIXME: lock FailoverMutex here?
_connection.ProtocolWriter.Write(ackFrame);
}
///
/// Handle a message that bounced from the server, creating
/// the corresponding exception and notifying the connection about it
///
/// Unprocessed message
private void ReturnBouncedMessage(UnprocessedMessage message)
{
try
{
AbstractQmsMessage bouncedMessage =
_messageFactoryRegistry.CreateMessage(
0, false, message.ContentHeader,
message.Bodies
);
int errorCode = message.BounceBody.ReplyCode;
string reason = message.BounceBody.ReplyText;
_logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
AMQException exception;
if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
{
exception = new AMQNoConsumersException(reason, bouncedMessage);
} else if ( errorCode == AMQConstant.NO_ROUTE.Code )
{
exception = new AMQNoRouteException(reason, bouncedMessage);
} else
{
exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage);
}
_connection.ExceptionReceived(exception);
} catch ( Exception ex )
{
_logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex);
}
}
private void OnPrefetchLowMark(int count)
{
if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
{
_logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count);
Suspend(false);
}
}
private void OnPrefetchHighMark(int count)
{
if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
{
_logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count);
Suspend(true);
}
}
private void Suspend(bool suspend)
{
lock ( _suspensionLock )
{
if ( _logger.IsDebugEnabled )
{
_logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
}
_suspended = suspend;
AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend);
Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody));
}
}
}
}