Skip to content
Snippets Groups Projects
Commit 72774d10 authored by Yuli Liu's avatar Yuli Liu
Browse files

improve code

parent ee3a2bcb
No related branches found
No related tags found
1 merge request!291initiate filtering capabilities
Pipeline #78153 waiting for manual action
......@@ -150,7 +150,6 @@ async def get_data_version(
data_param: GetDataParams = Depends(),
orient: JSONOrient = Depends(json_orient_parameter),
ctx: Context = Depends(get_ctx),
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
bulk_uri_access: BulkIdAccess = Depends(get_bulk_id_access)
):
record = await fetch_record(ctx, record_id, version)
......@@ -170,30 +169,7 @@ async def get_data_version(
df = await get_dataframe(ctx, bulk_id)
auto_cast_columns_to_string(df)
else:
columns_to_load = None
stat = dask_blob_storage.read_stat(record_id, bulk_id)
existing_col = set(stat['schema'])
if data_param.curves:
columns_to_load = DataFrameRender.get_matching_column(
data_param.get_curves_list(), existing_col) # add curve needed for filtering
stat['schema'] = { k: stat['schema'][k] for k in columns_to_load }
if data_param.bulk_filter:
# get column needed for filtering which are not yet in columns
filters = data_param.get_filters()
invalid_columns = [c for c in filters.keys() if c not in existing_col]
if invalid_columns:
raise FilterError(f'The columns:{invalid_columns} to be filtered do not exist')
if columns_to_load:
columns_to_load.extend(filters)
columns_to_load = set(columns_to_load)
if data_param.describe and not data_param.offset and not data_param.limit and not data_param.bulk_filter:
# optimization: create a fake dataset when describe on all rows
df = pd.DataFrame()
else:
# loading the dataframe with filter on columns is faster than filtering columns on df
df = await dask_blob_storage.load_bulk(record_id, bulk_id, columns=columns_to_load)
df, filters, stat = await _process_request_v1(record_id, bulk_id, data_param, filters)
df = await DataFrameRender.process_params(df, data_param, filters=filters)
return await DataFrameRender.df_render(df, data_param, request.headers.get('Accept'), orient=orient, stat=stat)
......@@ -201,6 +177,35 @@ async def get_data_version(
ex.raise_as_http()
async def _process_request_v1(record_id: str, bulk_id: str, data_param: GetDataParams, filters):
dask_blob_storage: DaskBulkStorage = await with_dask_blob_storage()
columns_to_load = None
stat = dask_blob_storage.read_stat(record_id, bulk_id)
existing_col = set(stat['schema'])
if data_param.curves:
columns_to_load = DataFrameRender.get_matching_column(
data_param.get_curves_list(), existing_col) # add curve needed for filtering
stat['schema'] = {k: stat['schema'][k] for k in columns_to_load}
if data_param.bulk_filter:
# get column needed for filtering which are not yet in columns
filters = data_param.get_filters()
invalid_columns = [c for c in filters.keys() if c not in existing_col]
if invalid_columns:
raise FilterError(f'The columns:{invalid_columns} to be filtered do not exist')
if columns_to_load:
columns_to_load.extend(filters)
columns_to_load = set(columns_to_load)
if data_param.describe and not data_param.offset and not data_param.limit and not data_param.bulk_filter:
# optimization: create a fake dataset when describe on all rows
df = pd.DataFrame()
else:
# loading the dataframe with filter on columns is faster than filtering columns on df
df = await dask_blob_storage.load_bulk(record_id, bulk_id, columns=columns_to_load)
return df, filters, stat
@router.get(
"/{record_id}/data",
summary='Returns the data according to the specified query parameters.',
......@@ -225,10 +230,9 @@ async def get_data(
ctrl_p: GetDataParams = Depends(),
orient: JSONOrient = Depends(json_orient_parameter),
ctx: Context = Depends(get_ctx),
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
bulk_uri_access: BulkIdAccess = Depends(get_bulk_id_access)
):
return await get_data_version(record_id, None, request, ctrl_p, orient, ctx, dask_blob_storage, bulk_uri_access)
return await get_data_version(record_id, None, request, ctrl_p, orient, ctx, bulk_uri_access)
@router.patch(
......
......@@ -1097,6 +1097,8 @@ def test_get_bulk_data_with_filters(setup_client, entity_type, params, expected,
response_get_data = client.get(f'{chunking_url}/{record_id}/data', headers=header_get_data,
params={'filter': params})
assert response_get_data.status_code == 200
df = _create_df_from_response(response_get_data)
assert_frame_equal(df, expected(dataframe_for_filters))
......@@ -1120,6 +1122,8 @@ def test_get_bulk_data_with_filters_curves_offset(setup_client, entity_type, fil
for i in range(0, math.ceil(20/limit)):
response_get_data = client.get(f'{chunking_url}/{record_id}/data', headers=header_get_data,
params={'filter': filter, 'curves': curve, 'offset': i*limit, 'limit': limit})
assert response_get_data.status_code == 200
df = _create_df_from_response(response_get_data)
df_expected = expected(dataframe_for_filters).iloc[i*limit:(i+1)*limit][['A', 'B']]
assert_frame_equal(df, df_expected)
......@@ -1144,6 +1148,8 @@ def test_get_bulk_data_with_filters_curves_offset_describe(setup_client, entity_
for i in range(0, math.ceil(20/limit)):
response_get_data = client.get(f'{chunking_url}/{record_id}/data', headers=header_get_data,
params={'filter': filter, 'curves': curves, 'offset': i*limit, 'limit': limit, 'describe': True})
assert response_get_data.status_code == 200
assert response_get_data.json()['numberOfRows'] == expected[i]
assert response_get_data.json()['columns'] == curves[0].split(',')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment