/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; namespace Apache.NMS.ActiveMQ.Threads { class PooledTaskRunner : TaskRunner { private readonly int maxIterationsPerRun; private readonly Task task; private readonly Object runable = new Object(); private bool queued; private bool _shutdown; private bool iterating; private volatile System.Threading.Thread runningThread; public void Run(Object o) { PooledTaskRunner p = o as PooledTaskRunner; p.runningThread = System.Threading.Thread.CurrentThread; try { p.RunTask(); } finally { p.runningThread = null; } } public PooledTaskRunner(Task task, int maxIterationsPerRun) { this.maxIterationsPerRun = maxIterationsPerRun; this.task = task; this._shutdown = false; this.iterating = false; this.queued = true; ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this); } /// /// We Expect MANY wakeup calls on the same TaskRunner. /// public void Wakeup() { lock(runable) { // When we get in here, we make some assumptions of state: // queued=false, iterating=false: wakeup() has not be called and // therefore task is not executing. // queued=true, iterating=false: wakeup() was called but, task // execution has not started yet // queued=false, iterating=true : wakeup() was called, which caused // task execution to start. // queued=true, iterating=true : wakeup() called after task // execution was started. if(queued || _shutdown) { return; } queued = true; // The runTask() method will do this for me once we are done // iterating. if(!iterating) { ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this); } } } /// /// shut down the task /// /// public void Shutdown(TimeSpan timeout) { lock(runable) { _shutdown = true; // the check on the thread is done // because a call to iterate can result in // shutDown() being called, which would wait forever // waiting for iterating to finish if(runningThread != System.Threading.Thread.CurrentThread) { if(iterating) { System.Threading.Thread.Sleep(timeout); } } } } public void ShutdownWithAbort(TimeSpan timeout) { lock(runable) { _shutdown = true; if(runningThread != System.Threading.Thread.CurrentThread) { if(iterating) { System.Threading.Thread.Sleep(timeout); } if(iterating) { runningThread.Abort(); } } } } public void Shutdown() { Shutdown(new TimeSpan(Timeout.Infinite)); } internal void RunTask() { lock(runable) { queued = false; if(_shutdown) { iterating = false; return; } iterating = true; } // Don't synchronize while we are iterating so that // multiple wakeup() calls can be executed concurrently. bool done = false; try { for(int i = 0; i < maxIterationsPerRun; i++) { if(!task.Iterate()) { done = true; break; } } } finally { lock(runable) { iterating = false; if(_shutdown) { queued = false; } else { // If we could not iterate all the items // then we need to re-queue. if(!done) { queued = true; } if(queued) { ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this); } } } } } } }