/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Text;
using Apache.NMS.Util;
namespace Apache.NMS.WCF
{
///
/// Server-side listener for sessioned input channels.
///
public class NmsInputSessionChannelListener : ChannelListenerBase, IChannel
{
#region Constructors
///
/// Initializes a new instance of the class.
///
/// The binding element.
/// The context.
internal NmsInputSessionChannelListener(NmsTransportBindingElement transportElement, BindingContext context)
: base(context.Binding)
{
_bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, (int) transportElement.MaxReceivedMessageSize);
MessageEncodingBindingElement messageEncoderBindingElement = context.BindingParameters.Remove();
_messageEncoderFactory = (messageEncoderBindingElement != null)
? messageEncoderBindingElement.CreateMessageEncoderFactory()
: NmsConstants.DefaultMessageEncoderFactory;
_channelQueue = new InputQueue();
_currentChannelLock = new object();
_destinationName = transportElement.Destination;
_destinationType = transportElement.DestinationType;
_uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
Tracer.DebugFormat("Listening to {0} at {1}/{2}", _destinationType, _uri, _destinationName);
}
#endregion
#region Public properties
///
/// Gets the message encoder factory.
///
/// The message encoder factory.
public MessageEncoderFactory MessageEncoderFactory
{
get { return _messageEncoderFactory; }
}
///
/// Gets or sets the destination.
///
/// The destination.
public string Destination
{
get { return _destinationName; }
set { _destinationName = value; }
}
///
/// Gets or sets the type of the destination.
///
/// The type of the destination.
public DestinationType DestinationType
{
get { return _destinationType; }
set { _destinationType = value; }
}
#endregion
#region Implementation of CommunicationObject
///
/// Inserts processing on a communication object after it transitions to the closing state
/// due to the invocation of a synchronous abort operation.
///
///
/// Abort can be called at any time, so we can't assume that we've been Opened successfully
/// (and thus may not have any listen sockets).
///
protected override void OnAbort()
{
OnClose(TimeSpan.Zero);
}
///
/// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
///
/// The that specifies how long the on close operation has to complete before timing out.
/// is less than zero.
protected override void OnClose(TimeSpan timeout)
{
lock(ThisLock)
{
if(_consumer != null)
{
Tracer.Debug("Listener is terminating consumer...");
_consumer.Close();
_consumer.Dispose();
Tracer.Debug("Listener has terminated consumer");
}
if(_session != null)
{
Tracer.Debug("Listener is terminating session...");
_session.Close();
Tracer.Debug("Listener has terminated session");
}
if(_connection != null)
{
Tracer.Debug("Listener is terminating connection...");
_connection.Stop();
_connection.Close();
_connection.Dispose();
Tracer.Debug("Listener has terminated connection");
}
_channelQueue.Close();
}
}
///
/// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
///
///
/// The that references the asynchronous on close operation.
///
/// The that specifies how long the on close operation has to complete before timing out.
/// The delegate that receives notification of the completion of the asynchronous on close operation.
/// An object, specified by the application, that contains state information associated with the asynchronous on close operation.
/// is less than zero.
protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
{
OnClose(timeout);
return new CompletedAsyncResult(callback, state);
}
///
/// Completes an asynchronous operation on the close of a communication object.
///
/// The that is returned by a call to the method.
protected override void OnEndClose(IAsyncResult result)
{
CompletedAsyncResult.End(result);
}
///
/// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
///
/// The that specifies how long the on open operation has to complete before timing out.
/// is less than zero.
/// The interval of time specified by that was allotted for the operation was exceeded before the operation was completed.
protected override void OnOpen(TimeSpan timeout)
{
if(Uri == null)
{
throw new InvalidOperationException("Uri must be set before ChannelListener is opened.");
}
NmsChannelHelper.ValidateTimeout(timeout);
}
///
/// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
///
///
/// The that references the asynchronous on open operation.
///
/// The that specifies how long the on open operation has to complete before timing out.
/// The delegate that receives notification of the completion of the asynchronous on open operation.
/// An object, specified by the application, that contains state information associated with the asynchronous on open operation.
/// is less than zero.
protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
{
NmsChannelHelper.ValidateTimeout(timeout);
OnOpen(timeout);
return new CompletedAsyncResult(callback, state);
}
///
/// Completes an asynchronous operation on the open of a communication object.
///
/// The that is returned by a call to the method.
protected override void OnEndOpen(IAsyncResult result)
{
CompletedAsyncResult.End(result);
}
#endregion
#region Implementation of ChannelListenerBase
///
/// When implemented in derived class, gets the URI on which the channel listener listens for an incoming channel.
///
///
/// The on which the channel listener listens for incoming channels.
///
public override Uri Uri
{
get { return _uri; }
}
///
/// When overridden in a derived class, provides a point of extensibility when waiting for a channel to arrive.
///
///
/// true if the method completed before the interval of time specified by the expired; otherwise false.
///
/// The that specifies how long the on wait for a channel operation has to complete before timing out.
protected override bool OnWaitForChannel(TimeSpan timeout)
{
NmsChannelHelper.ValidateTimeout(timeout);
return _channelQueue.WaitForItem(timeout);
}
///
/// When implemented in a derived class, provides a point of extensibility when starting to wait for a channel to arrive.
///
///
/// The that references the asynchronous on begin wait operation.
///
/// The that specifies how long the on begin wait operation has to complete before timing out.
/// The delegate that receives the notification of the asynchronous operation on begin wait completion.
/// An object, specified by the application, that contains state information associated with the asynchronous on begin wait operation.
protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
{
NmsChannelHelper.ValidateTimeout(timeout);
return _channelQueue.BeginWaitForItem(timeout, callback, state);
}
///
/// When implemented in a derived class, provides a point of extensibility when ending the waiting for a channel to arrive.
///
///
/// true if the method completed before the timeout expired; otherwise false.
///
/// The returned by a call to the method.
protected override bool OnEndWaitForChannel(IAsyncResult result)
{
return _channelQueue.EndWaitForItem(result);
}
///
/// When implemented in a derived class, provides an extensibility point when accepting a channel.
///
///
/// The accepted.
///
/// The that specifies how long the accept channel operation has to complete before timing out.
protected override IInputSessionChannel OnAcceptChannel(TimeSpan timeout)
{
Tracer.Debug("Accepting channel");
NmsChannelHelper.ValidateTimeout(timeout);
if(!IsDisposed)
{
EnsureChannelAvailable();
}
IInputSessionChannel channel;
if(_channelQueue.Dequeue(timeout, out channel))
{
return channel;
}
throw new TimeoutException(String.Format("Accept on listener at address {0} timed out after {1}.", Uri.AbsoluteUri, timeout));
}
///
/// When implemented in a derived class, provides an asynchronous extensibility point when beginning to accept a channel.
///
///
/// The that references the asynchronous accept channel operation.
///
/// The that specifies how long the accept channel operation has to complete before timing out.
/// The delegate that receives the notification of the asynchronous completion of the accept channel operation.
/// An object, specified by the application, that contains state information associated with the asynchronous accept channel operation.
protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
{
NmsChannelHelper.ValidateTimeout(timeout);
if(!IsDisposed)
{
EnsureChannelAvailable();
}
return _channelQueue.BeginDequeue(timeout, callback, state);
}
///
/// When implemented in a derived class, provides an asynchronous extensibility point when completing the acceptance a channel.
///
///
/// The accepted by the listener.
///
/// The returned by a call to the method.
protected override IInputSessionChannel OnEndAcceptChannel(IAsyncResult result)
{
IInputSessionChannel channel;
if(_channelQueue.EndDequeue(result, out channel))
{
return channel;
}
throw new TimeoutException();
}
#endregion
///
/// Dispatches the callback.
///
/// The state.
internal void DispatchCallback(object state)
{
Dispatch((Message) state);
}
///
/// Matches an incoming message to its waiting listener,
/// using the FilterTable to dispatch the message to the correct
/// listener. If no listener is waiting for the message, it is silently
/// discarded.
///
internal void Dispatch(Message message)
{
if(message == null)
{
return;
}
try
{
NmsInputSessionChannel newChannel;
bool channelCreated = CreateOrRetrieveChannel(out newChannel);
Tracer.Debug("Dispatching incoming message");
newChannel.Dispatch(message);
if(channelCreated)
{
//Hand the channel off to whomever is waiting for AcceptChannel() to complete
Tracer.Debug("Handing off channel");
_channelQueue.EnqueueAndDispatch(newChannel);
}
}
catch(Exception e)
{
Tracer.ErrorFormat("Error dispatching Message: {0}", e.ToString());
}
}
///
/// Creates or retrieves the channel.
///
/// The channel.
private bool CreateOrRetrieveChannel(out NmsInputSessionChannel newChannel)
{
bool channelCreated = false;
if((newChannel = _currentChannel) == null)
{
lock(_currentChannelLock)
{
if((newChannel = _currentChannel) == null)
{
newChannel = CreateNmsChannel(Uri);
newChannel.Closed += OnChannelClosed;
_currentChannel = newChannel;
channelCreated = true;
}
}
}
return channelCreated;
}
///
/// Called when the channel is closed.
///
/// The sender.
/// The instance containing the event data.
private void OnChannelClosed(object sender, EventArgs args)
{
NmsInputSessionChannel channel = (NmsInputSessionChannel) sender;
lock(_currentChannelLock)
{
if(channel == _currentChannel)
{
_currentChannel = null;
}
}
}
///
/// Creates the that will wait for inbound messages.
///
/// The URI for the message queue.
private NmsInputSessionChannel CreateNmsChannel(Uri uri)
{
_connection = OpenConnection(uri);
_session = OpenSession(_connection);
_destination = SessionUtil.GetDestination(_session, Destination, DestinationType);
_consumer = CreateConsumer(_session, _destination);
EndpointAddress address = new EndpointAddress(uri);
return new NmsInputSessionChannel(this, address);
}
///
/// Opens the connection to the message broker.
///
/// The URI.
/// An active connection to the ActiveMQ message broker specified by the URI;
/// exceptions will be caught by the attached ExceptionListener.
private IConnection OpenConnection(Uri uri)
{
IConnection connection = ConnectionFactoryManager.GetInstance().CreateConnection(uri);
connection.ExceptionListener += OnExceptionThrown;
connection.Start();
Tracer.Debug("Connection open");
return connection;
}
///
/// Opens a session to communicate with a message queue.
///
/// The connection to the ActiveMQ message broker.
/// A session.
/// the has not yet
/// been started.
private ISession OpenSession(IConnection connection)
{
if(!connection.IsStarted)
{
throw new InvalidOperationException("The connection has not yet been opened");
}
Tracer.Debug("Opening session...");
ISession session = connection.CreateSession();
Tracer.Debug("Session open");
return session;
}
///
/// Creates the consumer of messages received on the .
///
/// The session.
/// The destination.
/// A consumer for any messages received during the session;
/// messages will be consumed by the attached Listener.
private IMessageConsumer CreateConsumer(ISession session, IDestination destination)
{
Tracer.Debug("Creating message listener...");
IMessageConsumer consumer = session.CreateConsumer(destination);
consumer.Listener += OnReceiveMessage;
Tracer.Debug("Created message listener");
return consumer;
}
///
/// Event handler that processes a received message.
///
/// The message.
private void OnReceiveMessage(IMessage message)
{
Tracer.Debug("Decoding message");
string soapMsg = ((ITextMessage) message).Text;
byte[] buffer = Encoding.ASCII.GetBytes(soapMsg);
int dataLength = buffer.Length;
byte[] data1 = _bufferManager.TakeBuffer(dataLength);
Array.Copy(buffer, data1, dataLength);
ArraySegment data = new ArraySegment(data1, 0, dataLength);
byte[] msgContents = new byte[data.Count];
Array.Copy(data.Array, data.Offset, msgContents, 0, msgContents.Length);
Message msg = _messageEncoderFactory.Encoder.ReadMessage(data, _bufferManager);
Tracer.Debug(msg);
Dispatch(msg);
}
///
/// Called when an exception is thrown by the ActiveMQ listener.
///
///
/// s will be caught and logged; all other exceptions will
/// be thrown back up to the WCF service.
///
/// The exception that was thrown.
private void OnExceptionThrown(Exception exception)
{
if(exception is NMSException)
{
Tracer.ErrorFormat("{0} thrown : {1}\n{2}",
exception.GetType().Name,
exception.Message,
exception.StackTrace);
return;
}
// TODO: can we recover from the exception? Do we convert to WCF exceptions?
throw exception;
}
///
/// Guarantees that a channel is attached to this listener.
///
private void EnsureChannelAvailable()
{
NmsInputSessionChannel newChannel;
if(CreateOrRetrieveChannel(out newChannel))
{
_channelQueue.EnqueueAndDispatch(newChannel);
}
}
#region Private members
private readonly Uri _uri;
private IConnection _connection;
private ISession _session;
private IDestination _destination;
private IMessageConsumer _consumer;
private readonly InputQueue _channelQueue;
private NmsInputSessionChannel _currentChannel;
private readonly object _currentChannelLock;
private readonly MessageEncoderFactory _messageEncoderFactory;
private readonly BufferManager _bufferManager;
private string _destinationName;
private DestinationType _destinationType;
#endregion
}
}