Commit 2029e90c authored by Yannick's avatar Yannick
Browse files

add dask parameters for azure

parent a95216c4
Pipeline #42713 passed with stage
in 42 seconds
__version__ = '1.0.1'
__version__ = '1.1.0'
......@@ -44,7 +44,8 @@ class AzureAioBlobStorage(BlobStorageBase):
def _build_url(self, storage_account: str):
return f'https://{storage_account}.blob.core.windows.net'
def _get_credentials(self):
@staticmethod
def _get_credentials():
if AzureAioBlobStorage.Credentials is None:
AzureAioBlobStorage.Credentials = DefaultAzureCredential(
exclude_shared_token_cache_credential=True,
......
from typing import Optional
import adlfs
import fsspec
from dask.distributed import WorkerPlugin
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from osdu.core.api.storage.tenant import Tenant
from .blob_storage_az import AzureAioBlobStorage
from ..partition.partition_service import PartitionService
class AzureBlobFileSystemWithDefaultCredentials(adlfs.AzureBlobFileSystem):
""" Wrap the azure file system to add credentials if not present
The Azure credential object cannot be serialized (pickle) to the workers
so, it needs to be instantiated from the worker.
"""
def __init__(self, *args, **kwargs):
has_credential = (
"credential" in kwargs or "account_key" in kwargs
)
if not has_credential:
kwargs["credential"] = AzureAioBlobStorage._get_credentials()
super().__init__(*args, **kwargs)
class AzureWorkerPlugin(WorkerPlugin):
""" worker plugin enables custom code to run at different stages of the Workers' lifecycle
At startup, we wrap the azure blob storage to add the authentication.
"""
def setup(self, worker):
fsspec.register_implementation("abfs", AzureBlobFileSystemWithDefaultCredentials)
fsspec.register_implementation("az", AzureBlobFileSystemWithDefaultCredentials)
return super().setup(worker)
fsspec.register_implementation("abfs", AzureBlobFileSystemWithDefaultCredentials)
fsspec.register_implementation("az", AzureBlobFileSystemWithDefaultCredentials)
async def get_dask_storage_parameters(tenant: Tenant, directory: Optional[str] = None) -> DaskStorageParameters:
storage_account_name = await PartitionService.get_storage_account_name(tenant.data_partition_id)
storage_options = {'account_name': storage_account_name}
base_directory = f'{tenant.bucket_name}/{directory}' if directory else tenant.bucket_name
return DaskStorageParameters(protocol='az',
base_directory=base_directory,
storage_options=storage_options,
worker_plugin=(AzureWorkerPlugin, "AzureWorkerPlugin"))
......@@ -2,3 +2,6 @@ azure-storage-blob==12.6.0 # fix version, 12.7.0 & 12.7.1 cause hangs in tests
azure-identity
azure-keyvault
cachetools
fsspec
adlfs
dask[distributed]==2021.4.1
# osdu core lib main python
--extra-index-url \
https://community.opengroup.org/api/v4/projects/465/packages/pypi/simple/
osdu-core-lib-python~=1.0.0
osdu-core-lib-python==1.1.0.dev339911
......@@ -5,11 +5,15 @@ from os import environ
class Config:
storage_account = environ.get('TESTING_AZ_STORAGE_ACCOUNT', 'http://127.0.0.1:10000/devstoreaccount1')
storage_account_name = environ.get('STORAGE_ACCOUNT_NAME', 'devstoreaccount1')
storage_connection_string = environ.get(
'STORAGE_CONNECTION_STRING',
'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;')
credentials = environ.get('TESTING_AZ_CREDENTIALS', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
container = environ.get('TESTING_AZ_CONTAINER', 'wdms-osdu')
environ.setdefault('KEYVAULT_URL', 'invalid-keyvault-name')
environ.setdefault('SERVICE_HOST_PARTITION', 'invalid-partition-service-name')
def pytest_configure(config):
blob_service_client = BlobServiceClient(account_url=Config.storage_account, credential=Config.credentials)
try:
......
import pytest
from mock import patch
import fsspec
import uuid
from osdu.core.api.storage.tenant import Tenant
from osdu_az.partition.partition_service import PartitionService
from osdu_az.storage.blob_storage_az import AzureAioBlobStorage
from osdu_az.storage.dask_storage_parameters import get_dask_storage_parameters
from tests.conftest import Config
@pytest.fixture
def with_azurite_credentials() -> AzureAioBlobStorage:
with patch.object(AzureAioBlobStorage, '_get_credentials', return_value=Config.credentials):
with patch.object(PartitionService, 'get_storage_account_name', return_value=Config.storage_account_name):
yield
@pytest.fixture
def test_tenant():
return Tenant(project_id=Config.storage_account, bucket_name=Config.container, data_partition_id='local')
@pytest.mark.asyncio
async def test_read_write(with_azurite_credentials, test_tenant):
parameters = await get_dask_storage_parameters(test_tenant)
# need to use connection for fsspec with Azurite
parameters.storage_options['connection_string'] = Config.storage_connection_string
fs = fsspec.filesystem(parameters.protocol, **parameters.storage_options)
file_path = f'{Config.container}/{uuid.uuid4()}'
with fs.open(file_path, 'w') as wfile:
wfile.write('test content')
with fs.open(file_path, 'r') as rfile:
content = rfile.read()
fs.rm(file_path)
assert content == 'test content'
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment