/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using System.Threading;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ
{
///
/// Default provider of ISession
///
public class Session : ISession
{
///
/// Private object used for synchronization, instead of public "this"
///
private readonly object myLock = new object();
private int consumerCounter;
private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
private readonly DispatchingThread dispatchingThread;
private DispatchingThread.ExceptionHandler dispatchingThread_ExceptionHandler;
private readonly SessionInfo info;
private int producerCounter;
internal bool startedAsyncDelivery = false;
private bool disposed = false;
private bool closed = false;
private bool closing = false;
private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
{
this.connection = connection;
this.info = info;
this.acknowledgementMode = acknowledgementMode;
this.AsyncSend = connection.AsyncSend;
this.requestTimeout = connection.RequestTimeout;
this.PrefetchSize = 1000;
this.transactionContext = new TransactionContext(this);
this.dispatchingThread = new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
this.dispatchingThread_ExceptionHandler = new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
}
~Session()
{
Dispose(false);
}
///
/// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers
/// until acknowledgements are received.
///
public int PrefetchSize;
///
/// Sets the maximum number of messages to keep around per consumer
/// in addition to the prefetch window for non-durable topics until messages
/// will start to be evicted for slow consumers.
/// Must be > 0 to enable this feature
///
public int MaximumPendingMessageLimit;
///
/// Enables or disables whether asynchronous dispatch should be used by the broker
///
public bool DispatchAsync;
///
/// Enables or disables exclusive consumers when using queues. An exclusive consumer means
/// only one instance of a consumer is allowed to process messages on a queue to preserve order
///
public bool Exclusive;
///
/// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not?
///
public bool Retroactive;
///
/// Sets the default consumer priority for consumers
///
public byte Priority;
///
/// This property indicates whether or not async send is enabled.
///
public bool AsyncSend;
private Connection connection;
public Connection Connection
{
get { return this.connection; }
}
public SessionId SessionId
{
get { return info.SessionId; }
}
private TransactionContext transactionContext;
public TransactionContext TransactionContext
{
get { return this.transactionContext; }
}
#region ISession Members
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected void Dispose(bool disposing)
{
if(this.disposed)
{
return;
}
if(disposing)
{
// Dispose managed code here.
}
try
{
Close();
}
catch
{
// Ignore network errors.
}
this.disposed = true;
}
public void Close()
{
lock(myLock)
{
if(this.closed)
{
return;
}
try
{
this.closing = true;
StopAsyncDelivery();
lock(consumers.SyncRoot)
{
foreach(MessageConsumer consumer in consumers.Values)
{
consumer.Close();
}
}
consumers.Clear();
lock(producers.SyncRoot)
{
foreach(MessageProducer producer in producers.Values)
{
producer.Close();
}
}
producers.Clear();
Connection.RemoveSession(this);
}
catch(Exception ex)
{
Tracer.ErrorFormat("Error during session close: {0}", ex);
}
finally
{
this.connection = null;
this.closed = true;
this.closing = false;
}
}
}
public IMessageProducer CreateProducer()
{
return CreateProducer(null);
}
public IMessageProducer CreateProducer(IDestination destination)
{
ProducerInfo command = CreateProducerInfo(destination);
ProducerId producerId = command.ProducerId;
MessageProducer producer = null;
try
{
producer = new MessageProducer(this, command);
producers[producerId] = producer;
this.DoSend(command);
}
catch(Exception)
{
if(producer != null)
{
producer.Close();
}
throw;
}
return producer;
}
public IMessageConsumer CreateConsumer(IDestination destination)
{
return CreateConsumer(destination, null, false);
}
public IMessageConsumer CreateConsumer(IDestination destination, string selector)
{
return CreateConsumer(destination, selector, false);
}
public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
{
ConsumerInfo command = CreateConsumerInfo(destination, selector);
command.NoLocal = noLocal;
command.AcknowledgementMode = this.AcknowledgementMode;
ConsumerId consumerId = command.ConsumerId;
MessageConsumer consumer = null;
try
{
consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
// lets register the consumer first in case we start dispatching messages immediately
consumers[consumerId] = consumer;
this.DoSend(command);
return consumer;
}
catch(Exception)
{
if(consumer != null)
{
consumer.Close();
}
throw;
}
}
public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
{
ConsumerInfo command = CreateConsumerInfo(destination, selector);
ConsumerId consumerId = command.ConsumerId;
command.SubscriptionName = name;
command.NoLocal = noLocal;
MessageConsumer consumer = null;
try
{
consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
// lets register the consumer first in case we start dispatching messages immediately
consumers[consumerId] = consumer;
this.DoSend(command);
}
catch(Exception)
{
if(consumer != null)
{
consumer.Close();
}
throw;
}
return consumer;
}
public void DeleteDurableConsumer(string name)
{
RemoveSubscriptionInfo command = new RemoveSubscriptionInfo();
command.ConnectionId = Connection.ConnectionId;
command.ClientId = Connection.ClientId;
command.SubcriptionName = name;
this.DoSend(command);
}
public IQueue GetQueue(string name)
{
return new ActiveMQQueue(name);
}
public ITopic GetTopic(string name)
{
return new ActiveMQTopic(name);
}
public ITemporaryQueue CreateTemporaryQueue()
{
ActiveMQTempQueue answer = new ActiveMQTempQueue(Connection.CreateTemporaryDestinationName());
CreateTemporaryDestination(answer);
return answer;
}
public ITemporaryTopic CreateTemporaryTopic()
{
ActiveMQTempTopic answer = new ActiveMQTempTopic(Connection.CreateTemporaryDestinationName());
CreateTemporaryDestination(answer);
return answer;
}
///
/// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
///
public void DeleteDestination(IDestination destination)
{
DestinationInfo command = new DestinationInfo();
command.ConnectionId = Connection.ConnectionId;
command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
command.Destination = destination;
this.DoSend(command);
}
public IMessage CreateMessage()
{
ActiveMQMessage answer = new ActiveMQMessage();
Configure(answer);
return answer;
}
public ITextMessage CreateTextMessage()
{
ActiveMQTextMessage answer = new ActiveMQTextMessage();
Configure(answer);
return answer;
}
public ITextMessage CreateTextMessage(string text)
{
ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
Configure(answer);
return answer;
}
public IMapMessage CreateMapMessage()
{
return new ActiveMQMapMessage();
}
public IBytesMessage CreateBytesMessage()
{
return new ActiveMQBytesMessage();
}
public IBytesMessage CreateBytesMessage(byte[] body)
{
ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
answer.Content = body;
return answer;
}
public IObjectMessage CreateObjectMessage(object body)
{
ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
answer.Body = body;
return answer;
}
public void Commit()
{
if(!Transacted)
{
throw new InvalidOperationException(
"You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
+ this.AcknowledgementMode);
}
this.TransactionContext.Commit();
}
public void Rollback()
{
if(!Transacted)
{
throw new InvalidOperationException(
"You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
+ this.AcknowledgementMode);
}
this.TransactionContext.Rollback();
// lets ensure all the consumers redeliver any rolled back messages
lock(consumers.SyncRoot)
{
foreach(MessageConsumer consumer in consumers.Values)
{
consumer.RedeliverRolledBackMessages();
}
}
}
// Properties
private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout;
public TimeSpan RequestTimeout
{
get { return this.requestTimeout; }
set { this.requestTimeout = value; }
}
public bool Transacted
{
get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; }
}
private AcknowledgementMode acknowledgementMode;
public AcknowledgementMode AcknowledgementMode
{
get { return this.acknowledgementMode; }
}
#endregion
private void dispatchingThread_ExceptionListener(Exception exception)
{
if(null != Connection)
{
try
{
Connection.OnSessionException(this, exception);
}
catch
{
}
}
}
protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
{
DestinationInfo command = new DestinationInfo();
command.ConnectionId = Connection.ConnectionId;
command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
command.Destination = tempDestination;
this.DoSend(command);
}
public void DoSend(Command message)
{
this.DoSend(message, this.RequestTimeout);
}
public void DoSend(Command message, TimeSpan requestTimeout)
{
if(AsyncSend)
{
Connection.OneWay(message);
}
else
{
Connection.SyncRequest(message, requestTimeout);
}
}
///
/// Ensures that a transaction is started
///
public void DoStartTransaction()
{
if(Transacted)
{
this.TransactionContext.Begin();
}
}
public void DisposeOf(ConsumerId objectId)
{
Connection.DisposeOf(objectId);
if(!this.closing)
{
consumers.Remove(objectId);
}
}
public void DisposeOf(ProducerId objectId)
{
Connection.DisposeOf(objectId);
if(!this.closing)
{
producers.Remove(objectId);
}
}
public bool DispatchMessage(ConsumerId consumerId, Message message)
{
bool dispatched = false;
MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
if(consumer != null)
{
consumer.Dispatch((ActiveMQMessage) message);
dispatched = true;
}
return dispatched;
}
///
/// Private method called by the dispatcher thread in order to perform
/// asynchronous delivery of queued (inbound) messages.
///
private void DispatchAsyncMessages()
{
// lets iterate through each consumer created by this session
// ensuring that they have all pending messages dispatched
lock(consumers.SyncRoot)
{
foreach(MessageConsumer consumer in consumers.Values)
{
consumer.DispatchAsyncMessages();
}
}
}
protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
{
ConsumerInfo answer = new ConsumerInfo();
ConsumerId id = new ConsumerId();
id.ConnectionId = info.SessionId.ConnectionId;
id.SessionId = info.SessionId.Value;
id.Value = Interlocked.Increment(ref consumerCounter);
answer.ConsumerId = id;
answer.Destination = ActiveMQDestination.Transform(destination);
answer.Selector = selector;
answer.PrefetchSize = this.PrefetchSize;
answer.Priority = this.Priority;
answer.Exclusive = this.Exclusive;
answer.DispatchAsync = this.DispatchAsync;
answer.Retroactive = this.Retroactive;
// If the destination contained a URI query, then use it to set public properties
// on the ConsumerInfo
ActiveMQDestination amqDestination = destination as ActiveMQDestination;
if(amqDestination != null && amqDestination.Options != null)
{
URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
}
return answer;
}
protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
{
ProducerInfo answer = new ProducerInfo();
ProducerId id = new ProducerId();
id.ConnectionId = info.SessionId.ConnectionId;
id.SessionId = info.SessionId.Value;
id.Value = Interlocked.Increment(ref producerCounter);
answer.ProducerId = id;
answer.Destination = ActiveMQDestination.Transform(destination);
// If the destination contained a URI query, then use it to set public
// properties on the ProducerInfo
ActiveMQDestination amqDestination = destination as ActiveMQDestination;
if(amqDestination != null && amqDestination.Options != null)
{
URISupport.SetProperties(answer, amqDestination.Options, "producer.");
}
return answer;
}
///
/// Configures the message command
///
protected void Configure(ActiveMQMessage message)
{
}
internal void StopAsyncDelivery()
{
if(startedAsyncDelivery)
{
this.dispatchingThread.ExceptionListener -= this.dispatchingThread_ExceptionHandler;
dispatchingThread.Stop((int) MAX_THREAD_WAIT.TotalMilliseconds);
startedAsyncDelivery = false;
}
}
internal void StartAsyncDelivery()
{
if(!startedAsyncDelivery)
{
this.dispatchingThread.ExceptionListener += this.dispatchingThread_ExceptionHandler;
dispatchingThread.Start();
startedAsyncDelivery = true;
}
}
internal void RegisterConsumerDispatcher(Dispatcher dispatcher)
{
dispatcher.SetAsyncDelivery(this.dispatchingThread.EventHandle);
}
}
}