/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Threading; namespace Apache.NMS.ActiveMQ.Threads { /// /// This class provides a wrapper around the ThreadPool mechanism in .NET /// to allow for serial execution of jobs in the ThreadPool and provide /// a means of shutting down the execution of jobs in a deterministic /// way. /// public class ThreadPoolExecutor { private Queue workQueue = new Queue(); private Mutex syncRoot = new Mutex(); private bool running = false; private bool closing = false; private bool closed = false; private ManualResetEvent executionComplete = new ManualResetEvent(true); private Thread workThread = null; /// /// Represents an asynchronous task that is executed on the ThreadPool /// at some point in the future. /// internal class Future { private readonly WaitCallback callback; private readonly object callbackArg; public Future(WaitCallback callback, object arg) { this.callback = callback; this.callbackArg = arg; } public void Run() { if(this.callback == null) { throw new Exception("Future executed with null WaitCallback"); } try { this.callback(callbackArg); } catch { } } } public void QueueUserWorkItem(WaitCallback worker) { this.QueueUserWorkItem(worker, null); } public void QueueUserWorkItem(WaitCallback worker, object arg) { if(worker == null) { throw new ArgumentNullException("Invalid WaitCallback passed"); } if(!this.closed) { lock(syncRoot) { if(!this.closed || !this.closing) { this.workQueue.Enqueue(new Future(worker, arg)); if(!this.running) { this.executionComplete.Reset(); this.running = true; ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null); } } } } } /// /// Returns true if this ThreadPoolExecutor has been shut down but has not /// finished running all the tasks that have been Queue. When a ThreadPoolExecutor /// is shut down it will not accept any new tasks but it will complete all tasks /// that have been previously queued. /// public bool IsShutdown { get { return this.closing; } } /// /// Returns true if this ThreadPoolExecutor has been shut down and has also /// completed processing of all outstanding tasks in its task Queue. /// public bool IsTerminated { get { return this.closed; } } public void Shutdown() { if(!this.closed) { lock(this.syncRoot) { if(!this.closed) { this.closing = true; // Must be no tasks in Queue and none can be accepted // now that we've flipped the closing toggle so safe to // mark this ThreadPoolExecutor as closed. if (!this.running) { this.closed = true; this.executionComplete.Set(); } } } } } public bool AwaitTermination(TimeSpan timeout) { if(!this.closed) { syncRoot.WaitOne(); if(!this.closed) { // If called from the worker thread we can't check this as it // will deadlock us, just return whatever the closed state is. if(this.running && Thread.CurrentThread != this.workThread) { syncRoot.ReleaseMutex(); this.closed = this.executionComplete.WaitOne(timeout, false); syncRoot.WaitOne(); } } syncRoot.ReleaseMutex(); } return this.closed; } private void QueueProcessor(object unused) { Future theTask = null; lock(syncRoot) { this.workThread = Thread.CurrentThread; if(this.workQueue.Count == 0) { this.running = false; this.executionComplete.Set(); return; } theTask = this.workQueue.Dequeue(); } try { theTask.Run(); } finally { this.workThread = null; if(this.workQueue.Count == 0) { lock(syncRoot) { this.running = false; this.executionComplete.Set(); } } else { ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null); } } } } }