/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.ServiceModel; using System.ServiceModel.Channels; using System.Text; using Apache.NMS.Util; namespace Apache.NMS.WCF { /// /// Server-side listener for sessionless input channels. /// public class NmsInputChannelListener : ChannelListenerBase { #region Constructors /// /// Initializes a new instance of the class. /// /// The binding element. /// The context. internal NmsInputChannelListener(NmsTransportBindingElement transportElement, BindingContext context) : base(context.Binding) { _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, (int) transportElement.MaxReceivedMessageSize); MessageEncodingBindingElement messageEncoderBindingElement = context.BindingParameters.Remove(); _messageEncoderFactory = (messageEncoderBindingElement != null) ? messageEncoderBindingElement.CreateMessageEncoderFactory() : NmsConstants.DefaultMessageEncoderFactory; _channelQueue = new InputQueue(); _currentChannelLock = new object(); _destinationName = transportElement.Destination; _destinationType = transportElement.DestinationType; _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress); Tracer.DebugFormat("Listening to {0} at {1}/{2}", _destinationType, _uri, _destinationName); } #endregion #region Public properties /// /// Gets the message encoder factory. /// /// The message encoder factory. public MessageEncoderFactory MessageEncoderFactory { get { return _messageEncoderFactory; } } /// /// Gets or sets the destination. /// /// The destination. public string Destination { get { return _destinationName; } set { _destinationName = value; } } /// /// Gets or sets the type of the destination. /// /// The type of the destination. public DestinationType DestinationType { get { return _destinationType; } set { _destinationType = value; } } #endregion #region Implementation of CommunicationObject /// /// Inserts processing on a communication object after it transitions to the closing state /// due to the invocation of a synchronous abort operation. /// /// /// Abort can be called at any time, so we can't assume that we've been Opened successfully /// (and thus may not have any listen sockets). /// protected override void OnAbort() { OnClose(TimeSpan.Zero); } /// /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation. /// /// The that specifies how long the on close operation has to complete before timing out. /// is less than zero. protected override void OnClose(TimeSpan timeout) { lock(ThisLock) { if(_consumer != null) { Tracer.Debug("Listener is terminating consumer..."); _consumer.Close(); _consumer.Dispose(); Tracer.Debug("Listener has terminated consumer"); } if(_session != null) { Tracer.Debug("Listener is terminating session..."); _session.Close(); Tracer.Debug("Listener has terminated session"); } if(_connection != null) { Tracer.Debug("Listener is terminating connection..."); _connection.Stop(); _connection.Close(); _connection.Dispose(); Tracer.Debug("Listener has terminated connection"); } _channelQueue.Close(); } } /// /// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation. /// /// /// The that references the asynchronous on close operation. /// /// The that specifies how long the on close operation has to complete before timing out. /// The delegate that receives notification of the completion of the asynchronous on close operation. /// An object, specified by the application, that contains state information associated with the asynchronous on close operation. /// is less than zero. protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { OnClose(timeout); return new CompletedAsyncResult(callback, state); } /// /// Completes an asynchronous operation on the close of a communication object. /// /// The that is returned by a call to the method. protected override void OnEndClose(IAsyncResult result) { CompletedAsyncResult.End(result); } /// /// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time. /// /// The that specifies how long the on open operation has to complete before timing out. /// is less than zero. /// The interval of time specified by that was allotted for the operation was exceeded before the operation was completed. protected override void OnOpen(TimeSpan timeout) { if(Uri == null) { throw new InvalidOperationException("Uri must be set before ChannelListener is opened."); } NmsChannelHelper.ValidateTimeout(timeout); } /// /// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation. /// /// /// The that references the asynchronous on open operation. /// /// The that specifies how long the on open operation has to complete before timing out. /// The delegate that receives notification of the completion of the asynchronous on open operation. /// An object, specified by the application, that contains state information associated with the asynchronous on open operation. /// is less than zero. protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { NmsChannelHelper.ValidateTimeout(timeout); OnOpen(timeout); return new CompletedAsyncResult(callback, state); } /// /// Completes an asynchronous operation on the open of a communication object. /// /// The that is returned by a call to the method. protected override void OnEndOpen(IAsyncResult result) { CompletedAsyncResult.End(result); } #endregion #region Implementation of ChannelListenerBase /// /// When implemented in derived class, gets the URI on which the channel listener listens for an incoming channel. /// /// /// The on which the channel listener listens for incoming channels. /// public override Uri Uri { get { return _uri; } } /// /// When overridden in a derived class, provides a point of extensibility when waiting for a channel to arrive. /// /// /// true if the method completed before the interval of time specified by the expired; otherwise false. /// /// The that specifies how long the on wait for a channel operation has to complete before timing out. protected override bool OnWaitForChannel(TimeSpan timeout) { NmsChannelHelper.ValidateTimeout(timeout); return _channelQueue.WaitForItem(timeout); } /// /// When implemented in a derived class, provides a point of extensibility when starting to wait for a channel to arrive. /// /// /// The that references the asynchronous on begin wait operation. /// /// The that specifies how long the on begin wait operation has to complete before timing out. /// The delegate that receives the notification of the asynchronous operation on begin wait completion. /// An object, specified by the application, that contains state information associated with the asynchronous on begin wait operation. protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state) { NmsChannelHelper.ValidateTimeout(timeout); return _channelQueue.BeginWaitForItem(timeout, callback, state); } /// /// When implemented in a derived class, provides a point of extensibility when ending the waiting for a channel to arrive. /// /// /// true if the method completed before the timeout expired; otherwise false. /// /// The returned by a call to the method. protected override bool OnEndWaitForChannel(IAsyncResult result) { return _channelQueue.EndWaitForItem(result); } /// /// When implemented in a derived class, provides an extensibility point when accepting a channel. /// /// /// The accepted. /// /// The that specifies how long the accept channel operation has to complete before timing out. protected override IInputChannel OnAcceptChannel(TimeSpan timeout) { Tracer.Debug("Accepting channel"); NmsChannelHelper.ValidateTimeout(timeout); if(!IsDisposed) { EnsureChannelAvailable(); } IInputChannel channel; if(_channelQueue.Dequeue(timeout, out channel)) { return channel; } throw new TimeoutException(String.Format("Accept on listener at address {0} timed out after {1}.", Uri.AbsoluteUri, timeout)); } /// /// When implemented in a derived class, provides an asynchronous extensibility point when beginning to accept a channel. /// /// /// The that references the asynchronous accept channel operation. /// /// The that specifies how long the accept channel operation has to complete before timing out. /// The delegate that receives the notification of the asynchronous completion of the accept channel operation. /// An object, specified by the application, that contains state information associated with the asynchronous accept channel operation. protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state) { NmsChannelHelper.ValidateTimeout(timeout); if(!IsDisposed) { EnsureChannelAvailable(); } return _channelQueue.BeginDequeue(timeout, callback, state); } /// /// When implemented in a derived class, provides an asynchronous extensibility point when completing the acceptance a channel. /// /// /// The accepted by the listener. /// /// The returned by a call to the method. protected override IInputChannel OnEndAcceptChannel(IAsyncResult result) { IInputChannel channel; if(_channelQueue.EndDequeue(result, out channel)) { return channel; } throw new TimeoutException(); } #endregion /// /// Dispatches the callback. /// /// The state. internal void DispatchCallback(object state) { Dispatch((Message) state); } /// /// Matches an incoming message to its waiting listener, /// using the FilterTable to dispatch the message to the correct /// listener. If no listener is waiting for the message, it is silently /// discarded. /// internal void Dispatch(Message message) { if(message == null) { return; } try { NmsInputChannel newChannel; bool channelCreated = CreateOrRetrieveChannel(out newChannel); Tracer.Debug("Dispatching incoming message"); newChannel.Dispatch(message); if(channelCreated) { //Hand the channel off to whomever is waiting for AcceptChannel() to complete Tracer.Debug("Handing off channel"); _channelQueue.EnqueueAndDispatch(newChannel); } } catch(Exception e) { Tracer.ErrorFormat("Error dispatching Message: {0}", e.ToString()); } } /// /// Creates or retrieves the channel. /// /// The channel. private bool CreateOrRetrieveChannel(out NmsInputChannel newChannel) { bool channelCreated = false; if((newChannel = _currentChannel) == null) { lock(_currentChannelLock) { if((newChannel = _currentChannel) == null) { newChannel = CreateNmsChannel(Uri); newChannel.Closed += OnChannelClosed; _currentChannel = newChannel; channelCreated = true; } } } return channelCreated; } /// /// Called when the channel is closed. /// /// The sender. /// The instance containing the event data. private void OnChannelClosed(object sender, EventArgs args) { NmsInputChannel channel = (NmsInputChannel) sender; lock(_currentChannelLock) { if(channel == _currentChannel) { _currentChannel = null; } } } /// /// Creates the that will wait for inbound messages. /// /// The URI for the message queue. private NmsInputChannel CreateNmsChannel(Uri uri) { _connection = OpenConnection(uri); _session = OpenSession(_connection); _destination = SessionUtil.GetDestination(_session, Destination, DestinationType); _consumer = CreateConsumer(_session, _destination); EndpointAddress address = new EndpointAddress(uri); return new NmsInputChannel(this, address); } /// /// Opens the connection to the message broker. /// /// The URI. /// An active connection to the ActiveMQ message broker specified by the URI; /// exceptions will be caught by the attached ExceptionListener. private IConnection OpenConnection(Uri uri) { IConnection connection = ConnectionFactoryManager.GetInstance().CreateConnection(uri); connection.ExceptionListener += OnExceptionThrown; connection.Start(); Tracer.Debug("Connection open"); return connection; } /// /// Opens a session to communicate with a message queue. /// /// The connection to the ActiveMQ message broker. /// A session. /// the has not yet /// been started. private ISession OpenSession(IConnection connection) { if(!connection.IsStarted) { throw new InvalidOperationException("The connection has not yet been opened"); } Tracer.Debug("Opening session..."); ISession session = connection.CreateSession(); Tracer.Debug("Session open"); return session; } /// /// Creates the consumer of messages received on the . /// /// The session. /// The destination. /// A consumer for any messages received during the session; /// messages will be consumed by the attached Listener. private IMessageConsumer CreateConsumer(ISession session, IDestination destination) { Tracer.Debug("Creating message listener..."); IMessageConsumer consumer = session.CreateConsumer(destination); consumer.Listener += OnReceiveMessage; Tracer.Debug("Created message listener"); return consumer; } /// /// Event handler that processes a received message. /// /// The message. private void OnReceiveMessage(IMessage message) { Tracer.Debug("Decoding message"); string soapMsg = ((ITextMessage) message).Text; byte[] buffer = Encoding.ASCII.GetBytes(soapMsg); int dataLength = buffer.Length; byte[] data1 = _bufferManager.TakeBuffer(dataLength); Array.Copy(buffer, data1, dataLength); ArraySegment data = new ArraySegment(data1, 0, dataLength); byte[] msgContents = new byte[data.Count]; Array.Copy(data.Array, data.Offset, msgContents, 0, msgContents.Length); Message msg = _messageEncoderFactory.Encoder.ReadMessage(data, _bufferManager); Tracer.Debug(msg); Dispatch(msg); } /// /// Called when an exception is thrown by the ActiveMQ listener. /// /// /// s will be caught and logged; all other exceptions will /// be thrown back up to the WCF service. /// /// The exception that was thrown. private void OnExceptionThrown(Exception exception) { if(exception is NMSException) { Tracer.ErrorFormat("{0} thrown : {1}\n{2}", exception.GetType().Name, exception.Message, exception.StackTrace); return; } // TODO: can we recover from the exception? Do we convert to WCF exceptions? throw exception; } /// /// Guarantees that a channel is attached to this listener. /// private void EnsureChannelAvailable() { NmsInputChannel newChannel; if(CreateOrRetrieveChannel(out newChannel)) { _channelQueue.EnqueueAndDispatch(newChannel); } } #region Private members private readonly Uri _uri; private IConnection _connection; private ISession _session; private IDestination _destination; private IMessageConsumer _consumer; private readonly InputQueue _channelQueue; private NmsInputChannel _currentChannel; private readonly object _currentChannelLock; private readonly MessageEncoderFactory _messageEncoderFactory; private readonly BufferManager _bufferManager; private string _destinationName; private DestinationType _destinationType; #endregion } }