/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections; using System.Collections.Generic; using System.Collections.Specialized; using System.Threading; using Apache.NMS.Util; using Apache.NMS.ActiveMQ.Commands; using Apache.NMS.ActiveMQ.Util; using Apache.NMS.ActiveMQ.Threads; namespace Apache.NMS.ActiveMQ { /// /// Default provider of ISession /// public class Session : ISession, IDispatcher { /// /// Private object used for synchronization, instead of public "this" /// private readonly object myLock = new object(); private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable()); private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable()); private readonly SessionExecutor executor; private readonly TransactionContext transactionContext; private readonly Connection connection; private bool dispatchAsync; private bool exclusive; private bool retroactive; private byte priority = 0; private readonly SessionInfo info; private int consumerCounter; private int producerCounter; private long nextDeliveryId; private long lastDeliveredSequenceId; protected bool disposed = false; protected bool closed = false; protected bool closing = false; protected Atomic clearInProgress = new Atomic(); private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000); private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite); private TimeSpan requestTimeout; private readonly AcknowledgementMode acknowledgementMode; public Session(Connection connection, SessionId sessionId, AcknowledgementMode acknowledgementMode) { this.info = new SessionInfo(); this.info.SessionId = sessionId; this.connection = connection; this.connection.Oneway(this.info); this.acknowledgementMode = acknowledgementMode; this.requestTimeout = connection.RequestTimeout; this.dispatchAsync = connection.DispatchAsync; this.transactionContext = CreateTransactionContext(); this.exclusive = connection.ExclusiveConsumer; this.retroactive = connection.UseRetroactiveConsumer; Uri brokerUri = connection.BrokerUri; // Set propertieDs on session using parameters prefixed with "session." if(!String.IsNullOrEmpty(brokerUri.Query) && !brokerUri.OriginalString.EndsWith(")")) { string query = brokerUri.Query.Substring(brokerUri.Query.LastIndexOf(")") + 1); StringDictionary options = URISupport.ParseQuery(query); options = URISupport.GetProperties(options, "session."); URISupport.SetProperties(this, options); } this.ConsumerTransformer = connection.ConsumerTransformer; this.ProducerTransformer = connection.ProducerTransformer; this.executor = new SessionExecutor(this, this.consumers); if(connection.IsStarted) { this.Start(); } connection.AddSession(this); } ~Session() { Dispose(false); } #region Property Accessors #region Session Transaction Events // We delegate the events to the TransactionContext since it knows // what the state is for both Local and DTC transactions. public event SessionTxEventDelegate TransactionStartedListener { add { this.transactionContext.TransactionStartedListener += value; } remove { this.transactionContext.TransactionStartedListener += value; } } public event SessionTxEventDelegate TransactionCommittedListener { add { this.transactionContext.TransactionCommittedListener += value; } remove { this.transactionContext.TransactionCommittedListener += value; } } public event SessionTxEventDelegate TransactionRolledBackListener { add { this.transactionContext.TransactionRolledBackListener += value; } remove { this.transactionContext.TransactionRolledBackListener += value; } } #endregion /// /// Sets the maximum number of messages to keep around per consumer /// in addition to the prefetch window for non-durable topics until messages /// will start to be evicted for slow consumers. /// Must be > 0 to enable this feature /// public int MaximumPendingMessageLimit { set{ this.connection.PrefetchPolicy.MaximumPendingMessageLimit = value; } } /// /// Enables or disables whether asynchronous dispatch should be used by the broker /// public bool DispatchAsync { get{ return this.dispatchAsync; } set{ this.dispatchAsync = value; } } /// /// Enables or disables exclusive consumers when using queues. An exclusive consumer means /// only one instance of a consumer is allowed to process messages on a queue to preserve order /// public bool Exclusive { get{ return this.exclusive; } set{ this.exclusive = value; } } /// /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not? /// public bool Retroactive { get{ return this.retroactive; } set{ this.retroactive = value; } } /// /// Sets the default consumer priority for consumers /// public byte Priority { get{ return this.priority; } set{ this.priority = value; } } public Connection Connection { get { return this.connection; } } public SessionId SessionId { get { return info.SessionId; } } public TransactionContext TransactionContext { get { return this.transactionContext; } } public TimeSpan RequestTimeout { get { return this.requestTimeout; } set { this.requestTimeout = value; } } public bool Transacted { get { return this.IsTransacted; } } public virtual AcknowledgementMode AcknowledgementMode { get { return this.acknowledgementMode; } } public virtual bool IsClientAcknowledge { get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; } } public virtual bool IsAutoAcknowledge { get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; } } public virtual bool IsDupsOkAcknowledge { get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; } } public virtual bool IsIndividualAcknowledge { get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge; } } public virtual bool IsTransacted { get{ return this.acknowledgementMode == AcknowledgementMode.Transactional; } } public SessionExecutor Executor { get { return this.executor; } } public long NextDeliveryId { get { return Interlocked.Increment(ref this.nextDeliveryId); } } public long DisposeStopTimeout { get { return (long) this.disposeStopTimeout.TotalMilliseconds; } set { this.disposeStopTimeout = TimeSpan.FromMilliseconds(value); } } public long CloseStopTimeout { get { return (long) this.closeStopTimeout.TotalMilliseconds; } set { this.closeStopTimeout = TimeSpan.FromMilliseconds(value); } } private ConsumerTransformerDelegate consumerTransformer; /// /// A Delegate that is called each time a Message is dispatched to allow the client to do /// any necessary transformations on the received message before it is delivered. /// The Session instance sets the delegate on each Consumer it creates. /// public ConsumerTransformerDelegate ConsumerTransformer { get { return this.consumerTransformer; } set { this.consumerTransformer = value; } } private ProducerTransformerDelegate producerTransformer; /// /// A delegate that is called each time a Message is sent from this Producer which allows /// the application to perform any needed transformations on the Message before it is sent. /// The Session instance sets the delegate on each Producer it creates. /// public ProducerTransformerDelegate ProducerTransformer { get { return this.producerTransformer; } set { this.producerTransformer = value; } } internal Scheduler Scheduler { get { return this.connection.Scheduler; } } internal List Consumers { get { List copy = new List(); lock(consumers.SyncRoot) { foreach(MessageConsumer consumer in consumers.Values) { copy.Add(consumer); } } return copy; } } internal long LastDeliveredSequenceId { get { return this.lastDeliveredSequenceId; } } #endregion #region ISession Members public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected void Dispose(bool disposing) { if(this.disposed) { return; } try { // Force a Stop when we are Disposing vs a Normal Close. this.executor.Stop(this.disposeStopTimeout); Close(); } catch { // Ignore network errors. } this.disposed = true; } public virtual void Close() { if (!this.closed) { try { Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId); DoClose(); Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId); } catch (Exception ex) { Tracer.ErrorFormat("Error during session close: {0}", ex); } } } internal void DoClose() { Shutdown(); RemoveInfo removeInfo = new RemoveInfo(); removeInfo.ObjectId = this.info.SessionId; removeInfo.LastDeliveredSequenceId = this.lastDeliveredSequenceId; this.connection.Oneway(removeInfo); } internal void Shutdown() { Tracer.InfoFormat("Executing Shutdown on Session with Id {0}", this.info.SessionId); if(this.closed) { return; } lock(myLock) { if(this.closed || this.closing) { return; } try { this.closing = true; // Stop all message deliveries from this Session this.executor.Stop(this.closeStopTimeout); lock(consumers.SyncRoot) { foreach(MessageConsumer consumer in consumers.Values) { consumer.FailureError = this.connection.FirstFailureError; consumer.Shutdown(); this.lastDeliveredSequenceId = Math.Max(this.lastDeliveredSequenceId, consumer.LastDeliveredSequenceId); } } consumers.Clear(); lock(producers.SyncRoot) { foreach(MessageProducer producer in producers.Values) { producer.Shutdown(); } } producers.Clear(); // If in a local transaction we just roll back at this point. if (this.IsTransacted && this.transactionContext.InLocalTransaction) { try { this.transactionContext.Rollback(); } catch { } } Connection.RemoveSession(this); } catch(Exception ex) { Tracer.ErrorFormat("Error during session close: {0}", ex); } finally { this.closed = true; this.closing = false; } } } public IMessageProducer CreateProducer() { return CreateProducer(null); } public IMessageProducer CreateProducer(IDestination destination) { MessageProducer producer = null; try { ActiveMQDestination dest = null; if(destination != null) { dest = ActiveMQDestination.Transform(destination); } producer = DoCreateMessageProducer(GetNextProducerId(), dest); producer.ProducerTransformer = this.ProducerTransformer; this.AddProducer(producer); this.Connection.SyncRequest(producer.ProducerInfo); } catch(Exception) { if(producer != null) { this.RemoveProducer(producer.ProducerId); } throw; } return producer; } internal virtual MessageProducer DoCreateMessageProducer(ProducerId id, ActiveMQDestination destination) { return new MessageProducer(this, id, destination, this.RequestTimeout); } public IMessageConsumer CreateConsumer(IDestination destination) { return CreateConsumer(destination, null, false); } public IMessageConsumer CreateConsumer(IDestination destination, string selector) { return CreateConsumer(destination, selector, false); } public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal) { if(destination == null) { throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); } ActiveMQDestination dest = ActiveMQDestination.Transform(destination); int prefetchSize = this.Connection.PrefetchPolicy.DurableTopicPrefetch; if(dest.IsTopic) { prefetchSize = this.connection.PrefetchPolicy.TopicPrefetch; } else if(dest.IsQueue) { prefetchSize = this.connection.PrefetchPolicy.QueuePrefetch; } MessageConsumer consumer = null; try { consumer = DoCreateMessageConsumer(GetNextConsumerId(), dest, null, selector, prefetchSize, this.connection.PrefetchPolicy.MaximumPendingMessageLimit, noLocal); consumer.ConsumerTransformer = this.ConsumerTransformer; this.AddConsumer(consumer); this.Connection.SyncRequest(consumer.ConsumerInfo); if(this.Connection.IsStarted) { consumer.Start(); } } catch(Exception) { if(consumer != null) { this.RemoveConsumer(consumer); } throw; } return consumer; } public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) { if(destination == null) { throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); } if (IsIndividualAcknowledge) { throw new NMSException("Cannot create a durable consumer for a session that is using " + "Individual Acknowledgement mode."); } ActiveMQDestination dest = ActiveMQDestination.Transform(destination); MessageConsumer consumer = null; try { consumer = DoCreateMessageConsumer(GetNextConsumerId(), dest, name, selector, this.connection.PrefetchPolicy.DurableTopicPrefetch, this.connection.PrefetchPolicy.MaximumPendingMessageLimit, noLocal); consumer.ConsumerTransformer = this.ConsumerTransformer; this.AddConsumer(consumer); this.Connection.SyncRequest(consumer.ConsumerInfo); if(this.Connection.IsStarted) { consumer.Start(); } } catch(Exception) { if(consumer != null) { this.RemoveConsumer(consumer); } throw; } return consumer; } internal virtual MessageConsumer DoCreateMessageConsumer( ConsumerId id, ActiveMQDestination destination, string name, string selector, int prefetch, int maxPending, bool noLocal) { return new MessageConsumer(this, id, destination, name, selector, prefetch, maxPending, noLocal, false, this.DispatchAsync); } public void DeleteDurableConsumer(string name) { RemoveSubscriptionInfo command = new RemoveSubscriptionInfo(); command.ConnectionId = Connection.ConnectionId; command.ClientId = Connection.ClientId; command.SubcriptionName = name; this.connection.SyncRequest(command); } public IQueueBrowser CreateBrowser(IQueue queue) { return this.CreateBrowser(queue, null); } public IQueueBrowser CreateBrowser(IQueue queue, string selector) { if(queue == null) { throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); } ActiveMQDestination dest = ActiveMQDestination.Transform(queue); QueueBrowser browser = null; try { browser = new QueueBrowser(this, GetNextConsumerId(), dest, selector, this.DispatchAsync); } catch(Exception) { if(browser != null) { browser.Close(); } throw; } return browser; } public IQueue GetQueue(string name) { return new ActiveMQQueue(name); } public ITopic GetTopic(string name) { return new ActiveMQTopic(name); } public ITemporaryQueue CreateTemporaryQueue() { return (ITemporaryQueue)this.connection.CreateTemporaryDestination(false); } public ITemporaryTopic CreateTemporaryTopic() { return (ITemporaryTopic)this.connection.CreateTemporaryDestination(true); } /// /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic). /// public void DeleteDestination(IDestination destination) { this.connection.DeleteDestination(destination); } public IMessage CreateMessage() { ActiveMQMessage answer = new ActiveMQMessage(); return ConfigureMessage(answer) as IMessage; } public ITextMessage CreateTextMessage() { ActiveMQTextMessage answer = new ActiveMQTextMessage(); return ConfigureMessage(answer) as ITextMessage; } public ITextMessage CreateTextMessage(string text) { ActiveMQTextMessage answer = new ActiveMQTextMessage(text); return ConfigureMessage(answer) as ITextMessage; } public IMapMessage CreateMapMessage() { return ConfigureMessage(new ActiveMQMapMessage()) as IMapMessage; } public IBytesMessage CreateBytesMessage() { return ConfigureMessage(new ActiveMQBytesMessage()) as IBytesMessage; } public IBytesMessage CreateBytesMessage(byte[] body) { ActiveMQBytesMessage answer = new ActiveMQBytesMessage(); answer.Content = body; return ConfigureMessage(answer) as IBytesMessage; } public IStreamMessage CreateStreamMessage() { return ConfigureMessage(new ActiveMQStreamMessage()) as IStreamMessage; } public IObjectMessage CreateObjectMessage(object body) { ActiveMQObjectMessage answer = new ActiveMQObjectMessage(); answer.Body = body; return ConfigureMessage(answer) as IObjectMessage; } public void Commit() { this.DoCommit(); } public void Rollback() { this.DoRollback(); } public void Recover() { CheckClosed(); if (IsTransacted) { throw new IllegalStateException("Cannot Recover a Transacted Session"); } lock(this.consumers.SyncRoot) { foreach(MessageConsumer consumer in this.consumers.Values) { consumer.Rollback(); } } } #endregion internal void DoSend(ActiveMQDestination destination, ActiveMQMessage message, MessageProducer producer, MemoryUsage producerWindow, TimeSpan sendTimeout) { ActiveMQMessage msg = message; if(destination.IsTemporary && !connection.IsTempDestinationActive(destination as ActiveMQTempDestination)) { throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); } if(IsTransacted) { DoStartTransaction(); msg.TransactionId = TransactionContext.TransactionId; } msg.RedeliveryCounter = 0; msg.BrokerPath = null; if(this.connection.CopyMessageOnSend) { msg = (ActiveMQMessage)msg.Clone(); } msg.OnSend(); msg.ProducerId = msg.MessageId.ProducerId; if(sendTimeout.TotalMilliseconds <= 0 && !msg.ResponseRequired && !connection.AlwaysSyncSend && (!msg.Persistent || connection.AsyncSend || msg.TransactionId != null)) { this.connection.Oneway(msg); if(producerWindow != null) { // Since we defer lots of the marshaling till we hit the wire, this // might not provide and accurate size. We may change over to doing // more aggressive marshaling, to get more accurate sizes.. this is more // important once users start using producer window flow control. producerWindow.IncreaseUsage(msg.Size()); } } else { if(sendTimeout.TotalMilliseconds > 0) { this.connection.SyncRequest(msg, sendTimeout); } else { this.connection.SyncRequest(msg); } } } internal virtual void DoCommit() { if(!IsTransacted) { throw new InvalidOperationException( "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + this.AcknowledgementMode); } this.TransactionContext.Commit(); } internal virtual void DoRollback() { if(!IsTransacted) { throw new InvalidOperationException( "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + this.AcknowledgementMode); } this.TransactionContext.Rollback(); } /// /// Ensures that a transaction is started /// internal virtual void DoStartTransaction() { if(IsTransacted && !TransactionContext.InTransaction) { this.TransactionContext.Begin(); } } public void AddConsumer(MessageConsumer consumer) { if(!this.closing) { ConsumerId id = consumer.ConsumerId; // Registered with Connection before we register at the broker. consumers[id] = consumer; connection.AddDispatcher(id, this); } } public void RemoveConsumer(MessageConsumer consumer) { connection.RemoveDispatcher(consumer.ConsumerId); if(!this.closing) { consumers.Remove(consumer.ConsumerId); } connection.RemoveDispatcher(consumer); } public void AddProducer(MessageProducer producer) { if(!this.closing) { ProducerId id = producer.ProducerId; this.producers[id] = producer; this.connection.AddProducer(id, producer); } } public void RemoveProducer(ProducerId objectId) { connection.RemoveProducer(objectId); if(!this.closing) { producers.Remove(objectId); } } public ConsumerId GetNextConsumerId() { ConsumerId id = new ConsumerId(); id.ConnectionId = info.SessionId.ConnectionId; id.SessionId = info.SessionId.Value; id.Value = Interlocked.Increment(ref consumerCounter); return id; } public ProducerId GetNextProducerId() { ProducerId id = new ProducerId(); id.ConnectionId = info.SessionId.ConnectionId; id.SessionId = info.SessionId.Value; id.Value = Interlocked.Increment(ref producerCounter); return id; } public void Stop() { if(this.executor != null) { this.executor.Stop(); } } public void Start() { lock(this.consumers.SyncRoot) { foreach(MessageConsumer consumer in this.consumers.Values) { consumer.Start(); } } if(this.executor != null) { this.executor.Start(); } } public bool Started { get { return this.executor != null ? this.executor.Running : false; } } internal void Redispatch(IDispatcher dispatcher, MessageDispatchChannel channel) { MessageDispatch[] messages = channel.RemoveAll(); foreach (MessageDispatch dispatch in messages) { this.connection.RollbackDuplicate(dispatcher, dispatch.Message); } System.Array.Reverse(messages); foreach(MessageDispatch message in messages) { this.executor.ExecuteFirst(message); } } public void Dispatch(MessageDispatch dispatch) { if(this.executor != null) { this.executor.Execute(dispatch); } } internal void ClearMessagesInProgress(ref long transportInterruptionProcessingComplete) { if(this.executor != null) { this.executor.ClearMessagesInProgress(); } if (this.consumers.Count == 0) { return; } // Because we are called from inside the Transport Reconnection logic // we spawn the Consumer clear to another Thread so that we can avoid // any lock contention that might exist between the consumer and the // connection that is reconnecting. Use the Connection Scheduler so // that the clear calls are done one at a time to avoid further // contention on the Connection and Session resources. if (clearInProgress.CompareAndSet(false, true)) { lock(this.consumers.SyncRoot) { foreach(MessageConsumer consumer in this.consumers.Values) { consumer.InProgressClearRequired(); Interlocked.Increment(ref transportInterruptionProcessingComplete); Scheduler.ExecuteAfterDelay(ClearMessages, consumer, 0); } } // Clear after all consumer have had their ClearMessagesInProgress method called. Scheduler.ExecuteAfterDelay(ResetClearInProgressFlag, clearInProgress, 0); } } private static void ClearMessages(object value) { MessageConsumer consumer = value as MessageConsumer; if(Tracer.IsDebugEnabled) { Tracer.Debug("Performing Async Clear of In Progress Messages on Consumer: " + consumer.ConsumerId); } consumer.ClearMessagesInProgress(); } private static void ResetClearInProgressFlag(object value) { Atomic clearInProgress = value as Atomic; clearInProgress.Value = false; } internal void Acknowledge() { lock(this.consumers.SyncRoot) { foreach(MessageConsumer consumer in this.consumers.Values) { consumer.Acknowledge(); } } } private ActiveMQMessage ConfigureMessage(ActiveMQMessage message) { message.Connection = this.connection; if(this.IsTransacted) { // Allows Acknowledge to be called in a transaction with no effect per JMS Spec. message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge); } return message; } internal void SendAck(MessageAck ack) { this.SendAck(ack, false); } internal void SendAck(MessageAck ack, bool lazy) { if(Tracer.IsDebugEnabled) { Tracer.Debug("Session sending Ack: " + ack); } if(lazy || connection.SendAcksAsync || this.IsTransacted ) { this.connection.Oneway(ack); } else { this.connection.SyncRequest(ack); } } protected virtual TransactionContext CreateTransactionContext() { return new TransactionContext(this); } private void CheckClosed() { if (closed) { throw new IllegalStateException("Session is Closed"); } } /// /// Prevents message from throwing an exception if a client calls Acknoweldge on /// a message that is part of a transaction either being produced or consumed. The /// JMS Spec indicates that users should be able to call Acknowledge with no effect /// if the message is in a transaction. /// /// /// A /// private static void DoNothingAcknowledge(ActiveMQMessage message) { } class SessionCloseSynchronization : ISynchronization { private readonly Session session; public SessionCloseSynchronization(Session session) { this.session = session; } public void BeforeEnd() { } public void AfterCommit() { Tracer.Debug("SessionCloseSynchronization AfterCommit called for Session: " + session.SessionId); session.DoClose(); } public void AfterRollback() { Tracer.Debug("SessionCloseSynchronization AfterRollback called for Session: " + session.SessionId); session.DoClose(); } } internal bool IsInUse(ActiveMQTempDestination dest) { lock(this.consumers.SyncRoot) { foreach(MessageConsumer consumer in this.consumers.Values) { if(consumer.IsInUse(dest)) { return true; } } } return false; } } }