Commit b4f1808a authored by Cyril Monmouton's avatar Cyril Monmouton
Browse files

Dask-Tracing/Enh: add support for Dask.map()

parent db1fcd1c
Pipeline #50660 passed with stages
in 12 minutes and 21 seconds
......@@ -18,7 +18,6 @@ import json
import time
from contextlib import suppress
from functools import wraps
from logging import getLogger
from operator import attrgetter
import fsspec
import pandas as pd
......@@ -151,13 +150,10 @@ class DaskBulkStorage:
**kwargs: dict (of dicts) Passthrough key-word arguments for read backend.
"""
get_logger().debug(f"loading bulk : {path}")
return self.client.submit(wrap_trace_process,
dd.read_parquet,
get_ctx().tracer.span_context,
path,
engine='pyarrow-dataset',
storage_options=self._parameters.storage_options,
**kwargs)
return self._submit_with_trace(dd.read_parquet, path,
engine='pyarrow-dataset',
storage_options=self._parameters.storage_options,
**kwargs)
def _load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
"""Return a dask Dataframe of a record at the specified version.
......@@ -169,10 +165,17 @@ class DaskBulkStorage:
"""
Submit given target_func to Distributed Dask workers and add tracing required stuff
"""
return self.client.submit(wrap_trace_process,
target_func,
get_ctx().tracer.span_context,
*args, **kwargs)
kwargs['span_context'] = get_ctx().tracer.span_context
kwargs['target_func'] = target_func
return self.client.submit(wrap_trace_process, *args, **kwargs)
def _map_with_trace(self, target_func, *args, **kwargs):
"""
Submit given target_func to Distributed Dask workers and add tracing required stuff
"""
kwargs['span_context'] = get_ctx().tracer.span_context
kwargs['target_func'] = target_func
return self.client.map(wrap_trace_process, *args, **kwargs)
@capture_timings('load_bulk', handlers=worker_capture_timing_handlers)
@with_trace('load_bulk')
......@@ -191,27 +194,19 @@ class DaskBulkStorage:
we should be able to change or support other format easily ?
schema={} instead of 'infer' fixes wrong inference for columns of type string starting with nan values
"""
return self.client.submit(wrap_trace_process,
dd.to_parquet,
get_ctx().tracer.span_context,
ddf, path,
schema={},
engine='pyarrow',
storage_options=self._parameters.storage_options)
return self._submit_with_trace(dd.to_parquet, ddf, path,
schema={},
engine='pyarrow',
storage_options=self._parameters.storage_options)
def _save_with_pandas(self, path, pdf: dd.DataFrame):
"""Save the dataframe to a parquet file(s).
pdf: pd.DataFrame or Future<pd.DataFrame>
returns a Future<None>
"""
return self.client.submit(wrap_trace_process,
pdf.to_parquet,
get_ctx().tracer.span_context,
path,
span_context=get_ctx().tracer.span_context,
engine='pyarrow',
storage_options=self._parameters.storage_options)
return self._submit_with_trace(pdf.to_parquet, path,
engine='pyarrow',
storage_options=self._parameters.storage_options)
def _check_incoming_chunk(self, df):
# TODO should we test if is_monotonic?, unique ?
......@@ -318,14 +313,14 @@ class DaskBulkStorage:
if not dfs:
raise BulkNotProcessable("No data to commit")
dfs = self.client.map(set_index, dfs)
dfs = self.client.map(wrap_trace_process,
dfs,
target_func=set_index,
span_context=get_ctx().tracer.span_context,
)
while len(dfs) > 1:
dfs = [self.client.submit(wrap_trace_process,
do_merge,
get_ctx().tracer.span_context,
a, b)
for a, b in by_pairs(dfs)]
dfs = [self._submit_with_trace(do_merge, a, b) for a, b in by_pairs(dfs)]
return await self.save_blob(dfs[0], record_id=session.recordId)
......@@ -336,14 +331,15 @@ class DaskBulkStorage:
return result + 42
async def test_method_in_dask(self) -> dd.DataFrame:
fut1 = self._submit_with_trace(self._test_method_2, 0)
fut2 = self._submit_with_trace(self._test_method_2, fut1)
responses = await fut2
return responses
# return await self._submit_with_trace(dd.to_parquet, None, "42",
# schema="infer",
# engine='pyarrow',
# storage_options=self._parameters.storage_options)
# fut1 = self._submit_with_trace(self._test_method_2, 0)
# fut2 = self._submit_with_trace(self._test_method_2, fut1)
# responses = await fut2
# return responses
return await self._submit_with_trace(dd.to_parquet,
None, "42",
schema="infer",
engine='pyarrow',
storage_options=self._parameters.storage_options)
async def make_local_dask_bulk_storage(base_directory: str) -> DaskBulkStorage:
......
......@@ -5,14 +5,23 @@ from opencensus.trace.samplers import AlwaysOnSampler
from app.helper.traces import create_exporter
from app.conf import Config
_EXPORTER = None
def wrap_trace_process(target_func, span_context, *args, **kwargs):
if not span_context:
raise AttributeError("span_content cannot be null")
def wrap_trace_process(*args, **kwargs):
global _EXPORTER
target_func = kwargs.pop('target_func')
span_context = kwargs.pop('span_context')
if not span_context or not target_func:
raise AttributeError("Keyword arguments should contain 'target_func' and 'span_context'")
if _EXPORTER is None:
_EXPORTER = create_exporter(service_name=Config.service_name.value)
tracer = open_tracer.Tracer(span_context=span_context,
sampler=AlwaysOnSampler(),
exporter=create_exporter(service_name=Config.service_name.value))
exporter=_EXPORTER)
with tracer.span(name=f"Dask Worker - {target_func.__name__}") as span:
span.span_kind = SpanKind.CLIENT
......
......@@ -26,7 +26,6 @@ from app import conf
import pandas as pd
from tests.unit.persistence.dask_blob_storage_test import generate_df
Definitions = {
'WellLog': {
'api_version': 'v3',
......@@ -64,6 +63,7 @@ Definitions = {
EntityTypeParams = ['WellLog', 'WellboreTrajectory', 'Log']
def _create_df_from_response(response):
f = io.BytesIO(response.content)
f.seek(0)
......@@ -118,12 +118,12 @@ def _cast_datetime_to_datetime64_ns(result_df):
for name, col in result_df.items():
if name.startswith('date'):
result_df[name] = result_df[name].astype('datetime64[ns]')
return result_df
@pytest.fixture
def bob(nope_logger_fixture, monkeypatch):
def init_fixtures(nope_logger_fixture, monkeypatch):
with TemporaryDirectory() as tmp_dir:
monkeypatch.setenv(name='USE_LOCALFS_BLOB_STORAGE_WITH_PATH', value=tmp_dir)
conf.Config = conf.ConfigurationContainer.with_load_all()
......@@ -131,7 +131,7 @@ def bob(nope_logger_fixture, monkeypatch):
@pytest.fixture
def setup_client(nope_logger_fixture, bob):
def setup_client(init_fixtures):
from app.wdms_app import wdms_app, enable_alpha_feature
from app.wdms_app import app_injector
......@@ -246,11 +246,11 @@ def test_send_all_data_once(setup_client,
['MD', 'float_X', 'str_X', 'date_X']
])
def test_send_all_data_once_post_data_v2_get_data_v3(setup_client,
entity_type,
columns,
content_type_header,
create_func,
accept_content):
entity_type,
columns,
content_type_header,
create_func,
accept_content):
client, tmp_dir = setup_client
record_id = _create_record(client, entity_type)
chunking_url = Definitions[entity_type]['chunking_url']
......@@ -303,8 +303,8 @@ def test_send_all_data_once_post_data_v2_get_data_v3(setup_client,
# BELOW test cases FAIL with UPDATE mode:
# => If adding new column Date/String not starting at first index AND override an existing column
#['MD', 'date_X'],
#['MD', 'float_X', 'str_X', 'date_X'],
# ['MD', 'date_X'],
# ['MD', 'float_X', 'str_X', 'date_X'],
])
@pytest.mark.parametrize("session_mode", [
'overwrite',
......@@ -480,11 +480,11 @@ def test_add_curve_by_chunk_overlap_different_cols(setup_client, entity_type):
chunking_url = Definitions[entity_type]['chunking_url']
_create_chunks(client, entity_type, record_id=record_id, cols_ranges=[(['MD', 'A'], range(5, 10)),
(['B'], range(8)), # overlap left side
(['C'], range(8, 15)), # overlap left side
(['D'], range(6, 8)), # within
(['E'], range(15)), # overlap both side
])
(['B'], range(8)), # overlap left side
(['C'], range(8, 15)), # overlap left side
(['D'], range(6, 8)), # within
(['E'], range(15)), # overlap both side
])
data_response = client.get(f'{chunking_url}/{record_id}/data', headers={'Accept': 'application/json'})
assert data_response.status_code == 200
......@@ -618,7 +618,7 @@ def test_creates_two_sessions_one_record_with_chunks_different_format(setup_clie
_create_chunks(client, entity_type, record_id=record_id, data_format='json', cols_ranges=[(['X'], range(5, 20))])
_create_chunks(client, entity_type, record_id=record_id, data_format='parquet', cols_ranges=[(['Y'], range(5, 20)),
(['Z'], range(5, 20))])
(['Z'], range(5, 20))])
data_response = client.get(f'{chunking_url}/{record_id}/data')
assert data_response.status_code == 200
df = _create_df_from_response(data_response)
......@@ -634,7 +634,7 @@ def test_creates_two_sessions_two_record_with_chunks(setup_client, entity_type):
_create_chunks(client, entity_type, record_id=record_id, cols_ranges=[(['X'], range(5, 20))])
_create_chunks(client, entity_type, record_id=another_record_id, cols_ranges=[(['Y'], range(0, 10)),
(['Z'], range(5, 10))])
(['Z'], range(5, 10))])
data_response = client.get(f'{chunking_url}/{record_id}/data')
assert data_response.status_code == 200
df = _create_df_from_response(data_response)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment