package org.apache.lucene.search; /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import java.io.IOException; import java.util.List; import java.util.Arrays; import java.util.ArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.Callable; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; import org.apache.lucene.util.NamedThreadFactory; import org.apache.lucene.util.PriorityQueue; import org.apache.lucene.util.ThreadInterruptedException; /** Implements parallel search over a set of Searchables. * *

Applications usually need only call the inherited {@link #search(Query,int)} * or {@link #search(Query,Filter,int)} methods. */ public class ParallelMultiSearcher extends MultiSearcher { private final ExecutorService executor; private final Searchable[] searchables; private final int[] starts; /** Creates a {@link Searchable} which searches searchables. */ public ParallelMultiSearcher(Searchable... searchables) throws IOException { super(searchables); this.searchables = searchables; this.starts = getStarts(); executor = Executors.newCachedThreadPool(new NamedThreadFactory(this.getClass().getSimpleName())); } /** * Executes each {@link Searchable}'s docFreq() in its own thread and waits for each search to complete and merge * the results back together. */ @Override public int docFreq(final Term term) throws IOException { @SuppressWarnings("unchecked") final Future[] searchThreads = new Future[searchables.length]; for (int i = 0; i < searchables.length; i++) { // search each searchable final Searchable searchable = searchables[i]; searchThreads[i] = executor.submit(new Callable() { public Integer call() throws IOException { return Integer.valueOf(searchable.docFreq(term)); } }); } final CountDocFreq func = new CountDocFreq(); foreach(func, Arrays.asList(searchThreads)); return func.docFreq; } /** * A search implementation which executes each * {@link Searchable} in its own thread and waits for each search to complete and merge * the results back together. */ @Override public TopDocs search(Weight weight, Filter filter, int nDocs) throws IOException { final HitQueue hq = new HitQueue(nDocs, false); final Lock lock = new ReentrantLock(); @SuppressWarnings("unchecked") final Future[] searchThreads = new Future[searchables.length]; for (int i = 0; i < searchables.length; i++) { // search each searchable searchThreads[i] = executor.submit( new MultiSearcherCallableNoSort(lock, searchables[i], weight, filter, nDocs, hq, i, starts)); } final CountTotalHits func = new CountTotalHits(); foreach(func, Arrays.asList(searchThreads)); final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; for (int i = hq.size() - 1; i >= 0; i--) // put docs in array scoreDocs[i] = hq.pop(); return new TopDocs(func.totalHits, scoreDocs, func.maxScore); } /** * A search implementation allowing sorting which spans a new thread for each * Searchable, waits for each search to complete and merges * the results back together. */ @Override public TopFieldDocs search(Weight weight, Filter filter, int nDocs, Sort sort) throws IOException { if (sort == null) throw new NullPointerException(); final FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(nDocs); final Lock lock = new ReentrantLock(); @SuppressWarnings("unchecked") final Future[] searchThreads = new Future[searchables.length]; for (int i = 0; i < searchables.length; i++) { // search each searchable searchThreads[i] = executor.submit( new MultiSearcherCallableWithSort(lock, searchables[i], weight, filter, nDocs, hq, sort, i, starts)); } final CountTotalHits func = new CountTotalHits(); foreach(func, Arrays.asList(searchThreads)); final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()]; for (int i = hq.size() - 1; i >= 0; i--) // put docs in array scoreDocs[i] = hq.pop(); return new TopFieldDocs(func.totalHits, scoreDocs, hq.getFields(), func.maxScore); } /** Lower-level search API. * *

{@link Collector#collect(int)} is called for every matching document. * *

Applications should only use this if they need all of the * matching documents. The high-level search API ({@link * Searcher#search(Query,int)}) is usually more efficient, as it skips * non-high-scoring hits. * *

This method cannot be parallelized, because {@link Collector} * supports no concurrent access. * * @param weight to match documents * @param filter if non-null, a bitset used to eliminate some documents * @param collector to receive hits */ @Override public void search(final Weight weight, final Filter filter, final Collector collector) throws IOException { for (int i = 0; i < searchables.length; i++) { final int start = starts[i]; final Collector hc = new Collector() { @Override public void setScorer(final Scorer scorer) throws IOException { collector.setScorer(scorer); } @Override public void collect(final int doc) throws IOException { collector.collect(doc); } @Override public void setNextReader(final IndexReader reader, final int docBase) throws IOException { collector.setNextReader(reader, start + docBase); } @Override public boolean acceptsDocsOutOfOrder() { return collector.acceptsDocsOutOfOrder(); } }; searchables[i].search(weight, filter, hc); } } /* * apply the function to each element of the list. This method encapsulates the logic how * to wait for concurrently executed searchables. */ private void foreach(Function func, List> list) throws IOException{ for (Future future : list) { try{ func.apply(future.get()); } catch (ExecutionException e) { if (e.getCause() instanceof IOException) throw (IOException) e.getCause(); throw new RuntimeException(e.getCause()); } catch (InterruptedException ie) { throw new ThreadInterruptedException(ie); } } } // Both functions could be reduced to Int as other values of TopDocs // are not needed. Using sep. functions is more self documenting. /** * A function with one argument * @param the argument type */ private static interface Function { abstract void apply(T t); } /** * Counts the total number of hits for all {@link TopDocs} instances * provided. */ private static final class CountTotalHits implements Function { int totalHits = 0; float maxScore = Float.NEGATIVE_INFINITY; public void apply(T t) { totalHits += t.totalHits; maxScore = Math.max(maxScore, t.getMaxScore()); } } /** * Accumulates the document frequency for a term. */ private static final class CountDocFreq implements Function{ int docFreq = 0; public void apply(Integer t) { docFreq += t.intValue(); } } }