Commit cbf989c7 authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

improve performance of "describe"

parent 2967605b
Pipeline #71094 failed with stages
in 2 minutes and 26 seconds
......@@ -33,6 +33,7 @@ from app.persistence.sessions_storage import Session
from app.utils import DaskClient, capture_timings, get_ctx
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from pyarrow.lib import ArrowException
import pyarrow.parquet as pa
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient
......@@ -162,11 +163,22 @@ class DaskBulkStorage:
aggregate_files=True,
**kwargs)
def _load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
def _load_bulk(self, record_id: str, bulk_id: str, columns: List[str] = None) -> dd.DataFrame:
"""Return a dask Dataframe of a record at the specified version.
returns a Future<dd.DataFrame>
"""
return self._load(self._get_blob_path(record_id, bulk_id))
return self._load(self._get_blob_path(record_id, bulk_id), columns=columns)
def read_stat(self, record_id: str, bulk_id: str):
"""Return some meta data about the bulk."""
file_path = self._get_blob_path(record_id, bulk_id, with_protocol=False)
dataset = pa.ParquetDataset(file_path, filesystem=self._fs)
schema = dataset.read_pandas().schema
schema_dict = {x: str(y) for (x, y) in zip(schema.names, schema.types)}
return {
"num_rows": dataset.metadata.num_rows,
"schema": schema_dict
}
def _submit_with_trace(self, target_func, *args, **kwargs):
"""
......@@ -186,10 +198,10 @@ class DaskBulkStorage:
@capture_timings('load_bulk', handlers=worker_capture_timing_handlers)
@with_trace('load_bulk')
async def load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
async def load_bulk(self, record_id: str, bulk_id: str, columns: List[str] = None) -> dd.DataFrame:
"""Return a dask Dataframe of a record at the specified version."""
try:
return await self._load_bulk(record_id, bulk_id)
return await self._load_bulk(record_id, bulk_id, columns=columns)
except OSError:
raise BulkNotFound(record_id, bulk_id) # TODO proper exception
......@@ -201,10 +213,18 @@ class DaskBulkStorage:
we should be able to change or support other format easily ?
schema={} instead of 'infer' fixes wrong inference for columns of type string starting with nan values
"""
return self._submit_with_trace(dd.to_parquet, ddf, path,
schema={},
engine='pyarrow',
storage_options=self._parameters.storage_options)
def try_to_parquet(ddf, path, storage_options):
to_parquet_args = {'engine': 'pyarrow',
'storage_options': storage_options,
#"row_group_size": 50_000
}
try:
return dd.to_parquet(ddf, path, **to_parquet_args, schema="infer")
except ArrowException: # ArrowInvalid
# In some conditions, the schema is not properly infered. As a workaround, passing schema={} solve the issue.
return dd.to_parquet(ddf, path, **to_parquet_args, schema={})
return self._submit_with_trace(try_to_parquet, ddf, path, storage_options=self._parameters.storage_options)
async def _save_with_pandas(self, path, pdf: dd.DataFrame):
"""Save the dataframe to a parquet file(s).
......
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from typing import Optional, List
from fastapi import Query
......@@ -47,3 +47,12 @@ class GetDataParams:
self.curves = curves
self.describe = describe
# orient if json ?
def get_curves_list(self) -> List[str]:
"""parse the curves query parameter and return the list of requested curves"""
if self.curves:
# spit and remove emty
curves = list(filter(None, map(str.strip, self.curves.split(','))))
# remove duplicates but maintain order
return list(dict.fromkeys(curves))
return []
......@@ -146,11 +146,18 @@ async def get_data_version(
record = await fetch_record(ctx, record_id, version)
bulk_id, prefix = bulk_uri_access.get_bulk_uri(record=record) # TODO PATH logv2
stat = None
try:
if bulk_id is None:
raise BulkNotFound(record_id=record_id, bulk_id=None)
if prefix == BULK_URN_PREFIX_VERSION:
df = await dask_blob_storage.load_bulk(record_id, bulk_id)
columns = None
stat = dask_blob_storage.read_stat(record_id, bulk_id)
if ctrl_p.curves:
existing_col = set(stat['schema'])
columns = DataFrameRender.get_matching_column(ctrl_p.get_curves_list(), existing_col)
# loading the dataframe with filter on columns is faster than filtering columns on df
df = await dask_blob_storage.load_bulk(record_id, bulk_id, columns=columns)
elif prefix is None:
df = await get_dataframe(ctx, bulk_id)
_check_df_columns_type_legacy(df)
......@@ -158,7 +165,7 @@ async def get_data_version(
raise BulkNotFound(record_id=record_id, bulk_id=bulk_id)
df = await DataFrameRender.process_params(df, ctrl_p)
return await DataFrameRender.df_render(df, ctrl_p, request.headers.get('Accept'), orient=orient)
return await DataFrameRender.df_render(df, ctrl_p, request.headers.get('Accept'), orient=orient, stat=stat)
except BulkError as ex:
ex.raise_as_http()
......
......@@ -195,11 +195,17 @@ class DataFrameRender:
@staticmethod
@with_trace('df_render')
async def df_render(df, params: GetDataParams, accept: str = None, orient: Optional[JSONOrient] = None):
async def df_render(df, params: GetDataParams, accept: str = None, orient: Optional[JSONOrient] = None, stat=None):
if params.describe:
nb_rows: int = 0
if stat and not params.limit and not params.offset:
nb_rows = stat['num_rows']
else:
nb_rows = await DataFrameRender.get_size(df)
return {
"numberOfRows": await DataFrameRender.get_size(df),
"columns": [c for c in df.columns]
"numberOfRows": nb_rows,
"columns": list(df.columns)
}
pdf = await DataFrameRender.compute(df)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment