Commit 83947f7a authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

natural column sort

parent 8b5b3b88
Pipeline #53647 failed with stage
in 22 seconds
......@@ -86,7 +86,7 @@ class DataframeSerializerSync:
return pd.read_parquet(data)
@classmethod
def read_json(cls, data, orient: Union[str, JSONOrient]) -> 'DataframeSerializerAsync.DataframeClass':
def read_json(cls, data, orient: Union[str, JSONOrient], convert_axes: Optional[bool] = None) -> 'DataframeSerializerAsync.DataframeClass':
"""
:param data: bytes str content (valid JSON str), path object or file-like object
:param orient:
......@@ -94,7 +94,7 @@ class DataframeSerializerSync:
"""
orient = JSONOrient.get(orient)
return pd.read_json(path_or_buf=data, orient=orient.value).replace("NaN", np.NaN)
return pd.read_json(path_or_buf=data, orient=orient.value, convert_axes=convert_axes).replace("NaN", np.NaN)
class DataframeSerializerAsync:
......@@ -117,7 +117,7 @@ class DataframeSerializerAsync:
)
@with_trace("Parquet JSON deserialization")
async def read_json(self, data, orient: Union[str, JSONOrient]) -> DataframeClass:
async def read_json(self, data, orient: Union[str, JSONOrient], convert_axes: Optional[bool] = None) -> DataframeClass:
return await asyncio.get_event_loop().run_in_executor(
self.executor, DataframeSerializerSync.read_json, data, orient
)
......@@ -41,6 +41,7 @@ from app.routers.sessions import (SessionInternal, UpdateSessionState, UpdateSes
WithSessionStorages, get_session_dependencies)
from app.routers.record_utils import fetch_record
from app.helper.traces import with_trace
from natsort import natsorted
router_bulk = APIRouter() # router dedicated to bulk APIs
......@@ -72,7 +73,7 @@ async def get_df_from_request(request: Request, orient: Optional[str] = None) ->
if MimeTypes.JSON.match(ct):
content = await request.body() # request.stream()
try:
return await DataframeSerializerAsync().read_json(content, orient)
return await DataframeSerializerAsync().read_json(content, orient, convert_axes=False)
except ValueError:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='invalid body') # TODO
......@@ -139,7 +140,7 @@ class DataFrameRender:
if params.curves:
selection = list(map(str.strip, params.curves.split(',')))
columns = DataFrameRender.get_matching_column(selection, set(df))
df = df[sorted(columns)]
df = df[columns]
if params.offset:
head_index = df.head(params.offset, npartitions=-1, compute=False).index
......@@ -148,6 +149,8 @@ class DataFrameRender:
if params.limit and params.limit > 0:
df = df.head(params.limit, npartitions=-1, compute=False)
df = df[natsorted(df.columns)]
return df
@staticmethod
......
......@@ -75,7 +75,7 @@ def _create_df_from_response(response):
elif content_type == 'text/csv; charset=utf-8':
return pd.read_csv(f, index_col=0)
elif content_type == 'application/json':
return pd.read_json(f, dtype=True, orient='split')
return pd.read_json(f, dtype=True, orient='split', convert_axes=False)
else:
raise ValueError(f"Unknown content-type: '{content_type}'")
......@@ -189,7 +189,7 @@ def setup_client(init_fixtures):
['float_MD', 'float_X'],
['str_MD', 'str_X'],
['date_MD', 'date_X'],
['MD', 'float_X', 'str_X', 'date_X']
['MD', 'date_X', 'float_X', 'str_X']
])
def test_send_all_data_once(setup_client,
entity_type,
......@@ -243,7 +243,7 @@ def test_send_all_data_once(setup_client,
['float_MD', 'float_X'],
['str_MD', 'str_X'],
['date_MD', 'date_X'],
['MD', 'float_X', 'str_X', 'date_X']
['MD', 'date_X', 'float_X', 'str_X']
])
def test_send_all_data_once_post_data_v2_get_data_v3(setup_client,
entity_type,
......@@ -722,6 +722,32 @@ def test_session_chunk_int(setup_client, entity_type, content_type_header, creat
headers=headers)
assert chunk_response_1.status_code == expected_code
@pytest.mark.parametrize("data_format", ['parquet', 'json'])
@pytest.mark.parametrize("accept_content", ['application/x-parquet', 'application/json'])
@pytest.mark.parametrize("columns_name", [
list(map(str, range(100))),
list(map(lambda x: f'test_{x}', range(100))),
list(map(lambda x: f'{x}_test_{x%10}', range(100)))
])
def test_nat_sort_columns(setup_client, data_format, accept_content, columns_name):
""" Create session, append chunking with consecutive index, validate session """
entity_type = 'WellLog'
client, _ = setup_client
record_id = _create_record(client, entity_type)
chunking_url = Definitions[entity_type]['chunking_url']
_create_chunks(client, entity_type, record_id=record_id, data_format=data_format,
cols_ranges=[(columns_name, range(20))])
data_response = client.get(f'{chunking_url}/{record_id}/data', headers={'accept': accept_content})
assert data_response.status_code == 200
response_df = _create_df_from_response(data_response)
assert list(response_df.columns) == columns_name
# todo:
# - concurrent sessions using fromVersion in Integrations tests
# - index: check if dataframe has an index
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment