Commit 717620aa authored by Cyril Monmouton's avatar Cyril Monmouton
Browse files

Add tracing for method to read or write bulk

parent fac02b87
......@@ -157,6 +157,7 @@ class DaskBulkStorage:
return self._load(self._get_blob_path(record_id, bulk_id))
@capture_timings('load_bulk', handlers=worker_capture_timing_handlers)
@with_trace('load_bulk')
async def load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
"""Return a dask Dataframe of a record at the specified version."""
try:
......
......@@ -39,7 +39,7 @@ from app.routers.common_parameters import (
json_orient_parameter)
from app.routers.sessions import (SessionInternal, UpdateSessionState, UpdateSessionStateValue,
WithSessionStorages, get_session_dependencies)
from app.helper.traces import with_trace
router_bulk = APIRouter() # router dedicated to bulk APIs
......@@ -48,6 +48,7 @@ BULK_URN_PREFIX_VERSION = "wdms-1"
BULK_URI_FIELD = "bulkURI"
@with_trace("get_df_from_request")
async def get_df_from_request(request: Request, orient: Optional[str] = None) -> pd.DataFrame:
""" extract dataframe from request """
......@@ -122,6 +123,7 @@ class DataFrameRender:
return list(selected)
@staticmethod
@with_trace('process_params')
async def process_params(df, params: GetDataParams):
if params.curves:
selection = list(map(str.strip, params.curves.split(',')))
......@@ -141,6 +143,7 @@ class DataFrameRender:
return df
@staticmethod
@with_trace('df_render')
async def df_render(df, params: GetDataParams, accept: str = None):
if params.describe:
return {
......@@ -205,6 +208,7 @@ async def post_data(record_id: str,
ctx: Context = Depends(get_ctx),
dask_blob_storage: DaskBulkStorage = Depends(with_dask_blob_storage),
):
@with_trace("save_blob")
async def save_blob():
df = await get_df_from_request(request, orient)
return await dask_blob_storage.save_blob(df, record_id)
......@@ -274,17 +278,21 @@ async def get_data_version(
bulk_urn = get_bulk_uri_osdu(record)
if bulk_urn is not None:
bulk_id, prefix = BulkId.bulk_urn_decode(bulk_urn)
else: # fallback on ddms_v2 Persistence for wks:log schema
else:
# fallback on ddms_v2 Persistence for wks:log schema
bulk_id, prefix = LogBulkHelper.get_bulk_id(record, None)
if bulk_id is None:
raise BulkNotFound(record_id=record_id, bulk_id=None)
try:
if bulk_id is None:
raise BulkNotFound(record_id=record_id, bulk_id=None)
if prefix == BULK_URN_PREFIX_VERSION:
df = await dask_blob_storage.load_bulk(record_id, bulk_id)
elif prefix is None:
df = await get_dataframe(ctx, bulk_id)
else:
raise BulkNotFound(record_id=record_id, bulk_id=bulk_id)
df = await DataFrameRender.process_params(df, ctrl_p)
return await DataFrameRender.df_render(df, ctrl_p, request.headers.get('Accept'))
except BulkError as ex:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment