/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Diagnostics; using System.ServiceModel; using System.Threading; namespace Apache.NMS.WCF { // ItemDequeuedCallback is called as an item is dequeued from the InputQueue. The // InputQueue lock is not held during the callback. However, the user code will // not be notified of the item being available until the callback returns. If you // are not sure if the callback will block for a long time, then first call // IOThreadScheduler.ScheduleCallback to get to a "safe" thread. delegate void ItemDequeuedCallback(); /// /// Handles asynchronous interactions between producers and consumers. /// Producers can dispatch available data to the input queue, /// where it will be dispatched to a waiting consumer or stored until a /// consumer becomes available. Consumers can synchronously or asynchronously /// request data from the queue, which will be returned when data becomes /// available. /// /// The concrete type of the consumer objects that are waiting for data. class InputQueue : IDisposable where T : class { //Stores items that are waiting to be consumed. ItemQueue itemQueue; //Each IQueueReader represents some consumer that is waiting for //items to appear in the queue. The readerQueue stores them //in an ordered list so consumers get serviced in a FIFO manner. Queue readerQueue; //Each IQueueWaiter represents some waiter that is waiting for //items to appear in the queue. When any item appears, all //waiters are signalled. List waiterList; static WaitCallback onInvokeDequeuedCallback; static WaitCallback onDispatchCallback; static WaitCallback completeOutstandingReadersCallback; static WaitCallback completeWaitersFalseCallback; static WaitCallback completeWaitersTrueCallback; //Represents the current state of the InputQueue //as it transitions through its lifecycle. QueueState queueState; enum QueueState { Open, Shutdown, Closed } public InputQueue() { this.itemQueue = new ItemQueue(); this.readerQueue = new Queue(); this.waiterList = new List(); this.queueState = QueueState.Open; } public int PendingCount { get { lock(ThisLock) { return itemQueue.ItemCount; } } } // added by Roman public int NumberOfReaders { get { lock(ThisLock) { return readerQueue.Count; } } } public void Open() { lock(ThisLock) { if(queueState == QueueState.Open) { return; } if(queueState == QueueState.Closed) { throw new ObjectDisposedException(this.GetType().ToString()); } } } object ThisLock { get { return itemQueue; } } public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state) { Item item = default(Item); lock(ThisLock) { if(queueState == QueueState.Open) { if(itemQueue.HasAvailableItem) { item = itemQueue.DequeueAvailableItem(); } else { AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state); readerQueue.Enqueue(reader); return reader; } } else if(queueState == QueueState.Shutdown) { if(itemQueue.HasAvailableItem) { item = itemQueue.DequeueAvailableItem(); } else if(itemQueue.HasAnyItem) { AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state); readerQueue.Enqueue(reader); return reader; } } } InvokeDequeuedCallback(item.DequeuedCallback); return new TypedCompletedAsyncResult(item.GetValue(), callback, state); } public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state) { lock(ThisLock) { if(queueState == QueueState.Open) { if(!itemQueue.HasAvailableItem) { AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state); waiterList.Add(waiter); return waiter; } } else if(queueState == QueueState.Shutdown) { if(!itemQueue.HasAvailableItem && itemQueue.HasAnyItem) { AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state); waiterList.Add(waiter); return waiter; } } } return new TypedCompletedAsyncResult(true, callback, state); } static void CompleteOutstandingReadersCallback(object state) { IQueueReader[] outstandingReaders = (IQueueReader[]) state; for(int i = 0; i < outstandingReaders.Length; i++) { outstandingReaders[i].Set(default(Item)); } } static void CompleteWaitersFalseCallback(object state) { CompleteWaiters(false, (IQueueWaiter[]) state); } static void CompleteWaitersTrueCallback(object state) { CompleteWaiters(true, (IQueueWaiter[]) state); } static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters) { for(int i = 0; i < waiters.Length; i++) { waiters[i].Set(itemAvailable); } } static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters) { if(itemAvailable) { if(completeWaitersTrueCallback == null) { completeWaitersTrueCallback = new WaitCallback(CompleteWaitersTrueCallback); } ThreadPool.QueueUserWorkItem(completeWaitersTrueCallback, waiters); } else { if(completeWaitersFalseCallback == null) { completeWaitersFalseCallback = new WaitCallback(CompleteWaitersFalseCallback); } ThreadPool.QueueUserWorkItem(completeWaitersFalseCallback, waiters); } } void GetWaiters(out IQueueWaiter[] waiters) { if(waiterList.Count > 0) { waiters = waiterList.ToArray(); waiterList.Clear(); } else { waiters = null; } } public void Close() { ((IDisposable) this).Dispose(); } public void Shutdown() { IQueueReader[] outstandingReaders = null; lock(ThisLock) { if(queueState == QueueState.Shutdown) { return; } if(queueState == QueueState.Closed) { return; } this.queueState = QueueState.Shutdown; if(readerQueue.Count > 0 && this.itemQueue.ItemCount == 0) { outstandingReaders = new IQueueReader[readerQueue.Count]; readerQueue.CopyTo(outstandingReaders, 0); readerQueue.Clear(); } } if(outstandingReaders != null) { for(int i = 0; i < outstandingReaders.Length; i++) { outstandingReaders[i].Set(new Item((Exception) null, null)); } } } public T Dequeue(TimeSpan timeout) { T value; if(!this.Dequeue(timeout, out value)) { throw new TimeoutException(string.Format("Dequeue timed out in {0}.", timeout)); } return value; } public bool Dequeue(TimeSpan timeout, out T value) { WaitQueueReader reader = null; Item item = new Item(); lock(ThisLock) { if(queueState == QueueState.Open) { if(itemQueue.HasAvailableItem) { item = itemQueue.DequeueAvailableItem(); } else { reader = new WaitQueueReader(this); readerQueue.Enqueue(reader); } } else if(queueState == QueueState.Shutdown) { if(itemQueue.HasAvailableItem) { item = itemQueue.DequeueAvailableItem(); } else if(itemQueue.HasAnyItem) { reader = new WaitQueueReader(this); readerQueue.Enqueue(reader); } else { value = default(T); return true; } } else // queueState == QueueState.Closed { value = default(T); return true; } } if(reader != null) { return reader.Wait(timeout, out value); } else { InvokeDequeuedCallback(item.DequeuedCallback); value = item.GetValue(); return true; } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected void Dispose(bool disposing) { if(disposing) { bool dispose = false; lock(ThisLock) { if(queueState != QueueState.Closed) { queueState = QueueState.Closed; dispose = true; } } if(dispose) { while(readerQueue.Count > 0) { IQueueReader reader = readerQueue.Dequeue(); reader.Set(default(Item)); } while(itemQueue.HasAnyItem) { Item item = itemQueue.DequeueAnyItem(); item.Dispose(); InvokeDequeuedCallback(item.DequeuedCallback); } } } } public void Dispatch() { IQueueReader reader = null; Item item = new Item(); IQueueReader[] outstandingReaders = null; IQueueWaiter[] waiters = null; bool itemAvailable = true; lock(ThisLock) { itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown)); this.GetWaiters(out waiters); if(queueState != QueueState.Closed) { itemQueue.MakePendingItemAvailable(); if(readerQueue.Count > 0) { item = itemQueue.DequeueAvailableItem(); reader = readerQueue.Dequeue(); if(queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0) { outstandingReaders = new IQueueReader[readerQueue.Count]; readerQueue.CopyTo(outstandingReaders, 0); readerQueue.Clear(); itemAvailable = false; } } } } if(outstandingReaders != null) { if(completeOutstandingReadersCallback == null) { completeOutstandingReadersCallback = new WaitCallback(CompleteOutstandingReadersCallback); } ThreadPool.QueueUserWorkItem(completeOutstandingReadersCallback, outstandingReaders); } if(waiters != null) { CompleteWaitersLater(itemAvailable, waiters); } if(reader != null) { InvokeDequeuedCallback(item.DequeuedCallback); reader.Set(item); } } //Ends an asynchronous Dequeue operation. public T EndDequeue(IAsyncResult result) { T value; if(!this.EndDequeue(result, out value)) { throw new TimeoutException("Asynchronous Dequeue operation timed out."); } return value; } public bool EndDequeue(IAsyncResult result, out T value) { TypedCompletedAsyncResult typedResult = result as TypedCompletedAsyncResult; if(typedResult != null) { value = TypedCompletedAsyncResult.End(result); return true; } return AsyncQueueReader.End(result, out value); } public bool EndWaitForItem(IAsyncResult result) { TypedCompletedAsyncResult typedResult = result as TypedCompletedAsyncResult; if(typedResult != null) { return TypedCompletedAsyncResult.End(result); } return AsyncQueueWaiter.End(result); } public void EnqueueAndDispatch(T item) { EnqueueAndDispatch(item, null); } public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback) { EnqueueAndDispatch(item, dequeuedCallback, true); //EnqueueAndDispatch(item, dequeuedCallback, false); } public void EnqueueAndDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread) { Debug.Assert(exception != null, "exception parameter should not be null"); EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread); } public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread) { Debug.Assert(item != null, "item parameter should not be null"); EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread); } void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread) { bool disposeItem = false; IQueueReader reader = null; bool dispatchLater = false; IQueueWaiter[] waiters = null; bool itemAvailable = true; lock(ThisLock) { itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown)); this.GetWaiters(out waiters); if(queueState == QueueState.Open) { if(canDispatchOnThisThread) { if(readerQueue.Count == 0) { itemQueue.EnqueueAvailableItem(item); } else { reader = readerQueue.Dequeue(); } } else { if(readerQueue.Count == 0) { itemQueue.EnqueueAvailableItem(item); } else { itemQueue.EnqueuePendingItem(item); dispatchLater = true; } } } else // queueState == QueueState.Closed || queueState == QueueState.Shutdown { disposeItem = true; } } if(waiters != null) { if(canDispatchOnThisThread) { CompleteWaiters(itemAvailable, waiters); } else { CompleteWaitersLater(itemAvailable, waiters); } } if(reader != null) { InvokeDequeuedCallback(item.DequeuedCallback); reader.Set(item); } if(dispatchLater) { if(onDispatchCallback == null) { onDispatchCallback = new WaitCallback(OnDispatchCallback); } ThreadPool.QueueUserWorkItem(onDispatchCallback, this); } else if(disposeItem) { InvokeDequeuedCallback(item.DequeuedCallback); item.Dispose(); } } public bool EnqueueWithoutDispatch(T item, ItemDequeuedCallback dequeuedCallback) { Debug.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null"); return EnqueueWithoutDispatch(new Item(item, dequeuedCallback)); } public bool EnqueueWithoutDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback) { Debug.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null"); return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback)); } // This will not block, however, Dispatch() must be called later if this function // returns true. bool EnqueueWithoutDispatch(Item item) { lock(ThisLock) { // Open if(queueState != QueueState.Closed && queueState != QueueState.Shutdown) { if(readerQueue.Count == 0) { itemQueue.EnqueueAvailableItem(item); return false; } else { itemQueue.EnqueuePendingItem(item); return true; } } } item.Dispose(); InvokeDequeuedCallbackLater(item.DequeuedCallback); return false; } static void OnDispatchCallback(object state) { ((InputQueue) state).Dispatch(); } static void InvokeDequeuedCallbackLater(ItemDequeuedCallback dequeuedCallback) { if(dequeuedCallback != null) { if(onInvokeDequeuedCallback == null) { onInvokeDequeuedCallback = OnInvokeDequeuedCallback; } ThreadPool.QueueUserWorkItem(onInvokeDequeuedCallback, dequeuedCallback); } } static void InvokeDequeuedCallback(ItemDequeuedCallback dequeuedCallback) { if(dequeuedCallback != null) { dequeuedCallback(); } } static void OnInvokeDequeuedCallback(object state) { ItemDequeuedCallback dequeuedCallback = (ItemDequeuedCallback) state; dequeuedCallback(); } bool RemoveReader(IQueueReader reader) { lock(ThisLock) { if(queueState == QueueState.Open || queueState == QueueState.Shutdown) { bool removed = false; for(int i = readerQueue.Count; i > 0; i--) { IQueueReader temp = readerQueue.Dequeue(); if(Object.ReferenceEquals(temp, reader)) { removed = true; } else { readerQueue.Enqueue(temp); } } return removed; } } return false; } public bool WaitForItem(TimeSpan timeout) { WaitQueueWaiter waiter = null; bool itemAvailable = false; lock(ThisLock) { if(queueState == QueueState.Open) { if(itemQueue.HasAvailableItem) { itemAvailable = true; } else { waiter = new WaitQueueWaiter(); waiterList.Add(waiter); } } else if(queueState == QueueState.Shutdown) { if(itemQueue.HasAvailableItem) { itemAvailable = true; } else if(itemQueue.HasAnyItem) { waiter = new WaitQueueWaiter(); waiterList.Add(waiter); } else { return false; } } else // queueState == QueueState.Closed { return true; } } if(waiter != null) { return waiter.Wait(timeout); } else { return itemAvailable; } } interface IQueueReader { void Set(Item item); } interface IQueueWaiter { void Set(bool itemAvailable); } class WaitQueueReader : IQueueReader { Exception exception; InputQueue inputQueue; T item; ManualResetEvent waitEvent; object thisLock = new object(); public WaitQueueReader(InputQueue inputQueue) { this.inputQueue = inputQueue; waitEvent = new ManualResetEvent(false); } object ThisLock { get { return this.thisLock; } } public void Set(Item item) { lock(ThisLock) { Debug.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)"); Debug.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)"); this.exception = item.Exception; this.item = item.Value; waitEvent.Set(); } } public bool Wait(TimeSpan timeout, out T value) { bool isSafeToClose = false; try { if(timeout == TimeSpan.MaxValue) { waitEvent.WaitOne(); } else if(!waitEvent.WaitOne(timeout, false)) { if(this.inputQueue.RemoveReader(this)) { value = default(T); isSafeToClose = true; return false; } else { waitEvent.WaitOne(); } } isSafeToClose = true; } finally { if(isSafeToClose) { waitEvent.Close(); } } value = item; return true; } } public class AsyncQueueReader : AsyncResult, IQueueReader { static TimerCallback timerCallback = new TimerCallback(AsyncQueueReader.TimerCallback); bool expired; InputQueue inputQueue; T item; Timer timer; public AsyncQueueReader(InputQueue inputQueue, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.inputQueue = inputQueue; if(timeout != TimeSpan.MaxValue) { this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1)); } } public static bool End(IAsyncResult result, out T value) { AsyncQueueReader readerResult = AsyncResult.End(result); if(readerResult.expired) { value = default(T); return false; } else { value = readerResult.item; return true; } } static void TimerCallback(object state) { AsyncQueueReader thisPtr = (AsyncQueueReader) state; if(thisPtr.inputQueue.RemoveReader(thisPtr)) { thisPtr.expired = true; thisPtr.Complete(false); } } public void Set(Item item) { this.item = item.Value; if(this.timer != null) { this.timer.Change(-1, -1); } Complete(false, item.Exception); } } public struct Item { T value; Exception exception; ItemDequeuedCallback dequeuedCallback; public Item(T value, ItemDequeuedCallback dequeuedCallback) : this(value, null, dequeuedCallback) { } public Item(Exception exception, ItemDequeuedCallback dequeuedCallback) : this(null, exception, dequeuedCallback) { } Item(T value, Exception exception, ItemDequeuedCallback dequeuedCallback) { this.value = value; this.exception = exception; this.dequeuedCallback = dequeuedCallback; } public Exception Exception { get { return this.exception; } } public T Value { get { return value; } } public ItemDequeuedCallback DequeuedCallback { get { return dequeuedCallback; } } public void Dispose() { if(value != null) { if(value is IDisposable) { ((IDisposable) value).Dispose(); } else if(value is ICommunicationObject) { ((ICommunicationObject) value).Abort(); } } } public T GetValue() { if(this.exception != null) { throw this.exception; } return this.value; } } class WaitQueueWaiter : IQueueWaiter { bool itemAvailable; ManualResetEvent waitEvent; object thisLock = new object(); public WaitQueueWaiter() { waitEvent = new ManualResetEvent(false); } object ThisLock { get { return this.thisLock; } } public void Set(bool itemAvailable) { lock(ThisLock) { this.itemAvailable = itemAvailable; waitEvent.Set(); } } public bool Wait(TimeSpan timeout) { if(timeout == TimeSpan.MaxValue) { waitEvent.WaitOne(); } else if(!waitEvent.WaitOne(timeout, false)) { return false; } return this.itemAvailable; } } class AsyncQueueWaiter : AsyncResult, IQueueWaiter { static TimerCallback timerCallback = new TimerCallback(AsyncQueueWaiter.TimerCallback); Timer timer; bool itemAvailable; object thisLock = new object(); public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { if(timeout != TimeSpan.MaxValue) { this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1)); } } object ThisLock { get { return this.thisLock; } } public static bool End(IAsyncResult result) { AsyncQueueWaiter waiterResult = AsyncResult.End(result); return waiterResult.itemAvailable; } static void TimerCallback(object state) { AsyncQueueWaiter thisPtr = (AsyncQueueWaiter) state; thisPtr.Complete(false); } public void Set(bool itemAvailable) { bool timely; lock(ThisLock) { timely = (this.timer == null) || this.timer.Change(-1, -1); this.itemAvailable = itemAvailable; } if(timely) { Complete(false); } } } class ItemQueue { Item[] items; int head; int pendingCount; int totalCount; public ItemQueue() { items = new Item[1]; } public Item DequeueAvailableItem() { if(totalCount == pendingCount) { Debug.Assert(false, "ItemQueue does not contain any available items"); throw new Exception("Internal Error"); } return DequeueItemCore(); } public Item DequeueAnyItem() { if(pendingCount == totalCount) { pendingCount--; } return DequeueItemCore(); } void EnqueueItemCore(Item item) { if(totalCount == items.Length) { Item[] newItems = new Item[items.Length * 2]; for(int i = 0; i < totalCount; i++) { newItems[i] = items[(head + i) % items.Length]; } head = 0; items = newItems; } int tail = (head + totalCount) % items.Length; items[tail] = item; totalCount++; } Item DequeueItemCore() { if(totalCount == 0) { Debug.Assert(false, "ItemQueue does not contain any items"); throw new Exception("Internal Error"); } Item item = items[head]; items[head] = new Item(); totalCount--; head = (head + 1) % items.Length; return item; } public void EnqueuePendingItem(Item item) { EnqueueItemCore(item); pendingCount++; } public void EnqueueAvailableItem(Item item) { EnqueueItemCore(item); } public void MakePendingItemAvailable() { if(pendingCount == 0) { Debug.Assert(false, "ItemQueue does not contain any pending items"); throw new Exception("Internal Error"); } pendingCount--; } public bool HasAvailableItem { get { return totalCount > pendingCount; } } public bool HasAnyItem { get { return totalCount > 0; } } public int ItemCount { get { return totalCount; } } } } }