l.s.d.d.DummyStorageDriver(StorageDriver) : class documentation

Part of libcloud.storage.drivers.dummy View Source View In Hierarchy

Dummy Storage driver.

>>> from libcloud.storage.drivers.dummy import DummyStorageDriver
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(container_name='test container')
>>> container
<Container: name=test container, provider=Dummy Storage Provider>
>>> container.name
'test container'
>>> container.extra['object_count']
0
Method __init__ Undocumented
Method get_meta_data No summary
Method list_containers No summary
Method list_container_objects Return a list of objects for the given container.
Method get_container No summary
Method get_container_cdn_url No summary
Method get_object No summary
Method get_object_cdn_url No summary
Method create_container No summary
Method delete_container No summary
Method download_object Download an object to the specified destination path.
Method download_object_as_stream No summary
Method upload_object No summary
Method upload_object_via_stream No summary
Method delete_object No summary
Method _add_object Undocumented

Inherited from StorageDriver:

Method list_containters Undocumented
Method enable_container_cdn Undocumented
Method enable_object_cdn Undocumented
Method _get_object Call passed callback and start transfer of the object'
Method _save_object Save object to the provided path.
Method _upload_object Helper function for setting common request headers and calling the passed in callback which uploads an object.
Method _stream_data Stream a data over an http connection.
Method _upload_file Upload a file to the server.
def __init__(self, api_key, api_secret): (source)
Undocumented
def get_meta_data(self): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.get_meta_data()
{'object_count': 0, 'container_count': 0, 'bytes_used': 0}
>>> container = driver.create_container(container_name='test container 1')
>>> container = driver.create_container(container_name='test container 2')
>>> obj = container.upload_object_via_stream(
...  object_name='test object', iterator=DummyFileObject(5, 10), extra={})
>>> driver.get_meta_data()
{'object_count': 1, 'container_count': 2, 'bytes_used': 50}
def list_containers(self): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.list_containers()
[]
>>> container = driver.create_container(container_name='test container 1')
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> container.name
'test container 1'
>>> container = driver.create_container(container_name='test container 2')
>>> container
<Container: name=test container 2, provider=Dummy Storage Provider>
>>> container = driver.create_container(
...  container_name='test container 2') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerAlreadyExistsError:
>>> container_list=driver.list_containers()
>>> sorted([container.name for container in container_list])
['test container 1', 'test container 2']
def list_container_objects(self, container): (source)
Return a list of objects for the given container.

@type container: C{Container}
@param container: Container instance

@return A list of Object instances.
def get_container(self, container_name): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerDoesNotExistError:
>>> container = driver.create_container(container_name='test container 1')
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> container.name
'test container 1'
>>> driver.get_container('test container 1')
<Container: name=test container 1, provider=Dummy Storage Provider>
def get_container_cdn_url(self, container): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerDoesNotExistError:
>>> container = driver.create_container(container_name='test container 1')
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> container.name
'test container 1'
>>> container.get_cdn_url()
'http://www.test.com/container/test_container_1'
def get_object(self, container_name, object_name): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.get_object('unknown', 'unknown') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerDoesNotExistError:
>>> container = driver.create_container(container_name='test container 1')
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> driver.get_object(
...  'test container 1', 'unknown') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ObjectDoesNotExistError:
>>> obj = container.upload_object_via_stream(object_name='test object',
...      iterator=DummyFileObject(5, 10), extra={})
>>> obj
<Object: name=test object, size=50, hash=None, provider=Dummy Storage Provider ...>
def get_object_cdn_url(self, obj): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(container_name='test container 1')
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> obj = container.upload_object_via_stream(object_name='test object 5',
...      iterator=DummyFileObject(5, 10), extra={})
>>> obj
<Object: name=test object 5, size=50, hash=None, provider=Dummy Storage Provider ...>
>>> obj.get_cdn_url()
'http://www.test.com/object/test_object_5'
def create_container(self, container_name): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(container_name='test container 1')
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> container = driver.create_container(
...    container_name='test container 1') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerAlreadyExistsError:
def delete_container(self, container): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = Container(name = 'test container',
...    extra={'object_count': 0}, driver=driver)
>>> driver.delete_container(container=container)#doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerDoesNotExistError:
>>> container = driver.create_container(
...      container_name='test container 1') #doctest: +IGNORE_EXCEPTION_DETAIL
>>> len(driver._containers)
1
>>> driver.delete_container(container=container)
True
>>> len(driver._containers)
0
>>> container = driver.create_container(
...    container_name='test container 1') #doctest: +IGNORE_EXCEPTION_DETAIL
>>> obj = container.upload_object_via_stream(
...   object_name='test object', iterator=DummyFileObject(5, 10), extra={})
>>> driver.delete_container(container=container)#doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerIsNotEmptyError:
def download_object(self, obj, destination_path, overwrite_existing=False, delete_on_failure=True): (source)
Download an object to the specified destination path.

@type obj; C{Object}
@param obj: Object instance.

@type destination_path: C{str}
@type destination_path: Full path to a file or a directory where the
                        incoming file will be saved.

@type overwrite_existing: C{bool}
@type overwrite_existing: True to overwrite an existing file, defaults to False.

@type delete_on_failure: C{bool}
@param delete_on_failure: True to delete a partially downloaded file if
the download was not successful (hash mismatch / file size).

@return C{bool} True if an object has been successfully downloaded, False
otherwise.
def download_object_as_stream(self, obj, chunk_size=None): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(
...   container_name='test container 1') #doctest: +IGNORE_EXCEPTION_DETAIL
>>> obj = container.upload_object_via_stream(object_name='test object',
...    iterator=DummyFileObject(5, 10), extra={})
>>> stream = container.download_object_as_stream(obj)
>>> stream #doctest: +ELLIPSIS
<closed file ..., mode '<uninitialized file>' at 0x...>
def upload_object(self, file_path, container, object_name, extra=None, file_hash=None): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(container_name='test container 1')
>>> container.upload_object(file_path='/tmp/inexistent.file',
...     object_name='test') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
LibcloudError:
>>> file_path = path = os.path.abspath(__file__)
>>> file_size = os.path.getsize(file_path)
>>> obj = container.upload_object(file_path=file_path, object_name='test')
>>> obj #doctest: +ELLIPSIS
<Object: name=test, size=...>
>>> obj.size == file_size
True
def upload_object_via_stream(self, iterator, container, object_name, extra=None): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(
...    container_name='test container 1') #doctest: +IGNORE_EXCEPTION_DETAIL
>>> obj = container.upload_object_via_stream(
...   object_name='test object', iterator=DummyFileObject(5, 10), extra={})
>>> obj #doctest: +ELLIPSIS
<Object: name=test object, size=50, ...>
def delete_object(self, obj): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(
...   container_name='test container 1') #doctest: +IGNORE_EXCEPTION_DETAIL
>>> obj = container.upload_object_via_stream(object_name='test object',
...   iterator=DummyFileObject(5, 10), extra={})
>>> obj #doctest: +ELLIPSIS
<Object: name=test object, size=50, ...>
>>> container.delete_object(obj=obj)
True
>>> obj = Object(name='test object 2',
...    size=1000, hash=None, extra=None,
...    meta_data=None, container=container,driver=None)
>>> container.delete_object(obj=obj) #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ObjectDoesNotExistError:
def _add_object(self, container, object_name, size, extra=None): (source)
Undocumented
API Documentation for libcloud, generated by pydoctor at 2011-07-02 22:19:34.