Commit d4acc9a7 authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

update injectors code

parent 2ea89a2e
Pipeline #46801 failed with stage
in 23 seconds
......@@ -57,12 +57,6 @@ def handle_pyarrow_exceptions(target):
return async_inner
class DaskBlobStorageBase(ABC):
@abstractmethod
async def build_dask_blob_storage(self, tenant):
raise NotImplementedError('DaskBlobStorageBase.build_dask_blob_storage')
class DefaultWorkerPlugin(WorkerPlugin):
def __init__(self, logger=None, register_fsspec_implementation=None) -> None:
global _LOGGER
......@@ -91,28 +85,28 @@ class DaskBulkStorage:
""" use `create` to create instance """
self._parameters = None
self._fs = None
@classmethod
async def create(cls, parameters: DaskStorageParameters, dask_client=None) -> 'DaskBulkStorage':
instance = cls()
instance._parameters = parameters
instance._fs = fsspec.filesystem(parameters.protocol, **parameters.storage_options)
# Initialise the dask client. Returns False if client was already initialized
if not DaskBulkStorage.client:
if cls.lock_client is None:
cls.lock_client = asyncio.Lock()
async with DaskBulkStorage.lock_client:
if not DaskBulkStorage.client:
DaskBulkStorage.client = dask_client or DaskDistributedClient(asynchronous=True, processes=True)
# Initialise the dask client.
async with DaskBulkStorage.lock_client:
if not DaskBulkStorage.client:
DaskBulkStorage.client = dask_client or await DaskDistributedClient(asynchronous=True, processes=True)
if parameters.register_fsspec_implementation:
parameters.register_fsspec_implementation()
await DaskBulkStorage.client.register_worker_plugin(
DefaultWorkerPlugin,
name="LoggerWorkerPlugin",
logger=get_logger(),
register_fsspec_implementation=parameters.register_fsspec_implementation)
if parameters.register_fsspec_implementation:
parameters.register_fsspec_implementation()
await DaskBulkStorage.client.register_worker_plugin(
DefaultWorkerPlugin,
name="LoggerWorkerPlugin",
logger=get_logger(),
register_fsspec_implementation=parameters.register_fsspec_implementation)
get_logger().debug(f"dask client initialized : {DaskBulkStorage.client}")
return instance
......
......@@ -15,7 +15,7 @@
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu_aws.storage.storage_aws import AwsStorage
from app.bulk_persistence.dask.blob_storage import DaskBlobStorageBase
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage
from .app_injector import AppInjector, AppInjectorModule
from app.conf import Config
......@@ -23,7 +23,7 @@ from app.conf import Config
class AwsInjector(AppInjectorModule):
def configure(self, app_injector: AppInjector):
app_injector.register(BlobStorageBase, AwsInjector.build_aws_storage)
app_injector.register(DaskBlobStorageBase, AwsInjector.build_aws_dask_blob_storage)
app_injector.register(DaskBulkStorage, AwsInjector.build_aws_dask_blob_storage)
@staticmethod
async def build_aws_storage() -> BlobStorageBase:
......@@ -33,5 +33,5 @@ class AwsInjector(AppInjectorModule):
)
@staticmethod
async def build_aws_dask_blob_storage() -> DaskBlobStorageBase:
async def build_aws_dask_blob_storage() -> DaskBulkStorage:
raise NotImplementedError()
......@@ -12,25 +12,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from app.bulk_persistence import resolve_tenant
from app.bulk_persistence.dask.dask_bulk_storage import (DaskBulkStorage)
from app.utils import Context
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu_az.storage.blob_storage_az import AzureAioBlobStorage
from .app_injector import AppInjector, AppInjectorModule
from app.bulk_persistence.dask.blob_storage import DaskBlobStorageBase
from osdu_az.storage.dask_storage_parameters import \
get_dask_storage_parameters as az_parameters
# Below import should be pull out to dedicated Azure package osdu.core.api.storage
from app.bulk_persistence.dask.azure import DaskBlobStorageAzure
from .app_injector import AppInjector, AppInjectorModule
class AzureInjector(AppInjectorModule):
def configure(self, app_injector: AppInjector):
app_injector.register(BlobStorageBase, AzureInjector.build_az_blob_storage)
app_injector.register(DaskBlobStorageBase, AzureInjector.build_dask_az_blob_storage)
app_injector.register(DaskBulkStorage, AzureInjector.build_dask_az_blob_storage)
@staticmethod
async def build_az_blob_storage() -> BlobStorageBase:
return AzureAioBlobStorage()
@staticmethod
async def build_dask_az_blob_storage() -> DaskBlobStorageBase:
return DaskBlobStorageAzure()
async def build_dask_az_blob_storage() -> DaskBulkStorage:
ctx: Context = Context.current()
tenant = await resolve_tenant(ctx.partition_id)
params = await az_parameters(tenant)
return await DaskBulkStorage.create(params)
......@@ -19,14 +19,14 @@ from app.utils import get_http_client_session
from .app_injector import AppInjector, AppInjectorModule
from app.utils import Context
from app.bulk_persistence import resolve_tenant
from app.bulk_persistence.dask.blob_storage import DaskBlobStorageBase
from app.bulk_persistence.dask.google import DaskBlobStorageGoogle
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage
from osdu_gcp.storage.dask_storage_parameters import get_dask_storage_parameters as gcp_parameters
class GCPInjector(AppInjectorModule):
def configure(self, app_injector: AppInjector):
app_injector.register(BlobStorageBase, GCPInjector.build_gcp_blob_storage)
app_injector.register(DaskBlobStorageBase, GCPInjector.build_dask_gcp_blob_storage)
app_injector.register(DaskBulkStorage, GCPInjector.build_dask_gcp_blob_storage)
@staticmethod
async def build_gcp_blob_storage(*args, **kwargs) -> BlobStorageBase:
......@@ -39,6 +39,9 @@ class GCPInjector(AppInjectorModule):
)
@staticmethod
async def build_dask_gcp_blob_storage() -> DaskBlobStorageBase:
return DaskBlobStorageGoogle()
async def build_dask_gcp_blob_storage() -> DaskBulkStorage:
ctx: Context = Context.current()
tenant = await resolve_tenant(ctx.partition_id)
params = await gcp_parameters(tenant)
return await DaskBulkStorage.create(params)
......@@ -5,13 +5,13 @@ from app.utils import get_http_client_session
from app.utils import Context
from .app_injector import AppInjector, AppInjectorModule
from app.bulk_persistence import resolve_tenant
from app.bulk_persistence.dask.blob_storage import DaskBlobStorageBase
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage
class IBMInjector(AppInjectorModule):
def configure(self, app_injector: AppInjector):
app_injector.register(BlobStorageBase, IBMInjector.build_ibm_blob_storage)
app_injector.register(DaskBlobStorageBase, IBMInjector.build_ibm_dask_blob_storage)
app_injector.register(DaskBulkStorage, IBMInjector.build_ibm_dask_blob_storage)
@staticmethod
async def build_ibm_blob_storage(*args, **kwargs) -> BlobStorageBase:
......@@ -24,5 +24,5 @@ class IBMInjector(AppInjectorModule):
)
@staticmethod
async def build_ibm_dask_blob_storage() -> DaskBlobStorageBase:
async def build_ibm_dask_blob_storage() -> DaskBulkStorage:
raise NotImplementedError()
......@@ -31,8 +31,7 @@ from app.clients.search_service_client import SearchServiceClient
from app.clients import make_search_client, make_storage_record_client
from app.persistence.sessions_storage import SessionsStorage
from app.bulk_persistence.dask.blob_storage import (DaskBlobStorageBase,
DaskBlobStorageLocal)
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage, make_local_dask_bulk_storage
class MainInjector(AppInjectorModule):
......@@ -100,10 +99,10 @@ class MainInjector(AppInjectorModule):
logger.warning(f'overriding blob storage to use local fs on path ' + blob_storage_localfs)
app_injector.register(BlobStorageBase, _blob_storage_builder)
async def _dask_blob_storage_builder():
return DaskBlobStorageLocal(base_directory=blob_storage_localfs)
async def _dask_blob_storage_builder() -> DaskBulkStorage:
return await make_local_dask_bulk_storage(base_directory=blob_storage_localfs)
app_injector.register(DaskBlobStorageBase, _dask_blob_storage_builder)
app_injector.register(DaskBulkStorage, _dask_blob_storage_builder)
logger.warning(f'overriding DASK blob storage to use local fs on path ' + blob_storage_localfs)
......
......@@ -32,6 +32,6 @@ osdu-data-ecosystem-search>=0.3.2, <0.4
osdu-core-lib-python-ibm>=0.0.1, <0.1
osdu-core-lib-python-gcp==1.1.0.dev355075
osdu-core-lib-python-azure==1.1.0.dev355774
osdu-core-lib-python-azure==1.1.0.dev396021
osdu-core-lib-python-aws>=0.0.1, <0.1
osdu-core-lib-python==1.1.0.dev339911
osdu-core-lib-python==1.1.0.dev395936
......@@ -10,7 +10,7 @@ import pytest
from osdu.core.api.storage.blob_storage_local_fs import LocalFSBlobStorage
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from app.bulk_persistence.dask.dask_bulk_storage import (DaskBlobStorageBase, DaskBlobStorageLocal)
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage, make_local_dask_bulk_storage
from app.clients import StorageRecordServiceClient
from app.persistence.sessions_storage import SessionsStorage, SessionState
......@@ -121,10 +121,10 @@ def setup_client(nope_logger_fixture, bob):
async def sessions_storage_builder(*args, **kwargs):
return SessionsStorage(local_blob_storage)
async def dask_blob_storage_builder():
return DaskBlobStorageLocal(base_directory=tmp_dir)
async def dask_blob_storage_builder() -> DaskBulkStorage:
return await make_local_dask_bulk_storage(base_directory=tmp_dir)
app_injector.register(DaskBlobStorageBase, dask_blob_storage_builder)
app_injector.register(DaskBulkStorage, dask_blob_storage_builder)
app_injector.register(BlobStorageBase, blob_storage_builder)
app_injector.register(SessionsStorage, sessions_storage_builder)
app_injector.register(StorageRecordServiceClient, storage_service_builder)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment