Commit 4667afa0 authored by Cyril Monmouton's avatar Cyril Monmouton
Browse files

Feature/handle int columns bulk v2

parent b82d588d
......@@ -41,7 +41,7 @@ The following software have components provided under the terms of this license:
- pytest-dependency (from https://github.com/RKrahl/pytest-dependency)
- python-dateutil (from https://dateutil.readthedocs.org)
- python-multipart (from http://github.com/andrew-d/python-multipart)
- requests (from http://python-requests.org)
- requests (from https://requests.readthedocs.io)
- rfc3986 (from https://rfc3986.readthedocs.org)
- rsa (from https://stuvel.eu/rsa)
- s3transfer (from https://github.com/boto/s3transfer)
......
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
......@@ -13,20 +13,13 @@
# limitations under the License.
import asyncio
from typing import List, Set, Optional
import re
from contextlib import suppress
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import Response
import dask.dataframe as dd
import pandas as pd
from app.bulk_persistence import DataframeSerializerAsync, JSONOrient, get_dataframe
from app.bulk_persistence import JSONOrient, get_dataframe
from app.bulk_persistence.bulk_id import BulkId
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage
from app.bulk_persistence.dask.errors import BulkError, BulkNotFound
from app.clients.storage_service_client import get_storage_record_service
from app.bulk_persistence.mime_types import MimeTypes
from app.model.model_chunking import GetDataParams
from app.model.log_bulk import LogBulkHelper
......@@ -40,167 +33,19 @@ from app.routers.common_parameters import (
from app.routers.sessions import (SessionInternal, UpdateSessionState, UpdateSessionStateValue,
WithSessionStorages, get_session_dependencies)
from app.routers.record_utils import fetch_record
from app.routers.bulk.utils import (
with_dask_blob_storage, get_check_input_df_func, get_df_from_request,get_bulk_uri_osdu,
set_bulk_field_and_send_record, BULK_URN_PREFIX_VERSION, DataFrameRender)
from app.helper.traces import with_trace
from natsort import natsorted
router_bulk = APIRouter() # router dedicated to bulk APIs
router = APIRouter() # router dedicated to bulk APIs
BULK_URN_PREFIX_VERSION = "wdms-1"
BULK_URI_FIELD = "bulkURI"
OPERATION_IDS = {"record_data": "write_record_data",
"chunk_data": "post_chunk_data"}
def _check_df_columns_type(df: pd.DataFrame):
if any((type(t) is not str for t in df.columns)):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f'All columns type should be string')
@with_trace("get_df_from_request")
async def get_df_from_request(request: Request, orient: Optional[str] = None) -> pd.DataFrame:
""" extract dataframe from request """
ct = request.headers.get('Content-Type', '')
if MimeTypes.PARQUET.match(ct):
content = await request.body() # request.stream()
try:
return await DataframeSerializerAsync().read_parquet(content)
except OSError as err:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f'{err}') # TODO
if MimeTypes.JSON.match(ct):
content = await request.body() # request.stream()
try:
return await DataframeSerializerAsync().read_json(content, orient, convert_axes=False)
except ValueError:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='invalid body') # TODO
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail=f'Invalid content-type, "{ct}" is not supported')
async def with_dask_blob_storage() -> DaskBulkStorage:
ctx = Context.current()
return await ctx.app_injector.get(DaskBulkStorage)
class DataFrameRender:
@staticmethod
async def compute(df):
if isinstance(df, pd.DataFrame):
return df
driver = await with_dask_blob_storage()
return await driver.client.compute(df)
@staticmethod
async def get_size(df):
if isinstance(df, pd.DataFrame):
return len(df.index)
driver = await with_dask_blob_storage()
return await driver.client.submit(lambda: len(df.index))
re_array_selection = re.compile(r'^(?P<name>.+)\[(?P<start>[^:]+):?(?P<stop>.*)\]$')
@staticmethod
def _col_matching(sel, col):
if sel == col: # exact match
return True
m_col = DataFrameRender.re_array_selection.match(col)
if not m_col: # if the column doesn't have an array pattern (col[*])
return False
# compare selection with curve name without array suffix [*]
if sel == m_col['name']: # if selection is 'c', c[*] should match
return True
# range selection use cases c[0:2] should match c[0], c[1] and c[2]
m_sel = DataFrameRender.re_array_selection.match(sel)
if m_sel and m_sel['stop']:
with suppress(ValueError): # suppress int conversion exceptions
if int(m_sel['start']) <= int(m_col['start']) <= int(m_sel['stop']):
return True
return False
@staticmethod
def get_matching_column(selection: List[str], cols: Set[str]) -> List[str]:
selected = set()
for sel in selection:
selected.update(filter(lambda col: DataFrameRender._col_matching(sel, col),
cols.difference(selected)))
return list(selected)
@staticmethod
@with_trace('process_params')
async def process_params(df, params: GetDataParams):
if isinstance(df, pd.DataFrame):
df = dd.from_pandas(df, npartitions=1)
if params.curves:
selection = list(map(str.strip, params.curves.split(',')))
columns = DataFrameRender.get_matching_column(selection, set(df))
df = df[columns]
if params.offset:
head_index = df.head(params.offset, npartitions=-1, compute=False).index
index = await DataFrameRender.compute(head_index) # TODO could be slow!
df = df.loc[~df.index.isin(index)]
if params.limit and params.limit > 0:
df = df.head(params.limit, npartitions=-1, compute=False)
df = df[natsorted(df.columns)]
return df
@staticmethod
@with_trace('df_render')
async def df_render(df, params: GetDataParams, accept: str = None, orient: Optional[JSONOrient] = None):
if params.describe:
return {
"numberOfRows": await DataFrameRender.get_size(df),
"columns": [c for c in df.columns]
}
pdf = await DataFrameRender.compute(df)
pdf.index.name = None # TODO
if not accept or MimeTypes.PARQUET.type in accept:
return Response(pdf.to_parquet(engine="pyarrow"), media_type=MimeTypes.PARQUET.type)
if MimeTypes.JSON.type in accept:
return Response(
pdf.to_json(index=True, date_format='iso', orient=orient.value), media_type=MimeTypes.JSON.type
)
if MimeTypes.CSV.type in accept:
return Response(pdf.to_csv(), media_type=MimeTypes.CSV.type)
# in any other case => Parquet anyway?
return Response(pdf.to_parquet(engine="pyarrow"), media_type=MimeTypes.PARQUET.type)
def get_bulk_uri_osdu(record):
return record.data.get('ExtensionProperties', {}).get('wdms', {}).get(BULK_URI_FIELD, None)
def set_bulk_uri(record, bulk_urn):
return record.data.update({'ExtensionProperties': {'wdms': {BULK_URI_FIELD: bulk_urn}}})
async def set_bulk_field_and_send_record(ctx: Context, bulk_id, record):
bulk_urn = BulkId.bulk_urn_encode(bulk_id, BULK_URN_PREFIX_VERSION)
set_bulk_uri(record, bulk_urn)
# push new version on the storage
storage_client = await get_storage_record_service(ctx)
return await storage_client.create_or_update_records(
record=[record], data_partition_id=ctx.partition_id
)
@OpenApiHandler.set(operation_id=OPERATION_IDS["record_data"], request_body=REQUEST_DATA_BODY_SCHEMA)
@router_bulk.post(
@router.post(
'/{record_id}/data',
summary='Writes data as a whole bulk, creates a new version.',
description="""
......@@ -220,11 +65,12 @@ async def post_data(record_id: str,
orient: JSONOrient = Depends(json_orient_parameter),
ctx: Context = Depends(get_ctx),
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
check_input_df_func=Depends(get_check_input_df_func),
):
@with_trace("save_blob")
async def save_blob():
df = await get_df_from_request(request, orient)
_check_df_columns_type(df)
check_input_df_func(df)
return await dask_blob_storage.save_blob(df, record_id)
record, bulk_id = await asyncio.gather(
......@@ -235,7 +81,7 @@ async def post_data(record_id: str,
@OpenApiHandler.set(operation_id=OPERATION_IDS["chunk_data"], request_body=REQUEST_DATA_BODY_SCHEMA)
@router_bulk.post(
@router.post(
"/{record_id}/sessions/{session_id}/data",
summary="Send a data chunk. Session must be complete/commit once all chunks are sent.",
description="Send a data chunk. Session must be complete/commit once all chunks are sent. "
......@@ -252,6 +98,7 @@ async def post_chunk_data(record_id: str,
orient: JSONOrient = Depends(json_orient_parameter),
with_session: WithSessionStorages = Depends(get_session_dependencies),
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
check_input_df_func=Depends(get_check_input_df_func),
):
i_session = await with_session.get_session(record_id, session_id)
if i_session.session.state != SessionState.Open:
......@@ -260,11 +107,11 @@ async def post_chunk_data(record_id: str,
detail=f"Session cannot accept data, state={i_session.session.state}")
df = await get_df_from_request(request, orient)
_check_df_columns_type(df)
check_input_df_func(df)
await dask_blob_storage.session_add_chunk(i_session.session, df)
@router_bulk.get(
@router.get(
'/{record_id}/versions/{version}/data',
summary='Returns data of the specified version.',
description='Returns the data of a specific version according to the specified query parameters.'
......@@ -314,7 +161,7 @@ async def get_data_version(
ex.raise_as_http()
@router_bulk.get(
@router.get(
"/{record_id}/data",
summary='Returns the data according to the specified query parameters.',
description='Returns the data according to the specified query parameters.'
......@@ -343,7 +190,7 @@ async def get_data(
return await get_data_version(record_id, None, request, ctrl_p, orient, ctx, dask_blob_storage)
@router_bulk.patch(
@router.patch(
"/{record_id}/sessions/{session_id}",
summary='Update a session, either commit or abandon.',
response_model=Session
......
from fastapi import HTTPException, Request, status
from fastapi.routing import APIRoute
from typing import List, Set, Optional
import re
from contextlib import suppress
from fastapi.responses import Response
import dask.dataframe as dd
import pandas as pd
from natsort import natsorted
from app.clients.storage_service_client import get_storage_record_service
from app.bulk_persistence import DataframeSerializerAsync
from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage
from app.bulk_persistence.mime_types import MimeTypes
from app.bulk_persistence import BulkId, JSONOrient
from app.utils import get_ctx, OpenApiHandler, Context
from app.helper.traces import with_trace
from app.model.model_chunking import GetDataParams
BULK_URN_PREFIX_VERSION = "wdms-1"
BULK_URI_FIELD = "bulkURI"
def update_operation_ids(wdms_app):
""" Ensure all operation_id are uniques """
operation_ids = set()
for route in wdms_app.routes:
if isinstance(route, APIRoute):
if route.operation_id in operation_ids:
# duplicate detected
new_operation_id = route.unique_id
if route.operation_id in OpenApiHandler._handlers:
OpenApiHandler._handlers[new_operation_id] = OpenApiHandler._handlers[route.operation_id]
route.operation_id = new_operation_id
else:
operation_ids.add(route.operation_id)
async def set_v3_input_dataframe_check(request: Request):
"""
Inject into request state (c.f. https://www.starlette.io/requests/#other-state)
the check function. It aims for v3 bulk APIs
"""
request.state.check_input_df_func = _check_df_columns_type
async def set_legacy_input_dataframe_check(request: Request):
"""
Inject into request state (c.f. https://www.starlette.io/requests/#other-state) the check function.
For legacy routes, the check function is set according to content-type:
- parquet: no backward compatibility required, same function that v3 bulk
- json: backward compatibility required, the check function will cast column name type as string
"""
content_type = request.headers.get('Content-Type')
if content_type == 'application/x-parquet':
request.state.check_input_df_func = _check_df_columns_type
else:
request.state.check_input_df_func = _check_df_columns_type_legacy
def get_check_input_df_func(request: Request):
"""
Retrieve from request state (c.f. https://www.starlette.io/requests/#other-state) the injected input check function.
This function is injected when mounting the bulk router into the fastApi app as router's 'dependencies'
in module app/wdms_app.
NOTE: attribute name 'check_input_df_func' which contains the function should be IDENTICAL
that defined in above functions
"""
if not request.state.check_input_df_func:
return lambda x: True
return request.state.check_input_df_func
def _check_df_columns_type_legacy(df: pd.DataFrame):
""" If given dataframe contains columns name which is not a string, cast it """
if any((type(t) is not str for t in df.columns)):
get_ctx().logger.warning("_check_df_columns_type_legacy() - df columns type casted")
df.columns = map(str, df.columns)
return True
def _check_df_columns_type(df: pd.DataFrame):
""" Ensure given dataframe contains columns name as string only as described by WellLog schemas """
if any((type(t) is not str for t in df.columns)):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f'All columns type should be string')
return True
@with_trace("get_df_from_request")
async def get_df_from_request(request: Request, orient: Optional[str] = None) -> pd.DataFrame:
""" Extract dataframe from request """
ct = request.headers.get('Content-Type', '')
if MimeTypes.PARQUET.match(ct):
content = await request.body() # request.stream()
try:
return await DataframeSerializerAsync().read_parquet(content)
except OSError as err:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f'{err}') # TODO
if MimeTypes.JSON.match(ct):
content = await request.body() # request.stream()
try:
return await DataframeSerializerAsync().read_json(content, orient, convert_axes=False)
except ValueError:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='invalid body') # TODO
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail=f'Invalid content-type, "{ct}" is not supported')
async def with_dask_blob_storage() -> DaskBulkStorage:
return await get_ctx().app_injector.get(DaskBulkStorage)
class DataFrameRender:
@staticmethod
async def compute(df):
if isinstance(df, pd.DataFrame):
return df
driver = await with_dask_blob_storage()
return await driver.client.compute(df)
@staticmethod
async def get_size(df):
if isinstance(df, pd.DataFrame):
return len(df.index)
driver = await with_dask_blob_storage()
return await driver.client.submit(lambda: len(df.index))
re_array_selection = re.compile(r'^(?P<name>.+)\[(?P<start>[^:]+):?(?P<stop>.*)\]$')
@staticmethod
def _col_matching(sel, col):
if sel == col: # exact match
return True
m_col = DataFrameRender.re_array_selection.match(col)
if not m_col: # if the column doesn't have an array pattern (col[*])
return False
# compare selection with curve name without array suffix [*]
if sel == m_col['name']: # if selection is 'c', c[*] should match
return True
# range selection use cases c[0:2] should match c[0], c[1] and c[2]
m_sel = DataFrameRender.re_array_selection.match(sel)
if m_sel and m_sel['stop']:
with suppress(ValueError): # suppress int conversion exceptions
if int(m_sel['start']) <= int(m_col['start']) <= int(m_sel['stop']):
return True
return False
@staticmethod
def get_matching_column(selection: List[str], cols: Set[str]) -> List[str]:
selected = set()
for sel in selection:
selected.update(filter(lambda col: DataFrameRender._col_matching(sel, col),
cols.difference(selected)))
return list(selected)
@staticmethod
@with_trace('process_params')
async def process_params(df, params: GetDataParams):
if isinstance(df, pd.DataFrame):
df = dd.from_pandas(df, npartitions=1)
if params.curves:
selection = list(map(str.strip, params.curves.split(',')))
columns = DataFrameRender.get_matching_column(selection, set(df))
df = df[columns]
if params.offset:
head_index = df.head(params.offset, npartitions=-1, compute=False).index
index = await DataFrameRender.compute(head_index) # TODO could be slow!
df = df.loc[~df.index.isin(index)]
if params.limit and params.limit > 0:
df = df.head(params.limit, npartitions=-1, compute=False)
df = df[natsorted(df.columns)]
return df
@staticmethod
@with_trace('df_render')
async def df_render(df, params: GetDataParams, accept: str = None, orient: Optional[JSONOrient] = None):
if params.describe:
return {
"numberOfRows": await DataFrameRender.get_size(df),
"columns": [c for c in df.columns]
}
pdf = await DataFrameRender.compute(df)
pdf.index.name = None # TODO
if not accept or MimeTypes.PARQUET.type in accept:
return Response(pdf.to_parquet(engine="pyarrow"), media_type=MimeTypes.PARQUET.type)
if MimeTypes.JSON.type in accept:
return Response(
pdf.to_json(index=True, date_format='iso', orient=orient.value), media_type=MimeTypes.JSON.type
)
if MimeTypes.CSV.type in accept:
return Response(pdf.to_csv(), media_type=MimeTypes.CSV.type)
# in any other case => Parquet anyway?
return Response(pdf.to_parquet(engine="pyarrow"), media_type=MimeTypes.PARQUET.type)
def get_bulk_uri_osdu(record):
return record.data.get('ExtensionProperties', {}).get('wdms', {}).get(BULK_URI_FIELD, None)
def set_bulk_uri(record, bulk_urn):
return record.data.update({'ExtensionProperties': {'wdms': {BULK_URI_FIELD: bulk_urn}}})
async def set_bulk_field_and_send_record(ctx: Context, bulk_id, record):
bulk_urn = BulkId.bulk_urn_encode(bulk_id, BULK_URN_PREFIX_VERSION)
set_bulk_uri(record, bulk_urn)
# push new version on the storage
storage_client = await get_storage_record_service(ctx)
return await storage_client.create_or_update_records(
record=[record], data_partition_id=ctx.partition_id
)
......@@ -30,7 +30,7 @@ from app.injector.app_injector import AppInjector
from app.injector.main_injector import MainInjector
from app.middleware import CreateBasicContextMiddleware, TracingMiddleware
from app.middleware.basic_context_middleware import require_data_partition_id
from app.routers import probes, about, sessions, bulk_utils
from app.routers import probes, about, sessions
from app.routers.ddms_v2 import (
ddms_v2,
wellbore_ddms_v2,
......@@ -45,6 +45,7 @@ from app.routers.ddms_v3 import (
welllog_ddms_v3,
wellbore_trajectory_ddms_v3,
markerset_ddms_v3)
from app.routers.bulk import bulk_routes
from app.routers.trajectory import trajectory_ddms_v2
from app.routers.dipset import dipset_ddms_v2, dip_ddms_v2
from app.routers.logrecognition import log_recognition
......@@ -56,6 +57,7 @@ from app.utils import (
get_wdms_temp_dir,
get_pool_executor,
POOL_EXECUTOR_MAX_WORKER)
from app.routers.bulk.utils import update_operation_ids, set_v3_input_dataframe_check, set_legacy_input_dataframe_check
base_app = FastAPI()
......@@ -142,28 +144,12 @@ async def shutdown_event():
await get_http_client_session().close()
def update_operation_ids():
# Ensure all operation_id are uniques
from fastapi.routing import APIRoute
operation_ids = set()
for route in wdms_app.routes:
if isinstance(route, APIRoute):
if route.operation_id in operation_ids:
# duplicate detected
new_operation_id = route.unique_id
if route.operation_id in OpenApiHandler._handlers:
OpenApiHandler._handlers[new_operation_id] = OpenApiHandler._handlers[route.operation_id]
route.operation_id = new_operation_id
else:
operation_ids.add(route.operation_id)
DDMS_V2_PATH = '/ddms/v2'
DDMS_V3_PATH = '/ddms/v3'
ALPHA_APIS_PREFIX = '/alpha'
basic_dependencies = [
Depends(require_data_partition_id, use_cache=False),
Depends(require_opendes_authorized_user, use_cache=False)
Depends(require_opendes_authorized_user, use_cache=False),
]
wdms_app.include_router(probes.router)
......@@ -212,6 +198,7 @@ wdms_app.include_router(log_recognition.router, prefix='/log-recognition',
dependencies=basic_dependencies)
alpha_tags = ['ALPHA feature: bulk data chunking']
v3_bulk_dependencies = [*basic_dependencies, Depends(set_v3_input_dataframe_check)]
for bulk_prefix, bulk_tags, is_visible in [(ALPHA_APIS_PREFIX + DDMS_V3_PATH, alpha_tags, False),
(DDMS_V3_PATH, [], True)
......@@ -225,10 +212,10 @@ for bulk_prefix, bulk_tags, is_visible in [(ALPHA_APIS_PREFIX + DDMS_V3_PATH, al
include_in_schema=is_visible)
wdms_app.include_router(
bulk_utils.router_bulk,
bulk_routes.router,
prefix=bulk_prefix + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH,
tags=bulk_tags if bulk_tags else ["WellLog"],
dependencies=basic_dependencies,
dependencies=v3_bulk_dependencies,
include_in_schema=is_visible)
# wellbore trajectory bulk v3 APIs
......@@ -240,10 +227,10 @@ for bulk_prefix, bulk_tags, is_visible in [(ALPHA_APIS_PREFIX + DDMS_V3_PATH, al
include_in_schema=is_visible)
wdms_app.include_router(
bulk_utils.router_bulk,
bulk_routes.router,
prefix=bulk_prefix + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH,
tags=bulk_tags if bulk_tags else ["Trajectory v3"],
dependencies=basic_dependencies,
dependencies=v3_bulk_dependencies,
include_in_schema=is_visible)
# log bulk v2 APIs
......@@ -253,14 +240,13 @@ wdms_app.include_router(
tags=alpha_tags,
dependencies=basic_dependencies)