# ==================================================================== # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ==================================================================== import sys, lucene, unittest import os, shutil import test_PyLucene from threading import RLock from lucene import JavaError, JArray from java.lang import String from java.io import IOException from org.apache.pylucene.store import \ PythonLock, PythonLockFactory, \ PythonIndexInput, PythonIndexOutput, PythonDirectory """ The Directory Implementation here is for testing purposes only, not meant as an example of writing one, the implementation here suffers from a lack of safety when dealing with concurrent modifications as it does away with the file locking in the default lucene fsdirectory implementation. """ DEBUG = False class DebugWrapper(object): def __init__(self, obj): self.obj = obj def __getattr__(self, name): print self.obj.__class__.__name__, self.obj.name, name sys.stdout.flush() return getattr(self.obj, name) class DebugFactory(object): def __init__(self, klass): self.klass = klass def __call__(self, *args, **kw): instance = self.klass(*args, **kw) return DebugWrapper(instance) class PythonDirLock(PythonLock): # only safe for a single process def __init__(self, name, path, lock): super(PythonDirLock, self).__init__() self.name = name self.lock_file = path self.lock = lock def isLocked(self): return self.lock.locked() def obtain(self): return self.lock.acquire() def release(self): return self.lock.release() class PythonDirLockFactory(PythonLockFactory): def __init__(self, path): super(PythonDirLockFactory, self).__init__() self.path = path self._locks = {} def makeLock(self, name): lock = self._locks.get(name) if lock is None: lock = PythonDirLock(name, os.path.join(self.path, name), RLock()) self._locks[name] = lock return lock def clearLock(self, name): lock = self._locks.pop(name, None) if lock is not None: lock.release() class PythonFileStreamInput(PythonIndexInput): def __init__(self, name, fh, size, clone=False): if not clone: super(PythonFileStreamInput, self).__init__(name, size) self.name = name self.fh = fh self._length = size self.isOpen = True self.isClone = clone def length(self): return long(self._length) def clone(self): clone = PythonFileStreamInput(self.name, self.fh, self._length, True) return super(PythonFileStreamInput, self).clone(clone) def close(self): if self.isOpen: self.isOpen = False if not self.isClone: self.fh.close() def readInternal(self, length, pos): self.fh.seek(pos) return JArray('byte')(self.fh.read(length)) def seekInternal(self, pos): self.fh.seek(pos) class PythonFileStreamOutput(PythonIndexOutput): def __init__(self, name, fh): super(PythonFileStreamOutput, self).__init__() self.name = name self.fh = fh self.isOpen = True self._length = 0 def close(self): if self.isOpen: super(PythonFileStreamOutput, self).close() self.isOpen = False self.fh.close() def length(self): return long(self._length) def seekInternal(self, pos): self.fh.seek(pos) def flushBuffer(self, bytes): self.fh.write(bytes.string_) self.fh.flush() self._length += len(bytes) class PythonFileDirectory(PythonDirectory): def __init__(self, path): super(PythonFileDirectory, self).__init__(PythonDirLockFactory(path)) self.name = path assert os.path.isdir(path) self.path = path self._streams = [] def close(self): for stream in self._streams: stream.close() del self._streams[:] def createOutput(self, name, context): file_path = os.path.join(self.path, name) fh = open(file_path, "wb") stream = PythonFileStreamOutput(name, fh) self._streams.append(stream) return stream def deleteFile(self, name): if self.fileExists(name): os.unlink(os.path.join(self.path, name)) def fileExists(self, name): return os.path.exists(os.path.join(self.path, name)) def fileLength(self, name): file_path = os.path.join(self.path, name) return long(os.path.getsize(file_path)) def fileModified(self, name): file_path = os.path.join(self.path, name) return os.path.getmtime(file_path) def listAll(self): return os.listdir(self.path) def sync(self, name): pass def openInput(self, name, bufferSize=0): file_path = os.path.join(self.path, name) try: fh = open(file_path, "rb") except IOError: raise JavaError, IOException(name) stream = PythonFileStreamInput(name, fh, os.path.getsize(file_path)) self._streams.append(stream) return stream def touchFile(self, name): file_path = os.path.join(self.path, name) os.utime(file_path, None) if DEBUG: _globals = globals() _globals['PythonFileDirectory'] = DebugFactory(PythonFileDirectory) _globals['PythonFileStreamInput'] = DebugFactory(PythonFileStreamInput) _globals['PythonFileStreamOutput'] = DebugFactory(PythonFileStreamOutput) _globals['PythonDirLock'] = DebugFactory(PythonDirLock) del _globals class PythonDirectoryTests(unittest.TestCase, test_PyLucene.Test_PyLuceneBase): STORE_DIR = "testpyrepo" def setUp(self): if not os.path.exists(self.STORE_DIR): os.mkdir(self.STORE_DIR) def tearDown(self): if os.path.exists(self.STORE_DIR): shutil.rmtree(self.STORE_DIR) def openStore(self): return PythonFileDirectory(self.STORE_DIR) def closeStore(self, store, *args): for arg in args: if arg is not None: try: arg.close() except Exception, e: import pdb; pdb.set_trace() store.close() def test_IncrementalLoop(self): print "Testing Indexing Incremental Looping" for i in range(100): print "indexing ", i sys.stdout.flush() self.test_indexDocument() if __name__ == "__main__": env = lucene.initVM() if '-loop' in sys.argv: sys.argv.remove('-loop') while True: try: unittest.main() except: pass print 'inputs', env._dumpRefs(True).get('class org.osafoundation.lucene.store.PythonIndexOutput', 0) print 'outputs', env._dumpRefs(True).get('class org.osafoundation.lucene.store.PythonIndexInput', 0) print 'locks', env._dumpRefs(True).get('class org.osafoundation.lucene.store.PythonLock', 0) print 'dirs', env._dumpRefs(True).get('class org.osafoundation.lucene.store.PythonLock', 0) else: unittest.main()