/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.ServiceModel; using System.ServiceModel.Channels; using System.Text; using System.Xml; using Apache.NMS.Util; namespace Apache.NMS.WCF { /// /// Client-side implementation of the sessionless one-way channel. /// public class NmsOutputChannel : NmsOutputChannelBase, IOutputChannel { #region Constructors /// /// Initializes a new instance of the class. /// /// The factory that created the channel. /// The remote address of the channel. /// The URI that contains the transport address to which messages are sent on the output channel. /// The buffer manager. /// The encoder factory. /// The name of the ActiveMQ destination. /// The type of the ActiveMQ destination (either a queue or a topic, permanent or temporary). public NmsOutputChannel(ChannelManagerBase factory, EndpointAddress remoteAddress, Uri via, BufferManager bufferManager, MessageEncoderFactory encoderFactory, string destination, DestinationType destinationType) : base(factory, remoteAddress, via, bufferManager, encoderFactory, destination, destinationType) { _connection = ConnectionFactoryManager.GetInstance().CreateConnection(via); _connection.Start(); } #endregion #region Implementation of IOutputChannel /// /// Transmits a message to the destination of the output channel. /// /// The being sent on the output channel. public void Send(Message message) { Send(message, DefaultSendTimeout); } /// /// Sends a message on the current output channel within a specified interval of time. /// /// The being sent on the output channel. /// The that specifies how long the send operation has to complete before timing out. public void Send(Message message, TimeSpan timeout) { ThrowIfDisposedOrNotOpen(); RemoteAddress.ApplyTo(message); NMS.ISession session = _connection.CreateSession(); if(!session.Transacted) { using(session) { DoSendMessageForSession(session, message); } } else { // we are inside a transaction, so we should defer session disposing until transaction ends session.TransactionCommittedListener += SessionOnTransactionEndHandler; session.TransactionRolledBackListener += SessionOnTransactionEndHandler; DoSendMessageForSession(session, message); } } private void SessionOnTransactionEndHandler(ISession session) { session.TransactionCommittedListener -= SessionOnTransactionEndHandler; session.TransactionRolledBackListener -= SessionOnTransactionEndHandler; session.Dispose(); } private void DoSendMessageForSession(ISession session, Message message) { IDestination destination = SessionUtil.GetDestination(session, Destination, DestinationType); using (IMessageProducer producer = session.CreateProducer(destination)) { producer.DeliveryMode = MsgDeliveryMode.Persistent; ITextMessage request = session.CreateTextMessage(TranslateMessage(message)); producer.Send(request); producer.Close(); Tracer.Info("Sending message:"); Tracer.Info(request.Text); } } /// /// Translates the message using the appropriate SOAP versioning scheme. /// /// The message to be translated. private string TranslateMessage(Message message) { return (this.Encoder.MessageVersion == MessageVersion.Soap11) ? TranslateMessageAsSoap11(message) : TranslateMessageAsSoap12(message); } /// /// Translates the message using the SOAP 1.1 schema. /// /// The message to be translated. private static string TranslateMessageAsSoap11(Message message) { StringBuilder sb = new StringBuilder(); XmlDictionaryWriter writer = XmlDictionaryWriter.CreateDictionaryWriter(XmlWriter.Create(sb)); message.WriteStartEnvelope(writer); message.WriteBody(writer); writer.Flush(); string raw = sb.ToString(); //to get past the damn utf 16 header raw = raw.Substring(raw.LastIndexOf("?>") + 2).Trim(); //well there is no WriteEndEnvelope(writer) method:-) return raw + ""; } /// /// Translates the message using the SOAP 1.2 schema. /// /// The message to be translated. private static string TranslateMessageAsSoap12(Message message) { string raw = message.ToString(); raw = raw.Substring(raw.LastIndexOf("?>") + 1).Trim(); return raw; } /// /// Begins an asynchronous operation to transmit a message to the destination of the output channel. /// /// /// The that references the asynchronous message transmission. /// /// The being sent on the output channel. /// The delegate. /// An object, specified by the application, that contains state information associated with the asynchronous send operation. public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state) { return BeginSend(message, DefaultSendTimeout, callback, state); } /// /// Begins an asynchronous operation to transmit a message to the destination of the output channel within a specified interval of time. /// /// /// The that references the asynchronous send operation. /// /// The being sent on the output channel. /// The that specifies how long the send operation has to complete before timing out. /// The delegate that receives the notification of the asynchronous operation send completion. /// An object, specified by the application, that contains state information associated with the asynchronous send operation. public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) { ThrowIfDisposedOrNotOpen(); return new NmsAsyncResult(this, message, callback, state); } /// /// Completes an asynchronous operation to transmit a message to the destination of the output channel. /// /// The returned by a call to the method. public void EndSend(IAsyncResult result) { NmsAsyncResult.End(result); } #endregion #region Implementation of CommunicationObject /// /// Gets the property. /// /// /// public override T GetProperty() { if(typeof(T) == typeof(IOutputChannel)) { return (T) (object) this; } T messageEncoderProperty = Encoder.GetProperty(); if(messageEncoderProperty != null) { return messageEncoderProperty; } return base.GetProperty(); } /// /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous abort operation. /// protected override void OnAbort() { OnClose(TimeSpan.Zero); } /// /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation. /// /// The that specifies how long the on close operation has to complete before timing out. /// is less than zero. protected override void OnClose(TimeSpan timeout) { if(_connection != null) { _connection.Close(); _connection.Dispose(); } } /// /// Completes an asynchronous operation on the close of a communication object. /// /// The that is returned by a call to the method. protected override void OnEndClose(IAsyncResult result) { CompletedAsyncResult.End(result); } /// /// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation. /// /// /// The that references the asynchronous on close operation. /// /// The that specifies how long the on close operation has to complete before timing out. /// The delegate that receives notification of the completion of the asynchronous on close operation. /// An object, specified by the application, that contains state information associated with the asynchronous on close operation. /// is less than zero. protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { OnClose(timeout); return new CompletedAsyncResult(callback, state); } /// /// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time. /// /// The that specifies how long the on open operation has to complete before timing out. /// is less than zero. /// The interval of time specified by that was allotted for the operation was exceeded before the operation was completed. protected override void OnOpen(TimeSpan timeout) { } /// /// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation. /// /// /// The that references the asynchronous on open operation. /// /// The that specifies how long the on open operation has to complete before timing out. /// The delegate that receives notification of the completion of the asynchronous on open operation. /// An object, specified by the application, that contains state information associated with the asynchronous on open operation. /// is less than zero. protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { return new CompletedAsyncResult(callback, state); } /// /// Completes an asynchronous operation on the open of a communication object. /// /// The that is returned by a call to the method. protected override void OnEndOpen(IAsyncResult result) { CompletedAsyncResult.End(result); } #endregion /// /// Encodes the message. /// /// The message. public ArraySegment EncodeMessage(Message message) { try { return Encoder.WriteMessage(message, Int32.MaxValue, BufferManager); } finally { // The message is consumed by serialising it, so clean up here. message.Close(); } } #region Private members private readonly IConnection _connection; #endregion } }