diff --git a/app/bulk_persistence/__init__.py b/app/bulk_persistence/__init__.py index 6ef5dafef28ee5b3b005e50b4da4935dcbb0752a..a2eeabcb23671584504e6275a65936fa9449bc29 100644 --- a/app/bulk_persistence/__init__.py +++ b/app/bulk_persistence/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .bulk_id import BulkId +from .bulk_uri import BulkURI from .dataframe_persistence import create_and_store_dataframe, get_dataframe from .dataframe_serializer import DataframeSerializerAsync, DataframeSerializerSync from .json_orient import JSONOrient diff --git a/app/bulk_persistence/bulk_id.py b/app/bulk_persistence/bulk_id.py index 1dd223f1beaa2b1ca2651d20b137ed1268357a8d..442af905f2577caf3543d602d9bf8ee2c6e20cc4 100644 --- a/app/bulk_persistence/bulk_id.py +++ b/app/bulk_persistence/bulk_id.py @@ -13,27 +13,7 @@ # limitations under the License. import uuid -from typing import Tuple, Optional -class BulkId: - @staticmethod - def new_bulk_id() -> str: - return str(uuid.uuid4()) - - @classmethod - def bulk_urn_encode(cls, bulk_id: str, prefix: str = None) -> str: - if prefix: - return f'urn:{prefix}:uuid:{uuid.UUID(bulk_id)}' - return uuid.UUID(bulk_id).urn - - - # Returns a tuple ( : str, : str) - @classmethod - def bulk_urn_decode(cls, urn: str) -> Tuple[str, Optional[str]]: - if urn is None: - raise ValueError('attempted to decode empty urn') - parts = urn.split(":") - if len(parts) < 4: - return str(uuid.UUID(urn)), None - return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2]) +def new_bulk_id() -> str: + return str(uuid.uuid4()) diff --git a/app/bulk_persistence/bulk_storage_version.py b/app/bulk_persistence/bulk_storage_version.py new file mode 100644 index 0000000000000000000000000000000000000000..39b072bfbd767e7da3a49c6f520c21739cb72bde --- /dev/null +++ b/app/bulk_persistence/bulk_storage_version.py @@ -0,0 +1,21 @@ +from typing import Optional, NamedTuple + + +class BulkStorageVersion(NamedTuple): + """ This is the version of the bulk storage engine """ + + version: str + """ unique version identifier """ + + uri_prefix: Optional[str] + """ associated uri prefix """ + + +BulkStorageVersion_V0 = BulkStorageVersion(version='0', uri_prefix=None) +""" first bulk management implementation with direct management to blob storage with a single blob """ + +BulkStorageVersion_V1 = BulkStorageVersion(version='1', uri_prefix="wdms-1") +""" version 1, using Dask to handle bulk manipulation and storage """ + +BulkStorageVersion_Invalid = BulkStorageVersion(version='', uri_prefix=None) +""" represent an invalid/undefined storage version """ diff --git a/app/bulk_persistence/bulk_uri.py b/app/bulk_persistence/bulk_uri.py new file mode 100644 index 0000000000000000000000000000000000000000..493e6515fb61d5e270cbbf9034a15504c8a71f7f --- /dev/null +++ b/app/bulk_persistence/bulk_uri.py @@ -0,0 +1,123 @@ +from typing import Optional, Tuple +import uuid + +from .bulk_storage_version import ( + BulkStorageVersion, BulkStorageVersion_V0, BulkStorageVersion_V1, BulkStorageVersion_Invalid) + + +class BulkURI: + """ + Bulk URI, contains the bulk identifier (bulk_id) and Storage engine version which identifies how + the bulk is stored. + + Usage: + - ctor from URI string value: + `bulk_uri = BulkURI.decode(uri_str)` + + - ctor explicit given a bulk_id and a storage version: + `bulk_uri = BulkURI(bulk_id=bulk_id_value, version=BulkStorageVersion_V1)` + + - ctor explict using class method: + `bulk_uri = BulkURI.from_bulk_storage_V1(bulk_id_value)` + + - encode to URI string value: + `uri_str: str = bulk_uri.encode()` + + - check which storage engine version is: + `bulk_uri.storage_version == BulkStorageVersion_V0` + `bulk_uri.is_bulk_storage_V0()` + """ + + def __init__(self, bulk_id: str, version: BulkStorageVersion): + """ + make an new one or invalid + Either pass uri alone or bulk_id, version + :param bulk_id: expected as a valid uuid + :param version: storage version + :throw: ValueError + """ + if not bulk_id or not version or version == BulkStorageVersion_Invalid: + bulk_id = '' + version = BulkStorageVersion_Invalid + else: + # ensure valid uuid + uuid.UUID(bulk_id) + + self._bulk_id = bulk_id + self._storage_version = version + + @classmethod + def invalid(cls): + """ make an invalid instance """ + return cls('', BulkStorageVersion_Invalid) + + @classmethod + def decode(cls, uri: str) -> 'BulkURI': + """ + construct a BulkURI from an encoded URI + :throw: ValueError + """ + if not uri: + return BulkURI.invalid() + bulk_id, prefix = cls._decode_uri(uri) + if not prefix: + version = BulkStorageVersion_V0 + elif prefix == BulkStorageVersion_V1.uri_prefix: + version = BulkStorageVersion_V1 + else: + raise ValueError('Unsupported prefix in bulk URI: ' + prefix) + return cls(bulk_id=bulk_id, version=version) + + def is_bulk_storage_V0(self) -> bool: + """ convenient check that returns True is version == BulkStorageVersions.V0 """ + return self._storage_version.version == BulkStorageVersion_V0.version + + @classmethod + def from_bulk_storage_V0(cls, bulk_id: str) -> 'BulkURI': + """ construct a BulkURI for storage engine V0 given a bulk id """ + return cls(bulk_id=bulk_id, version=BulkStorageVersion_V0) + + @classmethod + def from_bulk_storage_V1(cls, bulk_id: str) -> 'BulkURI': + """ construct a BulkURI for storage engine V1 given a bulk id """ + return cls(bulk_id=bulk_id, version=BulkStorageVersion_V1) + + @property + def bulk_id(self) -> str: + return self._bulk_id + + @property + def storage_version(self) -> BulkStorageVersion: + return self._storage_version + + def encode(self) -> str: + """ + encode to uri as string + If the prefix is not empty returns, uri format = `urn:$prefix:uuid:$bulk_id` + If the prefix is empty or None, uri format = `urn:uuid:$bulk_id` + :Throw: ValueError + """ + if self._storage_version.uri_prefix: + return f'urn:{self._storage_version.uri_prefix}:uuid:{self._bulk_id}' + return uuid.UUID(self._bulk_id).urn + + @classmethod + def _decode_uri(cls, uri: str) -> Tuple[str, Optional[str]]: + """ + Decode urn into uuid and optional prefix. Returns tuple [uuid, prefix]. + If urn is `urn:$prefix:uuid:$bulk_id`, will return [$bulk_id, $prefix] + If urn is `urn:uuid:$bulk_id`, will return [$bulk_id, None] + :throw: ValueError if urn empty or invalid UUID + """ + if uri is None: + raise ValueError('attempted to decode empty urn') + parts = uri.split(":") + if len(parts) < 4: + return str(uuid.UUID(uri)), None + return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2]) + + def is_valid(self) -> bool: + """ check invalid """ + if self._bulk_id and self._storage_version.version: + return True + return False diff --git a/app/bulk_persistence/dask/dask_bulk_storage.py b/app/bulk_persistence/dask/dask_bulk_storage.py index 4bb8971b2c6de671cecab7d66ca2febbc252fa17..f84a57dfbac4c04b47c48670509e0974f3dfe8cc 100644 --- a/app/bulk_persistence/dask/dask_bulk_storage.py +++ b/app/bulk_persistence/dask/dask_bulk_storage.py @@ -27,18 +27,19 @@ from dask.distributed import Client as DaskDistributedClient from pyarrow.lib import ArrowException import pyarrow.parquet as pa -from app.bulk_persistence import BulkId -from app.bulk_persistence.dask.session_file_meta import SessionFileMeta +from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters + from app.helper.logger import get_logger from app.helper.traces import with_trace from app.persistence.sessions_storage import Session from app.utils import DaskClient, capture_timings, get_ctx -from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters from .errors import BulkNotFound, BulkNotProcessable, internal_bulk_exceptions from .traces import wrap_trace_process from .utils import by_pairs, do_merge, worker_capture_timing_handlers from .dask_worker_plugin import DaskWorkerPlugin +from .session_file_meta import SessionFileMeta +from ..bulk_id import new_bulk_id def pandas_to_parquet(pdf, path, opt): @@ -215,7 +216,7 @@ class DaskBulkStorage: """ df_columns = set(df.columns) pyarrow_reserved_columns_found = list(filter(lambda v: re.match(r'__index_level_\d+__', v), df_columns)) - + if pyarrow_reserved_columns_found or '__null_dask_index__' in df_columns: raise BulkNotProcessable("Invalid column name") @@ -224,7 +225,7 @@ class DaskBulkStorage: @with_trace('save_blob') async def save_blob(self, ddf: dd.DataFrame, record_id: str, bulk_id: str = None): """Write the data frame to the blob storage.""" - bulk_id = bulk_id or BulkId.new_bulk_id() + bulk_id = bulk_id or new_bulk_id() if isinstance(ddf, pd.DataFrame): self._check_incoming_chunk(ddf) diff --git a/app/bulk_persistence/dataframe_persistence.py b/app/bulk_persistence/dataframe_persistence.py index 2a248f416ad3443358a4167730056889e2b218bb..8eb343fbc4c2620714f3d49b4fe9ef732801ecc7 100644 --- a/app/bulk_persistence/dataframe_persistence.py +++ b/app/bulk_persistence/dataframe_persistence.py @@ -25,7 +25,7 @@ from .blob_storage import ( create_and_write_blob, read_blob, ) -from .bulk_id import BulkId +from .bulk_id import new_bulk_id from .mime_types import MimeTypes from .tenant_provider import resolve_tenant from ..helper.traces import with_trace @@ -33,10 +33,10 @@ from ..helper.traces import with_trace async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str: """Store bulk on a blob storage""" - new_bulk_id = BulkId.new_bulk_id() + bulk_id = new_bulk_id() tenant = await resolve_tenant(ctx.partition_id) async with create_and_write_blob( - df, file_exporter=BlobFileExporters.PARQUET, blob_id=new_bulk_id + df, file_exporter=BlobFileExporters.PARQUET, blob_id=bulk_id ) as bulkblob: storage: BlobStorageBase = await ctx.app_injector.get(BlobStorageBase) await storage.upload( diff --git a/app/model/log_bulk.py b/app/model/log_bulk.py index 0605657037c7aced58ceb7aa81c643a4e0669eea..266af91998638bc3f2d657aada1fd4346addf08a 100644 --- a/app/model/log_bulk.py +++ b/app/model/log_bulk.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional, Tuple +from typing import Optional, Union from jsonpath_ng import parse as parse_jsonpath from jsonpath_ng.jsonpath import Parent as JsonParent from odes_storage.models import Record -from app.bulk_persistence import BulkId +from app.bulk_persistence.bulk_uri import BulkURI class LogBulkHelper: @@ -33,33 +33,26 @@ class LogBulkHelper: return record.data @classmethod - def _set_bulk_id_in_wks(cls, record: Record, bulk_id, prefix: str) -> None: - """ for now it used externalIds, to _get_bulk_id_from_wksbe updated once schema is fixed with log.data.bulkId """ - bulk_urn = BulkId.bulk_urn_encode(bulk_id, prefix=prefix) - cls._get_record_data_dict(record).setdefault('log', {})['bulkURI'] = bulk_urn - - @classmethod - def _get_bulk_id_from_wks(cls, record: Record) -> Optional[str]: + def _get_bulk_uri_from_wks(cls, record: Record) -> Optional[str]: bulk_uri = ( cls._get_record_data_dict(record) .get("log", {}) .get("bulkURI", None) ) - return BulkId.bulk_urn_decode(bulk_uri) if bulk_uri else (None, None) + return bulk_uri @classmethod - def update_bulk_id( - cls, record: Record, bulk_id, custom_bulk_id_path: Optional[str] = None, prefix: Optional[str] = None - ): + def update_bulk_uri(cls, record: Record, bulk_uri: BulkURI, custom_bulk_id_path: Optional[str] = None): """ Update bulk id within a log record. Note that the custom path cannot be applied when using a strict structured model It creates the field if not exist :param record: record to update. - :param bulk_id: bulk reference (id, uri ...) to set + :param bulk_uri: either already encode uri as string or BulkURI :param custom_bulk_id_path: !! incompatible with log model """ + uri_value = bulk_uri.encode() if custom_bulk_id_path is None: # what about empty string ? - cls._set_bulk_id_in_wks(record, bulk_id, prefix) + cls._get_record_data_dict(record).setdefault('log', {})['bulkURI'] = uri_value else: record_dict = {"data": record.data} @@ -67,28 +60,23 @@ class LogBulkHelper: field_name = custom_bulk_id_path.split(".")[-1] json_exp = parse_jsonpath(custom_bulk_id_path).child(JsonParent()) - json_exp.find(record_dict)[0].value[ - field_name - ] = BulkId.bulk_urn_encode(bulk_id, prefix=prefix) + json_exp.find(record_dict)[0].value[field_name] = uri_value # if only support existing field, it can be done with a simple update call # parse_jsonpath(custom_bulk_id_path).update(record, bulk_ref) record.data = record_dict["data"] @classmethod - def get_bulk_id( - cls, record: Record, custom_bulk_id_path: Optional[str] = None - ) -> Tuple[Optional[str], Optional[str]]: + def get_bulk_uri(cls, record: Record, custom_bulk_id_path: Optional[str] = None) -> BulkURI: """ :param record: :param custom_bulk_id_path: !! incompatible with log model - :return: bulk id if any else None + :return: BulkURI, could be invalid if none """ if custom_bulk_id_path is None: # what about empty string ? - return cls._get_bulk_id_from_wks(record) + return BulkURI.decode(cls._get_bulk_uri_from_wks(record)) record_dict = {"data": record.data} matches = parse_jsonpath(custom_bulk_id_path).find(record_dict) if len(matches) > 0: - return BulkId.bulk_urn_decode(matches[0].value) - return None, None - + return BulkURI.decode(matches[0].value) + return BulkURI.invalid() diff --git a/app/routers/bulk/bulk_routes.py b/app/routers/bulk/bulk_routes.py index d6e173978aa50c4681709ad92c6335e8fa58dbc4..d21a391ac5c911310ea449e6b95684b3efa57cbb 100644 --- a/app/routers/bulk/bulk_routes.py +++ b/app/routers/bulk/bulk_routes.py @@ -33,8 +33,7 @@ from app.routers.sessions import (SessionInternal, UpdateSessionState, UpdateSes from app.routers.record_utils import fetch_record from app.routers.bulk.utils import (with_dask_blob_storage, get_check_input_df_func, get_df_from_request, set_bulk_field_and_send_record, DataFrameRender, _check_df_columns_type_legacy) -from app.routers.bulk.bulk_uri_dependencies import (get_bulk_id_access, BulkIdAccess, - BULK_URN_PREFIX_VERSION) +from app.routers.bulk.bulk_uri_dependencies import get_bulk_id_access, BulkIdAccess from app.helper.traces import with_trace @@ -143,13 +142,21 @@ async def get_data_version( bulk_uri_access: BulkIdAccess = Depends(get_bulk_id_access) ): record = await fetch_record(ctx, record_id, version) - bulk_id, prefix = bulk_uri_access.get_bulk_uri(record=record) # TODO PATH logv2 + try: + bulk_uri = bulk_uri_access.get_bulk_uri(record=record) # TODO PATH logv2 + except ValueError: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail='Record contains an invalid bulk URI') stat = None try: - if bulk_id is None: + if not bulk_uri.is_valid(): raise BulkNotFound(record_id=record_id, bulk_id=None) - if prefix == BULK_URN_PREFIX_VERSION: + bulk_id = bulk_uri.bulk_id + if bulk_uri.is_bulk_storage_V0(): + df = await get_dataframe(ctx, bulk_id) + _check_df_columns_type_legacy(df) + else: columns = None if data_param.curves: stat = dask_blob_storage.read_stat(record_id, bulk_id) @@ -160,11 +167,6 @@ async def get_data_version( # loading the dataframe with filter on columns is faster than filtering columns on df df = await dask_blob_storage.load_bulk(record_id, bulk_id, columns=columns) - elif prefix is None: - df = await get_dataframe(ctx, bulk_id) - _check_df_columns_type_legacy(df) - else: - raise BulkNotFound(record_id=record_id, bulk_id=bulk_id) df = await DataFrameRender.process_params(df, data_param) return await DataFrameRender.df_render(df, data_param, request.headers.get('Accept'), orient=orient, stat=stat) @@ -228,24 +230,32 @@ async def complete_session( _internal = i_session.internal # <= contains details details, may be irrelevant or not needed record = await fetch_record(ctx, record_id, i_session.session.fromVersion) - previous_bulk_uri = None + previous_bulk_id = None if i_session.session.mode == SessionUpdateMode.Update: - previous_bulk_uri, prefix = bulk_uri_access.get_bulk_uri(record) # TODO PATH for logv2 - if previous_bulk_uri is not None and prefix != BULK_URN_PREFIX_VERSION: + try: + previous_bulk_uri = bulk_uri_access.get_bulk_uri(record) # TODO PATH logv2 + except ValueError: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f'Record with version {i_session.session.fromVersion} from which ' + f'update contains an invalid bulk URI') + + if previous_bulk_uri.is_bulk_storage_V0(): try: - df = await get_dataframe(ctx, previous_bulk_uri) + df = await get_dataframe(ctx, previous_bulk_uri.bulk_id) # convert old bulk to new one - previous_bulk_uri = await dask_blob_storage.save_blob(df, record_id=record_id) + previous_bulk_id = await dask_blob_storage.save_blob(df, record_id=record_id) except ResourceNotFoundException: - BulkNotFound(record_id=record_id, bulk_id=previous_bulk_uri).raise_as_http() + BulkNotFound(record_id=record_id, bulk_id=previous_bulk_id).raise_as_http() + else: + previous_bulk_id = previous_bulk_uri.bulk_id - new_bulk_uri = await dask_blob_storage.session_commit(i_session.session, previous_bulk_uri) + new_bulk_id = await dask_blob_storage.session_commit(i_session.session, previous_bulk_id) # ==============> # ==============> UPDATE META DATA HERE (baseDepth, ...) <============== # ==============> - await set_bulk_field_and_send_record(ctx, new_bulk_uri, record, bulk_uri_access) + await set_bulk_field_and_send_record(ctx, new_bulk_id, record, bulk_uri_access) i_session = commit_guard.session i_session.session.meta = i_session.session.meta or {} diff --git a/app/routers/bulk/bulk_uri_dependencies.py b/app/routers/bulk/bulk_uri_dependencies.py index 9dd49059149547b69e96c20465cb1f61586e3c09..ce9dad211841d33bdc5825df8d90eaedb4cddc49 100644 --- a/app/routers/bulk/bulk_uri_dependencies.py +++ b/app/routers/bulk/bulk_uri_dependencies.py @@ -1,52 +1,47 @@ import abc from abc import ABC -from typing import Optional, Tuple +from typing import Optional from fastapi import Request -from app.bulk_persistence import BulkId +from app.bulk_persistence.bulk_uri import BulkURI from app.model.log_bulk import LogBulkHelper -BULK_URN_PREFIX_VERSION = "wdms-1" BULK_URI_FIELD = "bulkURI" class BulkIdAccess(ABC): @staticmethod @abc.abstractmethod - def get_bulk_uri(record) -> Tuple[str, Optional[str]]: + def get_bulk_uri(record) -> BulkURI: ... @staticmethod @abc.abstractmethod - def set_bulk_uri(record, bulk_id): + def set_bulk_uri(record, bulk_id: str): ... class OsduBulkIdAccess(BulkIdAccess): @staticmethod - def get_bulk_uri(record) -> Tuple[str, Optional[str]]: - bulk_urn = record.data.get("ExtensionProperties", {}).get("wdms", {}).get(BULK_URI_FIELD, None) - if bulk_urn: - return BulkId.bulk_urn_decode(bulk_urn) - return None, None + def get_bulk_uri(record) -> BulkURI: + return BulkURI.decode(record.data.get("ExtensionProperties", {}).get("wdms", {}).get(BULK_URI_FIELD, None)) @staticmethod - def set_bulk_uri(record, bulk_id): - bulk_urn = BulkId.bulk_urn_encode(bulk_id, BULK_URN_PREFIX_VERSION) - record.data.setdefault("ExtensionProperties", {}).setdefault("wdms", {})[BULK_URI_FIELD] = bulk_urn + def set_bulk_uri(record, bulk_id: str): + bulk_uri = BulkURI.from_bulk_storage_V1(bulk_id=bulk_id) + record.data.setdefault("ExtensionProperties", {}).setdefault("wdms", {})[BULK_URI_FIELD] = bulk_uri.encode() class LogBulkIdAccess(BulkIdAccess): @staticmethod - def get_bulk_uri(record, custom_bulk_id_path: Optional[str] = None) -> Tuple[str, Optional[str]]: - return LogBulkHelper.get_bulk_id(record=record, custom_bulk_id_path=custom_bulk_id_path) + def get_bulk_uri(record, custom_bulk_id_path: Optional[str] = None) -> BulkURI: + return LogBulkHelper.get_bulk_uri(record=record, custom_bulk_id_path=custom_bulk_id_path) @staticmethod - def set_bulk_uri(record, bulk_id: str, custom_bulk_id_path: Optional[str] = None): - LogBulkHelper.update_bulk_id( - record=record, bulk_id=bulk_id, prefix=BULK_URN_PREFIX_VERSION, custom_bulk_id_path=custom_bulk_id_path - ) + def set_bulk_uri(record, bulk_id: str): + LogBulkHelper.update_bulk_uri(record=record, + bulk_uri=BulkURI.from_bulk_storage_V1(bulk_id=bulk_id)) async def set_log_bulk_id_access(request: Request): diff --git a/app/routers/ddms_v2/log_ddms_v2.py b/app/routers/ddms_v2/log_ddms_v2.py index 9dcece12ff9e184a8cb3e10bc854721a1d211675..8b3851e7035bffe2b3d0fc4de338f7000ce3ed26 100644 --- a/app/routers/ddms_v2/log_ddms_v2.py +++ b/app/routers/ddms_v2/log_ddms_v2.py @@ -38,6 +38,7 @@ from odes_storage.models import ( from pydantic import BaseModel, Field from app.bulk_persistence import DataframeSerializerAsync, DataframeSerializerSync, JSONOrient, MimeTypes, get_dataframe +from app.bulk_persistence.bulk_uri import BulkURI from app.clients.storage_service_client import get_storage_record_service from app.model.log_bulk import LogBulkHelper from app.model.model_curated import log @@ -196,7 +197,8 @@ async def _write_log_data( fetch_record(ctx, logid), ) # update the record - LogBulkHelper.update_bulk_id(log_record, bulk_id, bulk_path) + bulk_uri = BulkURI.from_bulk_storage_V0(bulk_id=bulk_id) + LogBulkHelper.update_bulk_uri(log_record, bulk_uri, bulk_path) # push new version on the storage return await update_records(ctx, [log_record]) @@ -408,12 +410,12 @@ async def get_log_data_statistics(logid: str, # we may use an optimistic cache here log_record = await fetch_record(ctx, logid) # use dict to support the custom path - bulk_id, _prefix = LogBulkHelper.get_bulk_id(log_record, bulk_id_path) - if bulk_id is None: - content = '{}' # no bulk - else: - df = await get_dataframe(ctx, bulk_id) + bulk_uri = LogBulkHelper.get_bulk_uri(log_record, bulk_id_path) + if bulk_uri.is_valid(): + df = await get_dataframe(ctx, bulk_uri.bulk_id) content = df.describe(include="all").to_json() + else: + content = '{}' # no bulk return Response(content=content, media_type=MimeTypes.JSON.type) diff --git a/app/routers/ddms_v2/persistence.py b/app/routers/ddms_v2/persistence.py index 1b81eadd313cf75a83beb7280ac0cdcbf0eb7ec5..109237d20f88c8bcfa6320a20323c9d2c8a1c8f1 100644 --- a/app/routers/ddms_v2/persistence.py +++ b/app/routers/ddms_v2/persistence.py @@ -31,12 +31,12 @@ class Persistence: record: Record, bulk_id_path: str, ) -> pd.DataFrame: - bulk_id, _prefix = LogBulkHelper.get_bulk_id(record, bulk_id_path) + bulk_uri = LogBulkHelper.get_bulk_uri(record, bulk_id_path) # TODO use prefix to know how to read the bulk - if bulk_id is None: + if not bulk_uri.is_valid(): return pd.DataFrame() - return await get_dataframe(ctx, bulk_id) + return await get_dataframe(ctx, bulk_uri.bulk_id) @classmethod async def write_bulk(cls, ctx: Context, dataframe) -> str: diff --git a/app/routers/dipset/persistence.py b/app/routers/dipset/persistence.py index 0679946052ea66bb0aad9fb7b3bdca99762bdafd..6cb39e10b57208f9ff2331ef9cf37aefa39e75ea 100644 --- a/app/routers/dipset/persistence.py +++ b/app/routers/dipset/persistence.py @@ -35,7 +35,7 @@ from app.model.model_curated import ( ) from app.model.model_utils import from_record, to_record from app.routers.dipset.dip_model import Dip -from app.bulk_persistence import get_dataframe, create_and_store_dataframe, BulkId +from app.bulk_persistence import get_dataframe, create_and_store_dataframe, BulkURI async def create_missing_logs(ctx, my_dipset: dipset): """ @@ -246,9 +246,9 @@ def df_to_dips(dataframe: pd.DataFrame) -> List[Dip]: #TODO refactor duplicate with trajectory -async def write_bulk(ctx, dataframe: pd.DataFrame) -> str: +async def write_bulk(ctx, dataframe: pd.DataFrame) -> BulkURI: bulk_id = await create_and_store_dataframe(ctx, dataframe) - return BulkId.bulk_urn_encode(bulk_id) + return BulkURI.from_bulk_storage_V0(bulk_id) async def write_dipset_data(ctx, dataframe: pd.DataFrame, ds: Union[dipset, str]) -> dipset: @@ -260,7 +260,8 @@ async def write_dipset_data(ctx, dataframe: pd.DataFrame, ds: Union[dipset, str] dataframe.sort_values(by=["reference", "azimuth"], inplace=True, ignore_index=True) # Write data in storage and update dipset bulk URI - my_dipset.data.bulkURI = await write_bulk(ctx, dataframe) + bulk_uri = await write_bulk(ctx, dataframe) + my_dipset.data.bulkURI = bulk_uri.encode() # Create or update logs await create_missing_logs(ctx, my_dipset) @@ -294,9 +295,9 @@ async def read_dipset_data(ctx, ds: Union[dipset, str]) -> Tuple[dipset, pd.Data return my_dipset, pd.DataFrame() # Fetch data - bulk_uri, _prefix = BulkId.bulk_urn_decode(my_dipset.data.bulkURI) + bulk_uri = BulkURI.decode(my_dipset.data.bulkURI) # TODO use prefix to know how to read the bulk - df = await get_dataframe(ctx, bulk_uri) + df = await get_dataframe(ctx, bulk_uri.bulk_id) return my_dipset, df diff --git a/app/routers/trajectory/persistence.py b/app/routers/trajectory/persistence.py index 75e3187511b94d097b8bbbd982d78674d9c5c5a3..48145c99d36ee1c2c65fc9341951ca7d69ea473a 100644 --- a/app/routers/trajectory/persistence.py +++ b/app/routers/trajectory/persistence.py @@ -13,7 +13,7 @@ # limitations under the License. import pandas as pd -from app.bulk_persistence import BulkId, NoBulkException, UnknownChannelsException, InvalidBulkException +from app.bulk_persistence import BulkURI, NoBulkException, UnknownChannelsException, InvalidBulkException from app.model.model_curated import trajectory as Trajectory from app.bulk_persistence import get_dataframe, create_and_store_dataframe @@ -46,9 +46,9 @@ class Persistence: raise NoBulkException try: - bulkid, _prefix = BulkId.bulk_urn_decode(record.data.bulkURI) + bulk_uri = BulkURI.decode(record.data.bulkURI) # TODO use prefix to know how to read the bulk - df = await get_dataframe(ctx, bulkid) + df = await get_dataframe(ctx, bulk_uri.bulk_id) except Exception as ex: raise InvalidBulkException(ex) from ex @@ -60,8 +60,7 @@ class Persistence: except KeyError as key_error: # unknown channels raise UnknownChannelsException(key_error) - @classmethod - async def write_bulk(cls, ctx, dataframe: pd.DataFrame) -> str: + async def write_bulk(cls, ctx, dataframe: pd.DataFrame) -> BulkURI: bulk_id = await create_and_store_dataframe(ctx, dataframe) - return BulkId.bulk_urn_encode(bulk_id) + return BulkURI.from_bulk_storage_V0(bulk_id) diff --git a/app/routers/trajectory/trajectory_ddms_v2.py b/app/routers/trajectory/trajectory_ddms_v2.py index dcdebc3617dd604f69b508f6a78e0dc879f2a625..9cca3acd8a8d307c2335d91ec761c8dc3da1c395 100644 --- a/app/routers/trajectory/trajectory_ddms_v2.py +++ b/app/routers/trajectory/trajectory_ddms_v2.py @@ -236,7 +236,8 @@ async def post_traj_data( trajectory_record = await fetch_trajectory_record(ctx, trajectoryid) record = from_record(Trajectory, trajectory_record) - record.data.bulkURI = await persistence.write_bulk(ctx, df) + bulk_uri = await persistence.write_bulk(ctx, df) + record.data.bulkURI = bulk_uri.encode() # update record's channels if not record.data.channels: diff --git a/tests/unit/bulk_persistence/bulk_id_test.py b/tests/unit/bulk_persistence/bulk_id_test.py index 12a219830de574bada5056ecc778def6d4e9aeca..5bc385fc6a49ef3a4d1994483b4255f650ed2923 100644 --- a/tests/unit/bulk_persistence/bulk_id_test.py +++ b/tests/unit/bulk_persistence/bulk_id_test.py @@ -13,24 +13,9 @@ # limitations under the License. import pytest -from app.bulk_persistence import BulkId +from app.bulk_persistence.bulk_id import new_bulk_id import uuid def test_bulk_id_is_an_uuid(): - uuid.UUID(BulkId.new_bulk_id()) - -# urn decode test -def test_decode_urn_no_prefix(): - uuid, prefix = BulkId.bulk_urn_decode("urn:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8") - assert uuid == "489768d2-eee1-4a8f-ae95-7b0c30b0dcd8" - assert prefix is None - -def test_decode_urn_with_prefix(): - uuid, prefix = BulkId.bulk_urn_decode("urn:myprefix:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8") - assert uuid == "489768d2-eee1-4a8f-ae95-7b0c30b0dcd8" - assert prefix == 'myprefix' - -def test_decode_urn_none(): - with pytest.raises(ValueError): - uuid, prefix = BulkId.bulk_urn_decode(None) + uuid.UUID(new_bulk_id()) diff --git a/tests/unit/bulk_persistence/bulk_uri_test.py b/tests/unit/bulk_persistence/bulk_uri_test.py new file mode 100644 index 0000000000000000000000000000000000000000..6a3d562ba9e5e2a3c0ff747a192b025279337462 --- /dev/null +++ b/tests/unit/bulk_persistence/bulk_uri_test.py @@ -0,0 +1,79 @@ +# Copyright 2021 Schlumberger +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +from app.bulk_persistence.bulk_uri import BulkURI +from app.bulk_persistence.bulk_storage_version import ( + BulkStorageVersion_V0, BulkStorageVersion_V1, BulkStorageVersion_Invalid) + + +# urn decode test +def test_from_uri_without_prefix(): + uri_str = 'urn:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' + + bulk_uri = BulkURI.decode(uri_str) + assert bulk_uri.bulk_id == '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' + assert bulk_uri.is_bulk_storage_V0() + assert bulk_uri.storage_version == BulkStorageVersion_V0 + assert bulk_uri.storage_version.uri_prefix is None + assert bulk_uri.is_valid() + + # should encode back to the same uri + assert bulk_uri.encode() == uri_str + + +def test_decode_urn_with_prefix(): + uri_str = f'urn:{BulkStorageVersion_V1.uri_prefix}:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' + + bulk_uri = BulkURI.decode(uri_str) + assert bulk_uri.bulk_id == '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' + assert not bulk_uri.is_bulk_storage_V0() + assert bulk_uri.storage_version == BulkStorageVersion_V1 + assert bulk_uri.storage_version.uri_prefix == BulkStorageVersion_V1.uri_prefix + assert bulk_uri.is_valid() + + # should encode back to the same uri + assert bulk_uri.encode() == uri_str + + +@pytest.mark.parametrize("bulk_id, version", [ + ('489768d2-eee1-4a8f-ae95-7b0c30b0dcd8', None), + ('', BulkStorageVersion_Invalid), + ('489768d2-eee1-4a8f-ae95-7b0c30b0dcd8', BulkStorageVersion_Invalid), + ('', None), + ('', BulkStorageVersion_V1), +]) +def test_invalid_uri(bulk_id, version): + invalid_uri = BulkURI(bulk_id, version) + assert not invalid_uri.is_valid() + assert not invalid_uri.bulk_id + assert invalid_uri.storage_version == BulkStorageVersion_Invalid + + # explicit encode raises + with pytest.raises(ValueError): + invalid_uri.encode() + + +def test_decode_urn_invalid_input_should_throw(): + # bad formed urn format + with pytest.raises(ValueError): + BulkURI.decode('invalid_urn_uri') + + # bulk_id not a valid UUID + with pytest.raises(ValueError): + BulkURI.decode('urn:uuid:invalid_uuid') + + # unknown prefix + with pytest.raises(ValueError): + BulkURI.decode('urn:UNKNOWN_PREFIX:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8') diff --git a/tests/unit/model/log_bulk_test.py b/tests/unit/model/log_bulk_test.py index cab5cde35c85e0effc119da53953dc8bd8d010dc..b5483639f16e6390fba91b4721997f308a861dd2 100644 --- a/tests/unit/model/log_bulk_test.py +++ b/tests/unit/model/log_bulk_test.py @@ -11,9 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from app.model.log_bulk import LogBulkHelper -from app.bulk_persistence import BulkId +from app.bulk_persistence.bulk_uri import BulkURI from tests.unit.test_utils import basic_record import uuid import pytest @@ -21,42 +20,42 @@ import pytest @pytest.fixture def record_with_bulkURI(basic_record): - basic_record.data = {'custombulkid':'toto', 'log': {'bulkURI': str(uuid.uuid4())}} + basic_record.data = {'custombulkid':'toto', 'log': {'bulkURI': uuid.uuid4().urn}} return basic_record -def test_bulk_id_is_an_uuid(): - uuid.UUID(BulkId.new_bulk_id()) - - def test_update_bulk_id(record_with_bulkURI): - b_id = str(uuid.uuid4()) - LogBulkHelper.update_bulk_id(record_with_bulkURI, b_id) - assert record_with_bulkURI.data['log']['bulkURI'] == uuid.UUID(b_id).urn + b_id = uuid.uuid4() + b_uri = BulkURI.from_bulk_storage_V0(str(b_id)) + LogBulkHelper.update_bulk_uri(record_with_bulkURI, b_uri) + assert record_with_bulkURI.data['log']['bulkURI'] == b_id.urn def test_update_bulk_id_with_path(record_with_bulkURI): - b_id = str(uuid.uuid4()) - LogBulkHelper.update_bulk_id(record_with_bulkURI, b_id, "data.custombulkid") - assert record_with_bulkURI.data['custombulkid'] == uuid.UUID(b_id).urn + b_id = uuid.uuid4() + b_uri = BulkURI.from_bulk_storage_V0(str(b_id)) + LogBulkHelper.update_bulk_uri(record_with_bulkURI, b_uri, "data.custombulkid") + assert record_with_bulkURI.data['custombulkid'] == b_id.urn def test_update_bulk_id_on_not_valid_data_should_throw(basic_record): basic_record.data = 'not a dict data' with pytest.raises(Exception): - LogBulkHelper.update_bulk_id(basic_record, str(uuid.uuid4())) + LogBulkHelper.update_bulk_uri(basic_record, BulkURI.from_bulk_storage_V0(str(uuid.uuid4()))) def test_get_update_bulk_id(record_with_bulkURI): - assert LogBulkHelper.get_bulk_id(record_with_bulkURI)[0] == record_with_bulkURI.data['log']['bulkURI'] + assert LogBulkHelper.get_bulk_uri(record_with_bulkURI).encode() == record_with_bulkURI.data['log']['bulkURI'] def test_update_bulk_id_on_empty_record(basic_record): - b_id = str(uuid.uuid4()) - LogBulkHelper.update_bulk_id(basic_record, b_id) - assert basic_record.data['log']['bulkURI'] == uuid.UUID(b_id).urn + b_id = uuid.uuid4() + b_uri = BulkURI.from_bulk_storage_V0(str(b_id)) + LogBulkHelper.update_bulk_uri(basic_record, b_uri) + assert basic_record.data['log']['bulkURI'] == b_id.urn def test_get_bulk_id_on_empty_record(basic_record): - assert LogBulkHelper.get_bulk_id(basic_record) == (None, None) + bulk_uri = LogBulkHelper.get_bulk_uri(basic_record) + assert not bulk_uri.is_valid() diff --git a/tests/unit/routers/ddms_v2/log_ddms_v2_test.py b/tests/unit/routers/ddms_v2/log_ddms_v2_test.py index 2b0ee0f8c0720d97e7410ff40a595398dd29580f..dc908c29e0575431e22da5790e828c5de6acafb0 100644 --- a/tests/unit/routers/ddms_v2/log_ddms_v2_test.py +++ b/tests/unit/routers/ddms_v2/log_ddms_v2_test.py @@ -95,8 +95,8 @@ class TestHelper: return record @staticmethod - def get_bulk_id_from_record(record) -> str: - return LogBulkHelper.get_bulk_id(record) + def get_bulk_id_from_record(record): + return LogBulkHelper.get_bulk_uri(record) @pytest.fixture