diff --git a/NOTICE b/NOTICE index 9c01355ccb2aa49b6dd8e88834fda75a890f45df..cba63077a25cf65fc09068d214d85ea797766b7d 100644 --- a/NOTICE +++ b/NOTICE @@ -26,6 +26,7 @@ The following software have components provided under the terms of this license: - msgpack (from http://msgpack.org/) - multidict (from https://github.com/aio-libs/multidict/) - numpy (from http://www.numpy.org) +- openapi-spec-validator (from https://github.com/p1c2u/openapi-spec-validator) - opencensus (from https://github.com/census-instrumentation/opencensus-python) - opencensus-context (from https://github.com/census-instrumentation/opencensus-python/tree/master/context/opencensus-context) - opencensus-ext-azure (from ) @@ -95,6 +96,7 @@ The following software have components provided under the terms of this license: - mock (from https://github.com/testing-cabal/mock) - numpy (from http://www.numpy.org) - oauthlib (from https://github.com/idan/oauthlib) +- openapi-schema-validator (from https://github.com/p1c2u/openapi-schema-validator) - packaging (from https://github.com/pypa/packaging) - pandas (from http://pandas.pydata.org) - partd (from http://github.com/dask/partd/) diff --git a/app/bulk_persistence/dask/dask_bulk_storage.py b/app/bulk_persistence/dask/dask_bulk_storage.py index 41100250d550e4109d3e2fb394105bfd82b14e8d..09eb9d1514c8f610aff11508cee12ee3d1970375 100644 --- a/app/bulk_persistence/dask/dask_bulk_storage.py +++ b/app/bulk_persistence/dask/dask_bulk_storage.py @@ -18,26 +18,27 @@ import json import time from contextlib import suppress from functools import wraps -from logging import getLogger from operator import attrgetter -from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters - import fsspec import pandas as pd +from pyarrow.lib import ArrowException +import dask +import dask.dataframe as dd +from dask.distributed import Client as DaskDistributedClient, WorkerPlugin + +from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters + from app.bulk_persistence import BulkId +from app.bulk_persistence.dask.traces import wrap_trace_process from app.bulk_persistence.dask.errors import BulkNotFound, BulkNotProcessable from app.bulk_persistence.dask.utils import (SessionFileMeta, by_pairs, do_merge, set_index, worker_capture_timing_handlers) + from app.helper.logger import get_logger from app.helper.traces import with_trace from app.persistence.sessions_storage import Session -from app.utils import capture_timings, get_wdms_temp_dir -from pyarrow.lib import ArrowException - -import dask -import dask.dataframe as dd -from dask.distributed import Client as DaskDistributedClient, WorkerPlugin +from app.utils import capture_timings, get_wdms_temp_dir, get_ctx dask.config.set({'temporary_directory': get_wdms_temp_dir()}) @@ -55,9 +56,12 @@ def handle_pyarrow_exceptions(target): class DefaultWorkerPlugin(WorkerPlugin): + def __init__(self, logger=None, register_fsspec_implementation=None) -> None: + self.worker = None global _LOGGER _LOGGER = logger + self._register_fsspec_implementation = register_fsspec_implementation get_logger().debug("WorkerPlugin initialised") super().__init__() @@ -69,8 +73,8 @@ class DefaultWorkerPlugin(WorkerPlugin): def transition(self, key, start, finish, *args, **kwargs): if finish == 'error': - exc = self.worker.exceptions[key] - getLogger().exception("Task '%s' has failed with exception: %s" % (key, str(exc))) + # exc = self.worker.exceptions[key] + get_logger().exception(f"Task '{key}' has failed with exception") class DaskBulkStorage: @@ -84,7 +88,7 @@ class DaskBulkStorage: """ use `create` to create instance """ self._parameters = None self._fs = None - + @classmethod async def create(cls, parameters: DaskStorageParameters, dask_client=None) -> 'DaskBulkStorage': instance = cls() @@ -94,7 +98,7 @@ class DaskBulkStorage: async with DaskBulkStorage.lock_client: if not DaskBulkStorage.client: DaskBulkStorage.client = dask_client or await DaskDistributedClient(asynchronous=True, processes=True) - + if parameters.register_fsspec_implementation: parameters.register_fsspec_implementation() @@ -103,8 +107,8 @@ class DaskBulkStorage: name="LoggerWorkerPlugin", logger=get_logger(), register_fsspec_implementation=parameters.register_fsspec_implementation) - - get_logger().debug(f"dask client initialized : {DaskBulkStorage.client}") + + get_logger().info(f"Distributed Dask client initialized : {DaskBulkStorage.client}") instance._fs = fsspec.filesystem(parameters.protocol, **parameters.storage_options) return instance @@ -146,9 +150,10 @@ class DaskBulkStorage: **kwargs: dict (of dicts) Passthrough key-word arguments for read backend. """ get_logger().debug(f"loading bulk : {path}") - return self.client.submit(dd.read_parquet, path, engine='pyarrow-dataset', - storage_options=self._parameters.storage_options, - **kwargs) + return self._submit_with_trace(dd.read_parquet, path, + engine='pyarrow-dataset', + storage_options=self._parameters.storage_options, + **kwargs) def _load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame: """Return a dask Dataframe of a record at the specified version. @@ -156,7 +161,24 @@ class DaskBulkStorage: """ return self._load(self._get_blob_path(record_id, bulk_id)) + def _submit_with_trace(self, target_func, *args, **kwargs): + """ + Submit given target_func to Distributed Dask workers and add tracing required stuff + """ + kwargs['span_context'] = get_ctx().tracer.span_context + kwargs['target_func'] = target_func + return self.client.submit(wrap_trace_process, *args, **kwargs) + + def _map_with_trace(self, target_func, *args, **kwargs): + """ + Submit given target_func to Distributed Dask workers and add tracing required stuff + """ + kwargs['span_context'] = get_ctx().tracer.span_context + kwargs['target_func'] = target_func + return self.client.map(wrap_trace_process, *args, **kwargs) + @capture_timings('load_bulk', handlers=worker_capture_timing_handlers) + @with_trace('load_bulk') async def load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame: """Return a dask Dataframe of a record at the specified version.""" try: @@ -172,17 +194,19 @@ class DaskBulkStorage: we should be able to change or support other format easily ? schema={} instead of 'infer' fixes wrong inference for columns of type string starting with nan values """ - return self.client.submit(dd.to_parquet, ddf, path, schema={}, engine='pyarrow', - storage_options=self._parameters.storage_options) + return self._submit_with_trace(dd.to_parquet, ddf, path, + schema={}, + engine='pyarrow', + storage_options=self._parameters.storage_options) def _save_with_pandas(self, path, pdf: dd.DataFrame): """Save the dataframe to a parquet file(s). pdf: pd.DataFrame or Future returns a Future """ - return self.client.submit(pdf.to_parquet, path, - engine='pyarrow', - storage_options=self._parameters.storage_options) + return self._submit_with_trace(pdf.to_parquet, path, + engine='pyarrow', + storage_options=self._parameters.storage_options) def _check_incoming_chunk(self, df): # TODO should we test if is_monotonic?, unique ? @@ -289,13 +313,12 @@ class DaskBulkStorage: if not dfs: raise BulkNotProcessable("No data to commit") - dfs = self.client.map(set_index, dfs) - + dfs = self._map_with_trace(set_index, dfs) while len(dfs) > 1: - dfs = [self.client.submit(do_merge, a, b) for a, b in by_pairs(dfs)] + dfs = [self._submit_with_trace(do_merge, a, b) for a, b in by_pairs(dfs)] return await self.save_blob(dfs[0], record_id=session.recordId) - + async def make_local_dask_bulk_storage(base_directory: str) -> DaskBulkStorage: params = DaskStorageParameters(protocol='file', diff --git a/app/bulk_persistence/dask/traces.py b/app/bulk_persistence/dask/traces.py new file mode 100644 index 0000000000000000000000000000000000000000..c7dbaba090c477bd03e59f5f4681fa2b33623900 --- /dev/null +++ b/app/bulk_persistence/dask/traces.py @@ -0,0 +1,28 @@ +from opencensus.trace.span import SpanKind +from opencensus.trace import tracer as open_tracer +from opencensus.trace.samplers import AlwaysOnSampler + +from app.helper.traces import create_exporter +from app.conf import Config + +_EXPORTER = None + + +def wrap_trace_process(*args, **kwargs): + global _EXPORTER + + target_func = kwargs.pop('target_func') + span_context = kwargs.pop('span_context') + if not span_context or not target_func: + raise AttributeError("Keyword arguments should contain 'target_func' and 'span_context'") + + if _EXPORTER is None: + _EXPORTER = create_exporter(service_name=Config.service_name.value) + + tracer = open_tracer.Tracer(span_context=span_context, + sampler=AlwaysOnSampler(), + exporter=_EXPORTER) + + with tracer.span(name=f"Dask Worker - {target_func.__name__}") as span: + span.span_kind = SpanKind.CLIENT + return target_func(*args, **kwargs) diff --git a/app/bulk_utils.py b/app/bulk_utils.py index 9c8a3640f9125a1daaa6a312d15cbdf8cd11b215..670a07480abe39069e554b00ea8a8f28b940f565 100644 --- a/app/bulk_utils.py +++ b/app/bulk_utils.py @@ -39,7 +39,7 @@ from app.routers.common_parameters import ( json_orient_parameter) from app.routers.sessions import (SessionInternal, UpdateSessionState, UpdateSessionStateValue, WithSessionStorages, get_session_dependencies) - +from app.helper.traces import with_trace router_bulk = APIRouter() # router dedicated to bulk APIs @@ -54,6 +54,7 @@ def _check_df_columns_type(df: pd.DataFrame): detail=f'All columns type should be string') +@with_trace("get_df_from_request") async def get_df_from_request(request: Request, orient: Optional[str] = None) -> pd.DataFrame: """ extract dataframe from request """ @@ -128,6 +129,7 @@ class DataFrameRender: return list(selected) @staticmethod + @with_trace('process_params') async def process_params(df, params: GetDataParams): if params.curves: selection = list(map(str.strip, params.curves.split(','))) @@ -147,6 +149,7 @@ class DataFrameRender: return df @staticmethod + @with_trace('df_render') async def df_render(df, params: GetDataParams, accept: str = None): if params.describe: return { @@ -211,6 +214,7 @@ async def post_data(record_id: str, ctx: Context = Depends(get_ctx), dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage), ): + @with_trace("save_blob") async def save_blob(): df = await get_df_from_request(request, orient) _check_df_columns_type(df) @@ -282,8 +286,10 @@ async def get_data_version( bulk_urn = get_bulk_uri_osdu(record) if bulk_urn is not None: bulk_id, prefix = BulkId.bulk_urn_decode(bulk_urn) - else: # fallback on ddms_v2 Persistence for wks:log schema + else: + # fallback on ddms_v2 Persistence for wks:log schema bulk_id, prefix = LogBulkHelper.get_bulk_id(record, None) + try: if bulk_id is None: raise BulkNotFound(record_id=record_id, bulk_id=None) @@ -293,6 +299,7 @@ async def get_data_version( df = await get_dataframe(ctx, bulk_id) else: raise BulkNotFound(record_id=record_id, bulk_id=bulk_id) + df = await DataFrameRender.process_params(df, ctrl_p) return await DataFrameRender.df_render(df, ctrl_p, request.headers.get('Accept')) except BulkError as ex: diff --git a/tests/unit/routers/chunking_test.py b/tests/unit/routers/chunking_test.py index 0d8886d0eca0625f022aa1629b666f5e36ae9788..6551178bbfa2d81d8ca8dd00c32362df9d31a868 100644 --- a/tests/unit/routers/chunking_test.py +++ b/tests/unit/routers/chunking_test.py @@ -26,6 +26,7 @@ from app import conf from tests.unit.persistence.dask_blob_storage_test import generate_df from tests.unit.test_utils import nope_logger_fixture + Definitions = { 'WellLog': { 'api_version': 'v3', @@ -118,12 +119,12 @@ def _cast_datetime_to_datetime64_ns(result_df): for name, col in result_df.items(): if name.startswith('date'): result_df[name] = result_df[name].astype('datetime64[ns]') - + return result_df @pytest.fixture -def bob(nope_logger_fixture, monkeypatch): +def init_fixtures(nope_logger_fixture, monkeypatch): with TemporaryDirectory() as tmp_dir: monkeypatch.setenv(name='USE_LOCALFS_BLOB_STORAGE_WITH_PATH', value=tmp_dir) conf.Config = conf.ConfigurationContainer.with_load_all() @@ -131,7 +132,7 @@ def bob(nope_logger_fixture, monkeypatch): @pytest.fixture -def setup_client(nope_logger_fixture, bob): +def setup_client(init_fixtures): from app.wdms_app import wdms_app, enable_alpha_feature from app.wdms_app import app_injector @@ -245,11 +246,11 @@ def test_send_all_data_once(setup_client, ['MD', 'float_X', 'str_X', 'date_X'] ]) def test_send_all_data_once_post_data_v2_get_data_v3(setup_client, - entity_type, - columns, - content_type_header, - create_func, - accept_content): + entity_type, + columns, + content_type_header, + create_func, + accept_content): client, tmp_dir = setup_client record_id = _create_record(client, entity_type) chunking_url = Definitions[entity_type]['chunking_url'] @@ -302,8 +303,8 @@ def test_send_all_data_once_post_data_v2_get_data_v3(setup_client, # BELOW test cases FAIL with UPDATE mode: # => If adding new column Date/String not starting at first index AND override an existing column - #['MD', 'date_X'], - #['MD', 'float_X', 'str_X', 'date_X'], + # ['MD', 'date_X'], + # ['MD', 'float_X', 'str_X', 'date_X'], ]) @pytest.mark.parametrize("session_mode", [ 'overwrite', @@ -479,11 +480,11 @@ def test_add_curve_by_chunk_overlap_different_cols(setup_client, entity_type): chunking_url = Definitions[entity_type]['chunking_url'] _create_chunks(client, entity_type, record_id=record_id, cols_ranges=[(['MD', 'A'], range(5, 10)), - (['B'], range(8)), # overlap left side - (['C'], range(8, 15)), # overlap left side - (['D'], range(6, 8)), # within - (['E'], range(15)), # overlap both side - ]) + (['B'], range(8)), # overlap left side + (['C'], range(8, 15)), # overlap left side + (['D'], range(6, 8)), # within + (['E'], range(15)), # overlap both side + ]) data_response = client.get(f'{chunking_url}/{record_id}/data', headers={'Accept': 'application/json'}) assert data_response.status_code == 200 @@ -617,7 +618,7 @@ def test_creates_two_sessions_one_record_with_chunks_different_format(setup_clie _create_chunks(client, entity_type, record_id=record_id, data_format='json', cols_ranges=[(['X'], range(5, 20))]) _create_chunks(client, entity_type, record_id=record_id, data_format='parquet', cols_ranges=[(['Y'], range(5, 20)), - (['Z'], range(5, 20))]) + (['Z'], range(5, 20))]) data_response = client.get(f'{chunking_url}/{record_id}/data') assert data_response.status_code == 200 df = _create_df_from_response(data_response) @@ -633,7 +634,7 @@ def test_creates_two_sessions_two_record_with_chunks(setup_client, entity_type): _create_chunks(client, entity_type, record_id=record_id, cols_ranges=[(['X'], range(5, 20))]) _create_chunks(client, entity_type, record_id=another_record_id, cols_ranges=[(['Y'], range(0, 10)), - (['Z'], range(5, 10))]) + (['Z'], range(5, 10))]) data_response = client.get(f'{chunking_url}/{record_id}/data') assert data_response.status_code == 200 df = _create_df_from_response(data_response) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 059943fdd9876e5e65f20c029d7611a238a4a5ee..962a8840810542bc888f0f8c939883e2bd46cf96 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -13,11 +13,14 @@ # limitations under the License. import pytest -from tempfile import TemporaryDirectory -from odes_storage.models import Record, StorageAcl, Legal import mock import asyncio +from tempfile import TemporaryDirectory +from opencensus.trace.span_context import SpanContext from contextlib import contextmanager + +from odes_storage.models import Record, StorageAcl, Legal + from app.model.model_utils import record_to_dict from app.utils import get_or_create_ctx @@ -29,10 +32,13 @@ def from_env(key, default=None): return result + @pytest.fixture() def ctx_fixture(): """ Create context with a fake tracer in it """ - ctx = get_or_create_ctx().set_current_with_value(tracer=mock.MagicMock(), logger=NopeLogger()) + mock_mock = mock.MagicMock() + mock_mock.span_context = SpanContext(trace_id="trace-id", span_id="span_id") + ctx = get_or_create_ctx().set_current_with_value(tracer=mock_mock, logger=NopeLogger()) yield ctx