Commit e4995b98 authored by Cyril Monmouton's avatar Cyril Monmouton
Browse files

Merge branch 'master' of...

Merge branch 'master' of https://community.opengroup.org/osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services into trusted-feature/add-v3-vulk-tracing

# Conflicts:
#	app/bulk_utils.py
#	tests/unit/routers/chunking_test.py
parents 4b2175ae c8df76b4
Pipeline #50792 passed with stages
in 12 minutes and 3 seconds
......@@ -26,6 +26,7 @@ The following software have components provided under the terms of this license:
- msgpack (from http://msgpack.org/)
- multidict (from https://github.com/aio-libs/multidict/)
- numpy (from http://www.numpy.org)
- openapi-spec-validator (from https://github.com/p1c2u/openapi-spec-validator)
- opencensus (from https://github.com/census-instrumentation/opencensus-python)
- opencensus-context (from https://github.com/census-instrumentation/opencensus-python/tree/master/context/opencensus-context)
- opencensus-ext-azure (from )
......@@ -95,6 +96,7 @@ The following software have components provided under the terms of this license:
- mock (from https://github.com/testing-cabal/mock)
- numpy (from http://www.numpy.org)
- oauthlib (from https://github.com/idan/oauthlib)
- openapi-schema-validator (from https://github.com/p1c2u/openapi-schema-validator)
- packaging (from https://github.com/pypa/packaging)
- pandas (from http://pandas.pydata.org)
- partd (from http://github.com/dask/partd/)
......
......@@ -50,6 +50,7 @@ from .mime_types import MimeType, MimeTypes
# - using faster format, e.g. hd5
# - threshold about the busyness of the service (if not busy and not huge data -> direct write)
# - better proc fork and arg serialization
from ..helper.traces import with_trace
def export_to_parquet(
......@@ -228,6 +229,7 @@ async def create_and_write_blob(
raise RuntimeError(f'unexpected type {source} returned by bulk exporter function')
@with_trace('read_blob')
async def read_blob(blob: BlobBulk):
importer = BlobFileImporters.from_string(blob.content_type)
# TODO: run in executor?
......
......@@ -28,6 +28,7 @@ from .blob_storage import (
from .bulk_id import BulkId
from .mime_types import MimeTypes
from .tenant_provider import resolve_tenant
from ..helper.traces import with_trace
async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str:
......@@ -48,12 +49,13 @@ async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str:
return bulkblob.id
@with_trace('get_dataframe')
async def get_dataframe(ctx: Context, bulk_id: str) -> pd.DataFrame:
""" fetch bulk from a blob storage, provide column major """
tenant = await resolve_tenant(ctx.partition_id)
storage: BlobStorageBase = await ctx.app_injector.get(BlobStorageBase)
bytes_data = await storage.download(tenant, bulk_id)
bytes_data = await storage.download(tenant, bulk_id)
# for now use fix parquet format saving one call
# meta_data = await storage.download_metadata(tenant.project_id, tenant.bucket_name, bulk_id)
# content_type = meta_data.metadata["content_type"]
......
......@@ -48,6 +48,12 @@ BULK_URN_PREFIX_VERSION = "wdms-1"
BULK_URI_FIELD = "bulkURI"
def _check_df_columns_type(df: pd.DataFrame):
if any((type(t) is not str for t in df.columns)):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f'All columns type should be string')
@with_trace("get_df_from_request")
async def get_df_from_request(request: Request, orient: Optional[str] = None) -> pd.DataFrame:
""" extract dataframe from request """
......@@ -211,6 +217,7 @@ async def post_data(record_id: str,
@with_trace("save_blob")
async def save_blob():
df = await get_df_from_request(request, orient)
_check_df_columns_type(df)
return await dask_blob_storage.save_blob(df, record_id)
record, bulk_id = await asyncio.gather(
......@@ -246,6 +253,7 @@ async def post_chunk_data(record_id: str,
detail=f"Session cannot accept data, state={i_session.session.state}")
df = await get_df_from_request(request, orient)
_check_df_columns_type(df)
await dask_blob_storage.session_add_chunk(i_session.session, df)
......
......@@ -21,6 +21,7 @@ opencensus-ext-logging
# for chunking feature
dask[distributed]==2021.6.2
fsspec
python-ulid
--extra-index-url \
https://community.opengroup.org/api/v4/projects/465/packages/pypi/simple/
......
......@@ -7,7 +7,6 @@ httpx
numpy
pandas
pyarrow
python-ulid
# Note since 3.8 includes Mock 4.0+.
mock>=4.0
......
import io
from app.bulk_persistence.dask.errors import BulkNotFound
from tests.unit.test_utils import nope_logger_fixture
from tempfile import TemporaryDirectory
from fastapi import Header
from fastapi.testclient import TestClient
import pytest
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
from osdu.core.api.storage.blob_storage_local_fs import LocalFSBlobStorage
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
......@@ -23,8 +23,9 @@ from app.helper import traces
from app.utils import Context
from app import conf
import pandas as pd
from tests.unit.persistence.dask_blob_storage_test import generate_df
from tests.unit.test_utils import nope_logger_fixture
Definitions = {
'WellLog': {
......@@ -181,7 +182,6 @@ def setup_client(init_fixtures):
])
@pytest.mark.parametrize("accept_content", [
'application/x-parquet',
# 'text/csv; charset=utf-8',
'application/json',
])
@pytest.mark.parametrize("columns", [
......@@ -675,6 +675,53 @@ def test_session_sent_same_col_different_types(setup_client, entity_type):
commit_response = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}', json={'state': 'commit'})
assert commit_response.status_code == 422
def _df_to_pyarrow_parquet(df_data: pd.DataFrame):
""" Return a buffer containing parquet format file from the given dataframe """
table = pa.Table.from_pandas(df=df_data)
buf = pa.BufferOutputStream()
pq.write_table(table, buf)
return buf.getvalue().to_pybytes()
@pytest.mark.parametrize("entity_type", EntityTypeParams)
@pytest.mark.parametrize("columns_type", [
[int(42), float(-42)],
[int(42), float(-42), str('forty two')]
])
@pytest.mark.parametrize("content_type_header,create_func", [
('application/x-parquet', lambda df: _df_to_pyarrow_parquet(df)),
('application/json', lambda df: df.to_json(orient='split', date_format='iso')),
])
def test_session_chunk_int(setup_client, entity_type, content_type_header, create_func, columns_type):
client, _ = setup_client
record_id = _create_record(client, entity_type)
chunking_url = Definitions[entity_type]['chunking_url']
json_data = {t: np.random.rand(10) for t in columns_type}
df_data = pd.DataFrame(json_data)
data_to_send = create_func(df_data)
headers = {'content-type': content_type_header}
expected_code = 422
if content_type_header.endswith('parquet') and any((type(c) is str for c in columns_type)):
# there is a side effect with parquet format, if at least one col is str, then all cols are casted into str
expected_code = 200
write_response = client.post(f'{chunking_url}/{record_id}/data', data=data_to_send, headers=headers)
assert write_response.status_code == expected_code
session_response = client.post(f'{chunking_url}/{record_id}/sessions', json={'mode': 'update'})
assert session_response.status_code == 200
session_id = session_response.json()['id']
chunk_response_1 = client.post(f'{chunking_url}/{record_id}/sessions/{session_id}/data',
data=data_to_send,
headers=headers)
assert chunk_response_1.status_code == expected_code
# todo:
# - concurrent sessions using fromVersion in Integrations tests
# - index: check if dataframe has an index
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment