Commit 5f3cda95 authored by Cyril Monmouton's avatar Cyril Monmouton
Browse files

Feature/asynchronize df processing

parent 0c44de28
......@@ -13,6 +13,7 @@
# limitations under the License.
import asyncio
from functools import partial
from io import BytesIO
from typing import Union, AnyStr, IO, Optional, List, Dict
......@@ -61,19 +62,19 @@ class DataframeSerializerSync:
def to_json(cls,
df: DataframeClass,
orient: Union[str, JSONOrient] = JSONOrient.split,
path_or_buf: Optional[Union[str, Path, IO[AnyStr]]] = None) -> Optional[str]:
**kwargs) -> Optional[str]:
"""
:param df: dataframe to dump
:param orient: format for Json, default is split
:param path_or_buf: File path or object. If not specified, the result is returned as a string.
:param kwargs: keyword arguments will be forwarded to pandas.to_json()
:return: None or json string of path_or_buf is None
"""
orient = JSONOrient.get(orient)
return df.fillna("NaN").to_json(path_or_buf=path_or_buf, orient=orient.value)
return df.fillna("NaN").to_json(orient=orient.value, **kwargs)
@classmethod
def read_parquet(cls, data) -> 'DataframeSerializerAsync.DataframeClass':
def read_parquet(cls, data) -> DataframeClass:
"""
:param data: bytes, path object or file-like object
:return: dataframe
......@@ -85,7 +86,7 @@ class DataframeSerializerSync:
return pd.read_parquet(data)
@classmethod
def read_json(cls, data, orient: Union[str, JSONOrient], convert_axes: Optional[bool] = None) -> 'DataframeSerializerAsync.DataframeClass':
def read_json(cls, data, orient: Union[str, JSONOrient], convert_axes: Optional[bool] = None) -> DataframeClass:
"""
:param data: bytes str content (valid JSON str), path object or file-like object
:param orient:
......@@ -100,14 +101,26 @@ class DataframeSerializerAsync:
def __init__(self, pool_executor=get_pool_executor()):
self.executor = pool_executor
@with_trace("Parquet bulk serialization")
async def to_parquet(self, df: DataframeClass, *args, **kwargs) -> DataframeClass:
func = partial(df.to_parquet, *args, **kwargs)
return await asyncio.get_event_loop().run_in_executor(self.executor, func)
@with_trace("JSON bulk serialization")
async def to_json(self,
df: DataframeClass,
orient: Union[str, JSONOrient] = JSONOrient.split,
path_or_buf: Optional[Union[str, Path, IO[AnyStr]]] = None) -> Optional[str]:
return await asyncio.get_event_loop().run_in_executor(
self.executor, DataframeSerializerSync.to_json, df, orient, path_or_buf
)
*args, **kwargs) -> Optional[str]:
func = partial(DataframeSerializerSync.to_json, df, orient, *args, **kwargs)
return await asyncio.get_event_loop().run_in_executor(self.executor, func)
@with_trace("CSV bulk serialization")
async def to_csv(self, df: DataframeClass, *args, **kwargs) -> Optional[str]:
df = df.fillna("NaN")
func = partial(df.to_csv, *args, **kwargs)
return await asyncio.get_event_loop().run_in_executor(self.executor, func)
@with_trace("Parquet bulk deserialization")
async def read_parquet(self, data) -> DataframeClass:
......
......@@ -176,7 +176,6 @@ class DataFrameRender:
selected.extend(natsorted(matching_columns))
return selected
@staticmethod
@with_trace('process_params')
async def process_params(df, params: GetDataParams):
......@@ -194,7 +193,6 @@ class DataFrameRender:
return df
@staticmethod
@with_trace('df_render')
async def df_render(df, params: GetDataParams, accept: str = None, orient: Optional[JSONOrient] = None):
......@@ -208,18 +206,19 @@ class DataFrameRender:
pdf.index.name = None # TODO
if not accept or MimeTypes.PARQUET.type in accept:
return Response(pdf.to_parquet(engine="pyarrow"), media_type=MimeTypes.PARQUET.type)
content = await DataframeSerializerAsync().to_parquet(pdf, engine="pyarrow")
return Response(content, media_type=MimeTypes.PARQUET.type)
if MimeTypes.JSON.type in accept:
return Response(
pdf.to_json(index=True, date_format='iso', orient=orient.value), media_type=MimeTypes.JSON.type
)
content = await DataframeSerializerAsync().to_json(pdf, index=True, date_format='iso', orient=orient.value)
return Response(content, media_type=MimeTypes.JSON.type)
if MimeTypes.CSV.type in accept:
return Response(pdf.to_csv(), media_type=MimeTypes.CSV.type)
content = await DataframeSerializerAsync().to_csv(pdf)
return Response(content, media_type=MimeTypes.CSV.type)
# in any other case => Parquet anyway?
return Response(pdf.to_parquet(engine="pyarrow"), media_type=MimeTypes.PARQUET.type)
content = await DataframeSerializerAsync().to_parquet(pdf, engine="pyarrow")
return Response(content, media_type=MimeTypes.PARQUET.type)
async def set_bulk_field_and_send_record(ctx: Context, bulk_id, record, bulk_uri_access: BulkIdAccess):
......
......@@ -77,7 +77,9 @@ def _create_df_from_response(response):
elif content_type == 'text/csv; charset=utf-8':
return pd.read_csv(f, index_col=0)
elif content_type == 'application/json':
return pd.read_json(f, dtype=True, orient='split', convert_axes=False)
return pd.read_json(f, dtype=True, orient='split', convert_axes=False).replace("NaN", np.NaN)
elif content_type == 'application/csv':
return pd.read_csv(f, dtype=True).replace("NaN", np.NaN)
else:
raise ValueError(f"Unknown content-type: '{content_type}'")
......@@ -222,6 +224,7 @@ def test_post_data_merge_extension_properties(setup_client):
@pytest.mark.parametrize("accept_content", [
'application/x-parquet',
'application/json',
'text/csv; charset=utf-8',
])
@pytest.mark.parametrize("columns", [
['MD', 'X'],
......@@ -254,7 +257,7 @@ def test_send_all_data_once(setup_client,
assert get_response.status_code == 200
result_df = _create_df_from_response(get_response)
if content_type_header.endswith('parquet') and accept_content.endswith('json'):
if content_type_header.endswith('parquet') and not accept_content.endswith('parquet'):
result_df = _cast_datetime_to_datetime64_ns(result_df)
if content_type_header.endswith('json'):
......@@ -328,7 +331,7 @@ def test_send_all_data_once_post_data_v2_get_data_v3(setup_client,
])
@pytest.mark.parametrize("accept_content", [
'application/x-parquet',
# 'text/csv; charset=utf-8',
'text/csv; charset=utf-8',
'application/json',
])
@pytest.mark.parametrize("columns", [
......@@ -789,6 +792,7 @@ def test_nat_sort_columns(setup_client, data_format, accept_content, columns_nam
response_df = _create_df_from_response(data_response)
assert list(response_df.columns) == columns_name
@pytest.mark.parametrize("entity_type", ['WellLog', 'Log'])
def test_session_update_previous_version(setup_client, entity_type):
""" create a session update on a previous version """
......@@ -797,7 +801,7 @@ def test_session_update_previous_version(setup_client, entity_type):
record_id = _create_record(client, entity_type)
chunking_url = Definitions[entity_type]['chunking_url']
base_url = Definitions[entity_type]['base_url']
headers = headers={'Content-Type': 'application/x-parquet'}
headers = {'Content-Type': 'application/x-parquet'}
nb_rows = 5
version_data = [
generate_df(['MD', 'X', 'Y'], range(nb_rows)),
......@@ -812,7 +816,6 @@ def test_session_update_previous_version(setup_client, entity_type):
headers=headers)
assert write_response.status_code == 200
versions_response = client.get(f'{base_url}/{record_id}/versions')
assert versions_response.status_code == 200
versions = versions_response.json()['versions']
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment