Commit 487219be authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

add trace and code cleaup

parent 902246e5
Pipeline #71134 failed with stages
in 26 minutes and 43 seconds
......@@ -170,6 +170,7 @@ class DaskBulkStorage:
"""
return self._load(self._get_blob_path(record_id, bulk_id), columns=columns)
@with_trace('read_stat')
def read_stat(self, record_id: str, bulk_id: str):
"""Return some meta data about the bulk."""
file_path = self._get_blob_path(record_id, bulk_id, with_protocol=False)
......@@ -217,7 +218,6 @@ class DaskBulkStorage:
def try_to_parquet(ddf, path, storage_options):
to_parquet_args = {'engine': 'pyarrow',
'storage_options': storage_options,
#"row_group_size": 50_000
}
try:
return dd.to_parquet(ddf, path, **to_parquet_args, schema="infer")
......
......@@ -136,7 +136,7 @@ async def post_chunk_data(record_id: str,
async def get_data_version(
record_id: str, version: int,
request: Request,
ctrl_p: GetDataParams = Depends(),
data_param: GetDataParams = Depends(),
orient: JSONOrient = Depends(json_orient_parameter),
ctx: Context = Depends(get_ctx),
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
......@@ -152,9 +152,9 @@ async def get_data_version(
if prefix == BULK_URN_PREFIX_VERSION:
columns = None
stat = dask_blob_storage.read_stat(record_id, bulk_id)
if ctrl_p.curves:
if data_param.curves:
existing_col = set(stat['schema'])
columns = DataFrameRender.get_matching_column(ctrl_p.get_curves_list(), existing_col)
columns = DataFrameRender.get_matching_column(data_param.get_curves_list(), existing_col)
# loading the dataframe with filter on columns is faster than filtering columns on df
df = await dask_blob_storage.load_bulk(record_id, bulk_id, columns=columns)
elif prefix is None:
......@@ -163,8 +163,8 @@ async def get_data_version(
else:
raise BulkNotFound(record_id=record_id, bulk_id=bulk_id)
df = await DataFrameRender.process_params(df, ctrl_p)
return await DataFrameRender.df_render(df, ctrl_p, request.headers.get('Accept'), orient=orient, stat=stat)
df = await DataFrameRender.process_params(df, data_param)
return await DataFrameRender.df_render(df, data_param, request.headers.get('Accept'), orient=orient, stat=stat)
except BulkError as ex:
ex.raise_as_http()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment