Commit a3d76059 authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

Merge branch 'dask_parameters' into 'master'

add dask parameters for azure

See merge request !13
parents f4172ec7 6054beb5
Pipeline #47181 failed with stages
in 1 minute and 51 seconds
__version__ = '1.0.2'
__version__ = '1.1.0'
......@@ -47,7 +47,8 @@ class AzureAioBlobStorage(BlobStorageBase):
def _build_url(self, storage_account: str):
return f'https://{storage_account}.blob.core.windows.net'
def _get_credentials(self):
@staticmethod
def _get_credentials():
if AzureAioBlobStorage.Credentials is None:
_LOGGER.info(f"Acquire new Credentials using DefaultAzureCredential")
AzureAioBlobStorage.Credentials = DefaultAzureCredential(
......
from typing import Optional
import adlfs
import fsspec
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from osdu.core.api.storage.tenant import Tenant
from .blob_storage_az import AzureAioBlobStorage
from ..partition.partition_service import PartitionService
class AzureBlobFileSystemWithDefaultCredentials(adlfs.AzureBlobFileSystem):
""" Wrap the azure file system to add credentials if not present
The Azure credential object cannot be serialized (pickle) to the workers
so, it needs to be instantiated from the worker.
"""
def __init__(self, *args, **kwargs):
has_credential = (
"credential" in kwargs or "account_key" in kwargs or "connection_string" in kwargs
)
if not has_credential:
kwargs["credential"] = AzureAioBlobStorage._get_credentials()
super().__init__(*args, **kwargs)
def register_azure_fsspec():
fsspec.register_implementation("abfs", AzureBlobFileSystemWithDefaultCredentials)
fsspec.register_implementation("az", AzureBlobFileSystemWithDefaultCredentials)
async def get_dask_storage_parameters(tenant: Tenant, directory: Optional[str] = None) -> DaskStorageParameters:
storage_account_name = await PartitionService.get_storage_account_name(tenant.data_partition_id)
storage_options = {'account_name': storage_account_name}
base_directory = f'{tenant.bucket_name}/{directory}' if directory else tenant.bucket_name
return DaskStorageParameters(protocol='az',
base_directory=base_directory,
storage_options=storage_options,
register_fsspec_implementation=register_azure_fsspec)
......@@ -2,3 +2,5 @@ azure-storage-blob==12.6.0 # fix version, 12.7.0 & 12.7.1 cause hangs in tests
azure-identity
azure-keyvault
cachetools
fsspec
adlfs
# osdu core lib main python
--extra-index-url \
https://community.opengroup.org/api/v4/projects/465/packages/pypi/simple/
osdu-core-lib-python~=1.0.0
osdu-core-lib-python~=1.1.0
......@@ -5,11 +5,15 @@ from os import environ
class Config:
storage_account = environ.get('TESTING_AZ_STORAGE_ACCOUNT', 'http://127.0.0.1:10000/devstoreaccount1')
storage_account_name = environ.get('STORAGE_ACCOUNT_NAME', 'devstoreaccount1')
storage_connection_string = environ.get(
'STORAGE_CONNECTION_STRING',
'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;')
credentials = environ.get('TESTING_AZ_CREDENTIALS', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
container = environ.get('TESTING_AZ_CONTAINER', 'wdms-osdu')
environ.setdefault('KEYVAULT_URL', 'invalid-keyvault-name')
environ.setdefault('SERVICE_HOST_PARTITION', 'invalid-partition-service-name')
def pytest_configure(config):
blob_service_client = BlobServiceClient(account_url=Config.storage_account, credential=Config.credentials)
try:
......
import pytest
from mock import patch
import fsspec
import uuid
from osdu.core.api.storage.tenant import Tenant
from osdu_az.partition.partition_service import PartitionService
from osdu_az.storage.blob_storage_az import AzureAioBlobStorage
from osdu_az.storage.dask_storage_parameters import get_dask_storage_parameters
from tests.conftest import Config
@pytest.fixture
def with_azurite_credentials() -> AzureAioBlobStorage:
with patch.object(AzureAioBlobStorage, '_get_credentials', return_value=Config.credentials):
with patch.object(PartitionService, 'get_storage_account_name', return_value=Config.storage_account_name):
yield
@pytest.fixture
def test_tenant():
return Tenant(project_id=Config.storage_account, bucket_name=Config.container, data_partition_id='local')
@pytest.mark.asyncio
async def test_read_write(with_azurite_credentials, test_tenant):
parameters = await get_dask_storage_parameters(test_tenant)
# need to use connection for fsspec with Azurite
parameters.storage_options['connection_string'] = Config.storage_connection_string
fs = fsspec.filesystem(parameters.protocol, **parameters.storage_options)
file_path = f'{Config.container}/{uuid.uuid4()}'
with fs.open(file_path, 'w') as wfile:
wfile.write('test content')
with fs.open(file_path, 'r') as rfile:
content = rfile.read()
fs.rm(file_path)
assert content == 'test content'
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment