// $Id$ // // Copyright 2007-2008 Cisco Systems Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy // of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations // under the License. using System; using System.Runtime.CompilerServices; using System.Threading; namespace Etch.Util { /// /// A standalone version of a processor for todo items /// public class TodoManager : AbstractStartable { /// /// Constructs the TodoManager /// /// the maximum number of entries in the queue /// milliseconds to delay a caller who tries to /// add a entry over the limit. /// the minimum number of workers to keep waiting /// the maximum number of workers to allow. /// milliseconds a worker will wait for a Todo /// before considering quitting. /// the per worker threshold for queue length. if /// queue length exceeds this amount, a new worker is added if allowed. /// public TodoManager( int maxEntries, int entryDelay, int minWorkers, int maxWorkers, int workerLinger, int threshold ) { if ( maxEntries < 1 ) throw new ArgumentException( "maxEntries < 1" ); if ( minWorkers < 0 ) throw new ArgumentException( "minWorkers < 0" ); if ( maxWorkers < minWorkers ) throw new ArgumentException( "maxWorkers < minWorkers" ); if ( maxWorkers < 1 ) throw new ArgumentException( "maxWorkers < 1" ); if ( workerLinger < 1 ) throw new ArgumentException( "workerLinger < 1" ); this.maxEntries = maxEntries; this.entryDelay = entryDelay; this.minWorkers = minWorkers; this.maxWorkers = maxWorkers; this.workerLinger = workerLinger; this.threshold = threshold; } private int maxEntries; private int entryDelay; private int minWorkers; private int maxWorkers; private int workerLinger; private int threshold; protected override void Start0() { // nothing to do } protected override void Stop0() { System.Threading.Monitor.PulseAll( this ); } /// /// /// /// /// Exception: /// throws ThreadInterruptedException [ MethodImpl ( MethodImplOptions.Synchronized ) ] public void Add( Todo todo ) { CheckIsStarted(); int n = AddEntry( todo ); System.Threading.Monitor.Pulse( this ); ConsiderStartingAWorker( n ) ; if ( n > maxEntries ) Thread.Sleep( entryDelay ); } public void Run() { bool needAdjust = true; try { Todo todo; while ( ( todo = GetNextTodo() ) != null ) { try { todo.Doit( this ); } catch ( Exception e ) { todo.Exception( this, e ); } } needAdjust = false; } finally { if ( needAdjust ) workers.Adjust( -1 ); } } # region Workers /// /// /// /// number of workers public int NumWorkers() { return workers.Get(); } [MethodImpl( MethodImplOptions.Synchronized )] private void ConsiderStartingAWorker( int qlen ) { int n = NumWorkers(); if ( n >= maxWorkers ) return; // Start a new worker if there are none or if the // queue length per worker has exceeded the threshold if ( n == 0 || ( (qlen + n-1) / n ) > threshold ) StartAWorker(); } private void StartAWorker() { workers.Adjust( 1 ); Thread t = new Thread( new ThreadStart( Run ) ); t.Start(); } private IntCounter workers = new IntCounter(); # endregion Workers # region Queue private Entry head; private Entry tail; private IntCounter entries = new IntCounter(); /// /// Adds the todo to the tail of the queue. /// /// the todo to add. /// the current queue length. /// [MethodImpl( MethodImplOptions.Synchronized )] private int AddEntry( Todo todo ) { Entry e = new Entry(); e.todo = todo; if ( tail != null ) tail.next = e; else head = e; // first instance tail = e; return entries.Adjust( 1 ); } /// /// Remove an entry from the queue. /// /// a todo from the head of the queue, or /// null if empty /// [MethodImpl( MethodImplOptions.Synchronized )] private Todo RemoveEntry() { if ( head == null ) return null; Entry e = head; head = e.next; if ( head == null ) tail = null; entries.Adjust( -1 ); return e.todo; } /// /// /// /// number of TODOs public int NumEntries() { return entries.Get(); } /// /// An entry in the todo queue /// public class Entry { /// /// The todo to be performed /// public Todo todo; /// /// The next todo in the queue /// public Entry next; } # endregion Queue # region BLAH [MethodImpl( MethodImplOptions.Synchronized )] private Todo GetNextTodo() { Todo todo = null; bool lingered = false; while ( IsStarted() && ( todo = RemoveEntry() ) == null ) { try { if ( lingered && workers.Get() > minWorkers ) { workers.Adjust( -1 ); return null; } System.Threading.Monitor.Wait( this, workerLinger ); // we lingered. we might have been woken because // we're stopping, or a todo might have been // queued. lingered = true; } catch ( ThreadInterruptedException ) { workers.Adjust( -1 ); return null; } } return todo; } # endregion BLAH # region Static Stuff /// /// /// /// /// Exception: /// throws Exception /// public static void AddTodo( Todo todo ) { try { GetTodoManager().Add(todo); } catch(Exception e) { todo.Exception(null,e); } } /// /// /// /// the configured TodoManager. If there isn't one, it makes /// one with one worker thread. /// Exception: /// if there is a problem creating the TodoManager public static TodoManager GetTodoManager() { if ( todomanager == null ) { lock ( lockObject ) { if ( todomanager == null ) { todomanager = new TodoManager( 50, 10, 0, 5, 5000, 0 ); todomanager.Start(); } } } return todomanager; } /// /// /// /// /// the old todo manager /// [MethodImpl( MethodImplOptions.Synchronized )] public static TodoManager SetTodoManager( TodoManager newTodoManager ) { TodoManager oldTodoManager = todomanager; todomanager = newTodoManager; return oldTodoManager; } /// /// Shuts down the currently configured static todo manager if any. /// /// Exception: /// throws Exception public static void ShutDown() { TodoManager oldTodoManager = SetTodoManager( null ); if ( oldTodoManager != null ) oldTodoManager.Stop(); } private static TodoManager todomanager; /// /// Since C# doesn't allow locking on an entire class, the substitute /// here is locking a static variable of the class. /// private static object lockObject = new Object() ; # endregion Static Stuff } }