Commit a5ced00e authored by Yannick's avatar Yannick
Browse files

Merge branch 'update_concurrent' into 'master'

implement conditional update for concurrency control

See merge request !11
parents 3633e75a e7533316
Pipeline #37328 passed with stages
in 2 minutes and 19 seconds
__version__ = '0.2.1'
__version__ = '1.0.0'
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.blob import Blob
from typing import Optional, List, Any
from azure.storage.blob.aio import BlobServiceClient
from azure.storage.blob import ContentSettings
from azure.identity.aio import DefaultAzureCredential
from azure.core import MatchConditions
from azure.core import exceptions as AzureExceptions
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.blob import Blob
from osdu.core.api.storage.tenant import Tenant
from osdu.core.api.storage.exceptions import (
with_blobstorage_exception,
AuthenticationException,
ResourceNotFoundException,
ResourceExistsException,
PreconditionFailedException)
from osdu_az.partition.partition_service import PartitionService
......@@ -22,6 +33,14 @@ class AzureAioBlobStorage(BlobStorageBase):
Credentials = None
ExceptionMapping = {
AzureExceptions.ClientAuthenticationError: AuthenticationException,
AzureExceptions.ResourceNotFoundError: ResourceNotFoundException,
AzureExceptions.ResourceExistsError: ResourceExistsException,
AzureExceptions.ResourceModifiedError: PreconditionFailedException,
AzureExceptions.ResourceNotModifiedError: PreconditionFailedException,
}
def _build_url(self, storage_account: str):
return f'https://{storage_account}.blob.core.windows.net'
......@@ -33,6 +52,12 @@ class AzureAioBlobStorage(BlobStorageBase):
exclude_environment_credential=True)
return AzureAioBlobStorage.Credentials
async def _get_blob_service_client(self, tenant):
storage_account = await self._get_storage_account_name(tenant.data_partition_id)
cred = self._get_credentials()
account_url = self._build_url(storage_account)
return BlobServiceClient(account_url=account_url, credential=cred)
@classmethod
async def close_credentials(cls):
""" This cause to gracefully dispose credentials if any. Next calls will then initialize a new one """
......@@ -43,6 +68,7 @@ class AzureAioBlobStorage(BlobStorageBase):
async def _get_storage_account_name(self, data_partition_id: str):
return await PartitionService.get_storage_account_name(data_partition_id)
@with_blobstorage_exception(ExceptionMapping)
async def list_objects(self, tenant: Tenant,
*args, auth: Optional = None, prefix: str = '', page_token: Optional[str] = None,
max_result: Optional[int] = None, timeout: int = 10, **kwargs) -> List[str]:
......@@ -51,46 +77,55 @@ class AzureAioBlobStorage(BlobStorageBase):
:param auth: auth obj to perform the operation
:param tenant: tenant info
:param prefix: Filter results to objects whose names begin with this prefix
:param page_token: A previously-returned page token representing part of the larger set of results to view.
:param page_token: UNUSED
:param max_result: Maximum number of items to return.
:param timeout: timeout
:param kwargs:
:param kwargs: UNUSED
:return: list of blob names
"""
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
result = []
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
async for prop in container_client.list_blobs(name_starts_with=prefix, timeout=timeout):
result.append(prop.name)
if max_result and len(result) >= max_result:
break
return result
raise NotImplementedError('Azure blob storage implementation, "list_objects" not implemented')
@with_blobstorage_exception(ExceptionMapping)
async def delete(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs):
"""
delete an object
:param auth: auth obj to perform the operation
:param auth: UNUSED
:param tenant: tenant info
:param object_name:
:param timeout:
:param kwargs:
:param timeout: UNUSED
:param kwargs: UNUSED
:return:
"""
raise NotImplementedError('Azure blob storage implementation, "delete" not implemented')
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
await container_client.delete_blob(object_name)
@with_blobstorage_exception(ExceptionMapping)
async def download(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs) -> bytes:
"""
download blob data
:param auth: auth obj to perform the operation
:param auth: UNUSED
:param tenant: tenant info
:param object_name:
:param timeout:
:param kwargs:
:param timeout: UNUSED
:param kwargs: UNUSED
:return:
"""
storage_account = await self._get_storage_account_name(tenant.data_partition_id)
container = tenant.bucket_name
cred = self._get_credentials()
account_url = self._build_url(storage_account)
blob_service_client = BlobServiceClient(account_url=account_url, credential=cred)
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
......@@ -98,41 +133,92 @@ class AzureAioBlobStorage(BlobStorageBase):
return await data.readall()
# not for now, parquet only
@with_blobstorage_exception(ExceptionMapping)
async def download_metadata(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs) -> Blob:
"""
download blob data
:param auth: auth obj to perform the operation
:param auth: UNUSED
:param tenant: tenant info
:param object_name:
:param timeout:
:param kwargs:
:param timeout: UNUSED
:param kwargs: UNUSED
:return: blob
"""
raise NotImplementedError('Azure blob storage implementation, "download_metadata" not implemented')
async def upload(self, tenant: Tenant, object_name: str, file_data: Any,
*args, auth: Optional = None, content_type: str = None, metadata: dict = None,
timeout: int = 30, **kwargs):
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
properties = await blob_client.get_blob_properties()
if properties.has_key('content_settings'):
content_type = properties['content_settings'].get('content_type')
else:
content_type = None
return Blob(identifier=object_name,
bucket=container,
name=properties.get('name', object_name),
metadata=properties.get('metadata', {}),
acl=properties.get('acl', None),
content_type=content_type,
time_created=str(properties.get('creation_time', '')),
time_updated=str(properties.get('last_modified', '')),
size=properties.get('size', -1),
etag=str(properties.etag),
provider_specific=properties
)
@with_blobstorage_exception(ExceptionMapping)
async def upload(self, tenant: Tenant, object_name: str, file_data: Any, *,
overwrite: bool = True,
if_match=None,
if_not_match=None,
auth: Optional = None, content_type: str = None, metadata: dict = None,
timeout: int = 30, **kwargs) -> Blob:
"""
upload blob data
upload blob data, fail if already exist
:param tenant: tenant info
:param object_name: maps to file name
:param file_data: Any, *,
:param file_data: Any
:param overwrite: if False, will fail if object already exist. If True, will replace if exist.(Default=True)
:param if_match: (ETag value) update will fail if the blob to overwrite doesn't match the ETag provided.
Cannot be used with `if_not_match`. It expects ETag value. ETag can be get using `download_metadata`
or in response of an upload.
:param if_not_match: (ETag value) update will fail if the blob to overwrite matches the ETag provided.
Cannot be used with `if_match`. It expects ETag value. ETag can be get using `download_metadata` or
in response of an upload.
:param auth: Optional = None,
:param content_type: str = None,
:param metadata: dict = None,
:param timeout: int = 30, **kwargs
:param return: blob id
:param timeout: UNUSED
:return: blob
"""
storage_account = await self._get_storage_account_name(tenant.data_partition_id)
container = tenant.bucket_name
assert not (if_match and if_not_match), "if_match and if_not_match cannot be set simultaneous"
conditions = {}
if if_match or if_not_match:
conditions['etag'] = if_match or if_not_match
conditions['match_condition'] = MatchConditions.IfNotModified if if_match else MatchConditions.IfModified
cred = self._get_credentials()
account_url = self._build_url(storage_account)
blob_service_client = BlobServiceClient(account_url=account_url, credential=cred)
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
data = await blob_client.upload_blob(file_data, overwrite=True, metadata=metadata)
return data
content_settings = ContentSettings(content_type=content_type) if content_type else None
upload_response = await blob_client.upload_blob(file_data,
overwrite=overwrite,
metadata=metadata,
content_settings=content_settings,
**conditions)
return Blob(identifier=object_name,
bucket=container,
name=upload_response.get('name', object_name),
metadata=upload_response.get('metadata', {}),
acl=upload_response.get('acl', None),
content_type=content_type,
time_created=str(upload_response.get('date', '')),
time_updated=str(upload_response.get('last_modified', '')),
size=upload_response.get('size', -1),
etag=upload_response.get('etag', None),
provider_specific=upload_response)
# osdu core lib main python
# osdu-core-lib-python>=0.4.0, <0.5
azure-storage-blob==12.6.0 # fix version, 12.7.0 & 12.7.1 cause hangs in tests
azure-identity
azure-keyvault
......
# osdu core lib main python
--extra-index-url \
https://community.opengroup.org/api/v4/projects/465/packages/pypi/simple/
osdu-core-lib-python>=0.4.0, <0.5
osdu-core-lib-python==1.0.0.dev286249
from osdu.core.api.storage.tenant import Tenant
from tests.conftest import Config
from osdu_az.storage.blob_storage_az import AzureAioBlobStorage
import asyncio
import pytest
from io import BytesIO
import uuid
from mock import patch
from azure.core import exceptions
from osdu.core.api.storage.tenant import Tenant
from osdu.core.api.storage.exceptions import *
from osdu_az.storage.blob_storage_az import AzureAioBlobStorage
from tests.conftest import Config
# Patch '_get_credentials', '_build_url' and '_get_storage_account_name' for tests
@pytest.fixture
......@@ -30,61 +33,129 @@ async def test_tenant():
async def test_downloading_successfully_uploaded_blob(az_client: AzureAioBlobStorage, test_tenant, input_data, expected):
blob_name = 'testing_data/' + str(uuid.uuid4())
await az_client.upload(test_tenant, blob_name, input_data)
assert await az_client.download(test_tenant, blob_name) == expected
@pytest.mark.asyncio
async def test_download_metadata(az_client: AzureAioBlobStorage, test_tenant):
blob_name = 'testing_data/' + str(uuid.uuid4())
input_data = b'expected content 123456789'
await az_client.upload(test_tenant,
blob_name,
input_data,
metadata={'customMetaKey': 'customMetaValue'},
content_type='application/x-parquet')
blob_prop = await az_client.download_metadata(test_tenant, blob_name)
assert blob_prop.identifier == blob_name
assert blob_prop.name == blob_name
assert blob_prop.content_type == 'application/x-parquet'
assert blob_prop.metadata['customMetaKey'] == 'customMetaValue'
@pytest.mark.asyncio
async def test_overwrite_with_condition(az_client: AzureAioBlobStorage, test_tenant):
blob_name = 'testing_data/' + str(uuid.uuid4())
await az_client.upload(test_tenant, blob_name, b'1111')
with pytest.raises(ResourceExistsException):
await az_client.upload(test_tenant, blob_name, b'1111', overwrite=False)
# update no condition
await az_client.upload(test_tenant, blob_name, b'1112')
assert await az_client.download(test_tenant, blob_name) == b'1112'
# successful update if_match
etag_1112 = (await az_client.download_metadata(test_tenant, blob_name)).etag
await az_client.upload(test_tenant, blob_name, b'1113', if_match=etag_1112)
assert await az_client.download(test_tenant, blob_name) == b'1113'
# should fail update if_match not satisfied
with pytest.raises(PreconditionFailedException):
await az_client.upload(test_tenant, blob_name, b'1114', if_match=etag_1112)
# success update if_not_match
await az_client.upload(test_tenant, blob_name, b'1115', if_not_match=etag_1112)
# should fail update if_not_match not satisfied
etag_1115 = (await az_client.download_metadata(test_tenant, blob_name)).etag
with pytest.raises(PreconditionFailedException):
await az_client.upload(test_tenant, blob_name, b'1116', if_not_match=etag_1115)
@pytest.mark.asyncio
async def test_concurrent_update_only_one_should_succeed(az_client: AzureAioBlobStorage, test_tenant):
# not really sure this can really prove it
blob_name = 'testing_data/' + str(uuid.uuid4())
await az_client.upload(test_tenant, blob_name, b'1111')
nb_repetition = 10
concurrency = 10
for _ in range(nb_repetition):
etag = (await az_client.download_metadata(test_tenant, blob_name)).etag
calls = [az_client.upload(test_tenant, blob_name, str(c), if_match=etag) for c in range(concurrency)]
# perform several concurrent call
result = await asyncio.gather(*calls, return_exceptions=True)
# ensure only one succeed and updated it
success = [i for (i, r) in enumerate(result) if not isinstance(r, Exception)]
assert len(success) == 1 # only should succeed
# check the content, should match the one which succeed
content = await az_client.download(test_tenant, blob_name)
assert content.decode('utf-8') == str(success[0])
@pytest.mark.asyncio
async def test_download_not_existing_blob_should_throw(az_client: AzureAioBlobStorage, test_tenant):
# here we just ensure it does not silently fail and throw something for now (to be updated when proper exceptions
# will be defined in the core lib)
with pytest.raises(exceptions.ResourceNotFoundError) as ex_info:
with pytest.raises(ResourceNotFoundException):
# given a not existing blob
blob_name = 'testing_data/' + str(uuid.uuid4())
# when try to download it should fail
await az_client.download(test_tenant, blob_name)
print(ex_info.value)
@pytest.mark.asyncio
async def test_invalid_storage_container(az_client: AzureAioBlobStorage):
with pytest.raises(exceptions.ResourceNotFoundError) as ex_info:
with pytest.raises(ResourceNotFoundException):
tenant = Tenant(project_id=Config.storage_account, bucket_name='not_existing_container', data_partition_id='local')
await az_client.upload(tenant, 'blob_name', 'input_data')
print(ex_info.value)
# ---------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------- NOT IMPLEMENTED METHODS ----------------------------------------------
# ---------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_list_objects(az_client: AzureAioBlobStorage, test_tenant):
# given
nb_blob = 3
prefix = 'testing_data/list' + str(uuid.uuid4())
all_names = set()
async def assert_not_implemented(coro):
with pytest.raises(NotImplementedError) as ex_info:
await coro
ex = ex_info.value
assert 'azure' in str(ex).lower()
for _ in range(nb_blob):
blob_name = prefix + str(uuid.uuid4())
all_names.add(blob_name)
await az_client.upload(test_tenant, blob_name, b'na')
blob_names = await az_client.list_objects(test_tenant, prefix=prefix)
assert all_names.issubset(set(blob_names))
@pytest.mark.asyncio
async def test_list_objects_should_throw_not_implemented(az_client: AzureAioBlobStorage, test_tenant):
await assert_not_implemented(
az_client.list_objects(test_tenant)
)
blob_names = await az_client.list_objects(test_tenant, prefix=prefix, max_result=2)
assert len(set(blob_names)) == 2
@pytest.mark.asyncio
async def test_delete_should_throw_not_implemented(az_client: AzureAioBlobStorage, test_tenant):
await assert_not_implemented(
az_client.delete(test_tenant, 'id')
)
async def test_delete(az_client: AzureAioBlobStorage, test_tenant):
blob_name = 'testing_data/' + str(uuid.uuid4())
input_data = b'expected content 123456789'
await az_client.upload(test_tenant, blob_name, input_data)
await az_client.download(test_tenant, blob_name)
await az_client.delete(test_tenant, blob_name)
with pytest.raises(ResourceNotFoundException):
await az_client.download(test_tenant, blob_name)
@pytest.mark.asyncio
async def test_download_metadata_should_throw_not_implemented(az_client: AzureAioBlobStorage, test_tenant):
await assert_not_implemented(
az_client.download_metadata(test_tenant, 'id')
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment