/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Text;
using System.Xml;
using Apache.NMS.Util;
namespace Apache.NMS.WCF
{
///
/// Client-side implementation of the sessionless one-way channel.
///
public class NmsOutputChannel : NmsOutputChannelBase, IOutputChannel
{
#region Constructors
///
/// Initializes a new instance of the class.
///
/// The factory that created the channel.
/// The remote address of the channel.
/// The URI that contains the transport address to which messages are sent on the output channel.
/// The buffer manager.
/// The encoder factory.
/// The name of the ActiveMQ destination.
/// The type of the ActiveMQ destination (either a queue or a topic, permanent or temporary).
public NmsOutputChannel(ChannelManagerBase factory, EndpointAddress remoteAddress, Uri via, BufferManager bufferManager, MessageEncoderFactory encoderFactory, string destination, DestinationType destinationType)
: base(factory, remoteAddress, via, bufferManager, encoderFactory, destination, destinationType)
{
_connection = ConnectionFactoryManager.GetInstance().CreateConnection(via);
_connection.Start();
}
#endregion
#region Implementation of IOutputChannel
///
/// Transmits a message to the destination of the output channel.
///
/// The being sent on the output channel.
public void Send(Message message)
{
Send(message, DefaultSendTimeout);
}
///
/// Sends a message on the current output channel within a specified interval of time.
///
/// The being sent on the output channel.
/// The that specifies how long the send operation has to complete before timing out.
public void Send(Message message, TimeSpan timeout)
{
ThrowIfDisposedOrNotOpen();
RemoteAddress.ApplyTo(message);
NMS.ISession session = _connection.CreateSession();
if(!session.Transacted)
{
using(session)
{
DoSendMessageForSession(session, message);
}
}
else
{
// we are inside a transaction, so we should defer session disposing until transaction ends
session.TransactionCommittedListener += SessionOnTransactionEndHandler;
session.TransactionRolledBackListener += SessionOnTransactionEndHandler;
DoSendMessageForSession(session, message);
}
}
private void SessionOnTransactionEndHandler(ISession session)
{
session.TransactionCommittedListener -= SessionOnTransactionEndHandler;
session.TransactionRolledBackListener -= SessionOnTransactionEndHandler;
session.Dispose();
}
private void DoSendMessageForSession(ISession session, Message message)
{
IDestination destination = SessionUtil.GetDestination(session, Destination, DestinationType);
using (IMessageProducer producer = session.CreateProducer(destination))
{
producer.DeliveryMode = MsgDeliveryMode.Persistent;
ITextMessage request = session.CreateTextMessage(TranslateMessage(message));
producer.Send(request);
producer.Close();
Tracer.Info("Sending message:");
Tracer.Info(request.Text);
}
}
///
/// Translates the message using the appropriate SOAP versioning scheme.
///
/// The message to be translated.
private string TranslateMessage(Message message)
{
return (this.Encoder.MessageVersion == MessageVersion.Soap11)
? TranslateMessageAsSoap11(message)
: TranslateMessageAsSoap12(message);
}
///
/// Translates the message using the SOAP 1.1 schema.
///
/// The message to be translated.
private static string TranslateMessageAsSoap11(Message message)
{
StringBuilder sb = new StringBuilder();
XmlDictionaryWriter writer = XmlDictionaryWriter.CreateDictionaryWriter(XmlWriter.Create(sb));
message.WriteStartEnvelope(writer);
message.WriteBody(writer);
writer.Flush();
string raw = sb.ToString();
//to get past the damn utf 16 header
raw = raw.Substring(raw.LastIndexOf("?>") + 2).Trim();
//well there is no WriteEndEnvelope(writer) method:-)
return raw + "";
}
///
/// Translates the message using the SOAP 1.2 schema.
///
/// The message to be translated.
private static string TranslateMessageAsSoap12(Message message)
{
string raw = message.ToString();
raw = raw.Substring(raw.LastIndexOf("?>") + 1).Trim();
return raw;
}
///
/// Begins an asynchronous operation to transmit a message to the destination of the output channel.
///
///
/// The that references the asynchronous message transmission.
///
/// The being sent on the output channel.
/// The delegate.
/// An object, specified by the application, that contains state information associated with the asynchronous send operation.
public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state)
{
return BeginSend(message, DefaultSendTimeout, callback, state);
}
///
/// Begins an asynchronous operation to transmit a message to the destination of the output channel within a specified interval of time.
///
///
/// The that references the asynchronous send operation.
///
/// The being sent on the output channel.
/// The that specifies how long the send operation has to complete before timing out.
/// The delegate that receives the notification of the asynchronous operation send completion.
/// An object, specified by the application, that contains state information associated with the asynchronous send operation.
public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
{
ThrowIfDisposedOrNotOpen();
return new NmsAsyncResult(this, message, callback, state);
}
///
/// Completes an asynchronous operation to transmit a message to the destination of the output channel.
///
/// The returned by a call to the method.
public void EndSend(IAsyncResult result)
{
NmsAsyncResult.End(result);
}
#endregion
#region Implementation of CommunicationObject
///
/// Gets the property.
///
///
///
public override T GetProperty()
{
if(typeof(T) == typeof(IOutputChannel))
{
return (T) (object) this;
}
T messageEncoderProperty = Encoder.GetProperty();
if(messageEncoderProperty != null)
{
return messageEncoderProperty;
}
return base.GetProperty();
}
///
/// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous abort operation.
///
protected override void OnAbort()
{
OnClose(TimeSpan.Zero);
}
///
/// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
///
/// The that specifies how long the on close operation has to complete before timing out.
/// is less than zero.
protected override void OnClose(TimeSpan timeout)
{
if(_connection != null)
{
_connection.Close();
_connection.Dispose();
}
}
///
/// Completes an asynchronous operation on the close of a communication object.
///
/// The that is returned by a call to the method.
protected override void OnEndClose(IAsyncResult result)
{
CompletedAsyncResult.End(result);
}
///
/// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
///
///
/// The that references the asynchronous on close operation.
///
/// The that specifies how long the on close operation has to complete before timing out.
/// The delegate that receives notification of the completion of the asynchronous on close operation.
/// An object, specified by the application, that contains state information associated with the asynchronous on close operation.
/// is less than zero.
protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
{
OnClose(timeout);
return new CompletedAsyncResult(callback, state);
}
///
/// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
///
/// The that specifies how long the on open operation has to complete before timing out.
/// is less than zero.
/// The interval of time specified by that was allotted for the operation was exceeded before the operation was completed.
protected override void OnOpen(TimeSpan timeout)
{
}
///
/// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
///
///
/// The that references the asynchronous on open operation.
///
/// The that specifies how long the on open operation has to complete before timing out.
/// The delegate that receives notification of the completion of the asynchronous on open operation.
/// An object, specified by the application, that contains state information associated with the asynchronous on open operation.
/// is less than zero.
protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
{
return new CompletedAsyncResult(callback, state);
}
///
/// Completes an asynchronous operation on the open of a communication object.
///
/// The that is returned by a call to the method.
protected override void OnEndOpen(IAsyncResult result)
{
CompletedAsyncResult.End(result);
}
#endregion
///
/// Encodes the message.
///
/// The message.
public ArraySegment EncodeMessage(Message message)
{
try
{
return Encoder.WriteMessage(message, Int32.MaxValue, BufferManager);
}
finally
{
// The message is consumed by serialising it, so clean up here.
message.Close();
}
}
#region Private members
private readonly IConnection _connection;
#endregion
}
}