Commit 7b89d07c authored by Yannick's avatar Yannick
Browse files

try sas token auth

parent feb18cbb
Pipeline #45033 passed with stage
in 42 seconds
......@@ -229,3 +229,233 @@ class AzureAioBlobStorage(BlobStorageBase):
size=upload_response.get('size', -1),
etag=upload_response.get('etag', None),
provider_specific=upload_response)
### with SAS token instead
from cachetools.func import ttl_cache
from azure.storage.blob import generate_account_sas, ResourceTypes, AccountSasPermissions
from osdu_az.partition.partition_service import STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY
from datetime import datetime, timedelta
class AzureAioBlobStorageAuthWithSAS(BlobStorageBase):
ExceptionMapping = {
AzureExceptions.ClientAuthenticationError: AuthenticationException,
AzureExceptions.ResourceNotFoundError: ResourceNotFoundException,
AzureExceptions.ResourceExistsError: ResourceExistsException,
AzureExceptions.ResourceModifiedError: PreconditionFailedException,
AzureExceptions.ResourceNotModifiedError: PreconditionFailedException,
}
@ttl_cache(256, ttl=10800)
def _generate_sas_token_cached(self, account_name, account_key):
_LOGGER.info("generating sas token")
return generate_account_sas(
account_name,
account_key=account_key,
resource_types=ResourceTypes(object=True, container=True, service=True),
permission=AccountSasPermissions(read=True, write=True, delete=True,
list=True, add=True, create=True, update=True,
process=True, delete_previous_version=True),
expiry=datetime.utcnow() + timedelta(hours=2)
)
async def _generate_sas_token(self, data_partition_id: str) -> Optional[str]:
partition_info = await PartitionService.get_partition(data_partition_id)
account_name = await partition_info.get_value(STORAGE_ACCOUNT_NAME)
account_key = await partition_info.get_value(STORAGE_ACCOUNT_KEY)
if not account_key:
return None
return self._generate_sas_token_cached(account_name, account_key)
def _build_url(self, storage_account: str):
return f'https://{storage_account}.blob.core.windows.net'
def _get_credentials(self, data_partition_id: Optional[str]):
# try sas token
if data_partition_id:
sas_token = await self._generate_sas_token(data_partition_id)
if sas_token:
return sas_token
if AzureAioBlobStorage.Credentials is None:
_LOGGER.info(f"Acquire new Credentials using DefaultAzureCredential")
AzureAioBlobStorage.Credentials = DefaultAzureCredential(
exclude_shared_token_cache_credential=True,
exclude_visual_studio_code_credential=True,
exclude_environment_credential=True)
else:
_LOGGER.info(f"Using cached Credentials")
return AzureAioBlobStorage.Credentials
async def _get_blob_service_client(self, tenant):
cred = await self._get_credentials(tenant.data_partition_id)
storage_account = await self._get_storage_account_name(tenant.data_partition_id)
account_url = self._build_url(storage_account)
return BlobServiceClient(account_url=account_url, credential=cred)
@classmethod
async def close_credentials(cls):
pass
async def _get_storage_account_name(self, data_partition_id: str):
return await PartitionService.get_storage_account_name(data_partition_id)
@with_blobstorage_exception(ExceptionMapping)
async def list_objects(self, tenant: Tenant,
*args, auth: Optional = None, prefix: str = '', page_token: Optional[str] = None,
max_result: Optional[int] = None, timeout: int = 10, **kwargs) -> List[str]:
"""
list all object within a container
:param auth: auth obj to perform the operation
:param tenant: tenant info
:param prefix: Filter results to objects whose names begin with this prefix
:param page_token: UNUSED
:param max_result: Maximum number of items to return.
:param timeout: timeout
:param kwargs: UNUSED
:return: list of blob names
"""
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
result = []
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
async for prop in container_client.list_blobs(name_starts_with=prefix, timeout=timeout):
result.append(prop.name)
if max_result and len(result) >= max_result:
break
return result
@with_blobstorage_exception(ExceptionMapping)
async def delete(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs):
"""
delete an object
:param auth: UNUSED
:param tenant: tenant info
:param object_name:
:param timeout: UNUSED
:param kwargs: UNUSED
:return:
"""
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
await container_client.delete_blob(object_name)
@with_blobstorage_exception(ExceptionMapping)
async def download(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs) -> bytes:
"""
download blob data
:param auth: UNUSED
:param tenant: tenant info
:param object_name:
:param timeout: UNUSED
:param kwargs: UNUSED
:return:
"""
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
data = await blob_client.download_blob()
return await data.readall()
# not for now, parquet only
@with_blobstorage_exception(ExceptionMapping)
async def download_metadata(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs) -> Blob:
"""
download blob data
:param auth: UNUSED
:param tenant: tenant info
:param object_name:
:param timeout: UNUSED
:param kwargs: UNUSED
:return: blob
"""
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
properties = await blob_client.get_blob_properties()
if properties.has_key('content_settings'):
content_type = properties['content_settings'].get('content_type')
else:
content_type = None
return Blob(identifier=object_name,
bucket=container,
name=properties.get('name', object_name),
metadata=properties.get('metadata', {}),
acl=properties.get('acl', None),
content_type=content_type,
time_created=str(properties.get('creation_time', '')),
time_updated=str(properties.get('last_modified', '')),
size=properties.get('size', -1),
etag=str(properties.etag),
provider_specific=properties
)
@with_blobstorage_exception(ExceptionMapping)
async def upload(self, tenant: Tenant, object_name: str, file_data: Any, *,
overwrite: bool = True,
if_match=None,
if_not_match=None,
auth: Optional = None, content_type: str = None, metadata: dict = None,
timeout: int = 30, **kwargs) -> Blob:
"""
upload blob data, fail if already exist
:param tenant: tenant info
:param object_name: maps to file name
:param file_data: Any
:param overwrite: if False, will fail if object already exist. If True, will replace if exist.(Default=True)
:param if_match: (ETag value) update will fail if the blob to overwrite doesn't match the ETag provided.
Cannot be used with `if_not_match`. It expects ETag value. ETag can be get using `download_metadata`
or in response of an upload.
:param if_not_match: (ETag value) update will fail if the blob to overwrite matches the ETag provided.
Cannot be used with `if_match`. It expects ETag value. ETag can be get using `download_metadata` or
in response of an upload.
:param auth: Optional = None,
:param content_type: str = None,
:param metadata: dict = None,
:param timeout: UNUSED
:return: blob
"""
assert not (if_match and if_not_match), "if_match and if_not_match cannot be set simultaneous"
conditions = {}
if if_match or if_not_match:
conditions['etag'] = if_match or if_not_match
conditions['match_condition'] = MatchConditions.IfNotModified if if_match else MatchConditions.IfModified
container = tenant.bucket_name
blob_service_client = await self._get_blob_service_client(tenant)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
content_settings = ContentSettings(content_type=content_type) if content_type else None
upload_response = await blob_client.upload_blob(file_data,
overwrite=overwrite,
metadata=metadata,
content_settings=content_settings,
**conditions)
return Blob(identifier=object_name,
bucket=container,
name=upload_response.get('name', object_name),
metadata=upload_response.get('metadata', {}),
acl=upload_response.get('acl', None),
content_type=content_type,
time_created=str(upload_response.get('date', '')),
time_updated=str(upload_response.get('last_modified', '')),
size=upload_response.get('size', -1),
etag=upload_response.get('etag', None),
provider_specific=upload_response)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment