Commit ff416784 authored by Yannick's avatar Yannick
Browse files

Merge branch 'dask_sas_token' into 'master'

Dask sas token

See merge request !24
parents d9cdd2d2 1055efe5
Pipeline #53062 passed with stage
in 1 minute and 58 seconds
__version__ = '1.1.2'
__version__ = '1.2.0'
from typing import Optional
from datetime import datetime, timedelta
import logging
import adlfs
import fsspec
from azure.storage.blob import generate_account_sas, ResourceTypes, AccountSasPermissions
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from osdu.core.api.storage.tenant import Tenant
from .blob_storage_az import AzureAioBlobStorage
from ..partition.partition_service import PartitionService
from osdu_az.partition.partition_service import PartitionService, STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY
SAS_TOKEN_DURATION_IN_SECONDS = 600 # 10 minutes
_LOGGER = logging.getLogger(__name__)
class AzureBlobFileSystemWithDefaultCredentials(adlfs.AzureBlobFileSystem):
......@@ -17,9 +24,13 @@ class AzureBlobFileSystemWithDefaultCredentials(adlfs.AzureBlobFileSystem):
def __init__(self, *args, **kwargs):
has_credential = (
"credential" in kwargs or "account_key" in kwargs or "connection_string" in kwargs
"credential" in kwargs
or "account_key" in kwargs
or "connection_string" in kwargs
or "sas_token" in kwargs
)
if not has_credential:
_LOGGER.debug('Dask set to use DefaultCredential')
kwargs["credential"] = AzureAioBlobStorage._get_credentials()
super().__init__(*args, **kwargs)
......@@ -30,9 +41,24 @@ def register_azure_fsspec():
async def get_dask_storage_parameters(tenant: Tenant, directory: Optional[str] = None) -> DaskStorageParameters:
storage_account_name = await PartitionService.get_storage_account_name(tenant.data_partition_id)
partition_info = await PartitionService.get_partition(tenant.data_partition_id)
storage_account_name = partition_info.get_value(STORAGE_ACCOUNT_NAME)
storage_options = {'account_name': storage_account_name}
storage_account_key = partition_info.get_value(STORAGE_ACCOUNT_KEY)
if storage_account_key: # in some en case account_key may not be available
sas_token = generate_account_sas(
storage_account_name,
account_key=storage_account_key,
resource_types=ResourceTypes(object=True, container=True, service=False),
permission=AccountSasPermissions(read=True, write=True, delete=True,
list=True, add=True, create=True, update=True,
process=True, delete_previous_version=True),
expiry=datetime.utcnow() + timedelta(seconds=SAS_TOKEN_DURATION_IN_SECONDS)
)
storage_options['sas_token'] = sas_token
base_directory = f'{tenant.bucket_name}/{directory}' if directory else tenant.bucket_name
return DaskStorageParameters(protocol='az',
......
import base64
import uuid
import pytest
from mock import patch
import fsspec
import uuid
from osdu.core.api.storage.tenant import Tenant
from azure.identity.aio import DefaultAzureCredential
from osdu.core.api.storage.tenant import Tenant
from osdu_az.partition.partition_info import PartitionInfo
from osdu_az.partition.partition_service import PartitionService
from osdu_az.storage.blob_storage_az import AzureAioBlobStorage
from osdu_az.storage.dask_storage_parameters import get_dask_storage_parameters
from osdu_az.storage.dask_storage_parameters import (get_dask_storage_parameters,
AzureBlobFileSystemWithDefaultCredentials)
from tests.conftest import Config
@pytest.fixture
def with_azurite_credentials() -> AzureAioBlobStorage:
partition_info = PartitionInfo(
{
"storage-account-key": {
"sensitive": False,
"value": base64.b64encode(b"storage-account-key")
},
"storage-account-name": {
"sensitive": False,
"value": "opendes-storage"
}
})
with patch.object(AzureAioBlobStorage, '_get_credentials', return_value=Config.credentials):
with patch.object(PartitionService, 'get_storage_account_name', return_value=Config.storage_account_name):
with patch.object(PartitionService, 'get_partition', return_value=partition_info):
yield
......@@ -43,3 +59,31 @@ async def test_read_write(with_azurite_credentials, test_tenant):
fs.rm(file_path)
assert content == 'test content'
@pytest.mark.asyncio
async def test_get_dask_parameter_no_account_key(test_tenant):
with patch.object(PartitionService, 'get_partition', return_value=PartitionInfo({
"storage-account-name": {
"sensitive": False, "value": "opendes-storage"
}
})):
# when
parameters = await get_dask_storage_parameters(test_tenant)
# then not 'sas_token' put in the storage option
assert 'sas_token' not in parameters.storage_options
# then use default credentials
azfs = AzureBlobFileSystemWithDefaultCredentials(**parameters.storage_options)
assert isinstance(azfs.credential, DefaultAzureCredential)
@pytest.mark.asyncio
async def test_get_dask_parameter_with_account_key(with_azurite_credentials, test_tenant):
# when
parameters = await get_dask_storage_parameters(test_tenant)
# then 'sas_token' put in the storage option
assert 'sas_token' in parameters.storage_options
azfs = AzureBlobFileSystemWithDefaultCredentials(**parameters.storage_options)
assert parameters.storage_options['sas_token'] in azfs.sas_token
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment