Commit 403cdca5 authored by Cyril Monmouton's avatar Cyril Monmouton
Browse files

DaskBulkStorage-Tests/Enh: update mock fixture to make test pass

parent 367409b4
Pipeline #50565 failed with stages
in 46 seconds
......@@ -20,24 +20,26 @@ from contextlib import suppress
from functools import wraps
from logging import getLogger
from operator import attrgetter
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
import fsspec
import pandas as pd
from pyarrow.lib import ArrowException
import dask
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient, WorkerPlugin
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from app.bulk_persistence import BulkId
from app.bulk_persistence.dask.traces import wrap_trace_process
from app.bulk_persistence.dask.errors import BulkNotFound, BulkNotProcessable
from app.bulk_persistence.dask.utils import (SessionFileMeta, by_pairs,
do_merge, set_index,
worker_capture_timing_handlers)
from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import capture_timings, get_wdms_temp_dir
from pyarrow.lib import ArrowException
import dask
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient, WorkerPlugin
from app.utils import capture_timings, get_wdms_temp_dir, get_ctx
dask.config.set({'temporary_directory': get_wdms_temp_dir()})
......@@ -55,9 +57,12 @@ def handle_pyarrow_exceptions(target):
class DefaultWorkerPlugin(WorkerPlugin):
def __init__(self, logger=None, register_fsspec_implementation=None) -> None:
self.worker = None
global _LOGGER
_LOGGER = logger
self._register_fsspec_implementation = register_fsspec_implementation
get_logger().debug("WorkerPlugin initialised")
super().__init__()
......@@ -69,7 +74,8 @@ class DefaultWorkerPlugin(WorkerPlugin):
def transition(self, key, start, finish, *args, **kwargs):
if finish == 'error':
getLogger().exception(f"DefaultWorkerPlugin: Task '{key}' has failed")
# exc = self.worker.exceptions[key]
get_logger().exception(f"Task '{key}' has failed with exception")
class DaskBulkStorage:
......@@ -83,7 +89,7 @@ class DaskBulkStorage:
""" use `create` to create instance """
self._parameters = None
self._fs = None
@classmethod
async def create(cls, parameters: DaskStorageParameters, dask_client=None) -> 'DaskBulkStorage':
instance = cls()
......@@ -93,7 +99,7 @@ class DaskBulkStorage:
async with DaskBulkStorage.lock_client:
if not DaskBulkStorage.client:
DaskBulkStorage.client = dask_client or await DaskDistributedClient(asynchronous=True, processes=True)
if parameters.register_fsspec_implementation:
parameters.register_fsspec_implementation()
......@@ -102,7 +108,7 @@ class DaskBulkStorage:
name="LoggerWorkerPlugin",
logger=get_logger(),
register_fsspec_implementation=parameters.register_fsspec_implementation)
get_logger().info(f"Distributed Dask client initialized : {DaskBulkStorage.client}")
instance._fs = fsspec.filesystem(parameters.protocol, **parameters.storage_options)
......@@ -145,7 +151,11 @@ class DaskBulkStorage:
**kwargs: dict (of dicts) Passthrough key-word arguments for read backend.
"""
get_logger().debug(f"loading bulk : {path}")
return self.client.submit(dd.read_parquet, path, engine='pyarrow-dataset',
return self.client.submit(wrap_trace_process,
dd.read_parquet,
get_ctx().tracer.span_context,
path,
engine='pyarrow-dataset',
storage_options=self._parameters.storage_options,
**kwargs)
......@@ -155,6 +165,15 @@ class DaskBulkStorage:
"""
return self._load(self._get_blob_path(record_id, bulk_id))
def _submit_with_trace(self, target_func, *args, **kwargs):
"""
Submit given target_func to Distributed Dask workers and add tracing required stuff
"""
return self.client.submit(wrap_trace_process,
target_func,
get_ctx().tracer.span_context,
*args, **kwargs)
@capture_timings('load_bulk', handlers=worker_capture_timing_handlers)
@with_trace('load_bulk')
async def load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
......@@ -171,7 +190,11 @@ class DaskBulkStorage:
Note:
we should be able to change or support other format easily ?
"""
return self.client.submit(dd.to_parquet, ddf, path, schema="infer",
return self.client.submit(wrap_trace_process,
dd.to_parquet,
get_ctx().tracer.span_context,
ddf, path,
schema="infer",
engine='pyarrow',
storage_options=self._parameters.storage_options)
......@@ -180,7 +203,12 @@ class DaskBulkStorage:
pdf: pd.DataFrame or Future<pd.DataFrame>
returns a Future<None>
"""
return self.client.submit(pdf.to_parquet, path,
return self.client.submit(wrap_trace_process,
pdf.to_parquet,
get_ctx().tracer.span_context,
path,
span_context=get_ctx().tracer.span_context,
engine='pyarrow',
storage_options=self._parameters.storage_options)
......@@ -292,10 +320,30 @@ class DaskBulkStorage:
dfs = self.client.map(set_index, dfs)
while len(dfs) > 1:
dfs = [self.client.submit(do_merge, a, b) for a, b in by_pairs(dfs)]
dfs = [self.client.submit(wrap_trace_process,
do_merge,
get_ctx().tracer.span_context,
a, b)
for a, b in by_pairs(dfs)]
return await self.save_blob(dfs[0], record_id=session.recordId)
def _test_method_2(self, future):
time.sleep(2)
result = future.result()
return result + 42
async def test_method_in_dask(self) -> dd.DataFrame:
fut1 = self._submit_with_trace(self._test_method_2, 0)
fut2 = self._submit_with_trace(self._test_method_2, fut1)
responses = await fut2
return responses
# return await self._submit_with_trace(dd.to_parquet, None, "42",
# schema="infer",
# engine='pyarrow',
# storage_options=self._parameters.storage_options)
async def make_local_dask_bulk_storage(base_directory: str) -> DaskBulkStorage:
params = DaskStorageParameters(protocol='file',
......
from opencensus.trace.span import SpanKind
from opencensus.trace import tracer as open_tracer
from opencensus.trace.samplers import AlwaysOnSampler
from app.helper.traces import create_exporter
from app.conf import Config
def wrap_trace_process(target_func, span_context, *args, **kwargs):
if not span_context:
raise AttributeError("span_content cannot be null")
tracer = open_tracer.Tracer(span_context=span_context,
sampler=AlwaysOnSampler(),
exporter=create_exporter(service_name=Config.service_name.value))
with tracer.span(name=f"Dask Worker - {target_func.__name__}") as span:
span.span_kind = SpanKind.CLIENT
return target_func(*args, **kwargs)
......@@ -326,6 +326,33 @@ async def get_data(
return await get_data_version(record_id, None, request, ctrl_p, ctx, dask_blob_storage)
"""
#################################################################
TEST METHODE - TO BE REMOVE AFTER USAGE
#################################################################
"""
@router_bulk.post("/test")
async def bob(
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
):
before = True
response = await dask_blob_storage.test_method_in_dask()
after = True
return {'value': response}
"""
#################################################################
TEST METHODE - TO BE REMOVE AFTER USAGE
#################################################################
"""
@router_bulk.patch(
"/{record_id}/sessions/{session_id}",
summary='Update a session, either commit or abandon.',
......
......@@ -13,11 +13,14 @@
# limitations under the License.
import pytest
from tempfile import TemporaryDirectory
from odes_storage.models import Record, StorageAcl, Legal
import mock
import asyncio
from tempfile import TemporaryDirectory
from opencensus.trace.span_context import SpanContext
from contextlib import contextmanager
from odes_storage.models import Record, StorageAcl, Legal
from app.model.model_utils import record_to_dict
from app.utils import get_or_create_ctx
......@@ -29,10 +32,13 @@ def from_env(key, default=None):
return result
@pytest.fixture()
def ctx_fixture():
""" Create context with a fake tracer in it """
ctx = get_or_create_ctx().set_current_with_value(tracer=mock.MagicMock(), logger=NopeLogger())
mock_mock = mock.MagicMock()
mock_mock.span_context = SpanContext(trace_id="trace-id", span_id="span_id")
ctx = get_or_create_ctx().set_current_with_value(tracer=mock_mock, logger=NopeLogger())
yield ctx
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment