Commit c6ccbb28 authored by fabian serin's avatar fabian serin
Browse files

Merge branch 'master' of...

Merge branch 'master' of https://community.opengroup.org/osdu/platform/domain-data-mgmt-services/wellbore/wellbore-domain-services into fserin/BUG_duplicated_operation_ids

# Conflicts:
#	requirements_dev.txt
parents c088680b c8df76b4
Pipeline #50684 failed with stages
in 5 minutes and 43 seconds
......@@ -50,6 +50,7 @@ from .mime_types import MimeType, MimeTypes
# - using faster format, e.g. hd5
# - threshold about the busyness of the service (if not busy and not huge data -> direct write)
# - better proc fork and arg serialization
from ..helper.traces import with_trace
def export_to_parquet(
......@@ -228,6 +229,7 @@ async def create_and_write_blob(
raise RuntimeError(f'unexpected type {source} returned by bulk exporter function')
@with_trace('read_blob')
async def read_blob(blob: BlobBulk):
importer = BlobFileImporters.from_string(blob.content_type)
# TODO: run in executor?
......
......@@ -28,6 +28,7 @@ from .blob_storage import (
from .bulk_id import BulkId
from .mime_types import MimeTypes
from .tenant_provider import resolve_tenant
from ..helper.traces import with_trace
async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str:
......@@ -48,12 +49,13 @@ async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str:
return bulkblob.id
@with_trace('get_dataframe')
async def get_dataframe(ctx: Context, bulk_id: str) -> pd.DataFrame:
""" fetch bulk from a blob storage, provide column major """
tenant = await resolve_tenant(ctx.partition_id)
storage: BlobStorageBase = await ctx.app_injector.get(BlobStorageBase)
bytes_data = await storage.download(tenant, bulk_id)
bytes_data = await storage.download(tenant, bulk_id)
# for now use fix parquet format saving one call
# meta_data = await storage.download_metadata(tenant.project_id, tenant.bucket_name, bulk_id)
# content_type = meta_data.metadata["content_type"]
......
......@@ -49,6 +49,12 @@ BULK_URI_FIELD = "bulkURI"
OPERATION_IDS = {"record_data": "write_record_data",
"chunk_data": "post_chunk_data"}
def _check_df_columns_type(df: pd.DataFrame):
if any((type(t) is not str for t in df.columns)):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f'All columns type should be string')
async def get_df_from_request(request: Request, orient: Optional[str] = None) -> pd.DataFrame:
""" extract dataframe from request """
......@@ -208,6 +214,7 @@ async def post_data(record_id: str,
):
async def save_blob():
df = await get_df_from_request(request, orient)
_check_df_columns_type(df)
return await dask_blob_storage.save_blob(df, record_id)
record, bulk_id = await asyncio.gather(
......@@ -243,6 +250,7 @@ async def post_chunk_data(record_id: str,
detail=f"Session cannot accept data, state={i_session.session.state}")
df = await get_df_from_request(request, orient)
_check_df_columns_type(df)
await dask_blob_storage.session_add_chunk(i_session.session, df)
......
......@@ -21,6 +21,7 @@ opencensus-ext-logging
# for chunking feature
dask[distributed]==2021.6.2
fsspec
python-ulid
--extra-index-url \
https://community.opengroup.org/api/v4/projects/465/packages/pypi/simple/
......
......@@ -43,28 +43,25 @@ def generate_df(columns, index):
df.columns = columns
return df
entity_type_dict={
"well_log": {"entity": "welllogs", "version": "v3"},
"wellbore_trajectory": {"entity": "wellboretrajectories", "version": "v3"},
"log": {"entity": "logs", "version": "v2"},
}
class EntityType(str, Enum):
well_log = "welllogs"
wellbore_trajectory = "wellboretrajectories"
log = "logs"
def build_base_url(entity_type: EntityType) -> str:
if entity_type == 'logs':
version = 'v2'
else:
version = 'v3'
return '{{base_url}}/alpha/ddms/' + version + '/' + entity_type.value
def build_base_url(entity_type: str) -> str:
return '{{base_url}}/alpha/ddms/' + entity_type_dict[entity_type]["version"] + '/' + entity_type_dict[entity_type]["entity"]
def build_base_url_without_dask(entity_type: str) -> str:
return '{{base_url}}/ddms/' + entity_type_dict[entity_type]["version"] + '/' + entity_type_dict[entity_type]["entity"]
@contextmanager
def create_record(env, entity_type: EntityType):
if entity_type == EntityType.well_log:
def create_record(env, entity_type: str):
if entity_type == "well_log":
result = build_request_create_osdu_welllog(False).call(env)
elif entity_type == EntityType.wellbore_trajectory:
elif entity_type == "wellbore_trajectory":
result = build_request_create_osdu_wellboretrajectory(False).call(env)
elif entity_type == EntityType.log:
elif entity_type == "log":
result = build_request_create_log().call(env)
else:
raise RuntimeError()
......@@ -79,11 +76,11 @@ def create_record(env, entity_type: EntityType):
yield record_id
# actually
if entity_type == EntityType.well_log:
if entity_type == "well_log":
build_request_delete_osdu_welllog(record_id).call(env)
elif entity_type == EntityType.wellbore_trajectory:
elif entity_type == "wellbore_trajectory":
build_request_delete_osdu_wellboretrajectory(record_id).call(env)
elif entity_type == EntityType.log:
elif entity_type == "log":
env.set('log_record_id', record_id)
build_request_delete_log().call(env)
......@@ -107,29 +104,31 @@ def build_request(name, method, url, *, payload=None, headers=None) -> RequestRu
return RequestRunner(rq_proto)
def build_request_post_data(entity_type: EntityType, record_id: str, payload) -> RequestRunner:
def build_request_post_data(entity_type: str, record_id: str, payload) -> RequestRunner:
url = build_base_url(entity_type) + f'/{record_id}/data'
return build_request(f'{entity_type} post data', 'POST', url, payload=payload)
def build_request_post_data_without_dask(entity_type: str, record_id: str, payload) -> RequestRunner:
url = build_base_url_without_dask(entity_type) + f'/{record_id}/data'
return build_request(f'{entity_type} post data', 'POST', url, payload=payload)
def build_request_post_chunk(entity_type: EntityType, record_id: str, session_id: str, payload) -> RequestRunner:
def build_request_post_chunk(entity_type: str, record_id: str, session_id: str, payload) -> RequestRunner:
url = build_base_url(entity_type) + f'/{record_id}/sessions/{session_id}/data'
return build_request(f'{entity_type} post data', 'POST', url, payload=payload)
def build_request_get_data(entity_type: EntityType, record_id: str) -> RequestRunner:
def build_request_get_data(entity_type: str, record_id: str) -> RequestRunner:
url = build_base_url(entity_type) + f'/{record_id}/data'
return build_request(f'{entity_type} get data', 'GET', url)
def create_session(env, entity_type: EntityType, record_id: str, overwrite: bool) -> str:
def create_session(env, entity_type: str, record_id: str, overwrite: bool) -> str:
url = build_base_url(entity_type) + f'/{record_id}/sessions'
runner = build_request(f'create {entity_type} session', 'POST', url,
payload={'mode': 'overwrite' if overwrite else 'update'})
return runner.call(env, assert_status=200, headers={"Content-Type": "application/json"}).get_response_obj().id
def complete_session(env, entity_type: EntityType, record_id: str, session_id: str, commit: bool):
def complete_session(env, entity_type: str, record_id: str, session_id: str, commit: bool):
state = "commit" if commit else "abandon"
url = build_base_url(entity_type) + f'/{record_id}/sessions/{session_id}'
runner = build_request(f'{state} session', 'PATCH', url, payload={'state': state})
......@@ -164,10 +163,9 @@ WELLLOG_URL_PREFIX = 'alpha/ddms/v3/welllogs'
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.parametrize('entity_type', [EntityType.well_log, EntityType.wellbore_trajectory, EntityType.log])
@pytest.mark.parametrize('entity_type', ["well_log", "wellbore_trajectory", "log"])
@pytest.mark.parametrize('serializer', [ParquetSerializer(), JsonSerializer()])
def test_send_one_chunk_without_session(with_wdms_env, entity_type, serializer):
with create_record(with_wdms_env, entity_type) as record_id:
data = generate_df(['MD', 'X'], range(8))
data_to_send = serializer.dump(data)
......@@ -181,7 +179,7 @@ def test_send_one_chunk_without_session(with_wdms_env, entity_type, serializer):
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.parametrize('entity_type', [EntityType.well_log, EntityType.wellbore_trajectory, EntityType.log])
@pytest.mark.parametrize('entity_type', ["well_log", "wellbore_trajectory", "log"])
@pytest.mark.parametrize('serializer', [ParquetSerializer(), JsonSerializer()])
def test_send_one_chunk_with_session_commit(with_wdms_env, entity_type, serializer):
......@@ -215,7 +213,7 @@ def test_send_one_chunk_with_session_commit(with_wdms_env, entity_type, serializ
@pytest.mark.parametrize("shuffle", [False]) # [False, True]
def test_send_multiple_chunks_with_session_commit(with_wdms_env, shuffle):
# well log on parquet
entity_type = EntityType.well_log
entity_type = "well_log"
serializer = ParquetSerializer()
with create_record(with_wdms_env, entity_type) as record_id:
......@@ -259,7 +257,7 @@ def test_send_multiple_chunks_with_session_commit(with_wdms_env, shuffle):
@pytest.mark.tag('chunking', 'smoke')
def test_get_data_with_offset_filter(with_wdms_env):
# well log on parquet
entity_type = EntityType.well_log
entity_type = "well_log"
serializer = ParquetSerializer()
with create_record(with_wdms_env, entity_type) as record_id:
......@@ -298,7 +296,7 @@ def test_get_data_with_offset_filter(with_wdms_env):
@pytest.mark.tag('chunking', 'smoke')
def test_get_data_with_column_filter(with_wdms_env):
# well log on parquet
entity_type = EntityType.well_log
entity_type = "well_log"
serializer = ParquetSerializer()
with create_record(with_wdms_env, entity_type) as record_id:
......@@ -331,7 +329,7 @@ def test_get_data_with_column_filter(with_wdms_env):
@pytest.mark.tag('chunking', 'smoke')
def test_get_data_with_limit_filter(with_wdms_env):
# well log on parquet
entity_type = EntityType.well_log
entity_type = "well_log"
serializer = ParquetSerializer()
with create_record(with_wdms_env, entity_type) as record_id:
......@@ -366,7 +364,7 @@ def test_get_data_with_limit_filter(with_wdms_env):
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.parametrize('entity_type', [EntityType.well_log, EntityType.wellbore_trajectory, EntityType.log])
@pytest.mark.parametrize('entity_type', ["well_log", "wellbore_trajectory", "log"])
def test_get_data_with_limit_and_offset_filter(with_wdms_env, entity_type):
serializer = ParquetSerializer()
......@@ -394,7 +392,7 @@ def test_get_data_with_limit_and_offset_filter(with_wdms_env, entity_type):
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.parametrize('entity_type', [EntityType.well_log, EntityType.wellbore_trajectory])
@pytest.mark.parametrize('entity_type', ["well_log", "wellbore_trajectory", "log"])
@pytest.mark.parametrize('serializer', [ParquetSerializer(), JsonSerializer()])
def test_multiple_overwrite_sessions_in_parallel_then_commit(with_wdms_env, entity_type, serializer):
......@@ -431,7 +429,7 @@ def test_multiple_overwrite_sessions_in_parallel_then_commit(with_wdms_env, enti
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.parametrize('entity_type', [EntityType.well_log, EntityType.wellbore_trajectory])
@pytest.mark.parametrize('entity_type', ["well_log", "wellbore_trajectory", "log"])
@pytest.mark.parametrize('serializer', [ParquetSerializer(), JsonSerializer()])
def test_multiple_update_sessions_in_parallel_then_commit(with_wdms_env, entity_type, serializer):
......@@ -471,3 +469,20 @@ def test_multiple_update_sessions_in_parallel_then_commit(with_wdms_env, entity_
pd.testing.assert_frame_equal(
expected, serializer.read(result.response.content), check_dtype=False)
# check type set to false since in Json dType is lost so int32 can become int64
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.parametrize('entity_type', ["log"])
@pytest.mark.parametrize('serializer', [JsonSerializer()])
def test_get_data_from_record_data_without_dask(with_wdms_env, entity_type, serializer):
with create_record(with_wdms_env, entity_type) as record_id:
data = generate_df(['MD', 'X'], range(8))
data_to_send = serializer.dump(data)
headers = {'Content-Type': serializer.mime_type, 'Accept': serializer.mime_type}
build_request_post_data_without_dask(entity_type, record_id, data_to_send).call(with_wdms_env, headers=headers).assert_ok()
result = build_request_get_data(entity_type, record_id).call(with_wdms_env, headers=headers, assert_status=200)
pd.testing.assert_frame_equal(data, serializer.read(result.response.content), check_dtype=False)
# check type set to false since in Json dType is lost so int32 can become int64
\ No newline at end of file
import io
from app.bulk_persistence.dask.errors import BulkNotFound
from tests.unit.test_utils import nope_logger_fixture
from tempfile import TemporaryDirectory
from fastapi import Header
from fastapi.testclient import TestClient
import pytest
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
from osdu.core.api.storage.blob_storage_local_fs import LocalFSBlobStorage
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
......@@ -23,9 +23,8 @@ from app.helper import traces
from app.utils import Context
from app import conf
import pandas as pd
from tests.unit.persistence.dask_blob_storage_test import generate_df
from tests.unit.test_utils import nope_logger_fixture
Definitions = {
'WellLog': {
......@@ -64,6 +63,7 @@ Definitions = {
EntityTypeParams = ['WellLog', 'WellboreTrajectory', 'Log']
def _create_df_from_response(response):
f = io.BytesIO(response.content)
f.seek(0)
......@@ -181,7 +181,6 @@ def setup_client(nope_logger_fixture, bob):
])
@pytest.mark.parametrize("accept_content", [
'application/x-parquet',
# 'text/csv; charset=utf-8',
'application/json',
])
@pytest.mark.parametrize("columns", [
......@@ -675,6 +674,53 @@ def test_session_sent_same_col_different_types(setup_client, entity_type):
commit_response = client.patch(f'{chunking_url}/{record_id}/sessions/{session_id}', json={'state': 'commit'})
assert commit_response.status_code == 422
def _df_to_pyarrow_parquet(df_data: pd.DataFrame):
""" Return a buffer containing parquet format file from the given dataframe """
table = pa.Table.from_pandas(df=df_data)
buf = pa.BufferOutputStream()
pq.write_table(table, buf)
return buf.getvalue().to_pybytes()
@pytest.mark.parametrize("entity_type", EntityTypeParams)
@pytest.mark.parametrize("columns_type", [
[int(42), float(-42)],
[int(42), float(-42), str('forty two')]
])
@pytest.mark.parametrize("content_type_header,create_func", [
('application/x-parquet', lambda df: _df_to_pyarrow_parquet(df)),
('application/json', lambda df: df.to_json(orient='split', date_format='iso')),
])
def test_session_chunk_int(setup_client, entity_type, content_type_header, create_func, columns_type):
client, _ = setup_client
record_id = _create_record(client, entity_type)
chunking_url = Definitions[entity_type]['chunking_url']
json_data = {t: np.random.rand(10) for t in columns_type}
df_data = pd.DataFrame(json_data)
data_to_send = create_func(df_data)
headers = {'content-type': content_type_header}
expected_code = 422
if content_type_header.endswith('parquet') and any((type(c) is str for c in columns_type)):
# there is a side effect with parquet format, if at least one col is str, then all cols are casted into str
expected_code = 200
write_response = client.post(f'{chunking_url}/{record_id}/data', data=data_to_send, headers=headers)
assert write_response.status_code == expected_code
session_response = client.post(f'{chunking_url}/{record_id}/sessions', json={'mode': 'update'})
assert session_response.status_code == 200
session_id = session_response.json()['id']
chunk_response_1 = client.post(f'{chunking_url}/{record_id}/sessions/{session_id}/data',
data=data_to_send,
headers=headers)
assert chunk_response_1.status_code == expected_code
# todo:
# - concurrent sessions using fromVersion in Integrations tests
# - index: check if dataframe has an index
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment