# ==================================================================== # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ==================================================================== import time, threading from unittest import TestCase, main from lucene import * class PyLuceneThreadTestCase(TestCase): """ Test using threads in PyLucene with python threads """ def setUp(self): self.classLoader = Thread.currentThread().getContextClassLoader() self.directory = RAMDirectory() writer = IndexWriter(self.directory, StandardAnalyzer(Version.LUCENE_CURRENT), True, IndexWriter.MaxFieldLength.LIMITED) doc1 = Document() doc2 = Document() doc3 = Document() doc4 = Document() doc1.add(Field("field", "one", Field.Store.YES, Field.Index.ANALYZED)) doc2.add(Field("field", "two", Field.Store.YES, Field.Index.ANALYZED)) doc3.add(Field("field", "three", Field.Store.YES, Field.Index.ANALYZED)) doc4.add(Field("field", "one", Field.Store.YES, Field.Index.ANALYZED)) writer.addDocument(doc1) writer.addDocument(doc2) writer.addDocument(doc3) writer.addDocument(doc4) writer.optimize() writer.close() self.testData = [('one',2), ('two',1), ('three', 1), ('five', 0)] * 500 self.lock = threading.Lock() self.totalQueries = 0 def tearDown(self): self.directory.close() def testWithMainThread(self): """ warm up test for runSearch in main thread """ self.runSearch(2000, True) def testWithPyLuceneThread(self): """ Run 5 threads with 2000 queries each """ threads = [] for i in xrange(5): threads.append(threading.Thread(target=self.runSearch, args=(2000,))) for thread in threads: thread.start() for thread in threads: thread.join() # we survived! # and all queries have ran successfully self.assertEqual(10000, self.totalQueries) def runSearch(self, runCount, mainThread=False): """ search for runCount number of times """ # problem: if there are any assertion errors in the child # thread, the calling thread is not notified and may still # consider the test case pass. We are using self.totalQueries # to double check that work has actually been done. if not mainThread: getVMEnv().attachCurrentThread() time.sleep(0.5) searcher = IndexSearcher(self.directory, True) try: self.query = PhraseQuery() for word, count in self.testData[0:runCount]: query = TermQuery(Term("field", word)) topDocs = searcher.search(query, 50) self.assertEqual(topDocs.totalHits, count) self.lock.acquire() self.totalQueries += 1 self.lock.release() finally: searcher.close() if __name__ == "__main__": import sys, lucene lucene.initVM() if '-loop' in sys.argv: sys.argv.remove('-loop') while True: try: main() except: pass else: main()