/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using IndexReader = Lucene.Net.Index.IndexReader; using Term = Lucene.Net.Index.Term; using PriorityQueue = Lucene.Net.Util.PriorityQueue; namespace Lucene.Net.Search { /// Implements parallel search over a set of Searchables. /// ///

Applications usually need only call the inherited {@link #Search(Query)} /// or {@link #Search(Query,Filter)} methods. ///

public class ParallelMultiSearcher:MultiSearcher { private class AnonymousClassCollector1:Collector { public AnonymousClassCollector1(Lucene.Net.Search.Collector collector, int start, ParallelMultiSearcher enclosingInstance) { InitBlock(collector, start, enclosingInstance); } private void InitBlock(Lucene.Net.Search.Collector collector, int start, ParallelMultiSearcher enclosingInstance) { this.collector = collector; this.start = start; this.enclosingInstance = enclosingInstance; } private Lucene.Net.Search.Collector collector; private int start; private ParallelMultiSearcher enclosingInstance; public ParallelMultiSearcher Enclosing_Instance { get { return enclosingInstance; } } public override void SetScorer(Scorer scorer) { collector.SetScorer(scorer); } public override void Collect(int doc) { collector.Collect(doc); } public override void SetNextReader(IndexReader reader, int docBase) { collector.SetNextReader(reader, start + docBase); } public override bool AcceptsDocsOutOfOrder() { return collector.AcceptsDocsOutOfOrder(); } } private Searchable[] searchables; private int[] starts; /// Creates a searchable which searches searchables. public ParallelMultiSearcher(Searchable[] searchables):base(searchables) { this.searchables = searchables; this.starts = GetStarts(); } /// TODO: parallelize this one too public override int DocFreq(Term term) { return base.DocFreq(term); } /// A search implementation which spans a new thread for each /// Searchable, waits for each search to complete and merge /// the results back together. /// public override TopDocs Search(Weight weight, Filter filter, int nDocs) { HitQueue hq = new HitQueue(nDocs, false); int totalHits = 0; MultiSearcherThread[] msta = new MultiSearcherThread[searchables.Length]; for (int i = 0; i < searchables.Length; i++) { // search each searchable // Assume not too many searchables and cost of creating a thread is by far inferior to a search msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs, hq, i, starts, "MultiSearcher thread #" + (i + 1)); msta[i].Start(); } for (int i = 0; i < searchables.Length; i++) { try { msta[i].Join(); } catch (System.Threading.ThreadInterruptedException ie) { // In 3.0 we will change this to throw // InterruptedException instead SupportClass.ThreadClass.Current().Interrupt(); throw new System.SystemException(ie.Message, ie); } System.IO.IOException ioe = msta[i].GetIOException(); if (ioe == null) { totalHits += msta[i].Hits(); } else { // if one search produced an IOException, rethrow it throw ioe; } } ScoreDoc[] scoreDocs = new ScoreDoc[hq.Size()]; for (int i = hq.Size() - 1; i >= 0; i--) // put docs in array scoreDocs[i] = (ScoreDoc) hq.Pop(); float maxScore = (totalHits == 0)?System.Single.NegativeInfinity:scoreDocs[0].score; return new TopDocs(totalHits, scoreDocs, maxScore); } /// A search implementation allowing sorting which spans a new thread for each /// Searchable, waits for each search to complete and merges /// the results back together. /// public override TopFieldDocs Search(Weight weight, Filter filter, int nDocs, Sort sort) { // don't specify the fields - we'll wait to do this until we get results FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(null, nDocs); int totalHits = 0; MultiSearcherThread[] msta = new MultiSearcherThread[searchables.Length]; for (int i = 0; i < searchables.Length; i++) { // search each searchable // Assume not too many searchables and cost of creating a thread is by far inferior to a search msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs, hq, sort, i, starts, "MultiSearcher thread #" + (i + 1)); msta[i].Start(); } float maxScore = System.Single.NegativeInfinity; for (int i = 0; i < searchables.Length; i++) { try { msta[i].Join(); } catch (System.Threading.ThreadInterruptedException ie) { // In 3.0 we will change this to throw // InterruptedException instead SupportClass.ThreadClass.Current().Interrupt(); throw new System.SystemException(ie.Message, ie); } System.IO.IOException ioe = msta[i].GetIOException(); if (ioe == null) { totalHits += msta[i].Hits(); maxScore = System.Math.Max(maxScore, msta[i].GetMaxScore()); } else { // if one search produced an IOException, rethrow it throw ioe; } } ScoreDoc[] scoreDocs = new ScoreDoc[hq.Size()]; for (int i = hq.Size() - 1; i >= 0; i--) // put docs in array scoreDocs[i] = (ScoreDoc) hq.Pop(); return new TopFieldDocs(totalHits, scoreDocs, hq.GetFields(), maxScore); } /// Lower-level search API. /// ///

{@link Collector#Collect(int)} is called for every matching document. /// ///

Applications should only use this if they need all of the /// matching documents. The high-level search API ({@link /// Searcher#Search(Query)}) is usually more efficient, as it skips /// non-high-scoring hits. /// ///

/// to match documents /// /// if non-null, a bitset used to eliminate some documents /// /// to receive hits /// /// TODO: parallelize this one too /// public override void Search(Weight weight, Filter filter, Collector collector) { for (int i = 0; i < searchables.Length; i++) { int start = starts[i]; Collector hc = new AnonymousClassCollector1(collector, start, this); searchables[i].Search(weight, filter, hc); } } /* * TODO: this one could be parallelized too * @see Lucene.Net.Search.Searchable#rewrite(Lucene.Net.Search.Query) */ public override Query Rewrite(Query original) { return base.Rewrite(original); } } /// A thread subclass for searching a single searchable class MultiSearcherThread:SupportClass.ThreadClass { private Searchable searchable; private Weight weight; private Filter filter; private int nDocs; private TopDocs docs; private int i; private PriorityQueue hq; private int[] starts; private System.IO.IOException ioe; private Sort sort; public MultiSearcherThread(Searchable searchable, Weight weight, Filter filter, int nDocs, HitQueue hq, int i, int[] starts, System.String name):base(name) { this.searchable = searchable; this.weight = weight; this.filter = filter; this.nDocs = nDocs; this.hq = hq; this.i = i; this.starts = starts; } public MultiSearcherThread(Searchable searchable, Weight weight, Filter filter, int nDocs, FieldDocSortedHitQueue hq, Sort sort, int i, int[] starts, System.String name):base(name) { this.searchable = searchable; this.weight = weight; this.filter = filter; this.nDocs = nDocs; this.hq = hq; this.i = i; this.starts = starts; this.sort = sort; } override public void Run() { try { docs = (sort == null)?searchable.Search(weight, filter, nDocs):searchable.Search(weight, filter, nDocs, sort); } // Store the IOException for later use by the caller of this thread catch (System.IO.IOException ioe) { this.ioe = ioe; } if (this.ioe == null) { // if we are sorting by fields, we need to tell the field sorted hit queue // the actual type of fields, in case the original list contained AUTO. // if the searchable returns null for fields, we'll have problems. if (sort != null) { TopFieldDocs docsFields = (TopFieldDocs) docs; // If one of the Sort fields is FIELD_DOC, need to fix its values, so that // it will break ties by doc Id properly. Otherwise, it will compare to // 'relative' doc Ids, that belong to two different searchables. for (int j = 0; j < docsFields.fields.Length; j++) { if (docsFields.fields[j].GetType() == SortField.DOC) { // iterate over the score docs and change their fields value for (int j2 = 0; j2 < docs.scoreDocs.Length; j2++) { FieldDoc fd = (FieldDoc) docs.scoreDocs[j2]; fd.fields[j] = (System.Int32) (((System.Int32) fd.fields[j]) + starts[i]); } break; } } ((FieldDocSortedHitQueue) hq).SetFields(docsFields.fields); } ScoreDoc[] scoreDocs = docs.scoreDocs; for (int j = 0; j < scoreDocs.Length; j++) { // merge scoreDocs into hq ScoreDoc scoreDoc = scoreDocs[j]; scoreDoc.doc += starts[i]; // convert doc //it would be so nice if we had a thread-safe insert lock (hq) { if (!hq.Insert(scoreDoc)) break; } // no more scores > minScore } } } public virtual int Hits() { return docs.totalHits; } public virtual float GetMaxScore() { return docs.GetMaxScore(); } public virtual System.IO.IOException GetIOException() { return ioe; } } }