// $Id$
//
// Copyright 2007-2008 Cisco Systems Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy
// of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
using System;
using System.Runtime.CompilerServices;
using System.Threading;
namespace Etch.Util
{
///
/// A standalone version of a processor for todo items
///
public class TodoManager : AbstractStartable
{
///
/// Constructs the TodoManager
///
/// the maximum number of entries in the queue
/// milliseconds to delay a caller who tries to
/// add a entry over the limit.
/// the minimum number of workers to keep waiting
/// the maximum number of workers to allow.
/// milliseconds a worker will wait for a Todo
/// before considering quitting.
/// the per worker threshold for queue length. if
/// queue length exceeds this amount, a new worker is added if allowed.
///
public TodoManager( int maxEntries, int entryDelay, int minWorkers,
int maxWorkers, int workerLinger, int threshold )
{
if ( maxEntries < 1 )
throw new ArgumentException( "maxEntries < 1" );
if ( minWorkers < 0 )
throw new ArgumentException( "minWorkers < 0" );
if ( maxWorkers < minWorkers )
throw new ArgumentException( "maxWorkers < minWorkers" );
if ( maxWorkers < 1 )
throw new ArgumentException( "maxWorkers < 1" );
if ( workerLinger < 1 )
throw new ArgumentException( "workerLinger < 1" );
this.maxEntries = maxEntries;
this.entryDelay = entryDelay;
this.minWorkers = minWorkers;
this.maxWorkers = maxWorkers;
this.workerLinger = workerLinger;
this.threshold = threshold;
}
private int maxEntries;
private int entryDelay;
private int minWorkers;
private int maxWorkers;
private int workerLinger;
private int threshold;
protected override void Start0()
{
// nothing to do
}
protected override void Stop0()
{
System.Threading.Monitor.PulseAll( this );
}
///
///
///
///
/// Exception:
/// throws ThreadInterruptedException
[ MethodImpl ( MethodImplOptions.Synchronized ) ]
public void Add( Todo todo )
{
CheckIsStarted();
int n = AddEntry( todo );
System.Threading.Monitor.Pulse( this );
ConsiderStartingAWorker( n ) ;
if ( n > maxEntries )
Thread.Sleep( entryDelay );
}
public void Run()
{
bool needAdjust = true;
try
{
Todo todo;
while ( ( todo = GetNextTodo() ) != null )
{
try
{
todo.Doit( this );
}
catch ( Exception e )
{
todo.Exception( this, e );
}
}
needAdjust = false;
}
finally
{
if ( needAdjust )
workers.Adjust( -1 );
}
}
# region Workers
///
///
///
/// number of workers
public int NumWorkers()
{
return workers.Get();
}
[MethodImpl( MethodImplOptions.Synchronized )]
private void ConsiderStartingAWorker( int qlen )
{
int n = NumWorkers();
if ( n >= maxWorkers )
return;
// Start a new worker if there are none or if the
// queue length per worker has exceeded the threshold
if ( n == 0 || ( (qlen + n-1) / n ) > threshold )
StartAWorker();
}
private void StartAWorker()
{
workers.Adjust( 1 );
Thread t = new Thread( new ThreadStart( Run ) );
t.Start();
}
private IntCounter workers = new IntCounter();
# endregion Workers
# region Queue
private Entry head;
private Entry tail;
private IntCounter entries = new IntCounter();
///
/// Adds the todo to the tail of the queue.
///
/// the todo to add.
/// the current queue length.
///
[MethodImpl( MethodImplOptions.Synchronized )]
private int AddEntry( Todo todo )
{
Entry e = new Entry();
e.todo = todo;
if ( tail != null )
tail.next = e;
else
head = e; // first instance
tail = e;
return entries.Adjust( 1 );
}
///
/// Remove an entry from the queue.
///
/// a todo from the head of the queue, or
/// null if empty
///
[MethodImpl( MethodImplOptions.Synchronized )]
private Todo RemoveEntry()
{
if ( head == null )
return null;
Entry e = head;
head = e.next;
if ( head == null )
tail = null;
entries.Adjust( -1 );
return e.todo;
}
///
///
///
/// number of TODOs
public int NumEntries()
{
return entries.Get();
}
///
/// An entry in the todo queue
///
public class Entry
{
///
/// The todo to be performed
///
public Todo todo;
///
/// The next todo in the queue
///
public Entry next;
}
# endregion Queue
# region BLAH
[MethodImpl( MethodImplOptions.Synchronized )]
private Todo GetNextTodo()
{
Todo todo = null;
bool lingered = false;
while ( IsStarted() && ( todo = RemoveEntry() ) == null )
{
try
{
if ( lingered && workers.Get() > minWorkers )
{
workers.Adjust( -1 );
return null;
}
System.Threading.Monitor.Wait( this, workerLinger );
// we lingered. we might have been woken because
// we're stopping, or a todo might have been
// queued.
lingered = true;
}
catch ( ThreadInterruptedException )
{
workers.Adjust( -1 );
return null;
}
}
return todo;
}
# endregion BLAH
# region Static Stuff
///
///
///
///
/// Exception:
/// throws Exception
///
public static void AddTodo( Todo todo )
{
try
{
GetTodoManager().Add(todo);
}
catch(Exception e)
{
todo.Exception(null,e);
}
}
///
///
///
/// the configured TodoManager. If there isn't one, it makes
/// one with one worker thread.
/// Exception:
/// if there is a problem creating the TodoManager
public static TodoManager GetTodoManager()
{
if ( todomanager == null )
{
lock ( lockObject )
{
if ( todomanager == null )
{
todomanager = new TodoManager( 50, 10, 0, 5, 5000, 0 );
todomanager.Start();
}
}
}
return todomanager;
}
///
///
///
///
/// the old todo manager
///
[MethodImpl( MethodImplOptions.Synchronized )]
public static TodoManager SetTodoManager( TodoManager newTodoManager )
{
TodoManager oldTodoManager = todomanager;
todomanager = newTodoManager;
return oldTodoManager;
}
///
/// Shuts down the currently configured static todo manager if any.
///
/// Exception:
/// throws Exception
public static void ShutDown()
{
TodoManager oldTodoManager = SetTodoManager( null );
if ( oldTodoManager != null )
oldTodoManager.Stop();
}
private static TodoManager todomanager;
///
/// Since C# doesn't allow locking on an entire class, the substitute
/// here is locking a static variable of the class.
///
private static object lockObject = new Object() ;
# endregion Static Stuff
}
}