Commit 36ef6233 authored by Cyril Monmouton's avatar Cyril Monmouton
Browse files

Merge branch 'feature/add-v3-vulk-tracing' into 'master'

Add v3 bulk tracing

See merge request osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services!153
parents c8df76b4 e4995b98
Pipeline #50853 passed with stages
in 11 minutes and 8 seconds
......@@ -26,6 +26,7 @@ The following software have components provided under the terms of this license:
- msgpack (from http://msgpack.org/)
- multidict (from https://github.com/aio-libs/multidict/)
- numpy (from http://www.numpy.org)
- openapi-spec-validator (from https://github.com/p1c2u/openapi-spec-validator)
- opencensus (from https://github.com/census-instrumentation/opencensus-python)
- opencensus-context (from https://github.com/census-instrumentation/opencensus-python/tree/master/context/opencensus-context)
- opencensus-ext-azure (from )
......@@ -95,6 +96,7 @@ The following software have components provided under the terms of this license:
- mock (from https://github.com/testing-cabal/mock)
- numpy (from http://www.numpy.org)
- oauthlib (from https://github.com/idan/oauthlib)
- openapi-schema-validator (from https://github.com/p1c2u/openapi-schema-validator)
- packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org)
- partd (from http://github.com/dask/partd/)
......
......@@ -18,26 +18,27 @@ import json
import time
from contextlib import suppress
from functools import wraps
from logging import getLogger
from operator import attrgetter
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
import fsspec
import pandas as pd
from pyarrow.lib import ArrowException
import dask
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient, WorkerPlugin
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from app.bulk_persistence import BulkId
from app.bulk_persistence.dask.traces import wrap_trace_process
from app.bulk_persistence.dask.errors import BulkNotFound, BulkNotProcessable
from app.bulk_persistence.dask.utils import (SessionFileMeta, by_pairs,
do_merge, set_index,
worker_capture_timing_handlers)
from app.helper.logger import get_logger
from app.helper.traces import with_trace
from app.persistence.sessions_storage import Session
from app.utils import capture_timings, get_wdms_temp_dir
from pyarrow.lib import ArrowException
import dask
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient, WorkerPlugin
from app.utils import capture_timings, get_wdms_temp_dir, get_ctx
dask.config.set({'temporary_directory': get_wdms_temp_dir()})
......@@ -55,9 +56,12 @@ def handle_pyarrow_exceptions(target):
class DefaultWorkerPlugin(WorkerPlugin):
def __init__(self, logger=None, register_fsspec_implementation=None) -> None:
self.worker = None
global _LOGGER
_LOGGER = logger
self._register_fsspec_implementation = register_fsspec_implementation
get_logger().debug("WorkerPlugin initialised")
super().__init__()
......@@ -69,8 +73,8 @@ class DefaultWorkerPlugin(WorkerPlugin):
def transition(self, key, start, finish, *args, **kwargs):
if finish == 'error':
exc = self.worker.exceptions[key]
getLogger().exception("Task '%s' has failed with exception: %s" % (key, str(exc)))
# exc = self.worker.exceptions[key]
get_logger().exception(f"Task '{key}' has failed with exception")
class DaskBulkStorage:
......@@ -84,7 +88,7 @@ class DaskBulkStorage:
""" use `create` to create instance """
self._parameters = None
self._fs = None
@classmethod
async def create(cls, parameters: DaskStorageParameters, dask_client=None) -> 'DaskBulkStorage':
instance = cls()
......@@ -94,7 +98,7 @@ class DaskBulkStorage:
async with DaskBulkStorage.lock_client:
if not DaskBulkStorage.client:
DaskBulkStorage.client = dask_client or await DaskDistributedClient(asynchronous=True, processes=True)
if parameters.register_fsspec_implementation:
parameters.register_fsspec_implementation()
......@@ -103,8 +107,8 @@ class DaskBulkStorage:
name="LoggerWorkerPlugin",
logger=get_logger(),
register_fsspec_implementation=parameters.register_fsspec_implementation)
get_logger().debug(f"dask client initialized : {DaskBulkStorage.client}")
get_logger().info(f"Distributed Dask client initialized : {DaskBulkStorage.client}")
instance._fs = fsspec.filesystem(parameters.protocol, **parameters.storage_options)
return instance
......@@ -146,9 +150,10 @@ class DaskBulkStorage:
**kwargs: dict (of dicts) Passthrough key-word arguments for read backend.
"""
get_logger().debug(f"loading bulk : {path}")
return self.client.submit(dd.read_parquet, path, engine='pyarrow-dataset',
storage_options=self._parameters.storage_options,
**kwargs)
return self._submit_with_trace(dd.read_parquet, path,
engine='pyarrow-dataset',
storage_options=self._parameters.storage_options,
**kwargs)
def _load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
"""Return a dask Dataframe of a record at the specified version.
......@@ -156,7 +161,24 @@ class DaskBulkStorage:
"""
return self._load(self._get_blob_path(record_id, bulk_id))
def _submit_with_trace(self, target_func, *args, **kwargs):
"""
Submit given target_func to Distributed Dask workers and add tracing required stuff
"""
kwargs['span_context'] = get_ctx().tracer.span_context
kwargs['target_func'] = target_func
return self.client.submit(wrap_trace_process, *args, **kwargs)
def _map_with_trace(self, target_func, *args, **kwargs):
"""
Submit given target_func to Distributed Dask workers and add tracing required stuff
"""
kwargs['span_context'] = get_ctx().tracer.span_context
kwargs['target_func'] = target_func
return self.client.map(wrap_trace_process, *args, **kwargs)
@capture_timings('load_bulk', handlers=worker_capture_timing_handlers)
@with_trace('load_bulk')
async def load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
"""Return a dask Dataframe of a record at the specified version."""
try:
......@@ -172,17 +194,19 @@ class DaskBulkStorage:
we should be able to change or support other format easily ?
schema={} instead of 'infer' fixes wrong inference for columns of type string starting with nan values
"""
return self.client.submit(dd.to_parquet, ddf, path, schema={}, engine='pyarrow',
storage_options=self._parameters.storage_options)
return self._submit_with_trace(dd.to_parquet, ddf, path,
schema={},
engine='pyarrow',
storage_options=self._parameters.storage_options)
def _save_with_pandas(self, path, pdf: dd.DataFrame):
"""Save the dataframe to a parquet file(s).
pdf: pd.DataFrame or Future<pd.DataFrame>
returns a Future<None>
"""
return self.client.submit(pdf.to_parquet, path,
engine='pyarrow',
storage_options=self._parameters.storage_options)
return self._submit_with_trace(pdf.to_parquet, path,
engine='pyarrow',
storage_options=self._parameters.storage_options)
def _check_incoming_chunk(self, df):
# TODO should we test if is_monotonic?, unique ?
......@@ -289,13 +313,12 @@ class DaskBulkStorage:
if not dfs:
raise BulkNotProcessable("No data to commit")
dfs = self.client.map(set_index, dfs)
dfs = self._map_with_trace(set_index, dfs)
while len(dfs) > 1:
dfs = [self.client.submit(do_merge, a, b) for a, b in by_pairs(dfs)]
dfs = [self._submit_with_trace(do_merge, a, b) for a, b in by_pairs(dfs)]
return await self.save_blob(dfs[0], record_id=session.recordId)
async def make_local_dask_bulk_storage(base_directory: str) -> DaskBulkStorage:
params = DaskStorageParameters(protocol='file',
......
from opencensus.trace.span import SpanKind
from opencensus.trace import tracer as open_tracer
from opencensus.trace.samplers import AlwaysOnSampler
from app.helper.traces import create_exporter
from app.conf import Config
_EXPORTER = None
def wrap_trace_process(*args, **kwargs):
global _EXPORTER
target_func = kwargs.pop('target_func')
span_context = kwargs.pop('span_context')
if not span_context or not target_func:
raise AttributeError("Keyword arguments should contain 'target_func' and 'span_context'")
if _EXPORTER is None:
_EXPORTER = create_exporter(service_name=Config.service_name.value)
tracer = open_tracer.Tracer(span_context=span_context,
sampler=AlwaysOnSampler(),
exporter=_EXPORTER)
with tracer.span(name=f"Dask Worker - {target_func.__name__}") as span:
span.span_kind = SpanKind.CLIENT
return target_func(*args, **kwargs)
......@@ -39,7 +39,7 @@ from app.routers.common_parameters import (
json_orient_parameter)
from app.routers.sessions import (SessionInternal, UpdateSessionState, UpdateSessionStateValue,
WithSessionStorages, get_session_dependencies)
from app.helper.traces import with_trace
router_bulk = APIRouter() # router dedicated to bulk APIs
......@@ -54,6 +54,7 @@ def _check_df_columns_type(df: pd.DataFrame):
detail=f'All columns type should be string')
@with_trace("get_df_from_request")
async def get_df_from_request(request: Request, orient: Optional[str] = None) -> pd.DataFrame:
""" extract dataframe from request """
......@@ -128,6 +129,7 @@ class DataFrameRender:
return list(selected)
@staticmethod
@with_trace('process_params')
async def process_params(df, params: GetDataParams):
if params.curves:
selection = list(map(str.strip, params.curves.split(',')))
......@@ -147,6 +149,7 @@ class DataFrameRender:
return df
@staticmethod
@with_trace('df_render')
async def df_render(df, params: GetDataParams, accept: str = None):
if params.describe:
return {
......@@ -211,6 +214,7 @@ async def post_data(record_id: str,
ctx: Context = Depends(get_ctx),
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
):
@with_trace("save_blob")
async def save_blob():
df = await get_df_from_request(request, orient)
_check_df_columns_type(df)
......@@ -282,8 +286,10 @@ async def get_data_version(
bulk_urn = get_bulk_uri_osdu(record)
if bulk_urn is not None:
bulk_id, prefix = BulkId.bulk_urn_decode(bulk_urn)
else: # fallback on ddms_v2 Persistence for wks:log schema
else:
# fallback on ddms_v2 Persistence for wks:log schema
bulk_id, prefix = LogBulkHelper.get_bulk_id(record, None)
try:
if bulk_id is None:
raise BulkNotFound(record_id=record_id, bulk_id=None)
......@@ -293,6 +299,7 @@ async def get_data_version(
df = await get_dataframe(ctx, bulk_id)
else:
raise BulkNotFound(record_id=record_id, bulk_id=bulk_id)
df = await DataFrameRender.process_params(df, ctrl_p)
return await DataFrameRender.df_render(df, ctrl_p, request.headers.get('Accept'))
except BulkError as ex:
......
......@@ -26,6 +26,7 @@ from app import conf
from tests.unit.persistence.dask_blob_storage_test import generate_df
from tests.unit.test_utils import nope_logger_fixture
Definitions = {
'WellLog': {
'api_version': 'v3',
......@@ -118,12 +119,12 @@ def _cast_datetime_to_datetime64_ns(result_df):
for name, col in result_df.items():
if name.startswith('date'):
result_df[name] = result_df[name].astype('datetime64[ns]')
return result_df
@pytest.fixture
def bob(nope_logger_fixture, monkeypatch):
def init_fixtures(nope_logger_fixture, monkeypatch):
with TemporaryDirectory() as tmp_dir:
monkeypatch.setenv(name='USE_LOCALFS_BLOB_STORAGE_WITH_PATH', value=tmp_dir)
conf.Config = conf.ConfigurationContainer.with_load_all()
......@@ -131,7 +132,7 @@ def bob(nope_logger_fixture, monkeypatch):
@pytest.fixture
def setup_client(nope_logger_fixture, bob):
def setup_client(init_fixtures):
from app.wdms_app import wdms_app, enable_alpha_feature
from app.wdms_app import app_injector
......@@ -245,11 +246,11 @@ def test_send_all_data_once(setup_client,
['MD', 'float_X', 'str_X', 'date_X']
])
def test_send_all_data_once_post_data_v2_get_data_v3(setup_client,
entity_type,
columns,
content_type_header,
create_func,
accept_content):
entity_type,
columns,
content_type_header,
create_func,
accept_content):
client, tmp_dir = setup_client
record_id = _create_record(client, entity_type)
chunking_url = Definitions[entity_type]['chunking_url']
......@@ -302,8 +303,8 @@ def test_send_all_data_once_post_data_v2_get_data_v3(setup_client,
# BELOW test cases FAIL with UPDATE mode:
# => If adding new column Date/String not starting at first index AND override an existing column
#['MD', 'date_X'],
#['MD', 'float_X', 'str_X', 'date_X'],
# ['MD', 'date_X'],
# ['MD', 'float_X', 'str_X', 'date_X'],
])
@pytest.mark.parametrize("session_mode", [
'overwrite',
......@@ -479,11 +480,11 @@ def test_add_curve_by_chunk_overlap_different_cols(setup_client, entity_type):
chunking_url = Definitions[entity_type]['chunking_url']
_create_chunks(client, entity_type, record_id=record_id, cols_ranges=[(['MD', 'A'], range(5, 10)),
(['B'], range(8)), # overlap left side
(['C'], range(8, 15)), # overlap left side
(['D'], range(6, 8)), # within
(['E'], range(15)), # overlap both side
])
(['B'], range(8)), # overlap left side
(['C'], range(8, 15)), # overlap left side
(['D'], range(6, 8)), # within
(['E'], range(15)), # overlap both side
])
data_response = client.get(f'{chunking_url}/{record_id}/data', headers={'Accept': 'application/json'})
assert data_response.status_code == 200
......@@ -617,7 +618,7 @@ def test_creates_two_sessions_one_record_with_chunks_different_format(setup_clie
_create_chunks(client, entity_type, record_id=record_id, data_format='json', cols_ranges=[(['X'], range(5, 20))])
_create_chunks(client, entity_type, record_id=record_id, data_format='parquet', cols_ranges=[(['Y'], range(5, 20)),
(['Z'], range(5, 20))])
(['Z'], range(5, 20))])
data_response = client.get(f'{chunking_url}/{record_id}/data')
assert data_response.status_code == 200
df = _create_df_from_response(data_response)
......@@ -633,7 +634,7 @@ def test_creates_two_sessions_two_record_with_chunks(setup_client, entity_type):
_create_chunks(client, entity_type, record_id=record_id, cols_ranges=[(['X'], range(5, 20))])
_create_chunks(client, entity_type, record_id=another_record_id, cols_ranges=[(['Y'], range(0, 10)),
(['Z'], range(5, 10))])
(['Z'], range(5, 10))])
data_response = client.get(f'{chunking_url}/{record_id}/data')
assert data_response.status_code == 200
df = _create_df_from_response(data_response)
......
......@@ -13,11 +13,14 @@
# limitations under the License.
import pytest
from tempfile import TemporaryDirectory
from odes_storage.models import Record, StorageAcl, Legal
import mock
import asyncio
from tempfile import TemporaryDirectory
from opencensus.trace.span_context import SpanContext
from contextlib import contextmanager
from odes_storage.models import Record, StorageAcl, Legal
from app.model.model_utils import record_to_dict
from app.utils import get_or_create_ctx
......@@ -29,10 +32,13 @@ def from_env(key, default=None):
return result
@pytest.fixture()
def ctx_fixture():
""" Create context with a fake tracer in it """
ctx = get_or_create_ctx().set_current_with_value(tracer=mock.MagicMock(), logger=NopeLogger())
mock_mock = mock.MagicMock()
mock_mock.span_context = SpanContext(trace_id="trace-id", span_id="span_id")
ctx = get_or_create_ctx().set_current_with_value(tracer=mock_mock, logger=NopeLogger())
yield ctx
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment