/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.ServiceModel; using System.ServiceModel.Channels; namespace Apache.NMS.WCF { /// /// Base class for NMS input channels. /// /// public abstract class NmsInputQueueChannelBase : ChannelBase where T : class { #region Constructors /// /// Initializes a new instance of the class. /// /// The factory that was used to create the channel. /// The local address of the channel. public NmsInputQueueChannelBase(ChannelListenerBase factory, EndpointAddress localAddress) : base(factory) { _localAddress = localAddress; _messageQueue = new InputQueue(); } #endregion #region Public properties /// /// Gets the local address. /// /// The local address. public EndpointAddress LocalAddress { get { return _localAddress; } } #endregion #region Messaging /// /// Gets the pending message count. /// /// The pending message count. public int PendingMessageCount { get { return _messageQueue.PendingCount; } } /// /// Dispatches the specified request. /// /// The request. public void Dispatch(T request) { ThrowIfDisposedOrNotOpen(); _messageQueue.EnqueueAndDispatch(request); } /// /// Begins the dequeue operation. /// /// The timeout. /// The callback. /// The state. public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state) { return (State == CommunicationState.Opened) ? _messageQueue.BeginDequeue(timeout, callback, state) : new CompletedAsyncResult(callback, state); } /// /// Ends the dequeue operation. /// /// The result. /// public T EndDequeue(IAsyncResult result) { ThrowIfDisposedOrNotOpen(); return _messageQueue.EndDequeue(result); } /// /// Dequeues the next message. /// /// The timeout. public T Dequeue(TimeSpan timeout) { ThrowIfDisposedOrNotOpen(); return _messageQueue.Dequeue(timeout); } /// /// Tries to dequeue the next message. /// /// The result. /// The message. /// public bool TryDequeue(IAsyncResult result, out T message) { message = null; TypedAsyncResult completedResult = result as TypedAsyncResult; if(completedResult != null) { message = TypedAsyncResult.End(result); } else if(result.CompletedSynchronously == false) { InputQueue.AsyncQueueReader completedResult2 = result as InputQueue.AsyncQueueReader; InputQueue.AsyncQueueReader.End(result, out message); } return result.IsCompleted; } #endregion #region Abort /// /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous abort operation. /// protected override void OnAbort() { _messageQueue.Close(); } #endregion #region Open /// /// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation. /// /// The that specifies how long the on open operation has to complete before timing out. /// The delegate that receives notification of the completion of the asynchronous on open operation. /// An object, specified by the application, that contains state information associated with the asynchronous on open operation. /// /// The that references the asynchronous on open operation. /// /// /// is less than zero. protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { OnOpen(timeout); return new CompletedAsyncResult(callback, state); } /// /// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time. /// /// The that specifies how long the on open operation has to complete before timing out. /// /// is less than zero. /// The interval of time specified by that was allotted for the operation was exceeded before the operation was completed. protected override void OnOpen(TimeSpan timeout) { _messageQueue.Open(); } /// /// Completes an asynchronous operation on the open of a communication object. /// /// The that is returned by a call to the method. /// The interval of time specified by that was allotted for the operation was exceeded before the operation was completed. protected override void OnEndOpen(IAsyncResult result) { CompletedAsyncResult.End(result); } #endregion #region Close /// /// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation. /// /// The that specifies how long the on close operation has to complete before timing out. /// The delegate that receives notification of the completion of the asynchronous on close operation. /// An object, specified by the application, that contains state information associated with the asynchronous on close operation. /// /// The that references the asynchronous on close operation. /// /// /// is less than zero. protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { OnClose(timeout); return new CompletedAsyncResult(callback, state); } /// /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation. /// /// The that specifies how long the on close operation has to complete before timing out. /// /// is less than zero. protected override void OnClose(TimeSpan timeout) { _messageQueue.Close(); } /// /// Completes an asynchronous operation on the close of a communication object. /// /// The that is returned by a call to the method. /// The interval of time specified by that was allotted for the operation was exceeded before the operation was completed. protected override void OnEndClose(IAsyncResult result) { CompletedAsyncResult.End(result); } #endregion #region GetProperty /// /// Gets the property. /// /// public override P GetProperty

() { if(typeof(P) == typeof(FaultConverter)) { return FaultConverter.GetDefaultFaultConverter(MessageVersion.Soap12WSAddressing10) as P; } return base.GetProperty

(); } #endregion #region Private members private EndpointAddress _localAddress; private InputQueue _messageQueue; #endregion } }