Commit 7b13585d authored by Yannick's avatar Yannick
Browse files

implement update and changes from core

parent 4ea8914d
__version__ = '0.3.0' __version__ = '1.0.0'
...@@ -5,7 +5,7 @@ from azure.storage.blob.aio import BlobServiceClient ...@@ -5,7 +5,7 @@ from azure.storage.blob.aio import BlobServiceClient
from azure.storage.blob import ContentSettings from azure.storage.blob import ContentSettings
from azure.identity.aio import DefaultAzureCredential from azure.identity.aio import DefaultAzureCredential
from osdu.core.api.storage.tenant import Tenant from osdu.core.api.storage.tenant import Tenant
from azure.core import MatchConditions
from osdu_az.partition.partition_service import PartitionService from osdu_az.partition.partition_service import PartitionService
...@@ -58,10 +58,10 @@ class AzureAioBlobStorage(BlobStorageBase): ...@@ -58,10 +58,10 @@ class AzureAioBlobStorage(BlobStorageBase):
:param auth: auth obj to perform the operation :param auth: auth obj to perform the operation
:param tenant: tenant info :param tenant: tenant info
:param prefix: Filter results to objects whose names begin with this prefix :param prefix: Filter results to objects whose names begin with this prefix
:param page_token: A previously-returned page token representing part of the larger set of results to view. :param page_token: UNUSED
:param max_result: Maximum number of items to return. :param max_result: Maximum number of items to return.
:param timeout: timeout :param timeout: timeout
:param kwargs: :param kwargs: UNUSED
:return: list of blob names :return: list of blob names
""" """
container = tenant.bucket_name container = tenant.bucket_name
...@@ -69,19 +69,21 @@ class AzureAioBlobStorage(BlobStorageBase): ...@@ -69,19 +69,21 @@ class AzureAioBlobStorage(BlobStorageBase):
result = [] result = []
async with blob_service_client: async with blob_service_client:
container_client = blob_service_client.get_container_client(container) container_client = blob_service_client.get_container_client(container)
async for prop in container_client.list_blobs(name_starts_with=prefix): async for prop in container_client.list_blobs(name_starts_with=prefix, timeout=timeout):
result.append(prop.name) result.append(prop.name)
if max_result and len(result) >= max_result:
break
return result return result
async def delete(self, tenant: Tenant, object_name: str, async def delete(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs): *args, auth: Optional = None, timeout: int = 10, **kwargs):
""" """
delete an object delete an object
:param auth: auth obj to perform the operation :param auth: UNUSED
:param tenant: tenant info :param tenant: tenant info
:param object_name: :param object_name:
:param timeout: :param timeout: UNUSED
:param kwargs: :param kwargs: UNUSED
:return: :return:
""" """
container = tenant.bucket_name container = tenant.bucket_name
...@@ -94,11 +96,11 @@ class AzureAioBlobStorage(BlobStorageBase): ...@@ -94,11 +96,11 @@ class AzureAioBlobStorage(BlobStorageBase):
*args, auth: Optional = None, timeout: int = 10, **kwargs) -> bytes: *args, auth: Optional = None, timeout: int = 10, **kwargs) -> bytes:
""" """
download blob data download blob data
:param auth: auth obj to perform the operation :param auth: UNUSED
:param tenant: tenant info :param tenant: tenant info
:param object_name: :param object_name:
:param timeout: :param timeout: UNUSED
:param kwargs: :param kwargs: UNUSED
:return: :return:
""" """
container = tenant.bucket_name container = tenant.bucket_name
...@@ -114,11 +116,11 @@ class AzureAioBlobStorage(BlobStorageBase): ...@@ -114,11 +116,11 @@ class AzureAioBlobStorage(BlobStorageBase):
*args, auth: Optional = None, timeout: int = 10, **kwargs) -> Blob: *args, auth: Optional = None, timeout: int = 10, **kwargs) -> Blob:
""" """
download blob data download blob data
:param auth: auth obj to perform the operation :param auth: UNUSED
:param tenant: tenant info :param tenant: tenant info
:param object_name: :param object_name:
:param timeout: :param timeout: UNUSED
:param kwargs: :param kwargs: UNUSED
:return: blob :return: blob
""" """
container = tenant.bucket_name container = tenant.bucket_name
...@@ -127,7 +129,6 @@ class AzureAioBlobStorage(BlobStorageBase): ...@@ -127,7 +129,6 @@ class AzureAioBlobStorage(BlobStorageBase):
container_client = blob_service_client.get_container_client(container) container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name) blob_client = container_client.get_blob_client(object_name)
properties = await blob_client.get_blob_properties() properties = await blob_client.get_blob_properties()
if properties.has_key('content_settings'): if properties.has_key('content_settings'):
content_type = properties['content_settings'].get('content_type') content_type = properties['content_settings'].get('content_type')
else: else:
...@@ -141,32 +142,86 @@ class AzureAioBlobStorage(BlobStorageBase): ...@@ -141,32 +142,86 @@ class AzureAioBlobStorage(BlobStorageBase):
content_type=content_type, content_type=content_type,
time_created=str(properties.get('creation_time', '')), time_created=str(properties.get('creation_time', '')),
time_updated=str(properties.get('last_modified', '')), time_updated=str(properties.get('last_modified', '')),
size=properties.get('size', -1) size=properties.get('size', -1),
etag=str(properties.etag),
provider_specific=properties
)
async def _upload(self, tenant: Tenant,
object_name: str,
file_data: Any,
overwrite: bool,
content_type: str = None,
metadata: dict = None,
**kwargs) -> Blob:
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
content_settings = ContentSettings(content_type=content_type) if content_type else None
upload_response = await blob_client.upload_blob(file_data,
overwrite=overwrite,
metadata=metadata,
content_settings=content_settings,
**kwargs)
return Blob(identifier=object_name,
bucket=container,
name=upload_response.get('name', object_name),
metadata=upload_response.get('metadata', {}),
acl=upload_response.get('acl', None),
content_type=content_type,
time_created=str(upload_response.get('date', '')),
time_updated=str(upload_response.get('last_modified', '')),
size=upload_response.get('size', -1),
etag=upload_response.get('etag', None),
provider_specific=upload_response
) )
async def update(self, tenant: Tenant, object_name: str, file_data: Any, *,
if_match=None,
if_not_match=None,
auth: Optional = None,
content_type: str = None,
metadata: dict = None,
timeout: int = 30,
**kwargs) -> Blob:
"""
update blob data, create if not exist, overwrite if exist. Use `if_match` and `if_not_match` for conditional
update. `if_match` and `if_not_match` cannot be set simultaneously and are expected to work with the ETag
that can be get using `download_metadata`.
:param tenant: tenant info
:param object_name: maps to file name
:param file_data: Any, *,
:param if_match: (ETag value) update will fail if the blob to overwrite doesn't match the ETag provided.
:param if_not_match: (ETag value) update will fail if the blob to overwrite matches the ETag provided.
:param auth: UNUSED,
:param content_type: str = None,
:param metadata: dict = None,
:param timeout: UNUSED
:return: blob
"""
assert not (if_match and if_not_match), "if_match and if_not_match cannot be set simultaneous"
conditions = {}
if if_match or if_not_match:
conditions['etag'] = if_match or if_not_match
conditions['match_condition'] = MatchConditions.IfNotModified if if_match else MatchConditions.IfModified
return await self._upload(tenant, object_name, file_data, True, content_type, metadata, **conditions)
async def upload(self, tenant: Tenant, object_name: str, file_data: Any, async def upload(self, tenant: Tenant, object_name: str, file_data: Any,
*args, auth: Optional = None, content_type: str = None, metadata: dict = None, *args, auth: Optional = None, content_type: str = None, metadata: dict = None,
timeout: int = 30, **kwargs): timeout: int = 30, **kwargs) -> Blob:
""" """
upload blob data upload blob data, fail if already exist
:param tenant: tenant info :param tenant: tenant info
:param object_name: maps to file name :param object_name: maps to file name
:param file_data: Any, *, :param file_data: Any, *,
:param auth: Optional = None, :param auth: Optional = None,
:param content_type: str = None, :param content_type: str = None,
:param metadata: dict = None, :param metadata: dict = None,
:param timeout: int = 30, **kwargs :param timeout: UNUSED
:param return: blob id :return: blob
""" """
return await self._upload(tenant, object_name, file_data, False, content_type, metadata)
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
content_settings = ContentSettings(content_type=content_type) if content_type else None
data = await blob_client.upload_blob(file_data,
overwrite=True,
metadata=metadata,
content_settings=content_settings)
return data
# osdu core lib main python
# osdu-core-lib-python>=0.4.0, <0.5
azure-storage-blob==12.6.0 # fix version, 12.7.0 & 12.7.1 cause hangs in tests azure-storage-blob==12.6.0 # fix version, 12.7.0 & 12.7.1 cause hangs in tests
azure-identity azure-identity
azure-keyvault azure-keyvault
......
# osdu core lib main python # osdu core lib main python
--extra-index-url \ --extra-index-url \
https://community.opengroup.org/api/v4/projects/465/packages/pypi/simple/ https://community.opengroup.org/api/v4/projects/465/packages/pypi/simple/
osdu-core-lib-python>=0.4.0, <0.5 osdu-core-lib-python==1.0.0.dev270686
from azure.core.exceptions import AzureError
from osdu.core.api.storage.tenant import Tenant from osdu.core.api.storage.tenant import Tenant
from tests.conftest import Config from tests.conftest import Config
from osdu_az.storage.blob_storage_az import AzureAioBlobStorage from osdu_az.storage.blob_storage_az import AzureAioBlobStorage
...@@ -6,6 +7,7 @@ from io import BytesIO ...@@ -6,6 +7,7 @@ from io import BytesIO
import uuid import uuid
from mock import patch from mock import patch
from azure.core import exceptions from azure.core import exceptions
import asyncio
# Patch '_get_credentials', '_build_url' and '_get_storage_account_name' for tests # Patch '_get_credentials', '_build_url' and '_get_storage_account_name' for tests
...@@ -51,6 +53,58 @@ async def test_download_metadata(az_client: AzureAioBlobStorage, test_tenant): ...@@ -51,6 +53,58 @@ async def test_download_metadata(az_client: AzureAioBlobStorage, test_tenant):
assert blob_prop.metadata['customMetaKey'] == 'customMetaValue' assert blob_prop.metadata['customMetaKey'] == 'customMetaValue'
@pytest.mark.asyncio
async def test_update_with_condition(az_client: AzureAioBlobStorage, test_tenant):
blob_name = 'testing_data/' + str(uuid.uuid4())
await az_client.upload(test_tenant, blob_name, b'1111')
# update no condition
await az_client.update(test_tenant, blob_name, b'1112')
assert await az_client.download(test_tenant, blob_name) == b'1112'
# successful update if_match
etag_1112 = (await az_client.download_metadata(test_tenant, blob_name)).etag
await az_client.update(test_tenant, blob_name, b'1113', if_match=etag_1112)
assert await az_client.download(test_tenant, blob_name) == b'1113'
# should fail update if_match not satisfied
with pytest.raises(AzureError): # StorageErrorException is internal
await az_client.update(test_tenant, blob_name, b'1114', if_match=etag_1112)
# success update if_not_match
await az_client.update(test_tenant, blob_name, b'1115', if_not_match=etag_1112)
# should fail update if_not_match not satisfied
etag_1115 = (await az_client.download_metadata(test_tenant, blob_name)).etag
with pytest.raises(AzureError): # StorageErrorException is internal
await az_client.update(test_tenant, blob_name, b'1116', if_not_match=etag_1115)
@pytest.mark.asyncio
async def test_concurrent_update_only_one_should_succeed(az_client: AzureAioBlobStorage, test_tenant):
# not really sure this can really prove it
blob_name = 'testing_data/' + str(uuid.uuid4())
await az_client.upload(test_tenant, blob_name, b'1111')
nb_repetition = 10
concurrency = 10
for _ in range(nb_repetition):
etag = (await az_client.download_metadata(test_tenant, blob_name)).etag
calls = [az_client.update(test_tenant, blob_name, str(c), if_match=etag) for c in range(concurrency)]
# perform several concurrent call
result = await asyncio.gather(*calls, return_exceptions=True)
# ensure only one succeed and updated it
success = [i for (i, r) in enumerate(result) if not isinstance(r, Exception)]
assert len(success) == 1 # only should succeed
# check the content, should match the one which succeed
content = await az_client.download(test_tenant, blob_name)
assert content.decode('utf-8') == str(success[0])
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_download_not_existing_blob_should_throw(az_client: AzureAioBlobStorage, test_tenant): async def test_download_not_existing_blob_should_throw(az_client: AzureAioBlobStorage, test_tenant):
# here we just ensure it does not silently fail and throw something for now (to be updated when proper exceptions # here we just ensure it does not silently fail and throw something for now (to be updated when proper exceptions
...@@ -62,7 +116,7 @@ async def test_download_not_existing_blob_should_throw(az_client: AzureAioBlobSt ...@@ -62,7 +116,7 @@ async def test_download_not_existing_blob_should_throw(az_client: AzureAioBlobSt
# when try to download it should fail # when try to download it should fail
await az_client.download(test_tenant, blob_name) await az_client.download(test_tenant, blob_name)
print(ex_info.value) # print(ex_info.value)
@pytest.mark.asyncio @pytest.mark.asyncio
...@@ -71,33 +125,30 @@ async def test_invalid_storage_container(az_client: AzureAioBlobStorage): ...@@ -71,33 +125,30 @@ async def test_invalid_storage_container(az_client: AzureAioBlobStorage):
tenant = Tenant(project_id=Config.storage_account, bucket_name='not_existing_container', data_partition_id='local') tenant = Tenant(project_id=Config.storage_account, bucket_name='not_existing_container', data_partition_id='local')
await az_client.upload(tenant, 'blob_name', 'input_data') await az_client.upload(tenant, 'blob_name', 'input_data')
print(ex_info.value) # print(ex_info.value)
# ---------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------- NOT IMPLEMENTED METHODS ----------------------------------------------
# ---------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_list_objects(az_client: AzureAioBlobStorage, test_tenant):
# given
nb_blob = 3
prefix = 'testing_data/list' + str(uuid.uuid4())
all_names = set()
async def assert_not_implemented(coro): for _ in range(nb_blob):
with pytest.raises(NotImplementedError) as ex_info: blob_name = prefix + str(uuid.uuid4())
await coro all_names.add(blob_name)
ex = ex_info.value await az_client.upload(test_tenant, blob_name, b'na')
assert 'azure' in str(ex).lower()
blob_names = await az_client.list_objects(test_tenant, prefix=prefix)
assert all_names.issubset(set(blob_names))
@pytest.mark.asyncio blob_names = await az_client.list_objects(test_tenant, prefix=prefix, max_result=2)
async def test_list_objects(az_client: AzureAioBlobStorage, test_tenant): assert len(set(blob_names)) == 2
r = await az_client.list_objects(test_tenant, prefix='testi')
print(r)
# await assert_not_implemented(
# az_client.list_objects(test_tenant, prefix='testi')
# )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_delete_should(az_client: AzureAioBlobStorage, test_tenant): async def test_delete(az_client: AzureAioBlobStorage, test_tenant):
blob_name = 'testing_data/' + str(uuid.uuid4()) blob_name = 'testing_data/' + str(uuid.uuid4())
input_data = b'expected content 123456789' input_data = b'expected content 123456789'
await az_client.upload(test_tenant, blob_name, input_data) await az_client.upload(test_tenant, blob_name, input_data)
...@@ -107,10 +158,3 @@ async def test_delete_should(az_client: AzureAioBlobStorage, test_tenant): ...@@ -107,10 +158,3 @@ async def test_delete_should(az_client: AzureAioBlobStorage, test_tenant):
with pytest.raises(exceptions.ResourceNotFoundError): with pytest.raises(exceptions.ResourceNotFoundError):
await az_client.download(test_tenant, blob_name) await az_client.download(test_tenant, blob_name)
# @pytest.mark.asyncio
# async def test_download_metadata_should_throw_not_implemented(az_client: AzureAioBlobStorage, test_tenant):
# await assert_not_implemented(
# az_client.download_metadata(test_tenant, 'id')
# )
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment