/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Collections.Generic;
using Lucene.Net.Support;
using Directory = Lucene.Net.Store.Directory;
namespace Lucene.Net.Index
{
/// A that runs each merge using a
/// separate thread, up until a maximum number of threads
/// () at which when a merge is
/// needed, the thread(s) that are updating the index will
/// pause until one or more merges completes. This is a
/// simple way to use concurrency in the indexing process
/// without having to create and manage application level
/// threads.
///
public class ConcurrentMergeScheduler:MergeScheduler
{
private int mergeThreadPriority = - 1;
protected internal IList mergeThreads = new List();
// Max number of threads allowed to be merging at once
private int _maxThreadCount = 1;
protected internal Directory dir;
private bool closed;
protected internal IndexWriter writer;
protected internal int mergeThreadCount;
public ConcurrentMergeScheduler()
{
if (allInstances != null)
{
// Only for testing
AddMyself();
}
}
/// Gets or sets the max # simultaneous threads that may be
/// running. If a merge is necessary yet we already have
/// this many threads running, the incoming thread (that
/// is calling add/updateDocument) will block until
/// a merge thread has completed.
///
public virtual int MaxThreadCount
{
set
{
if (value < 1)
throw new System.ArgumentException("count should be at least 1");
_maxThreadCount = value;
}
get { return _maxThreadCount; }
}
/// Return the priority that merge threads run at. By
/// default the priority is 1 plus the priority of (ie,
/// slightly higher priority than) the first thread that
/// calls merge.
///
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate")]
public virtual int GetMergeThreadPriority()
{
lock (this)
{
InitMergeThreadPriority();
return mergeThreadPriority;
}
}
/// Set the priority that merge threads run at.
public virtual void SetMergeThreadPriority(int pri)
{
lock (this)
{
if (pri > (int) System.Threading.ThreadPriority.Highest || pri < (int) System.Threading.ThreadPriority.Lowest)
throw new System.ArgumentException("priority must be in range " + (int) System.Threading.ThreadPriority.Lowest + " .. " + (int) System.Threading.ThreadPriority.Highest + " inclusive");
mergeThreadPriority = pri;
int numThreads = MergeThreadCount();
for (int i = 0; i < numThreads; i++)
{
MergeThread merge = mergeThreads[i];
merge.SetThreadPriority(pri);
}
}
}
private bool Verbose()
{
return writer != null && writer.Verbose;
}
private void Message(System.String message)
{
if (Verbose())
writer.Message("CMS: " + message);
}
private void InitMergeThreadPriority()
{
lock (this)
{
if (mergeThreadPriority == - 1)
{
// Default to slightly higher priority than our
// calling thread
mergeThreadPriority = 1 + (System.Int32) ThreadClass.Current().Priority;
if (mergeThreadPriority > (int) System.Threading.ThreadPriority.Highest)
mergeThreadPriority = (int) System.Threading.ThreadPriority.Highest;
}
}
}
protected override void Dispose(bool disposing)
{
//if (disposing)
//{
closed = true;
//}
}
public virtual void Sync()
{
lock (this)
{
while (MergeThreadCount() > 0)
{
if (Verbose())
Message("now wait for threads; currently " + mergeThreads.Count + " still running");
int count = mergeThreads.Count;
if (Verbose())
{
for (int i = 0; i < count; i++)
Message(" " + i + ": " + mergeThreads[i]);
}
System.Threading.Monitor.Wait(this);
}
}
}
private int MergeThreadCount()
{
lock (this)
{
int count = 0;
int numThreads = mergeThreads.Count;
for (int i = 0; i < numThreads; i++)
{
if (mergeThreads[i].IsAlive)
{
count++;
}
}
return count;
}
}
public override void Merge(IndexWriter writer)
{
// TODO: .NET doesn't support this
// assert !Thread.holdsLock(writer);
this.writer = writer;
InitMergeThreadPriority();
dir = writer.Directory;
// First, quickly run through the newly proposed merges
// and add any orthogonal merges (ie a merge not
// involving segments already pending to be merged) to
// the queue. If we are way behind on merging, many of
// these newly proposed merges will likely already be
// registered.
if (Verbose())
{
Message("now merge");
Message(" index: " + writer.SegString());
}
// Iterate, pulling from the IndexWriter's queue of
// pending merges, until it's empty:
while (true)
{
// TODO: we could be careful about which merges to do in
// the BG (eg maybe the "biggest" ones) vs FG, which
// merges to do first (the easiest ones?), etc.
MergePolicy.OneMerge merge = writer.GetNextMerge();
if (merge == null)
{
if (Verbose())
Message(" no more merges pending; now return");
return ;
}
// We do this w/ the primary thread to keep
// deterministic assignment of segment names
writer.MergeInit(merge);
bool success = false;
try
{
lock (this)
{
while (MergeThreadCount() >= _maxThreadCount)
{
if (Verbose())
Message(" too many merge threads running; stalling...");
System.Threading.Monitor.Wait(this);
}
if (Verbose())
Message(" consider merge " + merge.SegString(dir));
System.Diagnostics.Debug.Assert(MergeThreadCount() < _maxThreadCount);
// OK to spawn a new merge thread to handle this
// merge:
MergeThread merger = GetMergeThread(writer, merge);
mergeThreads.Add(merger);
if (Verbose())
Message(" launch new thread [" + merger.Name + "]");
merger.Start();
success = true;
}
}
finally
{
if (!success)
{
writer.MergeFinish(merge);
}
}
}
}
/// Does the actual merge, by calling
protected internal virtual void DoMerge(MergePolicy.OneMerge merge)
{
writer.Merge(merge);
}
/// Create and return a new MergeThread
protected internal virtual MergeThread GetMergeThread(IndexWriter writer, MergePolicy.OneMerge merge)
{
lock (this)
{
var thread = new MergeThread(this, writer, merge);
thread.SetThreadPriority(mergeThreadPriority);
thread.IsBackground = true;
thread.Name = "Lucene Merge Thread #" + mergeThreadCount++;
return thread;
}
}
public /*protected internal*/ class MergeThread:ThreadClass
{
private void InitBlock(ConcurrentMergeScheduler enclosingInstance)
{
this.enclosingInstance = enclosingInstance;
}
private ConcurrentMergeScheduler enclosingInstance;
public ConcurrentMergeScheduler Enclosing_Instance
{
get
{
return enclosingInstance;
}
}
internal IndexWriter writer;
internal MergePolicy.OneMerge startMerge;
internal MergePolicy.OneMerge runningMerge;
public MergeThread(ConcurrentMergeScheduler enclosingInstance, IndexWriter writer, MergePolicy.OneMerge startMerge)
{
InitBlock(enclosingInstance);
this.writer = writer;
this.startMerge = startMerge;
}
public virtual void SetRunningMerge(MergePolicy.OneMerge merge)
{
lock (this)
{
runningMerge = merge;
}
}
public virtual MergePolicy.OneMerge RunningMerge
{
get
{
lock (this)
{
return runningMerge;
}
}
}
public virtual void SetThreadPriority(int pri)
{
try
{
Priority = (System.Threading.ThreadPriority) pri;
}
catch (System.NullReferenceException)
{
// Strangely, Sun's JDK 1.5 on Linux sometimes
// throws NPE out of here...
}
catch (System.Security.SecurityException)
{
// Ignore this because we will still run fine with
// normal thread priority
}
}
override public void Run()
{
// First time through the while loop we do the merge
// that we were started with:
MergePolicy.OneMerge merge = this.startMerge;
try
{
if (Enclosing_Instance.Verbose())
Enclosing_Instance.Message(" merge thread: start");
while (true)
{
SetRunningMerge(merge);
Enclosing_Instance.DoMerge(merge);
// Subsequent times through the loop we do any new
// merge that writer says is necessary:
merge = writer.GetNextMerge();
if (merge != null)
{
writer.MergeInit(merge);
if (Enclosing_Instance.Verbose())
Enclosing_Instance.Message(" merge thread: do another merge " + merge.SegString(Enclosing_Instance.dir));
}
else
break;
}
if (Enclosing_Instance.Verbose())
Enclosing_Instance.Message(" merge thread: done");
}
catch (System.Exception exc)
{
// Ignore the exception if it was due to abort:
if (!(exc is MergePolicy.MergeAbortedException))
{
if (!Enclosing_Instance.suppressExceptions)
{
// suppressExceptions is normally only set during
// testing.
Lucene.Net.Index.ConcurrentMergeScheduler.anyExceptions = true;
Enclosing_Instance.HandleMergeException(exc);
}
}
}
finally
{
lock (Enclosing_Instance)
{
System.Threading.Monitor.PulseAll(Enclosing_Instance);
Enclosing_Instance.mergeThreads.Remove(this);
bool removed = !Enclosing_Instance.mergeThreads.Contains(this);
System.Diagnostics.Debug.Assert(removed);
}
}
}
public override System.String ToString()
{
MergePolicy.OneMerge merge = RunningMerge ?? startMerge;
return "merge thread: " + merge.SegString(Enclosing_Instance.dir);
}
}
/// Called when an exception is hit in a background merge
/// thread
///
protected internal virtual void HandleMergeException(System.Exception exc)
{
// When an exception is hit during merge, IndexWriter
// removes any partial files and then allows another
// merge to run. If whatever caused the error is not
// transient then the exception will keep happening,
// so, we sleep here to avoid saturating CPU in such
// cases:
System.Threading.Thread.Sleep(new System.TimeSpan((System.Int64) 10000 * 250));
throw new MergePolicy.MergeException(exc, dir);
}
internal static bool anyExceptions = false;
/// Used for testing
public static bool AnyUnhandledExceptions()
{
if (allInstances == null)
{
throw new System.SystemException("setTestMode() was not called; often this is because your test case's setUp method fails to call super.setUp in LuceneTestCase");
}
lock (allInstances)
{
int count = allInstances.Count;
// Make sure all outstanding threads are done so we see
// any exceptions they may produce:
for (int i = 0; i < count; i++)
allInstances[i].Sync();
bool v = anyExceptions;
anyExceptions = false;
return v;
}
}
public static void ClearUnhandledExceptions()
{
lock (allInstances)
{
anyExceptions = false;
}
}
/// Used for testing
private void AddMyself()
{
lock (allInstances)
{
int size = allInstances.Count;
int upto = 0;
for (int i = 0; i < size; i++)
{
ConcurrentMergeScheduler other = allInstances[i];
if (!(other.closed && 0 == other.MergeThreadCount()))
// Keep this one for now: it still has threads or
// may spawn new threads
allInstances[upto++] = other;
}
allInstances.RemoveRange(upto, allInstances.Count - upto);
allInstances.Add(this);
}
}
private bool suppressExceptions;
/// Used for testing
public /*internal*/ virtual void SetSuppressExceptions()
{
suppressExceptions = true;
}
/// Used for testing
public /*internal*/ virtual void ClearSuppressExceptions()
{
suppressExceptions = false;
}
/// Used for testing
private static List allInstances;
public static void SetTestMode()
{
allInstances = new List();
}
}
}