/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections; using System.Collections.Specialized; using System.Threading; using Apache.NMS.Stomp.Commands; using Apache.NMS.Stomp.Util; using Apache.NMS.Util; namespace Apache.NMS.Stomp { /// /// Default provider of ISession /// public class Session : ISession, IDispatcher { /// /// Private object used for synchronization, instead of public "this" /// private readonly object myLock = new object(); private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable()); private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable()); private readonly SessionExecutor executor; private readonly TransactionContext transactionContext; private Connection connection; private bool dispatchAsync; private bool exclusive; private bool retroactive; private byte priority; private readonly SessionInfo info; private int consumerCounter; private int producerCounter; private int nextDeliveryId; private bool disposed = false; private bool closed = false; private bool closing = false; private TimeSpan requestTimeout; private readonly AcknowledgementMode acknowledgementMode; public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode, bool dispatchAsync) { this.connection = connection; this.info = info; this.acknowledgementMode = acknowledgementMode; this.requestTimeout = connection.RequestTimeout; this.dispatchAsync = dispatchAsync; if(acknowledgementMode == AcknowledgementMode.Transactional) { this.transactionContext = new TransactionContext(this); } else if(acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge) { this.acknowledgementMode = AcknowledgementMode.AutoAcknowledge; } this.executor = new SessionExecutor(this, this.consumers); } ~Session() { Dispose(false); } #region Property Accessors /// /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers /// until acknowledgements are received. /// public int PrefetchSize { set{ this.connection.PrefetchPolicy.SetAll(value); } } /// /// Sets the maximum number of messages to keep around per consumer /// in addition to the prefetch window for non-durable topics until messages /// will start to be evicted for slow consumers. /// Must be > 0 to enable this feature /// public int MaximumPendingMessageLimit { set{ this.connection.PrefetchPolicy.MaximumPendingMessageLimit = value; } } /// /// Enables or disables whether asynchronous dispatch should be used by the broker /// public bool DispatchAsync { get{ return this.dispatchAsync; } set{ this.dispatchAsync = value; } } /// /// Enables or disables exclusive consumers when using queues. An exclusive consumer means /// only one instance of a consumer is allowed to process messages on a queue to preserve order /// public bool Exclusive { get{ return this.exclusive; } set{ this.exclusive = value; } } /// /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not? /// public bool Retroactive { get{ return this.retroactive; } set{ this.retroactive = value; } } /// /// Sets the default consumer priority for consumers /// public byte Priority { get{ return this.priority; } set{ this.priority = value; } } public Connection Connection { get { return this.connection; } } public SessionId SessionId { get { return info.SessionId; } } public TransactionContext TransactionContext { get { return this.transactionContext; } } public TimeSpan RequestTimeout { get { return this.requestTimeout; } set { this.requestTimeout = value; } } public bool Transacted { get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; } } public AcknowledgementMode AcknowledgementMode { get { return this.acknowledgementMode; } } public bool IsClientAcknowledge { get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; } } public bool IsAutoAcknowledge { get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; } } public bool IsDupsOkAcknowledge { get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; } } public bool IsIndividualAcknowledge { get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge; } } public bool IsTransacted { get { return this.acknowledgementMode == AcknowledgementMode.Transactional; } } public SessionExecutor Executor { get { return this.executor; } } public long NextDeliveryId { get { return Interlocked.Increment(ref this.nextDeliveryId); } } private ConsumerTransformerDelegate consumerTransformer; public ConsumerTransformerDelegate ConsumerTransformer { get { return this.consumerTransformer; } set { this.consumerTransformer = value; } } private ProducerTransformerDelegate producerTransformer; public ProducerTransformerDelegate ProducerTransformer { get { return this.producerTransformer; } set { this.producerTransformer = value; } } #endregion #region ISession Members public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected void Dispose(bool disposing) { if(this.disposed) { return; } if(disposing) { // Dispose managed code here. } try { Close(); } catch { // Ignore network errors. } this.disposed = true; } public void Close() { lock(myLock) { if(this.closed) { return; } try { Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId.ToString()); DoClose(); Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId.ToString()); } catch(Exception ex) { Tracer.ErrorFormat("Error during session close: {0}", ex); } finally { this.connection = null; this.closed = true; this.closing = false; } } } internal void DoClose() { lock(myLock) { if(this.closed) { return; } try { this.closing = true; // Stop all message deliveries from this Session Stop(); lock(consumers.SyncRoot) { foreach(MessageConsumer consumer in consumers.Values) { consumer.FailureError = this.connection.FirstFailureError; consumer.DoClose(); } } consumers.Clear(); lock(producers.SyncRoot) { foreach(MessageProducer producer in producers.Values) { producer.DoClose(); } } producers.Clear(); // If in a transaction roll it back if(this.IsTransacted && this.transactionContext.InTransaction) { try { this.transactionContext.Rollback(); } catch { } } Connection.RemoveSession(this); } catch(Exception ex) { Tracer.ErrorFormat("Error during session close: {0}", ex); } finally { this.closed = true; this.closing = false; } } } public IMessageProducer CreateProducer() { return CreateProducer(null); } public IMessageProducer CreateProducer(IDestination destination) { ProducerInfo command = CreateProducerInfo(destination); ProducerId producerId = command.ProducerId; MessageProducer producer = null; try { producer = new MessageProducer(this, command); producer.ProducerTransformer = this.ProducerTransformer; producers[producerId] = producer; } catch(Exception) { if(producer != null) { producer.Close(); } throw; } return producer; } public IMessageConsumer CreateConsumer(IDestination destination) { return CreateConsumer(destination, null, false); } public IMessageConsumer CreateConsumer(IDestination destination, string selector) { return CreateConsumer(destination, selector, false); } public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal) { if (destination == null) { throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); } ConsumerInfo command = CreateConsumerInfo(destination, selector); command.NoLocal = noLocal; ConsumerId consumerId = command.ConsumerId; MessageConsumer consumer = null; // Registered with Connection before we register at the broker. connection.addDispatcher(consumerId, this); try { consumer = new MessageConsumer(this, command); consumer.ConsumerTransformer = this.ConsumerTransformer; consumers[consumerId] = consumer; if(this.Started) { consumer.Start(); } // lets register the consumer first in case we start dispatching messages immediately this.Connection.SyncRequest(command); return consumer; } catch(Exception) { if(consumer != null) { consumer.Close(); } throw; } } public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) { if (destination == null) { throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); } ConsumerInfo command = CreateConsumerInfo(destination, selector); ConsumerId consumerId = command.ConsumerId; command.SubscriptionName = name; command.NoLocal = noLocal; command.PrefetchSize = this.connection.PrefetchPolicy.DurableTopicPrefetch; MessageConsumer consumer = null; // Registered with Connection before we register at the broker. connection.addDispatcher(consumerId, this); try { consumer = new MessageConsumer(this, command); consumer.ConsumerTransformer = this.ConsumerTransformer; consumers[consumerId] = consumer; if(this.Started) { consumer.Start(); } this.connection.SyncRequest(command); } catch(Exception) { if(consumer != null) { consumer.Close(); } throw; } return consumer; } public void DeleteDurableConsumer(string name) { RemoveSubscriptionInfo command = new RemoveSubscriptionInfo(); command.ConnectionId = Connection.ConnectionId; command.ClientId = Connection.ClientId; command.SubscriptionName = name; this.connection.SyncRequest(command); } public IQueueBrowser CreateBrowser(IQueue queue) { throw new NotSupportedException("Not supported with Stomp Protocol"); } public IQueueBrowser CreateBrowser(IQueue queue, string selector) { throw new NotSupportedException("Not supported with Stomp Protocol"); } public IQueue GetQueue(string name) { return new Commands.Queue(name); } public ITopic GetTopic(string name) { return new Commands.Topic(name); } public ITemporaryQueue CreateTemporaryQueue() { TempQueue answer = new TempQueue(Connection.CreateTemporaryDestinationName()); return answer; } public ITemporaryTopic CreateTemporaryTopic() { TempTopic answer = new TempTopic(Connection.CreateTemporaryDestinationName()); return answer; } /// /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic). /// public void DeleteDestination(IDestination destination) { throw new NotSupportedException("Stomp Cannot delete Destinations"); } public IMessage CreateMessage() { Message answer = new Message(); return ConfigureMessage(answer) as IMessage; } public ITextMessage CreateTextMessage() { TextMessage answer = new TextMessage(); return ConfigureMessage(answer) as ITextMessage; } public ITextMessage CreateTextMessage(string text) { TextMessage answer = new TextMessage(text); return ConfigureMessage(answer) as ITextMessage; } public IMapMessage CreateMapMessage() { MapMessage answer = new MapMessage(); return ConfigureMessage(answer) as IMapMessage; } public IBytesMessage CreateBytesMessage() { return ConfigureMessage(new BytesMessage()) as IBytesMessage; } public IBytesMessage CreateBytesMessage(byte[] body) { BytesMessage answer = new BytesMessage(); answer.Content = body; return ConfigureMessage(answer) as IBytesMessage; } public IStreamMessage CreateStreamMessage() { throw new NotSupportedException("No Object Message in Stomp"); } public IObjectMessage CreateObjectMessage(object body) { throw new NotSupportedException("No Object Message in Stomp"); } public void Commit() { if(!Transacted) { throw new InvalidOperationException( "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + this.AcknowledgementMode); } this.TransactionContext.Commit(); } public void Rollback() { if(!Transacted) { throw new InvalidOperationException( "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + this.AcknowledgementMode); } this.TransactionContext.Rollback(); } #endregion public void DoSend( Message message, MessageProducer producer, TimeSpan sendTimeout ) { Message msg = message; if(Transacted) { DoStartTransaction(); msg.TransactionId = TransactionContext.TransactionId; } msg.RedeliveryCounter = 0; if(this.connection.CopyMessageOnSend) { msg = (Message)msg.Clone(); } msg.OnSend(); msg.ProducerId = msg.MessageId.ProducerId; if(sendTimeout.TotalMilliseconds <= 0 && !msg.ResponseRequired && !connection.AlwaysSyncSend && (!msg.Persistent || connection.AsyncSend || msg.TransactionId != null)) { this.connection.Oneway(msg); } else { if(sendTimeout.TotalMilliseconds > 0) { this.connection.SyncRequest(msg, sendTimeout); } else { this.connection.SyncRequest(msg); } } } /// /// Ensures that a transaction is started /// public void DoStartTransaction() { if(Transacted) { this.TransactionContext.Begin(); } } public void DisposeOf(ConsumerId objectId) { connection.removeDispatcher(objectId); if(!this.closing) { consumers.Remove(objectId); } } public void DisposeOf(ProducerId objectId) { if(!this.closing) { producers.Remove(objectId); } } protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) { ConsumerInfo answer = new ConsumerInfo(); ConsumerId id = new ConsumerId(); id.ConnectionId = info.SessionId.ConnectionId; id.SessionId = info.SessionId.Value; id.Value = Interlocked.Increment(ref consumerCounter); answer.ConsumerId = id; answer.Destination = Destination.Transform(destination); answer.Selector = selector; answer.Priority = this.Priority; answer.Exclusive = this.Exclusive; answer.DispatchAsync = this.DispatchAsync; answer.Retroactive = this.Retroactive; answer.MaximumPendingMessageLimit = this.connection.PrefetchPolicy.MaximumPendingMessageLimit; answer.AckMode = this.AcknowledgementMode; if(destination is ITopic || destination is ITemporaryTopic) { answer.PrefetchSize = this.connection.PrefetchPolicy.TopicPrefetch; } else if(destination is IQueue || destination is ITemporaryQueue) { answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch; } // If the destination contained a URI query, then use it to set public properties // on the ConsumerInfo Destination amqDestination = destination as Destination; if(amqDestination != null && amqDestination.Options != null) { StringDictionary options = URISupport.GetProperties(amqDestination.Options, "consumer."); URISupport.SetProperties(answer, options); } return answer; } protected virtual ProducerInfo CreateProducerInfo(IDestination destination) { ProducerInfo answer = new ProducerInfo(); ProducerId id = new ProducerId(); id.ConnectionId = info.SessionId.ConnectionId; id.SessionId = info.SessionId.Value; id.Value = Interlocked.Increment(ref producerCounter); answer.ProducerId = id; answer.Destination = Destination.Transform(destination); // If the destination contained a URI query, then use it to set public // properties on the ProducerInfo Destination amqDestination = destination as Destination; if(amqDestination != null && amqDestination.Options != null) { StringDictionary options = URISupport.GetProperties(amqDestination.Options, "producer."); URISupport.SetProperties(answer, options); } return answer; } public void Stop() { if(this.executor != null) { this.executor.Stop(); } } public void Start() { foreach(MessageConsumer consumer in this.consumers.Values) { consumer.Start(); } if(this.executor != null) { this.executor.Start(); } } public bool Started { get { return this.executor != null ? this.executor.Running : false; } } internal void Redispatch(MessageDispatchChannel channel) { MessageDispatch[] messages = channel.RemoveAll(); System.Array.Reverse(messages); foreach(MessageDispatch message in messages) { if(Tracer.IsDebugEnabled) { Tracer.DebugFormat("Resending Message Dispatch: ", message.ToString()); } this.executor.ExecuteFirst(message); } } public void Dispatch(MessageDispatch dispatch) { if(this.executor != null) { if(Tracer.IsDebugEnabled) { Tracer.DebugFormat("Send Message Dispatch: ", dispatch.ToString()); } this.executor.Execute(dispatch); } } internal void ClearMessagesInProgress() { if( this.executor != null ) { this.executor.ClearMessagesInProgress(); } if(Transacted) { this.transactionContext.ResetTransactionInProgress(); } lock(this.consumers.SyncRoot) { foreach(MessageConsumer consumer in this.consumers.Values) { consumer.InProgressClearRequired(); ThreadPool.QueueUserWorkItem(ClearMessages, consumer); } } } private void ClearMessages(object value) { MessageConsumer consumer = value as MessageConsumer; if(Tracer.IsDebugEnabled) { Tracer.Debug("Performing Async Clear of In Progress Messages for Consumer: " + consumer.ConsumerId); } consumer.ClearMessagesInProgress(); } internal void Acknowledge() { lock(this.consumers.SyncRoot) { foreach(MessageConsumer consumer in this.consumers.Values) { consumer.Acknowledge(); } } } private Message ConfigureMessage(Message message) { message.Connection = this.connection; if(this.IsTransacted) { // Allows Acknowledge to be called in a transaction with no effect per JMS Spec. message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge); } return message; } internal void SendAck(MessageAck ack) { this.SendAck(ack, false); } internal void SendAck(MessageAck ack, bool lazy) { if(lazy || connection.SendAcksAsync || this.IsTransacted ) { this.connection.Oneway(ack); } else { this.connection.SyncRequest(ack); } } /// /// Prevents message from throwing an exception if a client calls Acknoweldge on /// a message that is part of a transaction either being produced or consumed. The /// JMS Spec indicates that users should be able to call Acknowledge with no effect /// if the message is in a transaction. /// /// /// A /// private void DoNothingAcknowledge(Message message) { } } }