/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using Directory = Lucene.Net.Store.Directory; namespace Lucene.Net.Index { /// A {@link MergeScheduler} that runs each merge using a /// separate thread, up until a maximum number of threads /// ({@link #setMaxThreadCount}) at which points merges are /// run in the foreground, serially. This is a simple way /// to use concurrency in the indexing process without /// having to create and manage application level /// threads. /// public class ConcurrentMergeScheduler : MergeScheduler { private int mergeThreadPriority = - 1; private System.Collections.IList mergeThreads = new System.Collections.ArrayList(); private int maxThreadCount = 3; private System.Collections.IList exceptions = new System.Collections.ArrayList(); private Directory dir; private bool closed; private IndexWriter writer; public ConcurrentMergeScheduler() { if (allInstances != null) { // Only for testing AddMyself(); } } /// Sets the max # simultaneous threads that may be /// running. If a merge is necessary yet we already have /// this many threads running, the merge is returned back /// to IndexWriter so that it runs in the "foreground". /// public virtual void SetMaxThreadCount(int count) { if (count < 1) throw new System.ArgumentException("count should be at least 1"); maxThreadCount = count; } /// Get the max # simultaneous threads that may be /// /// public virtual int GetMaxThreadCount() { return maxThreadCount; } /// Return the priority that merge threads run at. By /// default the priority is 1 plus the priority of (ie, /// slightly higher priority than) the first thread that /// calls merge. /// public virtual int GetMergeThreadPriority() { lock (this) { InitMergeThreadPriority(); return mergeThreadPriority; } } /// Return the priority that merge threads run at. public virtual void SetMergeThreadPriority(int pri) { lock (this) { if (pri > (int) System.Threading.ThreadPriority.Highest || pri < (int) System.Threading.ThreadPriority.Lowest) throw new System.ArgumentException("priority must be in range " + (int) System.Threading.ThreadPriority.Lowest + " .. " + (int) System.Threading.ThreadPriority.Highest + " inclusive"); mergeThreadPriority = pri; int numThreads = MergeThreadCount(); for (int i = 0; i < numThreads; i++) { MergeThread merge = (MergeThread) mergeThreads[i]; merge.SetThreadPriority(pri); } } } private void Message(System.String message) { if (writer != null) writer.Message("CMS: " + message); } private void InitMergeThreadPriority() { lock (this) { if (mergeThreadPriority == - 1) { // Default to slightly higher priority than our // calling thread mergeThreadPriority = 1 + (System.Int32) SupportClass.ThreadClass.Current().Priority; if (mergeThreadPriority > (int) System.Threading.ThreadPriority.Highest) mergeThreadPriority = (int) System.Threading.ThreadPriority.Highest; } } } public override void Close() { closed = true; } public virtual void Sync() { lock (this) { while (MergeThreadCount() > 0) { Message("now wait for threads; currently " + mergeThreads.Count + " still running"); int count = mergeThreads.Count; for (int i = 0; i < count; i++) Message(" " + i + ": " + ((MergeThread) mergeThreads[i])); try { System.Threading.Monitor.Wait(this); } catch (System.Threading.ThreadInterruptedException) { } } } } private int MergeThreadCount() { lock (this) { int count = 0; int numThreads = mergeThreads.Count; for (int i = 0; i < numThreads; i++) if (((MergeThread) mergeThreads[i]).IsAlive) count++; return count; } } public override void Merge(IndexWriter writer) { this.writer = writer; InitMergeThreadPriority(); dir = writer.GetDirectory(); // First, quickly run through the newly proposed merges // and add any orthogonal merges (ie a merge not // involving segments already pending to be merged) to // the queue. If we are way behind on merging, many of // these newly proposed merges will likely already be // registered. Message("now merge"); Message(" index: " + writer.SegString()); // Iterate, pulling from the IndexWriter's queue of // pending merges, until its empty: while (true) { // TODO: we could be careful about which merges to do in // the BG (eg maybe the "biggest" ones) vs FG, which // merges to do first (the easiest ones?), etc. MergePolicy.OneMerge merge = writer.GetNextMerge(); if (merge == null) { Message(" no more merges pending; now return"); return ; } // We do this w/ the primary thread to keep // deterministic assignment of segment names writer.MergeInit(merge); Message(" consider merge " + merge.SegString(dir)); if (merge.isExternal) { Message(" merge involves segments from an external directory; now run in foreground"); } else { lock (this) { if (MergeThreadCount() < maxThreadCount) { // OK to spawn a new merge thread to handle this // merge: MergeThread merger = new MergeThread(this, writer, merge); mergeThreads.Add(merger); Message(" launch new thread [" + merger.Name + "]"); merger.SetThreadPriority(mergeThreadPriority); merger.IsBackground = true; merger.Start(); continue; } else Message(" too many merge threads running; run merge in foreground"); } } // Too many merge threads already running, so we do // this in the foreground of the calling thread writer.Merge(merge); } } private class MergeThread:SupportClass.ThreadClass { private void InitBlock(ConcurrentMergeScheduler enclosingInstance) { this.enclosingInstance = enclosingInstance; } private ConcurrentMergeScheduler enclosingInstance; public ConcurrentMergeScheduler Enclosing_Instance { get { return enclosingInstance; } } internal IndexWriter writer; internal MergePolicy.OneMerge startMerge; internal MergePolicy.OneMerge runningMerge; public MergeThread(ConcurrentMergeScheduler enclosingInstance, IndexWriter writer, MergePolicy.OneMerge startMerge) { InitBlock(enclosingInstance); this.writer = writer; this.startMerge = startMerge; } public virtual void SetRunningMerge(MergePolicy.OneMerge merge) { lock (this) { runningMerge = merge; } } public virtual MergePolicy.OneMerge GetRunningMerge() { lock (this) { return runningMerge; } } public virtual void SetThreadPriority(int pri) { try { Priority = (System.Threading.ThreadPriority) pri; } catch (System.NullReferenceException) { // Strangely, Sun's JDK 1.5 on Linux sometimes // throws NPE out of here... } catch (System.Security.SecurityException) { // Ignore this because we will still run fine with // normal thread priority } } override public void Run() { // First time through the while loop we do the merge // that we were started with: MergePolicy.OneMerge merge = this.startMerge; try { Enclosing_Instance.Message(" merge thread: start"); while (true) { SetRunningMerge(merge); writer.Merge(merge); // Subsequent times through the loop we do any new // merge that writer says is necessary: merge = writer.GetNextMerge(); if (merge != null) { writer.MergeInit(merge); Enclosing_Instance.Message(" merge thread: do another merge " + merge.SegString(Enclosing_Instance.dir)); } else break; } Enclosing_Instance.Message(" merge thread: done"); } catch (System.Exception exc) { if (merge != null) { merge.SetException(exc); writer.AddMergeException(merge); } // Ignore the exception if it was due to abort: if (!(exc is MergePolicy.MergeAbortedException)) { lock (Enclosing_Instance) { Enclosing_Instance.exceptions.Add(exc); } if (!Enclosing_Instance.suppressExceptions) { // suppressExceptions is normally only set during // testing. Lucene.Net.Index.ConcurrentMergeScheduler.anyExceptions = true; throw new MergePolicy.MergeException(exc); } } } finally { lock (Enclosing_Instance) { Enclosing_Instance.mergeThreads.Remove(this); System.Threading.Monitor.PulseAll(Enclosing_Instance); } } } public override System.String ToString() { MergePolicy.OneMerge merge = GetRunningMerge(); if (merge == null) merge = startMerge; return "merge thread: " + merge.SegString(Enclosing_Instance.dir); } } internal static bool anyExceptions = false; /// Used for testing public static bool AnyUnhandledExceptions() { lock (allInstances.SyncRoot) { int count = allInstances.Count; // Make sure all outstanding threads are done so we see // any exceptions they may produce: for (int i = 0; i < count; i++) ((ConcurrentMergeScheduler) allInstances[i]).Sync(); return anyExceptions; } } /// Used for testing private void AddMyself() { lock (allInstances.SyncRoot) { int size = 0; int upto = 0; for (int i = 0; i < size; i++) { ConcurrentMergeScheduler other = (ConcurrentMergeScheduler) allInstances[i]; if (!(other.closed && 0 == other.MergeThreadCount())) // Keep this one for now: it still has threads or // may spawn new threads allInstances[upto++] = other; } ((System.Collections.IList) ((System.Collections.ArrayList) allInstances).GetRange(upto, allInstances.Count - upto)).Clear(); allInstances.Add(this); } } private bool suppressExceptions; /// Used for testing internal virtual void SetSuppressExceptions() { suppressExceptions = true; } /// Used for testing internal virtual void ClearSuppressExceptions() { suppressExceptions = false; } /// Used for testing private static System.Collections.IList allInstances; public static void SetTestMode() { allInstances = new System.Collections.ArrayList(); } public void SetSuppressExceptions_ForNUnitTest() { SetSuppressExceptions(); } public void ClearSuppressExceptions_ForNUnitTest() { ClearSuppressExceptions(); } } }