Commit 73bbee47 authored by Luc Yriarte's avatar Luc Yriarte
Browse files

Merge branch 'bulk-api-v2' into 'master'

Bulk api v2

See merge request !136
parents 8600658b 113a9ac6
Pipeline #49640 failed with stages
in 12 minutes and 31 seconds
......@@ -21,7 +21,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import Response
import pandas as pd
from app.bulk_persistence import DataframeSerializerAsync, JSONOrient
from app.bulk_persistence import DataframeSerializerAsync, JSONOrient, get_dataframe
from app.bulk_persistence.bulk_id import BulkId
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage
from app.bulk_persistence.dask.errors import BulkError, BulkNotFound
......@@ -29,6 +29,7 @@ from app.clients.storage_service_client import get_storage_record_service
from app.record_utils import fetch_record
from app.bulk_persistence.mime_types import MimeTypes
from app.model.model_chunking import GetDataParams
from app.model.log_bulk import LogBulkHelper
from app.utils import Context, OpenApiHandler, get_ctx
from app.persistence.sessions_storage import (Session, SessionException, SessionState, SessionUpdateMode)
from app.routers.common_parameters import (
......@@ -163,7 +164,7 @@ class DataFrameRender:
return Response(pdf.to_parquet(engine="pyarrow"), media_type=MimeTypes.PARQUET.type)
def get_bulk_uri(record):
def get_bulk_uri_osdu(record):
return record.data.get('ExtensionProperties', {}).get('wdms', {}).get(BULK_URI_FIELD, None)
......@@ -270,16 +271,21 @@ async def get_data_version(
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
):
record = await fetch_record(ctx, record_id, version)
bulk_uri = get_bulk_uri(record)
bulk_urn = get_bulk_uri_osdu(record)
if bulk_urn is not None:
bulk_id, prefix = BulkId.bulk_urn_decode(bulk_urn)
else: # fallback on ddms_v2 Persistence for wks:log schema
bulk_id, prefix = LogBulkHelper.get_bulk_id(record, None)
try:
if bulk_uri is None:
if bulk_id is None:
raise BulkNotFound(record_id=record_id, bulk_id=None)
bulk_id, prefix = BulkId.bulk_urn_decode(bulk_uri)
if prefix != BULK_URN_PREFIX_VERSION:
if prefix == BULK_URN_PREFIX_VERSION:
df = await dask_blob_storage.load_bulk(record_id, bulk_id)
elif prefix is None:
df = await get_dataframe(ctx, bulk_id)
else:
raise BulkNotFound(record_id=record_id, bulk_id=bulk_id)
df = await dask_blob_storage.load_bulk(record_id, bulk_id)
df = await DataFrameRender.process_params(df, ctrl_p)
return await DataFrameRender.df_render(df, ctrl_p, request.headers.get('Accept'))
except BulkError as ex:
ex.raise_as_http()
......@@ -339,7 +345,7 @@ async def complete_session(
record = await fetch_record(ctx, record_id, i_session.session.fromVersion)
previous_bulk_uri = None
bulk_urn = get_bulk_uri(record)
bulk_urn = get_bulk_uri_osdu(record)
if i_session.session.mode == SessionUpdateMode.Update and bulk_urn is not None:
previous_bulk_uri, _prefix = BulkId.bulk_urn_decode(bulk_urn)
......
......@@ -50,6 +50,7 @@ from app.record_utils import fetch_record, update_records
router = APIRouter()
LOGS_API_BASE_PATH = '/logs'
async def get_persistence() -> Persistence:
return Persistence()
......
......@@ -20,6 +20,7 @@ from fastapi import FastAPI, Depends
from fastapi.openapi.utils import get_openapi
from app import __version__, __build_number__, __app_name__
from app import bulk_utils
from app.auth.auth import require_opendes_authorized_user
from app.conf import Config, check_environment
from app.errors.exception_handlers import add_exception_handlers
......@@ -49,7 +50,6 @@ from app.routers.trajectory import trajectory_ddms_v2
from app.routers.dipset import dipset_ddms_v2, dip_ddms_v2
from app.routers.logrecognition import log_recognition
from app.routers.search import search, fast_search, search_v3, fast_search_v3
from app.routers.ddms_v3 import bulk_v3
from app.clients import StorageRecordServiceClient, SearchServiceClient
from app.utils import (
get_http_client_session,
......@@ -222,7 +222,7 @@ wdms_app.include_router(
prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH,
tags=tags, dependencies=dependencies)
wdms_app.include_router(
bulk_v3.router_bulk,
bulk_utils.router_bulk,
prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH,
tags=tags, dependencies=dependencies)
......@@ -232,10 +232,20 @@ wdms_app.include_router(
prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH,
tags=tags, dependencies=dependencies)
wdms_app.include_router(
bulk_v3.router_bulk,
bulk_utils.router_bulk,
prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH,
tags=tags, dependencies=dependencies)
# log bulk v2 APIs
wdms_app.include_router(
sessions.router,
prefix=ALPHA_APIS_PREFIX + DDMS_V2_PATH + log_ddms_v2.LOGS_API_BASE_PATH,
tags=tags, dependencies=dependencies)
wdms_app.include_router(
bulk_utils.router_bulk,
prefix=ALPHA_APIS_PREFIX + DDMS_V2_PATH + log_ddms_v2.LOGS_API_BASE_PATH,
tags=tags, dependencies=dependencies)
# ------------- add alpha feature: ONLY MOUNTED IN DEV AND DA ENVs
def enable_alpha_feature():
""" must be called to enable and activate alpha feature"""
......
This diff is collapsed.
......@@ -23,6 +23,7 @@ import pandas as pd
import pytest
from .fixtures import with_wdms_env
from ..request_builders.wdms.crud.log import build_request_create_log, build_request_delete_log
from ..request_builders.wdms.session import build_delete_session
from ..request_runner import RequestRunner, Request
......@@ -46,10 +47,15 @@ def generate_df(columns, index):
class EntityType(str, Enum):
well_log = "welllogs"
wellbore_trajectory = "wellboretrajectories"
log = "logs"
def build_base_url(entity_type: EntityType) -> str:
return '{{base_url}}/alpha/ddms/v3/' + entity_type.value
if entity_type == 'logs':
version = 'v2'
else:
version = 'v3'
return '{{base_url}}/alpha/ddms/' + version + '/' + entity_type.value
@contextmanager
......@@ -58,6 +64,8 @@ def create_record(env, entity_type: EntityType):
result = build_request_create_osdu_welllog(False).call(env)
elif entity_type == EntityType.wellbore_trajectory:
result = build_request_create_osdu_wellboretrajectory(False).call(env)
elif entity_type == EntityType.log:
result = build_request_create_log().call(env)
else:
raise RuntimeError()
......@@ -75,6 +83,9 @@ def create_record(env, entity_type: EntityType):
build_request_delete_osdu_welllog(record_id).call(env)
elif entity_type == EntityType.wellbore_trajectory:
build_request_delete_osdu_wellboretrajectory(record_id).call(env)
elif entity_type == EntityType.log:
env.set('log_record_id', record_id)
build_request_delete_log().call(env)
def build_request(name, method, url, *, payload=None, headers=None) -> RequestRunner:
......@@ -153,7 +164,7 @@ WELLLOG_URL_PREFIX = 'alpha/ddms/v3/welllogs'
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.parametrize('entity_type', [EntityType.well_log, EntityType.wellbore_trajectory])
@pytest.mark.parametrize('entity_type', [EntityType.well_log, EntityType.wellbore_trajectory, EntityType.log])
@pytest.mark.parametrize('serializer', [ParquetSerializer(), JsonSerializer()])
def test_send_one_chunk_without_session(with_wdms_env, entity_type, serializer):
......@@ -170,7 +181,7 @@ def test_send_one_chunk_without_session(with_wdms_env, entity_type, serializer):
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.parametrize('entity_type', [EntityType.well_log, EntityType.wellbore_trajectory])
@pytest.mark.parametrize('entity_type', [EntityType.well_log, EntityType.wellbore_trajectory, EntityType.log])
@pytest.mark.parametrize('serializer', [ParquetSerializer(), JsonSerializer()])
def test_send_one_chunk_with_session_commit(with_wdms_env, entity_type, serializer):
......@@ -355,7 +366,7 @@ def test_get_data_with_limit_filter(with_wdms_env):
@pytest.mark.tag('chunking', 'smoke')
@pytest.mark.parametrize('entity_type', [EntityType.well_log, EntityType.wellbore_trajectory])
@pytest.mark.parametrize('entity_type', [EntityType.well_log, EntityType.wellbore_trajectory, EntityType.log])
def test_get_data_with_limit_and_offset_filter(with_wdms_env, entity_type):
serializer = ParquetSerializer()
......
......@@ -4,7 +4,7 @@ from io import BytesIO
from fastapi import HTTPException
from app.model.model_chunking import GetDataParams
from app.routers.ddms_v3.bulk_v3 import DataFrameRender, get_df_from_request
from app.bulk_utils import DataFrameRender, get_df_from_request
import pandas as pd
from pandas.testing import assert_frame_equal
......
import io
from app.bulk_persistence.dask.errors import BulkNotFound
from tests.unit.test_utils import nope_logger_fixture
from tempfile import TemporaryDirectory
......@@ -27,6 +29,7 @@ from tests.unit.persistence.dask_blob_storage_test import generate_df
Definitions = {
'WellLog': {
'api_version': 'v3',
'base_url': '/ddms/v3/welllogs',
'chunking_url': '/alpha/ddms/v3/welllogs', # TODO: update when no longer alpha
'kind': 'osdu:wks:work-product-component--WellLog:1.0.0',
......@@ -37,6 +40,7 @@ Definitions = {
},
'WellboreTrajectory': {
'api_version': 'v3',
'base_url': '/ddms/v3/wellboretrajectories',
'chunking_url': '/alpha/ddms/v3/wellboretrajectories', # TODO: update when no longer alpha
'kind': 'osdu:wks:work-product-component--WellboreTrajectory:1.0.0',
......@@ -46,12 +50,19 @@ Definitions = {
"BaseDepthMeasuredDepth": 12345.6,
"VerticalMeasurement": {"VerticalMeasurement": 12345.6}
}
},
'Log': {
'api_version': 'v2',
'base_url': '/ddms/v2/logs',
'chunking_url': '/alpha/ddms/v2/logs', # TODO: update when no longer alpha
'kind': 'osdu:wks:log:1.0.5',
'record_data': {
"name": "myLog_name"
}
}
}
EntityTypeParams = ['WellLog', 'WellboreTrajectory']
EntityTypeParams = ['WellLog', 'WellboreTrajectory', 'Log']
def _create_df_from_response(response):
f = io.BytesIO(response.content)
......@@ -194,7 +205,7 @@ def test_send_all_data_once(setup_client,
data_to_send = create_func(initial_data_df)
headers = {'content-type': content_type_header}
get_response_no_data = client.get(f'{Definitions[entity_type]["base_url"]}/{record_id}/data', headers=headers)
get_response_no_data = client.get(f'{chunking_url}/{record_id}/data', headers=headers)
assert get_response_no_data.status_code == 404
write_response = client.post(f'{chunking_url}/{record_id}/data', data=data_to_send, headers=headers)
......@@ -219,6 +230,58 @@ def test_send_all_data_once(setup_client,
)
@pytest.mark.parametrize("entity_type",
[entity for entity in EntityTypeParams if Definitions[entity]['api_version'] == "v2"])
@pytest.mark.parametrize("content_type_header,create_func", [
('application/json', lambda df: df.to_json(orient='split', date_format='iso')),
])
@pytest.mark.parametrize("accept_content", [
'application/json',
])
@pytest.mark.parametrize("columns", [
['MD', 'X'],
['float_MD', 'float_X'],
['str_MD', 'str_X'],
['date_MD', 'date_X'],
['MD', 'float_X', 'str_X', 'date_X']
])
def test_send_all_data_once_post_data_v2_get_data_v3(setup_client,
entity_type,
columns,
content_type_header,
create_func,
accept_content):
client, tmp_dir = setup_client
record_id = _create_record(client, entity_type)
chunking_url = Definitions[entity_type]['chunking_url']
base_url = Definitions[entity_type]['base_url']
initial_data_df = generate_df(columns, range(5, 13))
data_to_send = create_func(initial_data_df)
headers = {'content-type': content_type_header}
get_response_no_data = client.get(f'{chunking_url}/{record_id}/data', headers=headers)
assert get_response_no_data.status_code == 404
write_response = client.post(f'{base_url}/{record_id}/data', data=data_to_send, headers=headers)
assert write_response.status_code == 200
get_response = client.get(f'{chunking_url}/{record_id}/data', headers={'accept': accept_content})
assert get_response.status_code == 200
result_df = _create_df_from_response(get_response)
if content_type_header.endswith('json'):
initial_data_df = pd.read_json(data_to_send, orient='split')
assert initial_data_df.index.dtype == result_df.index.dtype
assert initial_data_df.shape == result_df.shape
pd.testing.assert_frame_equal(initial_data_df, result_df,
check_dtype=False,
check_column_type=False,
check_datetimelike_compat=True,
)
@pytest.mark.parametrize("entity_type", EntityTypeParams)
@pytest.mark.parametrize("content_type_header, create_func", [
('application/x-parquet', lambda df: df.to_parquet(engine="pyarrow")),
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment