Commit a1f73185 authored by Yannick's avatar Yannick
Browse files

- remove experimental support of multipart

- sanitize imports
- Parquet as default format on read bulk
- add unit tests on DataFrameRender
parent 904be7bd
Pipeline #47061 failed with stages
in 11 minutes and 45 seconds
......@@ -14,35 +14,31 @@
import asyncio
from typing import List, Set, Optional
import re
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import Response
from starlette.datastructures import FormData
import pandas as pd
from app.bulk_persistence import DataframeSerializer
from app.bulk_persistence import DataframeSerializer, JSONOrient
from app.bulk_persistence.tenant_provider import resolve_tenant
from app.bulk_persistence.bulk_id import BulkId
from app.bulk_persistence.dask.blob_storage import DaskDriverBlobStorage
from app.bulk_persistence.dask.blob_storage import DaskDriverBlobStorage, DaskBlobStorageBase
from app.bulk_persistence.dask.errors import BulkError, BulkNotFound
from app.clients.storage_service_client import get_storage_record_service
from app.record_utils import fetch_record
from app.bulk_persistence.dask.blob_storage import DaskBlobStorageBase
from app.bulk_persistence.mime_types import MimeTypes
from app.model.model_chunking import GetDataParams
from app.utils import Context, OpenApiHandler, get_ctx
from ...bulk_persistence import JSONOrient
from ...persistence.sessions_storage import (Session, SessionException,
SessionState, SessionUpdateMode)
from ..common_parameters import (
from app.persistence.sessions_storage import (Session, SessionException, SessionState, SessionUpdateMode)
from app.routers.common_parameters import (
REQUEST_DATA_BODY_SCHEMA,
REQUIRED_ROLES_READ,
REQUIRED_ROLES_WRITE,
json_orient_parameter)
from ..sessions import (SessionInternal, UpdateSessionState,
UpdateSessionStateValue, WithSessionStorages,
get_session_dependencies)
from app.routers.sessions import (SessionInternal, UpdateSessionState, UpdateSessionStateValue,
WithSessionStorages, get_session_dependencies)
router_bulk = APIRouter() # router dedicated to bulk APIs
......@@ -52,26 +48,17 @@ BULK_URI_FIELD = "bulkURI"
async def get_df_from_request(request: Request, orient: Optional[str] = None) -> pd.DataFrame:
""" extract dataframe from request """
def try_read_parquet(parquet_data):
ct = request.headers.get('Content-Type', '')
if MimeTypes.PARQUET.match(ct):
content = await request.body() # request.stream()
try:
return DataframeSerializer.read_parquet(parquet_data)
return DataframeSerializer.read_parquet(content)
except OSError as err:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f'{err}') # TODO
ct = request.headers.get('Content-Type', '')
if 'multipart/form-data' in ct:
form: FormData = await request.form()
assert (len(form) == 1)
for _file_name, file in form.items(): # TODO can contains multiple files ?
if MimeTypes.PARQUET.match(file.content_type):
return try_read_parquet(file.file)
if MimeTypes.PARQUET.match(ct):
content = await request.body() # request.stream()
return try_read_parquet(content)
if MimeTypes.JSON.match(ct):
content = await request.body() # request.stream()
try:
......@@ -106,25 +93,25 @@ class DataFrameRender:
driver = await with_dask_blob_storage()
return await driver.client.submit(lambda: len(df.index))
re_1D_curve_selection = re.compile(r'\[(?P<index>[0-9]+)\]$')
re_2D_curve_selection = re.compile(r'\[(?P<range>[0-9]+:[0-9]+)\]$')
@staticmethod
def get_matching_column(selection: List[str], cols: Set[str]):
import re
pat = re.compile(r'\[(?P<index>[0-9]+)\]$')
pat2 = re.compile(r'\[(?P<range>[0-9]+:[0-9]+)\]$')
def get_matching_column(selection: List[str], cols: Set[str]) -> List[str]:
selected = set()
for to_find in selection:
m = pat2.search(to_find)
m = DataFrameRender.re_2D_curve_selection.search(to_find)
if m:
r = range(*map(int, m['range'].split(':')))
def is_matching(c):
if c == to_find:
return True
i = pat.search(c)
i = DataFrameRender.re_1D_curve_selection.search(c)
return i and int(i['index']) in r
else:
def is_matching(c):
return c == to_find or to_find == pat.sub('', c)
return c == to_find or to_find == DataFrameRender.re_1D_curve_selection.sub('', c)
selected.update(filter(is_matching, cols.difference(selected)))
return list(selected)
......@@ -158,13 +145,17 @@ class DataFrameRender:
pdf = await DataFrameRender.compute(df)
pdf.index.name = None # TODO
if MimeTypes.PARQUET.type in accept:
if not accept or MimeTypes.PARQUET.type in accept:
return Response(pdf.to_parquet(engine="pyarrow"), media_type=MimeTypes.PARQUET.type)
if MimeTypes.JSON.type in accept:
return Response(pdf.to_json(index=True, date_format='iso'), media_type=MimeTypes.JSON.type)
if MimeTypes.CSV.type in accept:
return Response(pdf.to_csv(), media_type=MimeTypes.CSV.type)
return Response(pdf.to_json(index=True, date_format='iso'), media_type=MimeTypes.JSON.type)
# in any other case => Parquet anyway?
return Response(pdf.to_parquet(engine="pyarrow"), media_type=MimeTypes.PARQUET.type)
def get_bulk_uri(record):
......@@ -253,6 +244,8 @@ async def post_chunk_data(record_id: str,
summary='Returns data of the specified version.',
description='Returns the data of a specific version according to the specified query parameters.'
' Multiple media types response are available ("application/json", text/csv", "application/x-parquet")'
' The desired format can be specify in "Accept" header. The default is Parquet.'
' When bulk statistics are requested using "describe" parameter, the response is always provided in JSON'
+ REQUIRED_ROLES_READ,
# response_model=RecordData,
responses={
......@@ -288,7 +281,9 @@ async def get_data_version(
"/{record_id}/data",
summary='Returns the data according to the specified query parameters.',
description='Returns the data according to the specified query parameters.'
' Multiple media types response are available ("application/json", text/csv", "application/x-parquet")'
' Multiple media types response are available ("application/json", text/csv", "application/x-parquet").'
' The desired format can be specify in "Accept" header. The default is Parquet.'
' When bulk statistics are requested using "describe" parameter, the response is always provided in JSON.'
+ REQUIRED_ROLES_READ,
# response_model=Union[RecordData, Dict],
responses={
......
......@@ -8243,7 +8243,7 @@
"paths": {
"/alpha/ddms/v3/wellboretrajectories/{record_id}/data": {
"get": {
"description": "Returns the data according to the specified query parameters. Multiple media types response are available (\"application/json\", text/csv\", \"application/x-parquet\")\n<p>Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.\n\"In addition, users must be a member of data groups to access the data.</p>\n",
"description": "Returns the data according to the specified query parameters. Multiple media types response are available (\"application/json\", text/csv\", \"application/x-parquet\"). The desired format can be specify in \"Accept\" header. The default is Parquet. When bulk statistics are requested using \"describe\" parameter, the response is always provided in JSON.\n<p>Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.\n\"In addition, users must be a member of data groups to access the data.</p>\n",
"operationId": "get_data_alpha_ddms_v3_wellboretrajectories__record_id__data_get",
"parameters": [
{
......@@ -8790,7 +8790,7 @@
},
"/alpha/ddms/v3/wellboretrajectories/{record_id}/versions/{version}/data": {
"get": {
"description": "Returns the data of a specific version according to the specified query parameters. Multiple media types response are available (\"application/json\", text/csv\", \"application/x-parquet\")\n<p>Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.\n\"In addition, users must be a member of data groups to access the data.</p>\n",
"description": "Returns the data of a specific version according to the specified query parameters. Multiple media types response are available (\"application/json\", text/csv\", \"application/x-parquet\") The desired format can be specify in \"Accept\" header. The default is Parquet. When bulk statistics are requested using \"describe\" parameter, the response is always provided in JSON\n<p>Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.\n\"In addition, users must be a member of data groups to access the data.</p>\n",
"operationId": "get_data_version_alpha_ddms_v3_wellboretrajectories__record_id__versions__version__data_get",
"parameters": [
{
......@@ -8913,7 +8913,7 @@
},
"/alpha/ddms/v3/welllogs/{record_id}/data": {
"get": {
"description": "Returns the data according to the specified query parameters. Multiple media types response are available (\"application/json\", text/csv\", \"application/x-parquet\")\n<p>Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.\n\"In addition, users must be a member of data groups to access the data.</p>\n",
"description": "Returns the data according to the specified query parameters. Multiple media types response are available (\"application/json\", text/csv\", \"application/x-parquet\"). The desired format can be specify in \"Accept\" header. The default is Parquet. When bulk statistics are requested using \"describe\" parameter, the response is always provided in JSON.\n<p>Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.\n\"In addition, users must be a member of data groups to access the data.</p>\n",
"operationId": "get_data_alpha_ddms_v3_welllogs__record_id__data_get",
"parameters": [
{
......@@ -9477,7 +9477,7 @@
},
"/alpha/ddms/v3/welllogs/{record_id}/versions/{version}/data": {
"get": {
"description": "Returns the data of a specific version according to the specified query parameters. Multiple media types response are available (\"application/json\", text/csv\", \"application/x-parquet\")\n<p>Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.\n\"In addition, users must be a member of data groups to access the data.</p>\n",
"description": "Returns the data of a specific version according to the specified query parameters. Multiple media types response are available (\"application/json\", text/csv\", \"application/x-parquet\") The desired format can be specify in \"Accept\" header. The default is Parquet. When bulk statistics are requested using \"describe\" parameter, the response is always provided in JSON\n<p>Required roles: 'users.datalake.viewers' or 'users.datalake.editors' or 'users.datalake.admins'.\n\"In addition, users must be a member of data groups to access the data.</p>\n",
"operationId": "get_data_version_alpha_ddms_v3_welllogs__record_id__versions__version__data_get",
"parameters": [
{
......
import pytest
from io import BytesIO
from fastapi import HTTPException
from app.model.model_chunking import GetDataParams
from app.routers.ddms_v3.bulk_v3 import DataFrameRender, get_df_from_request
import pandas as pd
from pandas.testing import assert_frame_equal
@pytest.mark.parametrize("requested, df_columns, expected", [
(["X"], {"X"}, ["X"]),
([], {"X"}, []),
(["X", "Y", "Z"], {"X", "Y"}, ["X", "Y"]),
(["X", "Y", "Z"], {"X", "Y", "Z"}, ["X", "Y", "Z"]),
(["2D"], {"X", "2D[0]", "2D[1]"}, ["2D[0]", "2D[1]"]),
(["2D[0]"], {"X", "2D[0]", "2D[1]"}, ["2D[0]"]),
(["X", "2D"], {"X", "2D[0]", "2D[1]"}, ["X", "2D[0]", "2D[1]"]),
])
def test_get_matching_column(requested, df_columns, expected):
result = DataFrameRender.get_matching_column(requested, df_columns)
assert set(result) == set(expected)
def assert_df_in_parquet(expected_df, content):
# let read it
content = BytesIO(content)
content.seek(0)
actual_df = pd.read_parquet(content, "pyarrow")
assert_frame_equal(expected_df, actual_df)
@pytest.fixture
def default_get_params():
return GetDataParams(describe=False, limit=None, curves=None, offset=None)
@pytest.fixture
def basic_dataframe():
return pd.DataFrame([[10, 11], [20, 21], [30, 31]], index=[1, 2, 3], columns=['c1', 'c2'])
@pytest.mark.asyncio
@pytest.mark.parametrize("accept", [
None, # default is parquet
"", # default is parquet
"application/x-parquet",
"application/parquet",
"application/json, application/x-parquet" # is case of multiple, prioritize parquet
])
async def test_df_render_accept_parquet(default_get_params, basic_dataframe, accept):
response = await DataFrameRender.df_render(basic_dataframe, default_get_params, accept)
assert 'application/x-parquet' == response.headers.get('Content-Type')
assert_df_in_parquet(basic_dataframe, response.body)
@pytest.mark.asyncio
async def test_df_render_accept_json(default_get_params, basic_dataframe):
response = await DataFrameRender.df_render(basic_dataframe, default_get_params, "application/json")
assert 'application/json' == response.headers.get('Content-Type')
actual = pd.read_json(response.body, orient='columns')
assert_frame_equal(basic_dataframe, actual)
class RequestMock:
def __init__(self, headers: dict = {}, body=None):
self.headers = headers
self.body_content = body
async def body(self):
return self.body_content
@pytest.mark.asyncio
async def test_get_df_from_request_parquet(basic_dataframe):
request = RequestMock({"Content-Type": "application/x-parquet"},
basic_dataframe.to_parquet(engine='pyarrow', index=True))
actual_df = await get_df_from_request(request)
assert_frame_equal(basic_dataframe, actual_df)
@pytest.mark.asyncio
async def test_get_df_from_request_json(basic_dataframe):
request = RequestMock({"Content-Type": "application/json"},
basic_dataframe.to_json(orient='split'))
actual_df = await get_df_from_request(request, orient='split')
assert_frame_equal(basic_dataframe, actual_df)
@pytest.mark.asyncio
@pytest.mark.parametrize("content_type, status", [
("application/json", 422),
("application/x-parquet", 422),
("image/jpeg", 400)
])
async def test_get_df_from_request_invalid_raise(content_type, status):
request = RequestMock({"Content-Type": content_type}, b'some invalid data')
with pytest.raises(HTTPException) as ex_info:
await get_df_from_request(request, orient='split')
exception = ex_info.value
assert exception.status_code == status
......@@ -423,7 +423,7 @@ def test_add_curve_by_chunk_overlap_different_cols(setup_client, entity_type):
(['E'], range(15)), # overlap both side
])
data_response = client.get(f'{chunking_url}/{record_id}/data')
data_response = client.get(f'{chunking_url}/{record_id}/data', headers={'Accept': 'application/json'})
assert data_response.status_code == 200
with_new_col = pd.DataFrame.from_dict(data_response.json())
assert list(with_new_col.columns) == ['A', 'B', 'C', 'D', 'E', 'MD']
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment