Commit 5f7c11e5 authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

Merge branch 'jhallal/dask_update_and_optimization' into 'master'

Jhallal/dask update and optimization

See merge request !204
parents dd5c8192 f677e90b
Pipeline #56848 passed with stages
in 15 minutes and 5 seconds
......@@ -25,7 +25,7 @@ from app.bulk_persistence import BulkId
from app.bulk_persistence.dask.errors import BulkNotFound, BulkNotProcessable
from app.bulk_persistence.dask.traces import wrap_trace_process
from app.bulk_persistence.dask.utils import (SessionFileMeta, by_pairs,
do_merge, set_index,
do_merge,
worker_capture_timing_handlers)
from app.helper.logger import get_logger
from app.helper.traces import with_trace
......@@ -106,7 +106,7 @@ class DaskBulkStorage:
dask_client = dask_client or await DaskClient.create()
if DaskBulkStorage.client is not dask_client: # executed only once per dask client
DaskBulkStorage.client = dask_client
if parameters.register_fsspec_implementation:
parameters.register_fsspec_implementation()
......@@ -149,10 +149,17 @@ class DaskBulkStorage:
"""Read a Parquet file into a Dask DataFrame
path : string or list
**kwargs: dict (of dicts) Passthrough key-word arguments for read backend.
read_parquet parameters:
chunksize='25M': if chunk are too small, we aggregate them until we reach chunksize
aggregate_files=True: because we are passing a list of path when commiting a session,
aggregate_files is needed when paths are different
"""
return self._submit_with_trace(dd.read_parquet, path,
engine='pyarrow-dataset',
storage_options=self._parameters.storage_options,
chunksize='25M',
aggregate_files=True,
**kwargs)
def _load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
......@@ -303,14 +310,11 @@ class DaskBulkStorage:
@internal_bulk_exceptions
async def session_commit(self, session: Session, from_bulk_id: str = None) -> str:
dfs = [self._load(pf) for pf in self._get_next_files_list(session)]
if from_bulk_id:
dfs.insert(0, self._load_bulk(session.recordId, from_bulk_id))
if not dfs:
raise BulkNotProcessable("No data to commit")
if len(dfs) > 1: # set_index is not needed if no merge operations are done
dfs = self._map_with_trace(set_index, dfs)
if from_bulk_id:
dfs.insert(0, self._load_bulk(session.recordId, from_bulk_id))
while len(dfs) > 1:
dfs = [self._submit_with_trace(do_merge, a, b) for a, b in by_pairs(dfs)]
......
......@@ -20,6 +20,8 @@ from logging import INFO
from app.helper.logger import get_logger
from app.utils import capture_timings
import dask.dataframe as dd
def worker_make_log_captured_timing_handler(level=INFO):
"""log captured timing from the worker subprocess (no access to context)"""
......@@ -71,25 +73,25 @@ class SessionFileMeta:
@capture_timings("set_index", handlers=worker_capture_timing_handlers)
def set_index(ddf): # TODO
def set_index(ddf: dd.DataFrame):
"""Set index of the dask dataFrame only if needed."""
if not ddf.known_divisions or '_idx' not in ddf:
if '_idx' not in ddf:
ddf['_idx'] = ddf.index # we need to create a temporary variable to set it as index
ddf['_idx'] = ddf['_idx'].astype(ddf.index.dtype)
return ddf.set_index('_idx', sorted=True)
if not ddf.known_divisions:
return ddf.set_index(ddf.index, sorted=True)
return ddf
@capture_timings("do_merge", handlers=worker_capture_timing_handlers)
def do_merge(df1, df2):
def do_merge(df1: dd.DataFrame, df2: dd.DataFrame):
"""Combine the 2 dask dataframe. Updates df1 with df2 values if overlap."""
if df2 is None:
return df1
df1 = set_index(df1)
df2 = set_index(df2)
if share_items(df1.columns, df2.columns):
ddf = df2.combine_first(df1)
else:
ddf = df1.join(df2, how='outer') # join seems faster when there no columns in common
return ddf[sorted(ddf.columns)]
return ddf
......@@ -27,9 +27,9 @@ click==8.0.1
cloudpickle==1.6.0
colorama==0.4.4
cryptography==3.4.7
dask==2021.6.2
dask==2021.7.2
decorator==5.0.9
distributed==2021.6.2
distributed==2021.7.2
fastapi==0.66.0
fsspec==2021.7.0
gcsfs==2021.7.0
......
......@@ -29,9 +29,9 @@ cloudpickle==1.6.0
colorama==0.4.4
coverage==5.5
cryptography==3.4.7
dask==2021.6.2
dask==2021.7.2
decorator==5.0.9
distributed==2021.6.2
distributed==2021.7.2
fastapi==0.66.0
fsspec==2021.7.0
gcsfs==2021.7.0
......
......@@ -20,7 +20,7 @@ opencensus-ext-ocagent
opencensus-ext-logging
# for chunking feature
dask[distributed]==2021.6.2
dask[distributed]==2021.7.2
fsspec
python-ulid
......
......@@ -254,10 +254,17 @@ async def test_session_update_ovelap_by_column(test_session, dask_storage: DaskB
@pytest.mark.asyncio
async def test_bad_bulkId_commit(test_session, dask_storage: DaskBulkStorage):
await dask_storage.session_add_chunk(test_session, generate_df(['A'], range(10)))
with pytest.raises(BulkNotFound):
await dask_storage.session_commit(test_session, from_bulk_id="bad_bulk_id")
@pytest.mark.asyncio
async def test_empty_session_commit(test_session, dask_storage: DaskBulkStorage):
with pytest.raises(BulkNotProcessable):
await dask_storage.session_commit(test_session, from_bulk_id=test_session.recordId)
@pytest.mark.asyncio
async def test_all_type(test_session, dask_storage: DaskBulkStorage):
df_ref = generate_df(['dateD', 'floatB', 'intA', 'strC'], range(5))
......
......@@ -767,7 +767,7 @@ def test_nat_sort_columns(setup_client, data_format, accept_content, columns_nam
@pytest.mark.parametrize("entity_type", ['WellLog', 'Log'])
def test_session_update_previous_version(setup_client, entity_type):
""" create a session opdate on a previous version """
""" create a session update on a previous version """
client = setup_client
record_id = _create_record(client, entity_type)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment