Commit 286f2c5b authored by Luc Yriarte's avatar Luc Yriarte
Browse files

Merge branch 'master' into extensions-as-modules

parents f758021c 781e71e1
Pipeline #51808 passed with stages
in 13 minutes and 1 second
......@@ -28,7 +28,7 @@ variables:
AWS_INT_TEST_TYPE: python
OSDU_GCP_PROJECT_NAME: nice-etching-277309
OSDU_GCP_CLUSTER: primary
OSDU_GCP_CLUSTER: asm-primary
OSDU_GCP_ZONE: us-central1-c
OSDU_GCP_SERVICE: wellbore
OSDU_GCP_HELM_DEPLOYMENT_DIR: devops/gcp/osdu-helm
......
......@@ -15,7 +15,7 @@
import json
import asyncio
from io import BytesIO
from typing import Union, AnyStr, IO, Optional, List
from typing import Union, AnyStr, IO, Optional, List, Dict
from pathlib import Path
import numpy as np
......@@ -48,23 +48,12 @@ class DataframeSerializerSync:
columns: List[Union[str, int, float]] = None
index: List[Union[str, int, float]] = None
class IndexFormat(BaseModel):
# TODO
pass
class ColumnFormat(BaseModel):
# TODO
pass
class RecordsFormat(BaseModel):
# TODO
pass
__root__: Dict[str, Dict[Union[str, int, float], Union[str, int, float]]]
schema_dict = {
JSONOrient.split: SplitFormat.schema(),
JSONOrient.index: IndexFormat.schema(),
JSONOrient.columns: ColumnFormat.schema(),
JSONOrient.records: RecordsFormat.schema()
JSONOrient.columns: ColumnFormat.schema()
}
return schema_dict[JSONOrient.get(orient)]
......
......@@ -17,12 +17,8 @@ from typing import Union
class JSONOrient(str, Enum):
# not allow 'table' because very verbose then comes with significant overhead
# not allow 'values' because cannot carry index nor column
split = "split"
index = "index"
columns = "columns"
records = "records"
@classmethod
def get(cls, orient: Union[str, "JSONOrient"]) -> "JSONOrient":
......
......@@ -369,6 +369,7 @@ def check_environment(configuration):
AUTHORIZATION_HEADER_NAME = 'Authorization'
APP_KEY_HEADER_NAME = 'appKey'
APP_ID_HEADER_NAME = 'x-app-id'
CORRELATION_ID_HEADER_NAME = 'correlation-id'
REQUEST_ID_HEADER_NAME = 'Request-ID'
PARTITION_ID_HEADER_NAME = 'data-partition-id'
......@@ -87,6 +87,13 @@ class TracingMiddleware(BaseHTTPMiddleware):
tracer.add_attribute_to_current_span(
attribute_key=conf.CORRELATION_ID_HEADER_NAME,
attribute_value=correlation_id)
ctx_partition_id = get_or_create_ctx().partition_id
partition_id = ctx_partition_id if ctx_partition_id is not None \
else request.headers.get(conf.PARTITION_ID_HEADER_NAME)
tracer.add_attribute_to_current_span(
attribute_key=conf.PARTITION_ID_HEADER_NAME,
attribute_value=partition_id)
request_content_type = request.headers.get("Content-type")
tracer.add_attribute_to_current_span(attribute_key="request.header Content-type",
......@@ -96,6 +103,10 @@ class TracingMiddleware(BaseHTTPMiddleware):
tracer.add_attribute_to_current_span(attribute_key="request.header Content-length",
attribute_value=request_content_length)
app_id = request.headers.get(conf.APP_ID_HEADER_NAME)
tracer.add_attribute_to_current_span(attribute_key=conf.APP_ID_HEADER_NAME,
attribute_value=app_id)
@staticmethod
def _after_request(request: Request, response: Response, tracer):
......
......@@ -47,7 +47,8 @@ router_bulk = APIRouter() # router dedicated to bulk APIs
BULK_URN_PREFIX_VERSION = "wdms-1"
BULK_URI_FIELD = "bulkURI"
OPERATION_IDS = {"record_data": "write_record_data",
"chunk_data": "post_chunk_data"}
def _check_df_columns_type(df: pd.DataFrame):
if any((type(t) is not str for t in df.columns)):
......@@ -151,7 +152,7 @@ class DataFrameRender:
@staticmethod
@with_trace('df_render')
async def df_render(df, params: GetDataParams, accept: str = None):
async def df_render(df, params: GetDataParams, accept: str = None, orient: Optional[JSONOrient] = None):
if params.describe:
return {
"numberOfRows": await DataFrameRender.get_size(df),
......@@ -165,7 +166,9 @@ class DataFrameRender:
return Response(pdf.to_parquet(engine="pyarrow"), media_type=MimeTypes.PARQUET.type)
if MimeTypes.JSON.type in accept:
return Response(pdf.to_json(index=True, date_format='iso'), media_type=MimeTypes.JSON.type)
return Response(
pdf.to_json(index=True, date_format='iso', orient=orient.value), media_type=MimeTypes.JSON.type
)
if MimeTypes.CSV.type in accept:
return Response(pdf.to_csv(), media_type=MimeTypes.CSV.type)
......@@ -193,7 +196,7 @@ async def set_bulk_field_and_send_record(ctx: Context, bulk_id, record):
)
@OpenApiHandler.set(operation_id="post_data", request_body=REQUEST_DATA_BODY_SCHEMA)
@OpenApiHandler.set(operation_id=OPERATION_IDS["record_data"], request_body=REQUEST_DATA_BODY_SCHEMA)
@router_bulk.post(
'/{record_id}/data',
summary='Writes data as a whole bulk, creates a new version.',
......@@ -204,7 +207,7 @@ any previous bulk. Previous bulk versions are accessible via the get bulk data v
Support JSON and Parquet format ('Content_Type' must be set accordingly).
In case of JSON the orient must be set accordingly. Support http chunked encoding transfer.
""" + REQUIRED_ROLES_WRITE,
operation_id="write_record_data",
operation_id=OPERATION_IDS["record_data"],
responses={
404: {},
200: {}
......@@ -228,7 +231,7 @@ async def post_data(record_id: str,
return await set_bulk_field_and_send_record(ctx=ctx, bulk_id=bulk_id, record=record)
@OpenApiHandler.set(operation_id="post_chunk_data", request_body=REQUEST_DATA_BODY_SCHEMA)
@OpenApiHandler.set(operation_id=OPERATION_IDS["chunk_data"], request_body=REQUEST_DATA_BODY_SCHEMA)
@router_bulk.post(
"/{record_id}/sessions/{session_id}/data",
summary="Send a data chunk. Session must be complete/commit once all chunks are sent.",
......@@ -237,7 +240,7 @@ async def post_data(record_id: str,
"Support JSON and Parquet format ('Content_Type' must be set accordingly). "
"In case of JSON the orient must be set accordingly. Support http chunked encoding."
+ REQUIRED_ROLES_WRITE,
operation_id="post_chunk_data",
operation_id=OPERATION_IDS["chunk_data"],
responses={400: {"error": "Record not found"}}
)
async def post_chunk_data(record_id: str,
......@@ -280,6 +283,7 @@ async def get_data_version(
record_id: str, version: int,
request: Request,
ctrl_p: GetDataParams = Depends(),
orient: JSONOrient = Depends(json_orient_parameter),
ctx: Context = Depends(get_ctx),
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
):
......@@ -302,7 +306,7 @@ async def get_data_version(
raise BulkNotFound(record_id=record_id, bulk_id=bulk_id)
df = await DataFrameRender.process_params(df, ctrl_p)
return await DataFrameRender.df_render(df, ctrl_p, request.headers.get('Accept'))
return await DataFrameRender.df_render(df, ctrl_p, request.headers.get('Accept'), orient=orient)
except BulkError as ex:
ex.raise_as_http()
......@@ -329,10 +333,11 @@ async def get_data(
record_id: str,
request: Request,
ctrl_p: GetDataParams = Depends(),
orient: JSONOrient = Depends(json_orient_parameter),
ctx: Context = Depends(get_ctx),
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
):
return await get_data_version(record_id, None, request, ctrl_p, ctx, dask_blob_storage)
return await get_data_version(record_id, None, request, ctrl_p, orient, ctx, dask_blob_storage)
@router_bulk.patch(
......
......@@ -48,5 +48,5 @@ class DMSV3RouterUtils:
try:
delfi_id = ConverterUtils.decode_id(entity_id)
return DMSV3RouterUtils.is_delfi_id(delfi_id), delfi_id
except ValueError as e:
except (ValueError, LookupError):
return False, None
......@@ -142,10 +142,29 @@ async def shutdown_event():
await get_http_client_session().close()
def update_operation_ids():
# Ensure all operation_id are uniques
from fastapi.routing import APIRoute
operation_ids = set()
for route in wdms_app.routes:
if isinstance(route, APIRoute):
if route.operation_id in operation_ids:
# duplicate detected
new_operation_id = route.unique_id
if route.operation_id in OpenApiHandler._handlers:
OpenApiHandler._handlers[new_operation_id] = OpenApiHandler._handlers[route.operation_id]
route.operation_id = new_operation_id
else:
operation_ids.add(route.operation_id)
DDMS_V2_PATH = '/ddms/v2'
DDMS_V3_PATH = '/ddms/v3'
ALPHA_APIS_PREFIX = '/alpha'
basic_dependencies = [
Depends(require_data_partition_id, use_cache=False),
Depends(require_opendes_authorized_user, use_cache=False)
]
wdms_app.include_router(probes.router)
wdms_app.include_router(about.router, prefix=DDMS_V2_PATH)
......@@ -161,96 +180,95 @@ ddms_v2_routes_groups = [
(dipset_ddms_v2, "Dipset"),
(dip_ddms_v2, "Dips"),
]
for ddms_v2_routes_group in ddms_v2_routes_groups:
wdms_app.include_router(ddms_v2_routes_group[0].router,
for v2_api, tag in ddms_v2_routes_groups:
wdms_app.include_router(v2_api.router,
prefix=DDMS_V2_PATH,
tags=[ddms_v2_routes_group[1]],
dependencies=[
Depends(require_opendes_authorized_user, use_cache=False),
Depends(require_data_partition_id, use_cache=False)
])
tags=[tag],
dependencies=basic_dependencies)
ddms_v3_routes_groups = [
(wellbore_ddms_v3, "Wellbore"),
(well_ddms_v3, "Well"),
(welllog_ddms_v3, "WellLog"),
(wellbore_trajectory_ddms_v3, "Trajectory"),
(wellbore_trajectory_ddms_v3, "Trajectory v3"),
(markerset_ddms_v3, "Marker"),
]
for ddms_v3_routes_group in ddms_v3_routes_groups:
wdms_app.include_router(ddms_v3_routes_group[0].router,
for v3_api, tag in ddms_v3_routes_groups:
wdms_app.include_router(v3_api.router,
prefix=DDMS_V3_PATH,
tags=[ddms_v3_routes_group[1]],
dependencies=[
Depends(require_opendes_authorized_user, use_cache=False),
Depends(require_data_partition_id, use_cache=False)
])
wdms_app.include_router(search.router, prefix='/ddms', tags=['search'], dependencies=[
Depends(require_data_partition_id, use_cache=False),
Depends(require_opendes_authorized_user, use_cache=False)
])
wdms_app.include_router(fast_search.router, prefix='/ddms', tags=['fast-search'], dependencies=[
Depends(require_data_partition_id, use_cache=False),
Depends(require_opendes_authorized_user, use_cache=False)])
wdms_app.include_router(search_v3.router, prefix='/ddms/v3', tags=['search'], dependencies=[
Depends(require_data_partition_id, use_cache=False),
Depends(require_opendes_authorized_user, use_cache=False)
])
wdms_app.include_router(fast_search_v3.router, prefix='/ddms/v3', tags=['fast-search'], dependencies=[
Depends(require_data_partition_id, use_cache=False),
Depends(require_opendes_authorized_user, use_cache=False)
])
wdms_app.include_router(log_recognition.router, prefix='/log-recognition', tags=['log-recognition'], dependencies=[
Depends(require_data_partition_id, use_cache=False),
Depends(require_opendes_authorized_user, use_cache=False)])
dependencies = [Depends(require_data_partition_id, use_cache=False),
Depends(require_opendes_authorized_user, use_cache=False)]
tags = ['ALPHA feature: bulk data chunking']
# welllog bulk v3 APIs
wdms_app.include_router(
sessions.router,
prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH,
tags=tags, dependencies=dependencies)
wdms_app.include_router(
bulk_utils.router_bulk,
prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH,
tags=tags, dependencies=dependencies)
# wellbore trajectory bulk v3 APIs
wdms_app.include_router(
sessions.router,
prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH,
tags=tags, dependencies=dependencies)
wdms_app.include_router(
bulk_utils.router_bulk,
prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH,
tags=tags, dependencies=dependencies)
tags=[tag],
dependencies=basic_dependencies)
wdms_app.include_router(search.router, prefix='/ddms', tags=['search'], dependencies=basic_dependencies)
wdms_app.include_router(fast_search.router, prefix='/ddms', tags=['fast-search'], dependencies=basic_dependencies)
wdms_app.include_router(search_v3.router, prefix='/ddms/v3', tags=['search v3'], dependencies=basic_dependencies)
wdms_app.include_router(fast_search_v3.router, prefix='/ddms/v3', tags=['fast-search v3'],
dependencies=basic_dependencies)
wdms_app.include_router(log_recognition.router, prefix='/log-recognition',
tags=['log-recognition'],
dependencies=basic_dependencies)
alpha_tags = ['ALPHA feature: bulk data chunking']
for bulk_prefix, bulk_tags, is_visible in [(ALPHA_APIS_PREFIX + DDMS_V3_PATH, alpha_tags, False),
(DDMS_V3_PATH, [], True)
]:
# welllog bulk v3 APIs
wdms_app.include_router(
sessions.router,
prefix=bulk_prefix + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH,
tags=bulk_tags if bulk_tags else ["WellLog"],
dependencies=basic_dependencies,
include_in_schema=is_visible)
wdms_app.include_router(
bulk_utils.router_bulk,
prefix=bulk_prefix + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH,
tags=bulk_tags if bulk_tags else ["WellLog"],
dependencies=basic_dependencies,
include_in_schema=is_visible)
# wellbore trajectory bulk v3 APIs
wdms_app.include_router(
sessions.router,
prefix=bulk_prefix + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH,
tags=bulk_tags if bulk_tags else ["Trajectory v3"],
dependencies=basic_dependencies,
include_in_schema=is_visible)
wdms_app.include_router(
bulk_utils.router_bulk,
prefix=bulk_prefix + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH,
tags=bulk_tags if bulk_tags else ["Trajectory v3"],
dependencies=basic_dependencies,
include_in_schema=is_visible)
# log bulk v2 APIs
wdms_app.include_router(
sessions.router,
prefix=ALPHA_APIS_PREFIX + DDMS_V2_PATH + log_ddms_v2.LOGS_API_BASE_PATH,
tags=tags, dependencies=dependencies)
tags=alpha_tags,
dependencies=basic_dependencies)
wdms_app.include_router(
bulk_utils.router_bulk,
prefix=ALPHA_APIS_PREFIX + DDMS_V2_PATH + log_ddms_v2.LOGS_API_BASE_PATH,
tags=tags, dependencies=dependencies)
tags=alpha_tags,
dependencies=basic_dependencies)
#The multiple instanciation of bulk_utils router create some duplicates operation_id
update_operation_ids()
# ------------- add alpha feature: ONLY MOUNTED IN DEV AND DA ENVs
def enable_alpha_feature():
""" must be called to enable and activate alpha feature"""
logger.get_logger().warning("Enabling alpha feature: chunking")
# logger.get_logger().warning("Enabling alpha feature")
# include alpha routers down below #
pass
# order is last executed first
......
......@@ -7,6 +7,7 @@ httpx
numpy
pandas
pyarrow
openapi-spec-validator
# Note since 3.8 includes Mock 4.0+.
mock>=4.0
......
This diff is collapsed.
......@@ -152,10 +152,8 @@ class ParquetSerializer:
class JsonSerializer:
mime_type = 'application/json'
# TODO There's an inconsistency in service, cannot specify orient in json and default is 'columns'
# (which different from legacy which is 'split')
def read(self, json_content):
return pd.read_json(json_content, orient='columns')
return pd.read_json(json_content, orient='split')
def dump(self, df):
return df.to_json(orient='split')
......
......@@ -3,6 +3,7 @@ from io import BytesIO
from fastapi import HTTPException
from app.bulk_persistence import JSONOrient
from app.model.model_chunking import GetDataParams
from app.routers.bulk_utils import DataFrameRender, get_df_from_request
import pandas as pd
......@@ -75,10 +76,11 @@ async def test_df_render_accept_parquet(default_get_params, basic_dataframe, acc
@pytest.mark.asyncio
async def test_df_render_accept_json(default_get_params, basic_dataframe):
response = await DataFrameRender.df_render(basic_dataframe, default_get_params, "application/json")
@pytest.mark.parametrize("orient", [JSONOrient.split, JSONOrient.columns])
async def test_df_render_accept_json(default_get_params, basic_dataframe, orient):
response = await DataFrameRender.df_render(basic_dataframe, default_get_params, "application/json", orient)
assert 'application/json' == response.headers.get('Content-Type')
actual = pd.read_json(response.body, orient='columns')
actual = pd.read_json(response.body, orient=orient)
assert_frame_equal(basic_dataframe, actual)
......
......@@ -33,17 +33,11 @@ dataframe_dict = {
'split': {'index': Reference_df.index.tolist(),
'columns': Reference_df.columns.tolist(),
'data': Reference_df.values.tolist()},
'index': {
str(row_val): {
str(col_val): Reference_df[col_val].tolist()[count] for col_val in Reference_df.columns.tolist()
} for count, row_val in enumerate(Reference_df.index.tolist())
},
'columns': {
str(col_val): {
str(row_val): Reference_df[col_val].tolist()[count] for count, row_val in enumerate(Reference_df.index.tolist())
} for col_val in Reference_df.columns.tolist()
},
'records': [{c: v for c, v in zip(Reference_df.columns, row_values)} for row_values in Reference_df.values]
}
}
......
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from pytest_httpx import HTTPXMock
from app.clients import make_storage_record_client, make_search_client
from app.utils import Context, get_or_create_ctx
from tests.unit.test_utils import ctx_fixture
@pytest.mark.asyncio
async def test_fwd_correlation_id_to_outgoing_request_to_storage(ctx_fixture: Context, httpx_mock: HTTPXMock):
storage_url = "http://example.com" # well formed url required
expected_correlation_id = 'some-correlation-id'
ctx = ctx_fixture.with_correlation_id(expected_correlation_id).with_auth("foobar")
Context.set_current(ctx)
# safety: make sure no methods on tracer have been called yet
assert ctx.tracer.method_calls == []
async with make_storage_record_client(storage_url) as storage_client:
httpx_mock.add_response(match_headers={'correlation-id': expected_correlation_id})
# force to use endpoint which does not return a response to skip model validation
response = await storage_client.delete_record(id="123", data_partition_id="test")
assert response is not None
# make sure correlation-id is traced when doing a request to storage
ctx.tracer.add_attribute_to_current_span.assert_any_call(
attribute_key='correlation-id',
attribute_value=expected_correlation_id
)
@pytest.mark.asyncio
async def test_fwd_correlation_id_to_outgoing_request_to_search(ctx_fixture: Context, httpx_mock: HTTPXMock):
storage_url = "http://example.com" # well formed url required
expected_correlation_id = 'some-correlation-id'
ctx = ctx_fixture.with_correlation_id(expected_correlation_id).with_auth("foobar")
Context.set_current(ctx)
# safety: make sure no methods on tracer have been called yet
assert ctx.tracer.method_calls == []
async with make_search_client(storage_url) as search_client:
httpx_mock.add_response(match_headers={'correlation-id': expected_correlation_id})
# force to use endpoint which does not return a response to skip model validation
response = await search_client.delete_index(kind="kind", data_partition_id="test")
assert response is not None
# make sure correlation-id is traced when doing a request to search
ctx.tracer.add_attribute_to_current_span.assert_any_call(
attribute_key='correlation-id',
attribute_value=expected_correlation_id
)
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from opencensus.trace import base_exporter
from fastapi.testclient import TestClient
import pytest
from app.wdms_app import wdms_app, DDMS_V2_PATH
from app.utils import get_or_create_ctx
from tests.unit.test_utils import NopeLogger
# Initialize traces exporter in app with a custom one to allow validating our traces
class ExporterInTest(base_exporter.Exporter):
def __init__(self) -> None:
self.exported = []
def export(self, span_datas):
self.exported += span_datas
def find(self, correlation_id):
for sd in self.exported:
if sd.attributes.get('correlation-id') == correlation_id:
return sd
@pytest.fixture()
def ctx_fixture():
""" Create context with a real tracer in it """
ctx = get_or_create_ctx().set_current_with_value(logger=NopeLogger())
yield ctx
@pytest.fixture
def client(ctx_fixture):
yield TestClient(wdms_app)
wdms_app.dependency_overrides = {}
def build_url(path: str):
return DDMS_V2_PATH + path
def test_about_call_creates_correlation_id_if_absent(client: TestClient):
# Initialize traces exporter in app, like it is in app's startup_event
wdms_app.trace_exporter = ExporterInTest()
# no header -> works fine
response = client.get(build_url("/about"))
assert response.status_code == 200
# one call was exported, with correlation-id
assert len(wdms_app.trace_exporter.exported) == 1 # one call => one export
spandata = wdms_app.trace_exporter.exported[0]
assert 'correlation-id' in spandata.attributes.keys()
assert spandata.attributes['correlation-id'] is not None
def test_about_call_traces_existing_correlation_id(client: TestClient):