/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections; using System.Collections.Specialized; using System.Threading; using Apache.NMS.Stomp.Commands; using Apache.NMS.Stomp.Threads; using Apache.NMS.Stomp.Transport; using Apache.NMS.Stomp.Util; using Apache.NMS.Util; namespace Apache.NMS.Stomp { /// /// Represents a connection with a message broker /// public class Connection : IConnection { private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private readonly Uri brokerUri; private ITransport transport; private readonly ConnectionInfo info; private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; private bool asyncSend = false; private bool alwaysSyncSend = false; private bool copyMessageOnSend = true; private bool sendAcksAsync = false; private bool dispatchAsync = true; private string transformation = null; private IRedeliveryPolicy redeliveryPolicy; private PrefetchPolicy prefetchPolicy = new PrefetchPolicy(); private bool userSpecifiedClientID; private TimeSpan requestTimeout; private readonly IList sessions = ArrayList.Synchronized(new ArrayList()); private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable()); private readonly object myLock = new object(); private readonly Atomic connected = new Atomic(false); private readonly Atomic closed = new Atomic(false); private readonly Atomic closing = new Atomic(false); private readonly Atomic transportFailed = new Atomic(false); private Exception firstFailureError = null; private int sessionCounter = 0; private int temporaryDestinationCounter = 0; private int localTransactionCounter; private readonly Atomic started = new Atomic(false); private ConnectionMetaData metaData = null; private bool disposed = false; private readonly IdGenerator clientIdGenerator; private CountDownLatch transportInterruptionProcessingComplete; private readonly MessageTransformation messageTransformation; private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor(); public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator) { this.brokerUri = connectionUri; this.clientIdGenerator = clientIdGenerator; this.transport = transport; this.transport.Command = new CommandHandler(OnCommand); this.transport.Exception = new ExceptionHandler(OnTransportException); this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted); this.transport.Resumed = new ResumedHandler(OnTransportResumed); ConnectionId id = new ConnectionId(); id.Value = CONNECTION_ID_GENERATOR.GenerateId(); this.info = new ConnectionInfo(); this.info.ConnectionId = id; this.info.Host = brokerUri.Host; this.messageTransformation = new StompMessageTransformation(this); } ~Connection() { Dispose(false); } /// /// A delegate that can receive transport level exceptions. /// public event ExceptionListener ExceptionListener; /// /// An asynchronous listener that is notified when a Fault tolerant connection /// has been interrupted. /// public event ConnectionInterruptedListener ConnectionInterruptedListener; /// /// An asynchronous listener that is notified when a Fault tolerant connection /// has been resumed. /// public event ConnectionResumedListener ConnectionResumedListener; #region Properties public String UserName { get { return this.info.UserName; } set { this.info.UserName = value; } } public String Password { get { return this.info.Password; } set { this.info.Password = value; } } /// /// This property indicates whether or not async send is enabled. /// public bool AsyncSend { get { return asyncSend; } set { asyncSend = value; } } /// /// This property sets the acknowledgment mode for the connection. /// The URI parameter connection.ackmode can be set to a string value /// that maps to the enumeration value. /// public string AckMode { set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); } } /// /// This property forces all messages that are sent to be sent synchronously overriding /// any usage of the AsyncSend flag. This can reduce performance in some cases since the /// only messages we normally send synchronously are Persistent messages not sent in a /// transaction. This options guarantees that no send will return until the broker has /// acknowledge receipt of the message /// public bool AlwaysSyncSend { get { return alwaysSyncSend; } set { alwaysSyncSend = value; } } /// /// This property indicates whether Message's should be copied before being sent via /// one of the Connection's send methods. Copying the Message object allows the user /// to resuse the Object over for another send. If the message isn't copied performance /// can improve but the user must not reuse the Object as it may not have been sent /// before they reset its payload. /// public bool CopyMessageOnSend { get { return copyMessageOnSend; } set { copyMessageOnSend = value; } } /// /// This property indicates whether or not async sends are used for /// message acknowledgement messages. Sending Acks async can improve /// performance but may decrease reliability. /// public bool SendAcksAsync { get { return sendAcksAsync; } set { sendAcksAsync = value; } } /// /// synchronously or asynchronously by the broker. Set to false for a slow /// consumer and true for a fast consumer. /// public bool DispatchAsync { get { return this.dispatchAsync; } set { this.dispatchAsync = value; } } /// /// Sets the default Transformation attribute applied to Consumers. If a consumer /// is to receive Map messages from the Broker then the user should set the "jms-map-xml" /// transformation on the consumer so that all MapMessages are sent as XML. /// public string Transformation { get { return this.transformation; } set { this.transformation = value; } } public IConnectionMetaData MetaData { get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); } } public Uri BrokerUri { get { return brokerUri; } } public ITransport ITransport { get { return transport; } set { this.transport = value; } } public bool TransportFailed { get { return this.transportFailed.Value; } } public Exception FirstFailureError { get { return this.firstFailureError; } } public TimeSpan RequestTimeout { get { return this.requestTimeout; } set { this.requestTimeout = value; } } public AcknowledgementMode AcknowledgementMode { get { return acknowledgementMode; } set { this.acknowledgementMode = value; } } public string ClientId { get { return info.ClientId; } set { if(this.connected.Value) { throw new NMSException("You cannot change the ClientId once the Connection is connected"); } this.info.ClientId = value; this.userSpecifiedClientID = true; CheckConnected(); } } /// /// The Default Client Id used if the ClientId property is not set explicity. /// public string DefaultClientId { set { this.info.ClientId = value; this.userSpecifiedClientID = true; } } public ConnectionId ConnectionId { get { return info.ConnectionId; } } /// /// Get/or set the redelivery policy for this connection. /// public IRedeliveryPolicy RedeliveryPolicy { get { return this.redeliveryPolicy; } set { this.redeliveryPolicy = value; } } public PrefetchPolicy PrefetchPolicy { get { return this.prefetchPolicy; } set { this.prefetchPolicy = value; } } internal MessageTransformation MessageTransformation { get { return this.messageTransformation; } } private ConsumerTransformerDelegate consumerTransformer; public ConsumerTransformerDelegate ConsumerTransformer { get { return this.consumerTransformer; } set { this.consumerTransformer = value; } } private ProducerTransformerDelegate producerTransformer; public ProducerTransformerDelegate ProducerTransformer { get { return this.producerTransformer; } set { this.producerTransformer = value; } } #endregion /// /// Starts asynchronous message delivery of incoming messages for this connection. /// Synchronous delivery is unaffected. /// public void Start() { CheckConnected(); if(started.CompareAndSet(false, true)) { lock(sessions.SyncRoot) { foreach(Session session in sessions) { session.Start(); } } } } /// /// This property determines if the asynchronous message delivery of incoming /// messages has been started for this connection. /// public bool IsStarted { get { return started.Value; } } /// /// Temporarily stop asynchronous delivery of inbound messages for this connection. /// The sending of outbound messages is unaffected. /// public void Stop() { CheckConnected(); if(started.CompareAndSet(true, false)) { lock(sessions.SyncRoot) { foreach(Session session in sessions) { session.Stop(); } } } } /// /// Creates a new session to work on this connection /// public ISession CreateSession() { return CreateSession(acknowledgementMode); } /// /// Creates a new session to work on this connection /// public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode) { SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode); Session session = new Session(this, info, sessionAcknowledgementMode, this.dispatchAsync); // Set properties on session using parameters prefixed with "session." if(!String.IsNullOrEmpty(brokerUri.Query) && !brokerUri.OriginalString.EndsWith(")")) { // Since the Uri class will return the end of a Query string found in a Composite // URI we must ensure that we trim that off before we proceed. string query = brokerUri.Query.Substring(brokerUri.Query.LastIndexOf(")") + 1); StringDictionary options = URISupport.ParseQuery(query); options = URISupport.GetProperties(options, "session."); URISupport.SetProperties(session, options); } session.ConsumerTransformer = this.ConsumerTransformer; session.ProducerTransformer = this.ProducerTransformer; if(IsStarted) { session.Start(); } sessions.Add(session); return session; } internal void RemoveSession(Session session) { if(!this.closing.Value) { sessions.Remove(session); } } internal void addDispatcher(ConsumerId id, IDispatcher dispatcher) { this.dispatchers.Add(id, dispatcher); } internal void removeDispatcher(ConsumerId id) { this.dispatchers.Remove(id); } public void Close() { lock(myLock) { if(this.closed.Value) { return; } try { Tracer.Info("Closing Connection."); this.closing.Value = true; lock(sessions.SyncRoot) { foreach(Session session in sessions) { session.DoClose(); } } sessions.Clear(); if(connected.Value) { ShutdownInfo shutdowninfo = new ShutdownInfo(); transport.Oneway(shutdowninfo); } Tracer.Info("Disposing of the Transport."); transport.Dispose(); } catch(Exception ex) { Tracer.ErrorFormat("Error during connection close: {0}", ex); } finally { this.transport = null; this.closed.Value = true; this.connected.Value = false; this.closing.Value = false; } } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected void Dispose(bool disposing) { if(disposed) { return; } if(disposing) { // Dispose managed code here. } try { // For now we do not distinguish between Dispose() and Close(). // In theory Dispose should possibly be lighter-weight and perform a (faster) // disorderly close. Close(); } catch { // Ignore network errors. } disposed = true; } // Implementation methods /// /// Performs a synchronous request-response with the broker /// /// public Response SyncRequest(Command command) { try { return SyncRequest(command, this.RequestTimeout); } catch(Exception ex) { throw NMSExceptionSupport.Create(ex); } } public Response SyncRequest(Command command, TimeSpan requestTimeout) { CheckConnected(); try { Response response = transport.Request(command, requestTimeout); if(response is ExceptionResponse) { ExceptionResponse exceptionResponse = (ExceptionResponse) response; BrokerError brokerError = exceptionResponse.Exception; throw new BrokerException(brokerError); } return response; } catch(Exception ex) { throw NMSExceptionSupport.Create(ex); } } public void Oneway(Command command) { CheckConnected(); try { transport.Oneway(command); } catch(Exception ex) { throw NMSExceptionSupport.Create(ex); } } protected void CheckConnected() { if(closed.Value) { throw new ConnectionClosedException(); } if(!connected.Value) { if(!this.userSpecifiedClientID) { this.info.ClientId = this.clientIdGenerator.GenerateId(); } connected.Value = true; // now lets send the connection and see if we get an ack/nak if(null == SyncRequest(info)) { closed.Value = true; connected.Value = false; throw new ConnectionClosedException(); } } } /// /// Handle incoming commands /// /// An ITransport /// A Command protected void OnCommand(ITransport commandTransport, Command command) { if(command.IsMessageDispatch) { // We wait if the Connection is still processing interruption // code to reset the MessageConsumers. WaitForTransportInterruptionProcessingToComplete(); DispatchMessage((MessageDispatch) command); } else if(command.IsWireFormatInfo) { // Ignore for now, might need to save if off later. } else if(command.IsKeepAliveInfo) { // Ignore only the InactivityMonitor cares about this one. } else if(command.IsErrorCommand) { if(!closing.Value && !closed.Value) { ConnectionError connectionError = (ConnectionError) command; BrokerError brokerError = connectionError.Exception; string message = "Broker connection error."; string cause = ""; if(null != brokerError) { message = brokerError.Message; if(null != brokerError.Cause) { cause = brokerError.Cause.Message; } } OnException(new NMSConnectionException(message, cause)); } } else { Tracer.Error("Unknown command: " + command); } } protected void DispatchMessage(MessageDispatch dispatch) { lock(dispatchers.SyncRoot) { if(dispatchers.Contains(dispatch.ConsumerId)) { IDispatcher dispatcher = (IDispatcher) dispatchers[dispatch.ConsumerId]; // Can be null when a consumer has sent a MessagePull and there was // no available message at the broker to dispatch. if(dispatch.Message != null) { dispatch.Message.ReadOnlyBody = true; dispatch.Message.ReadOnlyProperties = true; dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter; } dispatcher.Dispatch(dispatch); return; } } Tracer.Error("No such consumer active: " + dispatch.ConsumerId); } protected void OnTransportException(ITransport sender, Exception exception) { this.OnException(exception); } internal void OnAsyncException(Exception error) { if(!this.closed.Value && !this.closing.Value) { if(this.ExceptionListener != null) { if(!(error is NMSException)) { error = NMSExceptionSupport.Create(error); } NMSException e = (NMSException)error; // Called in another thread so that processing can continue // here, ensures no lock contention. executor.QueueUserWorkItem(AsyncCallExceptionListener, e); } else { Tracer.Debug("Async exception with no exception listener: " + error); } } } private void AsyncCallExceptionListener(object error) { NMSException exception = error as NMSException; this.ExceptionListener(exception); } internal void OnException(Exception error) { // Will fire an exception listener callback if there's any set. OnAsyncException(error); if(!this.closing.Value && !this.closed.Value) { // Perform the actual work in another thread to avoid lock contention // and allow the caller to continue on in its error cleanup. executor.QueueUserWorkItem(AsyncOnExceptionHandler, error); } } private void AsyncOnExceptionHandler(object error) { Exception cause = error as Exception; MarkTransportFailed(cause); try { this.transport.Dispose(); } catch(Exception ex) { Tracer.Debug("Caught Exception While disposing of Transport: " + ex); } IList sessionsCopy = null; lock(this.sessions.SyncRoot) { sessionsCopy = new ArrayList(this.sessions); } // Use a copy so we don't concurrently modify the Sessions list if the // client is closing at the same time. foreach(Session session in sessionsCopy) { try { session.Dispose(); } catch(Exception ex) { Tracer.Debug("Caught Exception While disposing of Sessions: " + ex); } } } protected void OnTransportInterrupted(ITransport sender) { Tracer.Debug("Transport has been Interrupted."); this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.Count); if(Tracer.IsDebugEnabled) { Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count); } foreach(Session session in this.sessions) { session.ClearMessagesInProgress(); } if(this.ConnectionInterruptedListener != null && !this.closing.Value) { try { this.ConnectionInterruptedListener(); } catch { } } } protected void OnTransportResumed(ITransport sender) { Tracer.Debug("Transport has resumed normal operation."); if(this.ConnectionResumedListener != null && !this.closing.Value) { try { this.ConnectionResumedListener(); } catch { } } } internal void OnSessionException(Session sender, Exception exception) { if(ExceptionListener != null) { try { ExceptionListener(exception); } catch { sender.Close(); } } } private void MarkTransportFailed(Exception error) { this.transportFailed.Value = true; if(this.firstFailureError == null) { this.firstFailureError = error; } } /// /// Creates a new temporary destination name /// public String CreateTemporaryDestinationName() { return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter); } /// /// Creates a new local transaction ID /// public TransactionId CreateLocalTransactionId() { TransactionId id = new TransactionId(); id.ConnectionId = ConnectionId; id.Value = Interlocked.Increment(ref localTransactionCounter); return id; } protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode) { SessionInfo answer = new SessionInfo(); SessionId sessionId = new SessionId(); sessionId.ConnectionId = info.ConnectionId.Value; sessionId.Value = Interlocked.Increment(ref sessionCounter); answer.SessionId = sessionId; return answer; } private void WaitForTransportInterruptionProcessingToComplete() { CountDownLatch cdl = this.transportInterruptionProcessingComplete; if(cdl != null) { if(!closed.Value && cdl.Remaining > 0) { Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption " + "processing (" + cdl.Remaining + ") to complete.."); cdl.await(TimeSpan.FromSeconds(10)); } } } internal void TransportInterruptionProcessingComplete() { CountDownLatch cdl = this.transportInterruptionProcessingComplete; if(cdl != null) { cdl.countDown(); } } } }