Commit 5b55fdad authored by Cyril Monmouton's avatar Cyril Monmouton
Browse files

Add a fast check on columns type from input data to return a 422 and explicit...

Add a fast check on columns type from input data to return a 422 and explicit error message, instead of ugly internal error
parent 4ebc2d05
Pipeline #50117 failed with stage
in 56 seconds
......@@ -47,6 +47,12 @@ BULK_URN_PREFIX_VERSION = "wdms-1"
BULK_URI_FIELD = "bulkURI"
def _check_df_columns_type(df: pd.DataFrame):
if any([type(t) is not str for t in df.dtypes]):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f'All columns type should be string')
async def get_df_from_request(request: Request, orient: Optional[str] = None) -> pd.DataFrame:
""" extract dataframe from request """
......@@ -206,6 +212,7 @@ async def post_data(record_id: str,
):
async def save_blob():
df = await get_df_from_request(request, orient)
_check_df_columns_type(df)
return await dask_blob_storage.save_blob(df, record_id)
record, bulk_id = await asyncio.gather(
......@@ -241,6 +248,7 @@ async def post_chunk_data(record_id: str,
detail=f"Session cannot accept data, state={i_session.session.state}")
df = await get_df_from_request(request, orient)
_check_df_columns_type(df)
await dask_blob_storage.session_add_chunk(i_session.session, df)
......
import io
from tests.unit.test_utils import nope_logger_fixture
from tempfile import TemporaryDirectory
from fastapi import Header
from fastapi.testclient import TestClient
import pytest
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
from osdu.core.api.storage.blob_storage_local_fs import LocalFSBlobStorage
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
......@@ -21,9 +23,8 @@ from app.helper import traces
from app.utils import Context
from app import conf
import pandas as pd
from tests.unit.persistence.dask_blob_storage_test import generate_df
from tests.unit.test_utils import nope_logger_fixture
Definitions = {
'WellLog': {
......@@ -170,7 +171,6 @@ def setup_client(nope_logger_fixture, bob):
])
@pytest.mark.parametrize("accept_content", [
'application/x-parquet',
# 'text/csv; charset=utf-8',
'application/json',
])
@pytest.mark.parametrize("columns", [
......@@ -612,6 +612,48 @@ def test_session_sent_same_col_different_types(setup_client, entity_type):
commit_response = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}', json={'state': 'commit'})
assert commit_response.status_code == 422
def _df_to_pyarrow_parquet(df_data: pd.DataFrame):
""" Return a buffer containing parquet format file from the given dataframe """
table = pa.Table.from_pandas(df=df_data)
buf = pa.BufferOutputStream()
pq.write_table(table, buf)
return buf.getvalue().to_pybytes()
@pytest.mark.parametrize("entity_type", EntityTypeParams)
@pytest.mark.parametrize("content_type_header,create_func", [
('application/x-parquet', lambda df: _df_to_pyarrow_parquet(df)),
('application/json', lambda df: df.to_json(orient='split', date_format='iso')),
])
def test_session_chunk_int(setup_client, entity_type, content_type_header, create_func):
client, _ = setup_client
record_id = _create_record(client, entity_type)
chunking_url = Definitions[entity_type]['chunking_url']
json_data = {
int(42): [np.random.rand(10) for _ in range(10)],
float(-42): [np.random.rand(10) for _ in range(10)],
str('84'): [np.random.rand(10) for _ in range(10)],
}
df_data = pd.DataFrame(json_data)
data_to_send = create_func(df_data)
headers = {'content-type': content_type_header}
write_response = client.post(f'{chunking_url}/{record_id}/data', data=data_to_send, headers=headers)
assert write_response.status_code == 422
session_response = client.post(f'{chunking_url}/{record_id}/sessions', json={'mode': 'update'})
assert session_response.status_code == 200
session_id = session_response.json()['id']
chunk_response_1 = client.post(f'{chunking_url}/{record_id}/sessions/{session_id}/data',
data=data_to_send,
headers=headers)
assert chunk_response_1.status_code == 422
# todo:
# - concurrent sessions using fromVersion in Integrations tests
# - index: check if dataframe has an index
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment