Commit 820cb87b authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

Merge branch 'optimize_describe' into 'master'

improve performance of "describe"

See merge request !272
parents 72bef29a 4a539a7b
Pipeline #71563 passed with stages
in 7 minutes and 39 seconds
......@@ -18,6 +18,7 @@ import time
from contextlib import suppress
from functools import wraps
from operator import attrgetter
from typing import List
import fsspec
import pandas as pd
......@@ -33,6 +34,7 @@ from app.persistence.sessions_storage import Session
from app.utils import DaskClient, capture_timings, get_ctx
from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters
from pyarrow.lib import ArrowException
import pyarrow.parquet as pa
import dask.dataframe as dd
from dask.distributed import Client as DaskDistributedClient
......@@ -162,11 +164,23 @@ class DaskBulkStorage:
aggregate_files=True,
**kwargs)
def _load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
def _load_bulk(self, record_id: str, bulk_id: str, columns: List[str] = None) -> dd.DataFrame:
"""Return a dask Dataframe of a record at the specified version.
returns a Future<dd.DataFrame>
"""
return self._load(self._get_blob_path(record_id, bulk_id))
return self._load(self._get_blob_path(record_id, bulk_id), columns=columns)
@with_trace('read_stat')
def read_stat(self, record_id: str, bulk_id: str):
"""Return some meta data about the bulk."""
file_path = self._get_blob_path(record_id, bulk_id, with_protocol=False)
dataset = pa.ParquetDataset(file_path, filesystem=self._fs)
schema = dataset.read_pandas().schema
schema_dict = {x: str(y) for (x, y) in zip(schema.names, schema.types)}
return {
"num_rows": dataset.metadata.num_rows,
"schema": schema_dict
}
def _submit_with_trace(self, target_func, *args, **kwargs):
"""
......@@ -186,10 +200,10 @@ class DaskBulkStorage:
@capture_timings('load_bulk', handlers=worker_capture_timing_handlers)
@with_trace('load_bulk')
async def load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
async def load_bulk(self, record_id: str, bulk_id: str, columns: List[str] = None) -> dd.DataFrame:
"""Return a dask Dataframe of a record at the specified version."""
try:
return await self._load_bulk(record_id, bulk_id)
return await self._load_bulk(record_id, bulk_id, columns=columns)
except OSError:
raise BulkNotFound(record_id, bulk_id) # TODO proper exception
......@@ -201,10 +215,17 @@ class DaskBulkStorage:
we should be able to change or support other format easily ?
schema={} instead of 'infer' fixes wrong inference for columns of type string starting with nan values
"""
return self._submit_with_trace(dd.to_parquet, ddf, path,
schema={},
engine='pyarrow',
storage_options=self._parameters.storage_options)
def try_to_parquet(ddf, path, storage_options):
to_parquet_args = {'engine': 'pyarrow',
'storage_options': storage_options,
}
try:
return dd.to_parquet(ddf, path, **to_parquet_args, schema="infer")
except ArrowException: # ArrowInvalid
# In some conditions, the schema is not properly infered. As a workaround, passing schema={} solve the issue.
return dd.to_parquet(ddf, path, **to_parquet_args, schema={})
return self._submit_with_trace(try_to_parquet, ddf, path, storage_options=self._parameters.storage_options)
async def _save_with_pandas(self, path, pdf: dd.DataFrame):
"""Save the dataframe to a parquet file(s).
......
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from typing import Optional, List
from fastapi import Query
......@@ -47,3 +47,12 @@ class GetDataParams:
self.curves = curves
self.describe = describe
# orient if json ?
def get_curves_list(self) -> List[str]:
"""parse the curves query parameter and return the list of requested curves"""
if self.curves:
# split and remove emty
curves = list(filter(None, map(str.strip, self.curves.split(','))))
# remove duplicates but maintain order
return list(dict.fromkeys(curves))
return []
......@@ -16,7 +16,6 @@ import asyncio
from fastapi import APIRouter, Depends, HTTPException, Request, status
from app.bulk_persistence import JSONOrient, get_dataframe
from app.bulk_persistence.bulk_id import BulkId
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage
from app.bulk_persistence.dask.errors import BulkError, BulkNotFound
......@@ -137,7 +136,7 @@ async def post_chunk_data(record_id: str,
async def get_data_version(
record_id: str, version: int,
request: Request,
ctrl_p: GetDataParams = Depends(),
data_param: GetDataParams = Depends(),
orient: JSONOrient = Depends(json_orient_parameter),
ctx: Context = Depends(get_ctx),
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
......@@ -146,19 +145,29 @@ async def get_data_version(
record = await fetch_record(ctx, record_id, version)
bulk_id, prefix = bulk_uri_access.get_bulk_uri(record=record) # TODO PATH logv2
stat = None
try:
if bulk_id is None:
raise BulkNotFound(record_id=record_id, bulk_id=None)
if prefix == BULK_URN_PREFIX_VERSION:
df = await dask_blob_storage.load_bulk(record_id, bulk_id)
columns = None
if data_param.curves:
stat = dask_blob_storage.read_stat(record_id, bulk_id)
existing_col = set(stat['schema'])
columns = DataFrameRender.get_matching_column(data_param.get_curves_list(), existing_col)
elif data_param.describe:
stat = dask_blob_storage.read_stat(record_id, bulk_id)
# loading the dataframe with filter on columns is faster than filtering columns on df
df = await dask_blob_storage.load_bulk(record_id, bulk_id, columns=columns)
elif prefix is None:
df = await get_dataframe(ctx, bulk_id)
_check_df_columns_type_legacy(df)
else:
raise BulkNotFound(record_id=record_id, bulk_id=bulk_id)
df = await DataFrameRender.process_params(df, ctrl_p)
return await DataFrameRender.df_render(df, ctrl_p, request.headers.get('Accept'), orient=orient)
df = await DataFrameRender.process_params(df, data_param)
return await DataFrameRender.df_render(df, data_param, request.headers.get('Accept'), orient=orient, stat=stat)
except BulkError as ex:
ex.raise_as_http()
......
......@@ -195,11 +195,17 @@ class DataFrameRender:
@staticmethod
@with_trace('df_render')
async def df_render(df, params: GetDataParams, accept: str = None, orient: Optional[JSONOrient] = None):
async def df_render(df, params: GetDataParams, accept: str = None, orient: Optional[JSONOrient] = None, stat=None):
if params.describe:
nb_rows: int = 0
if stat and not params.limit and not params.offset:
nb_rows = stat['num_rows']
else:
nb_rows = await DataFrameRender.get_size(df)
return {
"numberOfRows": await DataFrameRender.get_size(df),
"columns": [c for c in df.columns]
"numberOfRows": nb_rows,
"columns": list(df.columns)
}
pdf = await DataFrameRender.compute(df)
......
......@@ -13,15 +13,15 @@
# limitations under the License.
import io
from enum import Enum
from contextlib import contextmanager
import random
import numpy as np
import numpy.testing as npt
import pandas as pd
import pytest
from tests.unit.generate_data import generate_df
from .fixtures import with_wdms_env
from ..request_builders.wdms.crud.log import build_request_create_log, build_request_delete_log
from ..request_builders.wdms.session import build_delete_session
......@@ -36,13 +36,6 @@ from ..request_builders.wdms.crud.osdu_wellboretrajectory import (
build_request_delete_osdu_wellboretrajectory)
def generate_df(columns, index):
nbrows = len(index)
df = pd.DataFrame(
np.random.randint(-100, 1000, size=(nbrows, len(columns))), index=index)
df.columns = columns
return df
entity_type_dict={
"well_log": {"entity": "welllogs", "version": "v3"},
"wellbore_trajectory": {"entity": "wellboretrajectories", "version": "v3"},
......
......@@ -9,7 +9,7 @@ from app.bulk_persistence import JSONOrient
from app.model.model_chunking import GetDataParams
from app.routers.bulk.bulk_routes import DataFrameRender
from app.routers.bulk.utils import get_df_from_request
from tests.unit.generate_data import generate_df
@pytest.mark.parametrize("requested, df_columns, expected", [
(["X"], {"X"}, ["X"]),
......@@ -76,7 +76,7 @@ def basic_dataframe():
async def test_df_render_accept_parquet(default_get_params, basic_dataframe, accept):
response = await DataFrameRender.df_render(basic_dataframe, default_get_params, accept)
assert 'application/x-parquet' == response.headers.get('Content-Type')
assert response.headers.get('Content-Type') == "application/x-parquet"
assert_df_in_parquet(basic_dataframe, response.body)
......@@ -84,11 +84,22 @@ async def test_df_render_accept_parquet(default_get_params, basic_dataframe, acc
@pytest.mark.parametrize("orient", [JSONOrient.split, JSONOrient.columns])
async def test_df_render_accept_json(default_get_params, basic_dataframe, orient):
response = await DataFrameRender.df_render(basic_dataframe, default_get_params, "application/json", orient)
assert 'application/json' == response.headers.get('Content-Type')
assert response.headers.get('Content-Type') == "application/json"
actual = pd.read_json(response.body, orient=orient)
assert_frame_equal(basic_dataframe, actual)
@pytest.mark.asyncio
async def test_df_render_describe():
columns = [f'var_{i}' for i in range(10)]
data = generate_df(columns, index=range(100))
response = await DataFrameRender.df_render(data, GetDataParams(
describe=True, limit=None, curves=None, offset=None))
assert response['columns'] == columns
assert response['numberOfRows'] == 100
class RequestMock:
def __init__(self, headers: dict = {}, body=None):
self.headers = headers
......
import numpy as np
import pandas as pd
def generate_df(columns, index):
def gen_values(col_name, size):
if col_name.startswith('float'):
return np.random.random_sample(size=size)
if col_name.startswith('str'):
return [f'string_value_{i}' for i in range(size)]
if col_name.startswith('date'):
return (np.datetime64('2021-01-01') + days for days in range(size))
if col_name.startswith('array_'):
array_size = int(col_name.split('_')[1])
return [np.array(np.random.random_sample(size=array_size)) for _i in range(size)]
return np.random.randint(-100, 1000, size=size)
df = pd.DataFrame({c: gen_values(c, len(index))
for c in columns}, index=index)
return df
......@@ -20,11 +20,11 @@ import pandas as pd
import pytest
from tests.unit.test_utils import ctx_fixture, nope_logger_fixture
from tests.unit.generate_data import generate_df
import mock
from app.utils import DaskException
from app.utils import DaskClient
from dask.utils import parse_bytes
from app.helper import logger
from app.bulk_persistence.dask.dask_bulk_storage import (BulkNotFound,
BulkNotProcessable,
......@@ -34,20 +34,6 @@ from app.persistence.sessions_storage import (Session, SessionState,
SessionUpdateMode)
def generate_df(columns, index):
def gen_values(col_name, size):
if col_name.startswith('float'):
return np.random.random_sample(size=size)
if col_name.startswith('str'):
return [f'string_value_{i}' for i in range(size)]
if col_name.startswith('date'):
return (np.datetime64('2021-01-01') + days for days in range(size))
return np.random.randint(-100, 1000, size=size)
df = pd.DataFrame({c: gen_values(c, len(index))
for c in columns}, index=index)
return df
@pytest.fixture(scope="module")
def event_loop(): # all tests will share the same loop
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment