// $Id$ // // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // using System; using System.Diagnostics; using System.Runtime.CompilerServices; namespace Org.Apache.Etch.Bindings.Csharp.Util { /// /// A circular queue of a fixed size. Elements are added to one /// end and removed from the other, with the queue size ranging /// from empty to full. Operations can optionally wait until /// finished or return status indicating success or failure. /// For instance, adding to a full queue can wait until an item /// is removed before adding the new item or it can wait only /// a specified amount of time before completing successfully /// or giving up. /// /// the type of items in the queue. public sealed class CircularQueue { /// /// Constructs the Circular Queue /// /// the maximum number of items allowed in the queue. public CircularQueue( int size ) { if ( size < 1) throw new ArgumentOutOfRangeException("size < 1"); this.size = size; // items = new T[ size ]; items = new Object[size]; } /// /// Constructs the CircularQueue with the maximum number of items /// defaulted to 10. /// public CircularQueue() : this (10) { } private readonly int size; // private readonly T[] items; private readonly Object[] items; /// /// /// /// the maximum number of items that may be put in the queue. public int Size() { return size; } private int count; /// /// /// /// the current number of items in the queue. public int Count() { return count; } /// /// /// /// true if the queue is empty. public bool IsEmpty() { return count == 0; } /// /// /// /// true if the queue is full. public bool IsFull() { return count == size; } private int head; private Object Get0() { Debug.Assert(count>0,"count > 0"); Object obj = items[head]; Debug.Assert(obj != null,"obj != null"); items[head] = null; count--; head++; if (head == size) head = 0; return obj; } /* private T IntGet() { Debug.Assert( !IsEmpty() ); Debug.Assert( items[head] != null ); T t = items[head]; items[ head ] = default( T ); //null; // wake up someone waiting for space. if ( IsFull() ) System.Threading.Monitor.Pulse( this ); count--; head++; if (head == size) head = 0; return t; } */ private int tail; private void Put0(Object obj) { Debug.Assert(obj != null, "obj != null"); Debug.Assert(count < size, "count < size"); Debug.Assert(items[tail] == null, "items[tail] == null"); items[tail] = obj; count++; tail++; if (tail == size) tail = 0; } /* private void IntPut( T t ) { Debug.Assert( t != null ); Debug.Assert( !IsFull() ); Debug.Assert( items[tail] == null ); items[tail] = t; // wake up someone waiting for an item. if ( IsEmpty() ) System.Threading.Monitor.Pulse(this); count++; tail++; if (tail == size) tail = 0; } */ /// /// Gets the next available item from the queue, waiting /// until an item is available or the queue is closed. /// /// the item from the queue, or null if the queue /// is closed. /// Exception: /// throws ThreadInterruptedException if thread is interrupted public T Get() { return Get( 0 ); } /// /// Gets the next available item from the queue, waiting /// until an item is available or the queue is closed. /// /// the maximum time in ms to wait for /// something to be put in the queue; 0 means wait forever, /// less than 0 means don't wait. /// the item from the queue, or null if maxDelay /// has been exceeded or the queue is closed. /// Exception: /// throws ThreadInterruptedException [MethodImpl(MethodImplOptions.Synchronized)] public T Get( int maxDelay ) { if (!IsEmpty()) return GetAndNotify(); if (IsClosed() || maxDelay < 0) return default( T ); //null; // the queue is empty, not closed, and caller has requested a delay long now = HPTimer.Now(); long end = EndTime(now, maxDelay); Debug.Assert(end > now, "end > now"); int d; while ((d = RemTime(end, now)) > 0) { // the queue is empty, not closed, and delay has not run out... Debug.Assert(IsEmpty(), "IsEmpty()"); Debug.Assert(!IsClosed(),"!IsClosed()"); System.Threading.Monitor.Wait(this, d); if (!IsEmpty()) return GetAndNotify(); if ( IsClosed() ) return default( T ); //null; now = HPTimer.Now(); } return default( T ); // null; } /// /// Puts an item in the queue, waiting until space is available /// or the queue is closed. /// /// a non-null item to put in the queue. /// true if the item was placed in the queue, /// or false if the queue is closed. /// Exception: /// throws ThreadInterruptedException if the thread is interrupted public bool Put( T t ) { return Put( t, 0 ); } /// /// Puts an item in the queue, waiting until space is available /// or the queue is closed. /// /// a non-null item to put in the queue. /// the maximum time in ms to wait for /// available space the queue; 0 means wait forever, /// less than 0 means don't wait. /// true if the item was placed in the queue, /// or false if maxDelay has been exceeded or the queue is closed. /// Exception: /// throws ThreadInterruptedException if the thread is interrupted [MethodImpl(MethodImplOptions.Synchronized)] public bool Put( T obj, int maxDelay ) { if (obj == null) throw new ArgumentNullException( "t == null" ); if (IsClosed()) return false; // the queue is not closed. if (!IsFull()) { PutAndNotify( obj ); return true; } // the queue is not closed, the queue is full. if (maxDelay < 0) return false; long now = HPTimer.Now(); long end = EndTime(now, maxDelay); int d; while ((d = RemTime(end, now)) > 0) { // the queue is not closed, the queue is full, and delay has not run out... Debug.Assert( !IsClosed() ); Debug.Assert( IsFull() ); System.Threading.Monitor.Wait(this, d); if (IsClosed()) return false; // the queue is not closed. if (!IsFull()) { PutAndNotify(obj); return true; } now = HPTimer.Now(); } return false; } /// /// Closes the queue so that no more items may be put into it. /// Get will return null when there are no more items to return. /// [MethodImpl(MethodImplOptions.Synchronized)] public void Close() { if (!closed) { closed = true; System.Threading.Monitor.PulseAll(this); } } /// /// /// /// true if the queue is closed. public bool IsClosed() { return closed; } private bool closed; private T GetAndNotify() { bool notify = IsFull(); Object obj = Get0(); notify = notify || !IsEmpty(); if (notify) System.Threading.Monitor.Pulse(this); return (T) obj; } private void PutAndNotify(T obj) { bool notify = IsEmpty(); Put0(obj); notify = notify || !IsFull(); if (notify) System.Threading.Monitor.Pulse(this); } private long EndTime(long now, int maxDelay) { if (maxDelay == 0 || maxDelay == int.MaxValue) return long.MaxValue; return now + maxDelay * HPTimer.NS_PER_MILLISECOND; } private int RemTime(long end, long now) { if (end == long.MaxValue) return int.MaxValue; long ms = (end - now) / HPTimer.NS_PER_MILLISECOND; if (ms > int.MaxValue) return int.MaxValue; return (int)ms; } } }