Commit 3629f7f6 authored by Yannick's avatar Yannick
Browse files

impl all interfaces

parent 3633e75a
......@@ -2,6 +2,7 @@ from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.blob import Blob
from typing import Optional, List, Any
from azure.storage.blob.aio import BlobServiceClient
from azure.storage.blob import ContentSettings
from azure.identity.aio import DefaultAzureCredential
from osdu.core.api.storage.tenant import Tenant
......@@ -33,6 +34,12 @@ class AzureAioBlobStorage(BlobStorageBase):
exclude_environment_credential=True)
return AzureAioBlobStorage.Credentials
async def _get_blob_service_client(self, tenant):
storage_account = await self._get_storage_account_name(tenant.data_partition_id)
cred = self._get_credentials()
account_url = self._build_url(storage_account)
return BlobServiceClient(account_url=account_url, credential=cred)
@classmethod
async def close_credentials(cls):
""" This cause to gracefully dispose credentials if any. Next calls will then initialize a new one """
......@@ -57,8 +64,14 @@ class AzureAioBlobStorage(BlobStorageBase):
:param kwargs:
:return: list of blob names
"""
raise NotImplementedError('Azure blob storage implementation, "list_objects" not implemented')
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
result = []
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
async for prop in container_client.list_blobs(name_starts_with=prefix):
result.append(prop.name)
return result
async def delete(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs):
......@@ -71,7 +84,11 @@ class AzureAioBlobStorage(BlobStorageBase):
:param kwargs:
:return:
"""
raise NotImplementedError('Azure blob storage implementation, "delete" not implemented')
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
await container_client.delete_blob(object_name)
async def download(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs) -> bytes:
......@@ -84,13 +101,8 @@ class AzureAioBlobStorage(BlobStorageBase):
:param kwargs:
:return:
"""
storage_account = await self._get_storage_account_name(tenant.data_partition_id)
container = tenant.bucket_name
cred = self._get_credentials()
account_url = self._build_url(storage_account)
blob_service_client = BlobServiceClient(account_url=account_url, credential=cred)
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
......@@ -109,7 +121,28 @@ class AzureAioBlobStorage(BlobStorageBase):
:param kwargs:
:return: blob
"""
raise NotImplementedError('Azure blob storage implementation, "download_metadata" not implemented')
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
properties = await blob_client.get_blob_properties()
if properties.has_key('content_settings'):
content_type = properties['content_settings'].get('content_type')
else:
content_type = None
return Blob(identifier=object_name,
bucket=container,
name=properties.get('name', object_name),
metadata=properties.get('metadata', {}),
acl=properties.get('acl', None),
content_type=content_type,
time_created=str(properties.get('creation_time', '')),
time_updated=str(properties.get('last_modified', '')),
size=properties.get('size', -1)
)
async def upload(self, tenant: Tenant, object_name: str, file_data: Any,
*args, auth: Optional = None, content_type: str = None, metadata: dict = None,
......@@ -125,14 +158,15 @@ class AzureAioBlobStorage(BlobStorageBase):
:param timeout: int = 30, **kwargs
:param return: blob id
"""
storage_account = await self._get_storage_account_name(tenant.data_partition_id)
container = tenant.bucket_name
cred = self._get_credentials()
account_url = self._build_url(storage_account)
blob_service_client = BlobServiceClient(account_url=account_url, credential=cred)
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
data = await blob_client.upload_blob(file_data, overwrite=True, metadata=metadata)
content_settings = ContentSettings(content_type=content_type) if content_type else None
data = await blob_client.upload_blob(file_data,
overwrite=True,
metadata=metadata,
content_settings=content_settings)
return data
......@@ -30,9 +30,27 @@ async def test_tenant():
async def test_downloading_successfully_uploaded_blob(az_client: AzureAioBlobStorage, test_tenant, input_data, expected):
blob_name = 'testing_data/' + str(uuid.uuid4())
await az_client.upload(test_tenant, blob_name, input_data)
assert await az_client.download(test_tenant, blob_name) == expected
@pytest.mark.asyncio
async def test_download_metadata(az_client: AzureAioBlobStorage, test_tenant):
blob_name = 'testing_data/' + str(uuid.uuid4())
input_data = b'expected content 123456789'
await az_client.upload(test_tenant,
blob_name,
input_data,
metadata={'customMetaKey': 'customMetaValue'},
content_type='application/x-parquet')
blob_prop = await az_client.download_metadata(test_tenant, blob_name)
assert blob_prop.identifier == blob_name
assert blob_prop.name == blob_name
assert blob_prop.content_type == 'application/x-parquet'
assert blob_prop.metadata['customMetaKey'] == 'customMetaValue'
@pytest.mark.asyncio
async def test_download_not_existing_blob_should_throw(az_client: AzureAioBlobStorage, test_tenant):
# here we just ensure it does not silently fail and throw something for now (to be updated when proper exceptions
......@@ -70,21 +88,29 @@ async def assert_not_implemented(coro):
@pytest.mark.asyncio
async def test_list_objects_should_throw_not_implemented(az_client: AzureAioBlobStorage, test_tenant):
await assert_not_implemented(
az_client.list_objects(test_tenant)
)
async def test_list_objects(az_client: AzureAioBlobStorage, test_tenant):
r = await az_client.list_objects(test_tenant, prefix='testi')
print(r)
# await assert_not_implemented(
# az_client.list_objects(test_tenant, prefix='testi')
# )
@pytest.mark.asyncio
async def test_delete_should_throw_not_implemented(az_client: AzureAioBlobStorage, test_tenant):
await assert_not_implemented(
az_client.delete(test_tenant, 'id')
)
async def test_delete_should(az_client: AzureAioBlobStorage, test_tenant):
blob_name = 'testing_data/' + str(uuid.uuid4())
input_data = b'expected content 123456789'
await az_client.upload(test_tenant, blob_name, input_data)
await az_client.download(test_tenant, blob_name)
await az_client.delete(test_tenant, blob_name)
with pytest.raises(exceptions.ResourceNotFoundError):
await az_client.download(test_tenant, blob_name)
@pytest.mark.asyncio
async def test_download_metadata_should_throw_not_implemented(az_client: AzureAioBlobStorage, test_tenant):
await assert_not_implemented(
az_client.download_metadata(test_tenant, 'id')
)
# @pytest.mark.asyncio
# async def test_download_metadata_should_throw_not_implemented(az_client: AzureAioBlobStorage, test_tenant):
# await assert_not_implemented(
# az_client.download_metadata(test_tenant, 'id')
# )
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment