l.s.d.d.DummyStorageDriver(StorageDriver) : class documentation

Part of libcloud.storage.drivers.dummy View Source View In Hierarchy

Dummy Storage driver.

>>> from libcloud.storage.drivers.dummy import DummyStorageDriver
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(container_name='test container')
>>> container
<Container: name=test container, provider=Dummy Storage Provider>
>>> container.name
'test container'
>>> container.extra['object_count']
0
Method __init__
Method get_meta_data No summary
Method list_containers No summary
Method list_container_objects Return a list of objects for the given container.
Method get_container No summary
Method get_container_cdn_url No summary
Method get_object No summary
Method get_object_cdn_url No summary
Method create_container No summary
Method delete_container No summary
Method download_object Download an object to the specified destination path.
Method download_object_as_stream No summary
Method upload_object No summary
Method upload_object_via_stream No summary
Method delete_object No summary
Method _add_object Undocumented

Inherited from StorageDriver:

Method list_containters Return a list of containers.
Method enable_container_cdn Undocumented
Method enable_object_cdn Undocumented
Method _get_object Call passed callback and start transfer of the object'
Method _save_object Save object to the provided path.
Method _upload_object Helper function for setting common request headers and calling the passed in callback which uploads an object.
Method _upload_data Upload data stored in a string.
Method _stream_data Stream a data over an http connection.
Method _upload_file Upload a file to the server.
Method _get_hash_function Return instantiated hash function for the hash type supported by the provider.

Inherited from BaseDriver (via StorageDriver):

Method _ex_connection_class_kwargs Return extra connection keyword arguments which are passed to the Connection class constructor.
def __init__(self, api_key, api_secret): (source)
ParameterskeyAPI key or username to used (type: str)
secretSecret password to be used (type: str)
secureWeither to use HTTPS or HTTP. Note: Some providers only support HTTPS, and it is on by default. (type: bool)
hostOverride hostname used for connections. (type: str)
portOverride port used for connections. (type: int)
api_versionOptional API version. Only used by drivers which support multiple API versions. (type: str)
def get_meta_data(self): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.get_meta_data()
{'object_count': 0, 'container_count': 0, 'bytes_used': 0}
>>> container = driver.create_container(container_name='test container 1')
>>> container = driver.create_container(container_name='test container 2')
>>> obj = container.upload_object_via_stream(
...  object_name='test object', iterator=DummyFileObject(5, 10), extra={})
>>> driver.get_meta_data()
{'object_count': 1, 'container_count': 2, 'bytes_used': 50}
def list_containers(self): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.list_containers()
[]
>>> container = driver.create_container(container_name='test container 1')
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> container.name
'test container 1'
>>> container = driver.create_container(container_name='test container 2')
>>> container
<Container: name=test container 2, provider=Dummy Storage Provider>
>>> container = driver.create_container(
...  container_name='test container 2') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerAlreadyExistsError:
>>> container_list=driver.list_containers()
>>> sorted([container.name for container in container_list])
['test container 1', 'test container 2']
def list_container_objects(self, container): (source)
Return a list of objects for the given container.
ParameterscontainerContainer instance (type: Container)
ReturnsA list of Object instances.
def get_container(self, container_name): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerDoesNotExistError:
>>> container = driver.create_container(container_name='test container 1')
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> container.name
'test container 1'
>>> driver.get_container('test container 1')
<Container: name=test container 1, provider=Dummy Storage Provider>
def get_container_cdn_url(self, container): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerDoesNotExistError:
>>> container = driver.create_container(container_name='test container 1')
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> container.name
'test container 1'
>>> container.get_cdn_url()
'http://www.test.com/container/test_container_1'
def get_object(self, container_name, object_name): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> driver.get_object('unknown', 'unknown') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerDoesNotExistError:
>>> container = driver.create_container(container_name='test container 1')
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> driver.get_object(
...  'test container 1', 'unknown') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ObjectDoesNotExistError:
>>> obj = container.upload_object_via_stream(object_name='test object',
...      iterator=DummyFileObject(5, 10), extra={})
>>> obj
<Object: name=test object, size=50, hash=None, provider=Dummy Storage Provider ...>
def get_object_cdn_url(self, obj): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(container_name='test container 1')
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> obj = container.upload_object_via_stream(object_name='test object 5',
...      iterator=DummyFileObject(5, 10), extra={})
>>> obj
<Object: name=test object 5, size=50, hash=None, provider=Dummy Storage Provider ...>
>>> obj.get_cdn_url()
'http://www.test.com/object/test_object_5'
def create_container(self, container_name): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(container_name='test container 1')
>>> container
<Container: name=test container 1, provider=Dummy Storage Provider>
>>> container = driver.create_container(
...    container_name='test container 1') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerAlreadyExistsError:
def delete_container(self, container): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = Container(name = 'test container',
...    extra={'object_count': 0}, driver=driver)
>>> driver.delete_container(container=container)#doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerDoesNotExistError:
>>> container = driver.create_container(
...      container_name='test container 1') #doctest: +IGNORE_EXCEPTION_DETAIL
>>> len(driver._containers)
1
>>> driver.delete_container(container=container)
True
>>> len(driver._containers)
0
>>> container = driver.create_container(
...    container_name='test container 1') #doctest: +IGNORE_EXCEPTION_DETAIL
>>> obj = container.upload_object_via_stream(
...   object_name='test object', iterator=DummyFileObject(5, 10), extra={})
>>> driver.delete_container(container=container)#doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ContainerIsNotEmptyError:
def download_object(self, obj, destination_path, overwrite_existing=False, delete_on_failure=True): (source)
Download an object to the specified destination path.
ParametersobjObject instance. (type: Object)
destination_pathFull path to a file or a directory where the incoming file will be saved. (type: str)
overwrite_existingTrue to overwrite an existing file, defaults to False. (type: bool)
delete_on_failureTrue to delete a partially downloaded file if the download was not successful (hash mismatch / file size). (type: bool)
ReturnsTrue if an object has been successfully downloaded, False otherwise. (type: bool)
def download_object_as_stream(self, obj, chunk_size=None): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(
...   container_name='test container 1') #doctest: +IGNORE_EXCEPTION_DETAIL
>>> obj = container.upload_object_via_stream(object_name='test object',
...    iterator=DummyFileObject(5, 10), extra={})
>>> stream = container.download_object_as_stream(obj)
>>> stream #doctest: +ELLIPSIS
<...closed...>
def upload_object(self, file_path, container, object_name, extra=None, file_hash=None): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(container_name='test container 1')
>>> container.upload_object(file_path='/tmp/inexistent.file',
...     object_name='test') #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
LibcloudError:
>>> file_path = path = os.path.abspath(__file__)
>>> file_size = os.path.getsize(file_path)
>>> obj = container.upload_object(file_path=file_path, object_name='test')
>>> obj #doctest: +ELLIPSIS
<Object: name=test, size=...>
>>> obj.size == file_size
True
def upload_object_via_stream(self, iterator, container, object_name, extra=None): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(
...    container_name='test container 1') #doctest: +IGNORE_EXCEPTION_DETAIL
>>> obj = container.upload_object_via_stream(
...   object_name='test object', iterator=DummyFileObject(5, 10), extra={})
>>> obj #doctest: +ELLIPSIS
<Object: name=test object, size=50, ...>
def delete_object(self, obj): (source)
>>> driver = DummyStorageDriver('key', 'secret')
>>> container = driver.create_container(
...   container_name='test container 1') #doctest: +IGNORE_EXCEPTION_DETAIL
>>> obj = container.upload_object_via_stream(object_name='test object',
...   iterator=DummyFileObject(5, 10), extra={})
>>> obj #doctest: +ELLIPSIS
<Object: name=test object, size=50, ...>
>>> container.delete_object(obj=obj)
True
>>> obj = Object(name='test object 2',
...    size=1000, hash=None, extra=None,
...    meta_data=None, container=container,driver=None)
>>> container.delete_object(obj=obj) #doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ObjectDoesNotExistError:
def _add_object(self, container, object_name, size, extra=None): (source)
Undocumented
API Documentation for libcloud, generated by pydoctor at 2012-07-15 18:48:28.