From 5a1fbc69487fdf7f1b25ca811894b48f7b74983f Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Fri, 5 Nov 2021 14:00:03 +0100 Subject: [PATCH 01/16] rename set/update bulk_id to set/update bulk_uri --- app/model/log_bulk.py | 16 +++++++++------- app/routers/bulk/bulk_uri_dependencies.py | 2 +- app/routers/ddms_v2/log_ddms_v2.py | 2 +- tests/unit/model/log_bulk_test.py | 8 ++++---- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/app/model/log_bulk.py b/app/model/log_bulk.py index 06056570..fc906f76 100644 --- a/app/model/log_bulk.py +++ b/app/model/log_bulk.py @@ -33,10 +33,11 @@ class LogBulkHelper: return record.data @classmethod - def _set_bulk_id_in_wks(cls, record: Record, bulk_id, prefix: str) -> None: - """ for now it used externalIds, to _get_bulk_id_from_wksbe updated once schema is fixed with log.data.bulkId """ - bulk_urn = BulkId.bulk_urn_encode(bulk_id, prefix=prefix) - cls._get_record_data_dict(record).setdefault('log', {})['bulkURI'] = bulk_urn + def _set_bulk_uri_in_wks(cls, record: Record, bulk_uri: str) -> None: + """ + for now it used externalIds, to _get_bulk_id_from_wks be updated once schema is fixed with log.data.bulkId + """ + cls._get_record_data_dict(record).setdefault('log', {})['bulkURI'] = bulk_uri @classmethod def _get_bulk_id_from_wks(cls, record: Record) -> Optional[str]: @@ -48,7 +49,7 @@ class LogBulkHelper: return BulkId.bulk_urn_decode(bulk_uri) if bulk_uri else (None, None) @classmethod - def update_bulk_id( + def update_bulk_uri( cls, record: Record, bulk_id, custom_bulk_id_path: Optional[str] = None, prefix: Optional[str] = None ): """ @@ -58,8 +59,9 @@ class LogBulkHelper: :param bulk_id: bulk reference (id, uri ...) to set :param custom_bulk_id_path: !! incompatible with log model """ + uri_value = BulkId.bulk_urn_encode(bulk_id, prefix=prefix) if custom_bulk_id_path is None: # what about empty string ? - cls._set_bulk_id_in_wks(record, bulk_id, prefix) + cls._set_bulk_uri_in_wks(record, uri_value) else: record_dict = {"data": record.data} @@ -69,7 +71,7 @@ class LogBulkHelper: json_exp.find(record_dict)[0].value[ field_name - ] = BulkId.bulk_urn_encode(bulk_id, prefix=prefix) + ] = uri_value # if only support existing field, it can be done with a simple update call # parse_jsonpath(custom_bulk_id_path).update(record, bulk_ref) record.data = record_dict["data"] diff --git a/app/routers/bulk/bulk_uri_dependencies.py b/app/routers/bulk/bulk_uri_dependencies.py index 9dd49059..e14ea79b 100644 --- a/app/routers/bulk/bulk_uri_dependencies.py +++ b/app/routers/bulk/bulk_uri_dependencies.py @@ -44,7 +44,7 @@ class LogBulkIdAccess(BulkIdAccess): @staticmethod def set_bulk_uri(record, bulk_id: str, custom_bulk_id_path: Optional[str] = None): - LogBulkHelper.update_bulk_id( + LogBulkHelper.update_bulk_uri( record=record, bulk_id=bulk_id, prefix=BULK_URN_PREFIX_VERSION, custom_bulk_id_path=custom_bulk_id_path ) diff --git a/app/routers/ddms_v2/log_ddms_v2.py b/app/routers/ddms_v2/log_ddms_v2.py index 9dcece12..8f48a1de 100644 --- a/app/routers/ddms_v2/log_ddms_v2.py +++ b/app/routers/ddms_v2/log_ddms_v2.py @@ -196,7 +196,7 @@ async def _write_log_data( fetch_record(ctx, logid), ) # update the record - LogBulkHelper.update_bulk_id(log_record, bulk_id, bulk_path) + LogBulkHelper.update_bulk_uri(log_record, bulk_id, bulk_path) # push new version on the storage return await update_records(ctx, [log_record]) diff --git a/tests/unit/model/log_bulk_test.py b/tests/unit/model/log_bulk_test.py index cab5cde3..03503141 100644 --- a/tests/unit/model/log_bulk_test.py +++ b/tests/unit/model/log_bulk_test.py @@ -31,13 +31,13 @@ def test_bulk_id_is_an_uuid(): def test_update_bulk_id(record_with_bulkURI): b_id = str(uuid.uuid4()) - LogBulkHelper.update_bulk_id(record_with_bulkURI, b_id) + LogBulkHelper.update_bulk_uri(record_with_bulkURI, b_id) assert record_with_bulkURI.data['log']['bulkURI'] == uuid.UUID(b_id).urn def test_update_bulk_id_with_path(record_with_bulkURI): b_id = str(uuid.uuid4()) - LogBulkHelper.update_bulk_id(record_with_bulkURI, b_id, "data.custombulkid") + LogBulkHelper.update_bulk_uri(record_with_bulkURI, b_id, "data.custombulkid") assert record_with_bulkURI.data['custombulkid'] == uuid.UUID(b_id).urn @@ -45,7 +45,7 @@ def test_update_bulk_id_on_not_valid_data_should_throw(basic_record): basic_record.data = 'not a dict data' with pytest.raises(Exception): - LogBulkHelper.update_bulk_id(basic_record, str(uuid.uuid4())) + LogBulkHelper.update_bulk_uri(basic_record, str(uuid.uuid4())) def test_get_update_bulk_id(record_with_bulkURI): @@ -54,7 +54,7 @@ def test_get_update_bulk_id(record_with_bulkURI): def test_update_bulk_id_on_empty_record(basic_record): b_id = str(uuid.uuid4()) - LogBulkHelper.update_bulk_id(basic_record, b_id) + LogBulkHelper.update_bulk_uri(basic_record, b_id) assert basic_record.data['log']['bulkURI'] == uuid.UUID(b_id).urn -- GitLab From a5d6db3ac2137a8d329fbb749ca10d26175dabed Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Fri, 5 Nov 2021 14:01:09 +0100 Subject: [PATCH 02/16] add type hint + remove parameter never used (custom_bulk_id_path) --- app/routers/bulk/bulk_uri_dependencies.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/app/routers/bulk/bulk_uri_dependencies.py b/app/routers/bulk/bulk_uri_dependencies.py index e14ea79b..dbe687c0 100644 --- a/app/routers/bulk/bulk_uri_dependencies.py +++ b/app/routers/bulk/bulk_uri_dependencies.py @@ -19,7 +19,7 @@ class BulkIdAccess(ABC): @staticmethod @abc.abstractmethod - def set_bulk_uri(record, bulk_id): + def set_bulk_uri(record, bulk_id: str): ... @@ -32,7 +32,7 @@ class OsduBulkIdAccess(BulkIdAccess): return None, None @staticmethod - def set_bulk_uri(record, bulk_id): + def set_bulk_uri(record, bulk_id: str): bulk_urn = BulkId.bulk_urn_encode(bulk_id, BULK_URN_PREFIX_VERSION) record.data.setdefault("ExtensionProperties", {}).setdefault("wdms", {})[BULK_URI_FIELD] = bulk_urn @@ -43,10 +43,9 @@ class LogBulkIdAccess(BulkIdAccess): return LogBulkHelper.get_bulk_id(record=record, custom_bulk_id_path=custom_bulk_id_path) @staticmethod - def set_bulk_uri(record, bulk_id: str, custom_bulk_id_path: Optional[str] = None): + def set_bulk_uri(record, bulk_id: str): LogBulkHelper.update_bulk_uri( - record=record, bulk_id=bulk_id, prefix=BULK_URN_PREFIX_VERSION, custom_bulk_id_path=custom_bulk_id_path - ) + record=record, bulk_id=bulk_id, prefix=BULK_URN_PREFIX_VERSION) async def set_log_bulk_id_access(request: Request): -- GitLab From 4781f64d0f06e4c624f6c130a5c43762d87c463f Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Fri, 5 Nov 2021 14:01:56 +0100 Subject: [PATCH 03/16] introduce bulk uri class to fullt encapsulate bulk uri management --- app/bulk_persistence/bulk_uri.py | 107 +++++++++++++++++++ tests/unit/bulk_persistence/bulk_uri_test.py | 80 ++++++++++++++ 2 files changed, 187 insertions(+) create mode 100644 app/bulk_persistence/bulk_uri.py create mode 100644 tests/unit/bulk_persistence/bulk_uri_test.py diff --git a/app/bulk_persistence/bulk_uri.py b/app/bulk_persistence/bulk_uri.py new file mode 100644 index 00000000..55da7e55 --- /dev/null +++ b/app/bulk_persistence/bulk_uri.py @@ -0,0 +1,107 @@ +from typing import Optional, NamedTuple, Tuple +import uuid + +BULK_URN_PREFIX_VERSION = "wdms-1" + + +class BulkUriVersion(NamedTuple): + """ """ + version: str + prefix: Optional[str] + + +BulkUriVersion_Invalid = BulkUriVersion(version='', prefix=None) + +BulkUriVersion_V0 = BulkUriVersion(version='0', prefix=None) +""" first bulk management implementation with direct management to blob storage with a single blob """ + +BulkUriVersion_V1 = BulkUriVersion(version='1', prefix=BULK_URN_PREFIX_VERSION) +""" version 1, using Dask to handle bulk manipulation and storage """ + + +class BulkURI: + """ """ + def __init__(self, uri: Optional[str] = None): + """ + make an new one or invalid + :throw: ValueError + """ + if uri: + bulk_id, prefix = self._decode_uri(uri) + if not prefix: + version = BulkUriVersion_V0 + elif prefix == BULK_URN_PREFIX_VERSION: + version = BulkUriVersion_V1 + else: + raise ValueError('Unsupported prefix in bulk URI: ' + prefix) + else: + bulk_id, version = '', BulkUriVersion_Invalid + + self._bulk_id = bulk_id + self._uri_version = version + + @classmethod + def invalid(cls): + """ make an invalid instance """ + return cls() + + @classmethod + def new_with_version(cls, version: BulkUriVersion): + inst = cls() + inst._bulk_id = str(uuid.uuid4()) + inst._uri_version = version + return inst + + @classmethod + def decode(cls, uri: str) -> 'BulkURI': + return cls(uri) + + def is_V0(self) -> bool: + """ convenient check that returns True is version == BulkUriVersion_V0 """ + return self._uri_version.version == BulkUriVersion_V0.version + + @property + def bulk_id(self) -> str: + return self._bulk_id + + @property + def uri_version(self) -> BulkUriVersion: + return self._uri_version + + def __bool__(self) -> bool: + """ check invalid """ + if self._bulk_id and self._uri_version.version: + return True + return False + + def __str__(self) -> str: + """ as URN string """ + return self.encode() if self.is_valid() else '' + + def encode(self) -> str: + """ + encode to uri as string + If the prefix is not empty returns, uri format = `urn:$prefix:uuid:$bulk_id` + If the prefix is empty or None, uri format = `urn:uuid:$bulk_id` + """ + if self._uri_version.prefix: + return f'urn:{self._uri_version.prefix}:uuid:{self._bulk_id}' + return uuid.UUID(self._bulk_id).urn + + @classmethod + def _decode_uri(cls, uri: str) -> Tuple[str, Optional[str]]: + """ + Decode urn into uuid and optional prefix. Returns tuple [uuid, prefix]. + If urn is `urn:$prefix:uuid:$bulk_id`, will return [$bulk_id, $prefix] + If urn is `urn:uuid:$bulk_id`, will return [$bulk_id, None] + :throw: ValueError if urn empty or invalid UUID + """ + if uri is None: + raise ValueError('attempted to decode empty urn') + parts = uri.split(":") + if len(parts) < 4: + return str(uuid.UUID(uri)), None + return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2]) + + def is_valid(self) -> bool: + return self.__bool__() diff --git a/tests/unit/bulk_persistence/bulk_uri_test.py b/tests/unit/bulk_persistence/bulk_uri_test.py new file mode 100644 index 00000000..eceb786e --- /dev/null +++ b/tests/unit/bulk_persistence/bulk_uri_test.py @@ -0,0 +1,80 @@ +# Copyright 2021 Schlumberger +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +from app.bulk_persistence.bulk_uri import ( + BulkURI, BulkUriVersion_V1, BulkUriVersion_V0, BULK_URN_PREFIX_VERSION, BulkUriVersion_Invalid) +import uuid + + +def test_bulk_id_is_an_uuid(): + uuid.UUID(BulkURI.new_with_version(BulkUriVersion_V1).bulk_id) + + +# urn decode test +def test_from_uri_without_prefix(): + uri_str = 'urn:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' + + bulk_uri = BulkURI.decode(uri_str) + assert bulk_uri.bulk_id == '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' + assert bulk_uri.is_V0() + assert bulk_uri.uri_version == BulkUriVersion_V0 + assert bulk_uri.uri_version.prefix is None + assert bulk_uri.is_valid() + + # should encode back to the same uri + assert bulk_uri.encode() == uri_str + assert str(bulk_uri.encode()) == uri_str + + +def test_decode_urn_with_prefix(): + uri_str = f'urn:{BULK_URN_PREFIX_VERSION}:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' + + bulk_uri = BulkURI.decode(uri_str) + assert bulk_uri.bulk_id == '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' + assert not bulk_uri.is_V0() + assert bulk_uri.uri_version == BulkUriVersion_V1 + assert bulk_uri.uri_version.prefix == BULK_URN_PREFIX_VERSION + assert bulk_uri.is_valid() + + # should encode back to the same uri + assert bulk_uri.encode() == uri_str + assert str(bulk_uri.encode()) == uri_str + + +def test_invalid_uri(): + invalid_uri = BulkURI() + assert not invalid_uri.is_valid() + assert not invalid_uri + assert not invalid_uri.bulk_id + assert invalid_uri.uri_version == BulkUriVersion_Invalid + assert str(invalid_uri) == '' + + # explicit encode raises + with pytest.raises(ValueError): + invalid_uri.encode() + + +def test_decode_urn_invalid_cases(): + # bad formed urn format + with pytest.raises(ValueError): + BulkURI('invalid_urn_uri') + + # bulk_id not a valid UUID + with pytest.raises(ValueError): + BulkURI('urn:uuid:invalid_uuid') + + # unknown prefix + with pytest.raises(ValueError): + BulkURI('urn:UNKNOWN_PREFIX:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8') -- GitLab From bebc924747065dc52179d94a172669cbc77d6fda Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Fri, 5 Nov 2021 15:22:15 +0100 Subject: [PATCH 04/16] enforce usage of set URI and class BulkURI on update --- app/bulk_persistence/bulk_uri.py | 15 +++++---- app/model/log_bulk.py | 10 +++--- app/routers/bulk/bulk_uri_dependencies.py | 9 +++--- app/routers/ddms_v2/log_ddms_v2.py | 4 ++- tests/unit/bulk_persistence/bulk_uri_test.py | 18 +++++++++-- tests/unit/model/log_bulk_test.py | 34 ++++++++++---------- 6 files changed, 53 insertions(+), 37 deletions(-) diff --git a/app/bulk_persistence/bulk_uri.py b/app/bulk_persistence/bulk_uri.py index 55da7e55..4adec9b0 100644 --- a/app/bulk_persistence/bulk_uri.py +++ b/app/bulk_persistence/bulk_uri.py @@ -21,9 +21,12 @@ BulkUriVersion_V1 = BulkUriVersion(version='1', prefix=BULK_URN_PREFIX_VERSION) class BulkURI: """ """ - def __init__(self, uri: Optional[str] = None): + def __init__(self, uri: Optional[str] = None, *, + bulk_id: Optional[str] = None, + version: Optional[BulkUriVersion] = None): """ make an new one or invalid + Either pass uri alone or bulk_id, version :throw: ValueError """ if uri: @@ -34,8 +37,9 @@ class BulkURI: version = BulkUriVersion_V1 else: raise ValueError('Unsupported prefix in bulk URI: ' + prefix) - else: - bulk_id, version = '', BulkUriVersion_Invalid + elif not bulk_id or not version or version == BulkUriVersion_Invalid: + bulk_id = '' + version = BulkUriVersion_Invalid self._bulk_id = bulk_id self._uri_version = version @@ -47,10 +51,7 @@ class BulkURI: @classmethod def new_with_version(cls, version: BulkUriVersion): - inst = cls() - inst._bulk_id = str(uuid.uuid4()) - inst._uri_version = version - return inst + return cls(bulk_id=str(uuid.uuid4()), version=version) @classmethod def decode(cls, uri: str) -> 'BulkURI': diff --git a/app/model/log_bulk.py b/app/model/log_bulk.py index fc906f76..ae32c690 100644 --- a/app/model/log_bulk.py +++ b/app/model/log_bulk.py @@ -12,15 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional, Tuple +from typing import Optional, Tuple, Union from jsonpath_ng import parse as parse_jsonpath from jsonpath_ng.jsonpath import Parent as JsonParent from odes_storage.models import Record +from app.bulk_persistence.bulk_uri import BulkURI from app.bulk_persistence import BulkId - class LogBulkHelper: # TODO find a better name, LogRecordHelper. but I don't like 'helper', its just a synonymous of bag of several thing # breaking single responsibility principle @@ -50,16 +50,16 @@ class LogBulkHelper: @classmethod def update_bulk_uri( - cls, record: Record, bulk_id, custom_bulk_id_path: Optional[str] = None, prefix: Optional[str] = None + cls, record: Record, bulk_uri: Union[str, BulkURI], custom_bulk_id_path: Optional[str] = None ): """ Update bulk id within a log record. Note that the custom path cannot be applied when using a strict structured model It creates the field if not exist :param record: record to update. - :param bulk_id: bulk reference (id, uri ...) to set + :param bulk_uri: either already encode uri as string or BulkURI :param custom_bulk_id_path: !! incompatible with log model """ - uri_value = BulkId.bulk_urn_encode(bulk_id, prefix=prefix) + uri_value = str(bulk_uri) if custom_bulk_id_path is None: # what about empty string ? cls._set_bulk_uri_in_wks(record, uri_value) else: diff --git a/app/routers/bulk/bulk_uri_dependencies.py b/app/routers/bulk/bulk_uri_dependencies.py index dbe687c0..ca573e35 100644 --- a/app/routers/bulk/bulk_uri_dependencies.py +++ b/app/routers/bulk/bulk_uri_dependencies.py @@ -4,6 +4,7 @@ from typing import Optional, Tuple from fastapi import Request from app.bulk_persistence import BulkId +from app.bulk_persistence.bulk_uri import BulkURI, BulkUriVersion_V1 from app.model.log_bulk import LogBulkHelper @@ -33,8 +34,8 @@ class OsduBulkIdAccess(BulkIdAccess): @staticmethod def set_bulk_uri(record, bulk_id: str): - bulk_urn = BulkId.bulk_urn_encode(bulk_id, BULK_URN_PREFIX_VERSION) - record.data.setdefault("ExtensionProperties", {}).setdefault("wdms", {})[BULK_URI_FIELD] = bulk_urn + bulk_uri = BulkURI(bulk_id=bulk_id, version=BulkUriVersion_V1) + record.data.setdefault("ExtensionProperties", {}).setdefault("wdms", {})[BULK_URI_FIELD] = bulk_uri.encode() class LogBulkIdAccess(BulkIdAccess): @@ -44,8 +45,8 @@ class LogBulkIdAccess(BulkIdAccess): @staticmethod def set_bulk_uri(record, bulk_id: str): - LogBulkHelper.update_bulk_uri( - record=record, bulk_id=bulk_id, prefix=BULK_URN_PREFIX_VERSION) + LogBulkHelper.update_bulk_uri(record=record, + bulk_uri=BulkURI(bulk_id=bulk_id, version=BulkUriVersion_V1)) async def set_log_bulk_id_access(request: Request): diff --git a/app/routers/ddms_v2/log_ddms_v2.py b/app/routers/ddms_v2/log_ddms_v2.py index 8f48a1de..bc36aca9 100644 --- a/app/routers/ddms_v2/log_ddms_v2.py +++ b/app/routers/ddms_v2/log_ddms_v2.py @@ -38,6 +38,7 @@ from odes_storage.models import ( from pydantic import BaseModel, Field from app.bulk_persistence import DataframeSerializerAsync, DataframeSerializerSync, JSONOrient, MimeTypes, get_dataframe +from app.bulk_persistence.bulk_uri import BulkURI, BulkUriVersion_V0 from app.clients.storage_service_client import get_storage_record_service from app.model.log_bulk import LogBulkHelper from app.model.model_curated import log @@ -196,7 +197,8 @@ async def _write_log_data( fetch_record(ctx, logid), ) # update the record - LogBulkHelper.update_bulk_uri(log_record, bulk_id, bulk_path) + bulk_uri = BulkURI(bulk_id=bulk_id, version=BulkUriVersion_V0) + LogBulkHelper.update_bulk_uri(log_record, bulk_uri, bulk_path) # push new version on the storage return await update_records(ctx, [log_record]) diff --git a/tests/unit/bulk_persistence/bulk_uri_test.py b/tests/unit/bulk_persistence/bulk_uri_test.py index eceb786e..df2a0b04 100644 --- a/tests/unit/bulk_persistence/bulk_uri_test.py +++ b/tests/unit/bulk_persistence/bulk_uri_test.py @@ -22,6 +22,10 @@ def test_bulk_id_is_an_uuid(): uuid.UUID(BulkURI.new_with_version(BulkUriVersion_V1).bulk_id) +def test_bulk_uri_explicit(): + pass + + # urn decode test def test_from_uri_without_prefix(): uri_str = 'urn:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' @@ -53,8 +57,16 @@ def test_decode_urn_with_prefix(): assert str(bulk_uri.encode()) == uri_str -def test_invalid_uri(): - invalid_uri = BulkURI() +@pytest.mark.parametrize("args, kwargs", [ + ([], {}), # no param, default ctor + ([''], {}), + ([''], {'bulk_id': '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8'}), + ([''], {'version': BulkUriVersion_Invalid}), + ([''], {'bulk_id': '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8', 'version': BulkUriVersion_Invalid}), + ([''], {'bulk_id': '', 'version': BulkUriVersion_V1}), +]) +def test_invalid_uri(args, kwargs): + invalid_uri = BulkURI(*args, **kwargs) assert not invalid_uri.is_valid() assert not invalid_uri assert not invalid_uri.bulk_id @@ -66,7 +78,7 @@ def test_invalid_uri(): invalid_uri.encode() -def test_decode_urn_invalid_cases(): +def test_decode_urn_invalid_input_should_throw(): # bad formed urn format with pytest.raises(ValueError): BulkURI('invalid_urn_uri') diff --git a/tests/unit/model/log_bulk_test.py b/tests/unit/model/log_bulk_test.py index 03503141..dee78c53 100644 --- a/tests/unit/model/log_bulk_test.py +++ b/tests/unit/model/log_bulk_test.py @@ -11,9 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from app.model.log_bulk import LogBulkHelper -from app.bulk_persistence import BulkId +from app.bulk_persistence.bulk_uri import BulkURI, BulkUriVersion_V0 from tests.unit.test_utils import basic_record import uuid import pytest @@ -21,24 +20,25 @@ import pytest @pytest.fixture def record_with_bulkURI(basic_record): - basic_record.data = {'custombulkid':'toto', 'log': {'bulkURI': str(uuid.uuid4())}} + basic_record.data = {'custombulkid':'toto', 'log': {'bulkURI': uuid.uuid4().urn}} return basic_record -def test_bulk_id_is_an_uuid(): - uuid.UUID(BulkId.new_bulk_id()) - - def test_update_bulk_id(record_with_bulkURI): - b_id = str(uuid.uuid4()) - LogBulkHelper.update_bulk_uri(record_with_bulkURI, b_id) - assert record_with_bulkURI.data['log']['bulkURI'] == uuid.UUID(b_id).urn + uri = uuid.uuid4().urn + LogBulkHelper.update_bulk_uri(record_with_bulkURI, uri) + assert record_with_bulkURI.data['log']['bulkURI'] == uri + + b_id = uuid.uuid4() + b_uri = BulkURI(bulk_id=str(b_id), version=BulkUriVersion_V0) + LogBulkHelper.update_bulk_uri(record_with_bulkURI, b_uri) + assert record_with_bulkURI.data['log']['bulkURI'] == b_id.urn def test_update_bulk_id_with_path(record_with_bulkURI): - b_id = str(uuid.uuid4()) - LogBulkHelper.update_bulk_uri(record_with_bulkURI, b_id, "data.custombulkid") - assert record_with_bulkURI.data['custombulkid'] == uuid.UUID(b_id).urn + uri = uuid.uuid4().urn + LogBulkHelper.update_bulk_uri(record_with_bulkURI, uri, "data.custombulkid") + assert record_with_bulkURI.data['custombulkid'] == uri def test_update_bulk_id_on_not_valid_data_should_throw(basic_record): @@ -49,13 +49,13 @@ def test_update_bulk_id_on_not_valid_data_should_throw(basic_record): def test_get_update_bulk_id(record_with_bulkURI): - assert LogBulkHelper.get_bulk_id(record_with_bulkURI)[0] == record_with_bulkURI.data['log']['bulkURI'] + assert LogBulkHelper.get_bulk_id(record_with_bulkURI)[0] == str(uuid.UUID(record_with_bulkURI.data['log']['bulkURI'])) def test_update_bulk_id_on_empty_record(basic_record): - b_id = str(uuid.uuid4()) - LogBulkHelper.update_bulk_uri(basic_record, b_id) - assert basic_record.data['log']['bulkURI'] == uuid.UUID(b_id).urn + uri = uuid.uuid4().urn + LogBulkHelper.update_bulk_uri(basic_record, uri) + assert basic_record.data['log']['bulkURI'] == uri def test_get_bulk_id_on_empty_record(basic_record): -- GitLab From 3a543484e870775a98f7ead9b801afe3c4cc1d89 Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Fri, 5 Nov 2021 17:07:55 +0100 Subject: [PATCH 05/16] full usage of BulkURI --- app/bulk_persistence/__init__.py | 1 + app/bulk_persistence/bulk_id.py | 18 ------- app/bulk_persistence/bulk_uri.py | 9 +++- app/model/log_bulk.py | 18 +++---- app/routers/bulk/bulk_routes.py | 48 +++++++++++-------- app/routers/bulk/bulk_uri_dependencies.py | 17 +++---- app/routers/ddms_v2/log_ddms_v2.py | 6 +-- app/routers/ddms_v2/persistence.py | 6 +-- app/routers/dipset/persistence.py | 12 ++--- app/routers/trajectory/persistence.py | 11 ++--- app/routers/trajectory/trajectory_ddms_v2.py | 2 +- tests/unit/bulk_persistence/bulk_id_test.py | 15 ------ tests/unit/model/log_bulk_test.py | 6 ++- .../unit/routers/ddms_v2/log_ddms_v2_test.py | 4 +- 14 files changed, 77 insertions(+), 96 deletions(-) diff --git a/app/bulk_persistence/__init__.py b/app/bulk_persistence/__init__.py index 6ef5dafe..aa5f97c0 100644 --- a/app/bulk_persistence/__init__.py +++ b/app/bulk_persistence/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. from .bulk_id import BulkId +from .bulk_uri import BulkURI from .dataframe_persistence import create_and_store_dataframe, get_dataframe from .dataframe_serializer import DataframeSerializerAsync, DataframeSerializerSync from .json_orient import JSONOrient diff --git a/app/bulk_persistence/bulk_id.py b/app/bulk_persistence/bulk_id.py index 1dd223f1..bc7b3586 100644 --- a/app/bulk_persistence/bulk_id.py +++ b/app/bulk_persistence/bulk_id.py @@ -13,27 +13,9 @@ # limitations under the License. import uuid -from typing import Tuple, Optional class BulkId: @staticmethod def new_bulk_id() -> str: return str(uuid.uuid4()) - - @classmethod - def bulk_urn_encode(cls, bulk_id: str, prefix: str = None) -> str: - if prefix: - return f'urn:{prefix}:uuid:{uuid.UUID(bulk_id)}' - return uuid.UUID(bulk_id).urn - - - # Returns a tuple ( : str, : str) - @classmethod - def bulk_urn_decode(cls, urn: str) -> Tuple[str, Optional[str]]: - if urn is None: - raise ValueError('attempted to decode empty urn') - parts = urn.split(":") - if len(parts) < 4: - return str(uuid.UUID(urn)), None - return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2]) diff --git a/app/bulk_persistence/bulk_uri.py b/app/bulk_persistence/bulk_uri.py index 4adec9b0..b4e6e93d 100644 --- a/app/bulk_persistence/bulk_uri.py +++ b/app/bulk_persistence/bulk_uri.py @@ -20,7 +20,10 @@ BulkUriVersion_V1 = BulkUriVersion(version='1', prefix=BULK_URN_PREFIX_VERSION) class BulkURI: - """ """ + """ + Bulk URI, contains the bulk identifier (bulk_id) and URI version which identifies the storage engine version how + the bulk is stored. + """ def __init__(self, uri: Optional[str] = None, *, bulk_id: Optional[str] = None, version: Optional[BulkUriVersion] = None): @@ -61,6 +64,10 @@ class BulkURI: """ convenient check that returns True is version == BulkUriVersion_V0 """ return self._uri_version.version == BulkUriVersion_V0.version + @classmethod + def make_V0_URI(cls, bulk_id: str) -> 'BulkURI': + return cls(bulk_id=bulk_id, version=BulkUriVersion_V0) + @property def bulk_id(self) -> str: return self._bulk_id diff --git a/app/model/log_bulk.py b/app/model/log_bulk.py index ae32c690..257f1853 100644 --- a/app/model/log_bulk.py +++ b/app/model/log_bulk.py @@ -40,13 +40,13 @@ class LogBulkHelper: cls._get_record_data_dict(record).setdefault('log', {})['bulkURI'] = bulk_uri @classmethod - def _get_bulk_id_from_wks(cls, record: Record) -> Optional[str]: + def _get_bulk_uri_from_wks(cls, record: Record) -> Optional[str]: bulk_uri = ( cls._get_record_data_dict(record) .get("log", {}) .get("bulkURI", None) ) - return BulkId.bulk_urn_decode(bulk_uri) if bulk_uri else (None, None) + return bulk_uri @classmethod def update_bulk_uri( @@ -77,20 +77,20 @@ class LogBulkHelper: record.data = record_dict["data"] @classmethod - def get_bulk_id( + def get_bulk_uri( cls, record: Record, custom_bulk_id_path: Optional[str] = None - ) -> Tuple[Optional[str], Optional[str]]: + ) -> BulkURI: """ :param record: :param custom_bulk_id_path: !! incompatible with log model - :return: bulk id if any else None + :return: BulkURI, could be invalid if none """ if custom_bulk_id_path is None: # what about empty string ? - return cls._get_bulk_id_from_wks(record) + return BulkURI(cls._get_bulk_uri_from_wks(record)) record_dict = {"data": record.data} matches = parse_jsonpath(custom_bulk_id_path).find(record_dict) if len(matches) > 0: - return BulkId.bulk_urn_decode(matches[0].value) - return None, None - + return BulkURI(matches[0].value) + return BulkURI() + diff --git a/app/routers/bulk/bulk_routes.py b/app/routers/bulk/bulk_routes.py index d6e17397..f62f980c 100644 --- a/app/routers/bulk/bulk_routes.py +++ b/app/routers/bulk/bulk_routes.py @@ -17,7 +17,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request, status from app.bulk_persistence import JSONOrient, get_dataframe from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage -from app.bulk_persistence.dask.errors import BulkError, BulkNotFound +from app.bulk_persistence.dask.errors import BulkError, BulkNotFound, BulkNotProcessable from app.bulk_persistence.mime_types import MimeTypes from app.model.model_chunking import GetDataParams @@ -33,8 +33,7 @@ from app.routers.sessions import (SessionInternal, UpdateSessionState, UpdateSes from app.routers.record_utils import fetch_record from app.routers.bulk.utils import (with_dask_blob_storage, get_check_input_df_func, get_df_from_request, set_bulk_field_and_send_record, DataFrameRender, _check_df_columns_type_legacy) -from app.routers.bulk.bulk_uri_dependencies import (get_bulk_id_access, BulkIdAccess, - BULK_URN_PREFIX_VERSION) +from app.routers.bulk.bulk_uri_dependencies import get_bulk_id_access, BulkIdAccess from app.helper.traces import with_trace @@ -143,13 +142,21 @@ async def get_data_version( bulk_uri_access: BulkIdAccess = Depends(get_bulk_id_access) ): record = await fetch_record(ctx, record_id, version) - bulk_id, prefix = bulk_uri_access.get_bulk_uri(record=record) # TODO PATH logv2 + try: + bulk_uri = bulk_uri_access.get_bulk_uri(record=record) # TODO PATH logv2 + except ValueError: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail='Record contains an invalid bulk URI') stat = None try: - if bulk_id is None: + if not bulk_uri: raise BulkNotFound(record_id=record_id, bulk_id=None) - if prefix == BULK_URN_PREFIX_VERSION: + bulk_id = bulk_uri.bulk_id + if bulk_uri.is_V0(): + df = await get_dataframe(ctx, bulk_id) + _check_df_columns_type_legacy(df) + else: columns = None if data_param.curves: stat = dask_blob_storage.read_stat(record_id, bulk_id) @@ -160,11 +167,6 @@ async def get_data_version( # loading the dataframe with filter on columns is faster than filtering columns on df df = await dask_blob_storage.load_bulk(record_id, bulk_id, columns=columns) - elif prefix is None: - df = await get_dataframe(ctx, bulk_id) - _check_df_columns_type_legacy(df) - else: - raise BulkNotFound(record_id=record_id, bulk_id=bulk_id) df = await DataFrameRender.process_params(df, data_param) return await DataFrameRender.df_render(df, data_param, request.headers.get('Accept'), orient=orient, stat=stat) @@ -228,24 +230,32 @@ async def complete_session( _internal = i_session.internal # <= contains details details, may be irrelevant or not needed record = await fetch_record(ctx, record_id, i_session.session.fromVersion) - previous_bulk_uri = None + previous_bulk_id = None if i_session.session.mode == SessionUpdateMode.Update: - previous_bulk_uri, prefix = bulk_uri_access.get_bulk_uri(record) # TODO PATH for logv2 - if previous_bulk_uri is not None and prefix != BULK_URN_PREFIX_VERSION: + try: + previous_bulk_uri = bulk_uri_access.get_bulk_uri(record) # TODO PATH logv2 + except ValueError: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f'Record with version {i_session.session.fromVersion} from which ' + f'update contains an invalid bulk URI') + + if previous_bulk_uri.is_V0(): try: - df = await get_dataframe(ctx, previous_bulk_uri) + df = await get_dataframe(ctx, previous_bulk_uri.bulk_id) # convert old bulk to new one - previous_bulk_uri = await dask_blob_storage.save_blob(df, record_id=record_id) + previous_bulk_id = await dask_blob_storage.save_blob(df, record_id=record_id) except ResourceNotFoundException: - BulkNotFound(record_id=record_id, bulk_id=previous_bulk_uri).raise_as_http() + BulkNotFound(record_id=record_id, bulk_id=previous_bulk_id).raise_as_http() + else: + previous_bulk_id = previous_bulk_uri.bulk_id - new_bulk_uri = await dask_blob_storage.session_commit(i_session.session, previous_bulk_uri) + new_bulk_id = await dask_blob_storage.session_commit(i_session.session, previous_bulk_id) # ==============> # ==============> UPDATE META DATA HERE (baseDepth, ...) <============== # ==============> - await set_bulk_field_and_send_record(ctx, new_bulk_uri, record, bulk_uri_access) + await set_bulk_field_and_send_record(ctx, new_bulk_id, record, bulk_uri_access) i_session = commit_guard.session i_session.session.meta = i_session.session.meta or {} diff --git a/app/routers/bulk/bulk_uri_dependencies.py b/app/routers/bulk/bulk_uri_dependencies.py index ca573e35..9fb5dae8 100644 --- a/app/routers/bulk/bulk_uri_dependencies.py +++ b/app/routers/bulk/bulk_uri_dependencies.py @@ -1,21 +1,19 @@ import abc from abc import ABC -from typing import Optional, Tuple +from typing import Optional from fastapi import Request -from app.bulk_persistence import BulkId from app.bulk_persistence.bulk_uri import BulkURI, BulkUriVersion_V1 from app.model.log_bulk import LogBulkHelper -BULK_URN_PREFIX_VERSION = "wdms-1" BULK_URI_FIELD = "bulkURI" class BulkIdAccess(ABC): @staticmethod @abc.abstractmethod - def get_bulk_uri(record) -> Tuple[str, Optional[str]]: + def get_bulk_uri(record) -> BulkURI: ... @staticmethod @@ -26,11 +24,8 @@ class BulkIdAccess(ABC): class OsduBulkIdAccess(BulkIdAccess): @staticmethod - def get_bulk_uri(record) -> Tuple[str, Optional[str]]: - bulk_urn = record.data.get("ExtensionProperties", {}).get("wdms", {}).get(BULK_URI_FIELD, None) - if bulk_urn: - return BulkId.bulk_urn_decode(bulk_urn) - return None, None + def get_bulk_uri(record) -> BulkURI: + return BulkURI(record.data.get("ExtensionProperties", {}).get("wdms", {}).get(BULK_URI_FIELD, None)) @staticmethod def set_bulk_uri(record, bulk_id: str): @@ -40,8 +35,8 @@ class OsduBulkIdAccess(BulkIdAccess): class LogBulkIdAccess(BulkIdAccess): @staticmethod - def get_bulk_uri(record, custom_bulk_id_path: Optional[str] = None) -> Tuple[str, Optional[str]]: - return LogBulkHelper.get_bulk_id(record=record, custom_bulk_id_path=custom_bulk_id_path) + def get_bulk_uri(record, custom_bulk_id_path: Optional[str] = None) -> BulkURI: + return LogBulkHelper.get_bulk_uri(record=record, custom_bulk_id_path=custom_bulk_id_path) @staticmethod def set_bulk_uri(record, bulk_id: str): diff --git a/app/routers/ddms_v2/log_ddms_v2.py b/app/routers/ddms_v2/log_ddms_v2.py index bc36aca9..d53b049a 100644 --- a/app/routers/ddms_v2/log_ddms_v2.py +++ b/app/routers/ddms_v2/log_ddms_v2.py @@ -410,11 +410,11 @@ async def get_log_data_statistics(logid: str, # we may use an optimistic cache here log_record = await fetch_record(ctx, logid) # use dict to support the custom path - bulk_id, _prefix = LogBulkHelper.get_bulk_id(log_record, bulk_id_path) - if bulk_id is None: + bulk_uri = LogBulkHelper.get_bulk_uri(log_record, bulk_id_path) + if not bulk_uri: content = '{}' # no bulk else: - df = await get_dataframe(ctx, bulk_id) + df = await get_dataframe(ctx, bulk_uri.bulk_id) content = df.describe(include="all").to_json() return Response(content=content, media_type=MimeTypes.JSON.type) diff --git a/app/routers/ddms_v2/persistence.py b/app/routers/ddms_v2/persistence.py index 1b81eadd..f3bf36e1 100644 --- a/app/routers/ddms_v2/persistence.py +++ b/app/routers/ddms_v2/persistence.py @@ -31,12 +31,12 @@ class Persistence: record: Record, bulk_id_path: str, ) -> pd.DataFrame: - bulk_id, _prefix = LogBulkHelper.get_bulk_id(record, bulk_id_path) + bulk_uri = LogBulkHelper.get_bulk_uri(record, bulk_id_path) # TODO use prefix to know how to read the bulk - if bulk_id is None: + if not bulk_uri: return pd.DataFrame() - return await get_dataframe(ctx, bulk_id) + return await get_dataframe(ctx, bulk_uri.bulk_id) @classmethod async def write_bulk(cls, ctx: Context, dataframe) -> str: diff --git a/app/routers/dipset/persistence.py b/app/routers/dipset/persistence.py index 98662f4a..cbc79aae 100644 --- a/app/routers/dipset/persistence.py +++ b/app/routers/dipset/persistence.py @@ -35,7 +35,7 @@ from app.model.model_curated import ( ) from app.model.model_utils import from_record, to_record from app.routers.dipset.dip_model import Dip -from app.bulk_persistence import get_dataframe, create_and_store_dataframe, BulkId +from app.bulk_persistence import get_dataframe, create_and_store_dataframe, BulkURI async def create_missing_logs(ctx, my_dipset: dipset): """ @@ -246,9 +246,9 @@ def df_to_dips(dataframe: pd.DataFrame) -> List[Dip]: #TODO refactor duplicate with trajectory -async def write_bulk(ctx, dataframe: pd.DataFrame) -> str: +async def write_bulk(ctx, dataframe: pd.DataFrame) -> BulkURI: bulk_id = await create_and_store_dataframe(ctx, dataframe) - return BulkId.bulk_urn_encode(bulk_id) + return BulkURI.make_V0_URI(bulk_id) async def write_dipset_data(ctx, dataframe: pd.DataFrame, ds: Union[dipset, str]) -> dipset: @@ -260,7 +260,7 @@ async def write_dipset_data(ctx, dataframe: pd.DataFrame, ds: Union[dipset, str] dataframe.sort_values(by=["reference", "azimuth"], inplace=True, ignore_index=True) # Write data in storage and update dipset bulk URI - my_dipset.data.bulkURI = await write_bulk(ctx, dataframe) + my_dipset.data.bulkURI = str(await write_bulk(ctx, dataframe)) # Create or update logs await create_missing_logs(ctx, my_dipset) @@ -294,9 +294,9 @@ async def read_dipset_data(ctx, ds: Union[dipset, str]) -> Tuple[dipset, pd.Data return my_dipset, pd.DataFrame() # Fetch data - bulk_uri, _prefix = BulkId.bulk_urn_decode(my_dipset.data.bulkURI) + bulk_uri = BulkURI(my_dipset.data.bulkURI) # TODO use prefix to know how to read the bulk - df = await get_dataframe(ctx, bulk_uri) + df = await get_dataframe(ctx, bulk_uri.bulk_id) return my_dipset, df diff --git a/app/routers/trajectory/persistence.py b/app/routers/trajectory/persistence.py index 8f02fead..40ffc202 100644 --- a/app/routers/trajectory/persistence.py +++ b/app/routers/trajectory/persistence.py @@ -13,7 +13,7 @@ # limitations under the License. import pandas as pd -from app.bulk_persistence import BulkId, NoBulkException, UnknownChannelsException, InvalidBulkException +from app.bulk_persistence import BulkURI, NoBulkException, UnknownChannelsException, InvalidBulkException from app.model.model_curated import trajectory as Trajectory from app.bulk_persistence import get_dataframe, create_and_store_dataframe @@ -46,9 +46,9 @@ class Persistence: raise NoBulkException try: - bulkid, _prefix = BulkId.bulk_urn_decode(record.data.bulkURI) + bulk_uri = BulkURI(record.data.bulkURI) # TODO use prefix to know how to read the bulk - df = await get_dataframe(ctx, bulkid) + df = await get_dataframe(ctx, bulk_uri.bulk_id) except Exception as ex: raise InvalidBulkException(ex) @@ -60,8 +60,7 @@ class Persistence: except KeyError as key_error: # unknown channels raise UnknownChannelsException(key_error) - @classmethod - async def write_bulk(cls, ctx, dataframe: pd.DataFrame) -> str: + async def write_bulk(cls, ctx, dataframe: pd.DataFrame) -> BulkURI: bulk_id = await create_and_store_dataframe(ctx, dataframe) - return BulkId.bulk_urn_encode(bulk_id) + return BulkURI.make_V0_URI(bulk_id) diff --git a/app/routers/trajectory/trajectory_ddms_v2.py b/app/routers/trajectory/trajectory_ddms_v2.py index 355d0d95..4989c092 100644 --- a/app/routers/trajectory/trajectory_ddms_v2.py +++ b/app/routers/trajectory/trajectory_ddms_v2.py @@ -236,7 +236,7 @@ async def post_traj_data( trajectory_record = await fetch_trajectory_record(ctx, trajectoryid) record = from_record(Trajectory, trajectory_record) - record.data.bulkURI = await persistence.write_bulk(ctx, df) + record.data.bulkURI = str(await persistence.write_bulk(ctx, df)) # update record's channels if not record.data.channels: diff --git a/tests/unit/bulk_persistence/bulk_id_test.py b/tests/unit/bulk_persistence/bulk_id_test.py index 12a21983..f84766a7 100644 --- a/tests/unit/bulk_persistence/bulk_id_test.py +++ b/tests/unit/bulk_persistence/bulk_id_test.py @@ -19,18 +19,3 @@ import uuid def test_bulk_id_is_an_uuid(): uuid.UUID(BulkId.new_bulk_id()) - -# urn decode test -def test_decode_urn_no_prefix(): - uuid, prefix = BulkId.bulk_urn_decode("urn:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8") - assert uuid == "489768d2-eee1-4a8f-ae95-7b0c30b0dcd8" - assert prefix is None - -def test_decode_urn_with_prefix(): - uuid, prefix = BulkId.bulk_urn_decode("urn:myprefix:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8") - assert uuid == "489768d2-eee1-4a8f-ae95-7b0c30b0dcd8" - assert prefix == 'myprefix' - -def test_decode_urn_none(): - with pytest.raises(ValueError): - uuid, prefix = BulkId.bulk_urn_decode(None) diff --git a/tests/unit/model/log_bulk_test.py b/tests/unit/model/log_bulk_test.py index dee78c53..46881cf2 100644 --- a/tests/unit/model/log_bulk_test.py +++ b/tests/unit/model/log_bulk_test.py @@ -49,7 +49,7 @@ def test_update_bulk_id_on_not_valid_data_should_throw(basic_record): def test_get_update_bulk_id(record_with_bulkURI): - assert LogBulkHelper.get_bulk_id(record_with_bulkURI)[0] == str(uuid.UUID(record_with_bulkURI.data['log']['bulkURI'])) + assert str(LogBulkHelper.get_bulk_uri(record_with_bulkURI)) == record_with_bulkURI.data['log']['bulkURI'] def test_update_bulk_id_on_empty_record(basic_record): @@ -59,4 +59,6 @@ def test_update_bulk_id_on_empty_record(basic_record): def test_get_bulk_id_on_empty_record(basic_record): - assert LogBulkHelper.get_bulk_id(basic_record) == (None, None) + bulk_uri = LogBulkHelper.get_bulk_uri(basic_record) + assert not bulk_uri + assert not bulk_uri.is_valid() diff --git a/tests/unit/routers/ddms_v2/log_ddms_v2_test.py b/tests/unit/routers/ddms_v2/log_ddms_v2_test.py index 2b0ee0f8..dc908c29 100644 --- a/tests/unit/routers/ddms_v2/log_ddms_v2_test.py +++ b/tests/unit/routers/ddms_v2/log_ddms_v2_test.py @@ -95,8 +95,8 @@ class TestHelper: return record @staticmethod - def get_bulk_id_from_record(record) -> str: - return LogBulkHelper.get_bulk_id(record) + def get_bulk_id_from_record(record): + return LogBulkHelper.get_bulk_uri(record) @pytest.fixture -- GitLab From a3220c5e4e953f3208a0d49176ab126b5009fd37 Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Fri, 5 Nov 2021 17:21:20 +0100 Subject: [PATCH 06/16] renaming --- app/bulk_persistence/bulk_uri.py | 31 ++++++++++---------- app/routers/bulk/bulk_uri_dependencies.py | 6 ++-- app/routers/ddms_v2/log_ddms_v2.py | 4 +-- tests/unit/bulk_persistence/bulk_uri_test.py | 20 ++++++------- tests/unit/model/log_bulk_test.py | 4 +-- 5 files changed, 32 insertions(+), 33 deletions(-) diff --git a/app/bulk_persistence/bulk_uri.py b/app/bulk_persistence/bulk_uri.py index b4e6e93d..119abddf 100644 --- a/app/bulk_persistence/bulk_uri.py +++ b/app/bulk_persistence/bulk_uri.py @@ -1,21 +1,20 @@ from typing import Optional, NamedTuple, Tuple import uuid -BULK_URN_PREFIX_VERSION = "wdms-1" - -class BulkUriVersion(NamedTuple): +class BulkStorageVersion(NamedTuple): """ """ version: str prefix: Optional[str] -BulkUriVersion_Invalid = BulkUriVersion(version='', prefix=None) +BulkStorageVersion_Invalid = BulkStorageVersion(version='', prefix=None) +""" represent an invalid/undefined storage version """ -BulkUriVersion_V0 = BulkUriVersion(version='0', prefix=None) +BulkStorageVersion_V0 = BulkStorageVersion(version='0', prefix=None) """ first bulk management implementation with direct management to blob storage with a single blob """ -BulkUriVersion_V1 = BulkUriVersion(version='1', prefix=BULK_URN_PREFIX_VERSION) +BulkStorageVersion_V1 = BulkStorageVersion(version='1', prefix="wdms-1") """ version 1, using Dask to handle bulk manipulation and storage """ @@ -26,7 +25,7 @@ class BulkURI: """ def __init__(self, uri: Optional[str] = None, *, bulk_id: Optional[str] = None, - version: Optional[BulkUriVersion] = None): + version: Optional[BulkStorageVersion] = None): """ make an new one or invalid Either pass uri alone or bulk_id, version @@ -35,14 +34,14 @@ class BulkURI: if uri: bulk_id, prefix = self._decode_uri(uri) if not prefix: - version = BulkUriVersion_V0 - elif prefix == BULK_URN_PREFIX_VERSION: - version = BulkUriVersion_V1 + version = BulkStorageVersion_V0 + elif prefix == BulkStorageVersion_V1.prefix: + version = BulkStorageVersion_V1 else: raise ValueError('Unsupported prefix in bulk URI: ' + prefix) - elif not bulk_id or not version or version == BulkUriVersion_Invalid: + elif not bulk_id or not version or version == BulkStorageVersion_Invalid: bulk_id = '' - version = BulkUriVersion_Invalid + version = BulkStorageVersion_Invalid self._bulk_id = bulk_id self._uri_version = version @@ -53,7 +52,7 @@ class BulkURI: return cls() @classmethod - def new_with_version(cls, version: BulkUriVersion): + def new_with_version(cls, version: BulkStorageVersion): return cls(bulk_id=str(uuid.uuid4()), version=version) @classmethod @@ -62,18 +61,18 @@ class BulkURI: def is_V0(self) -> bool: """ convenient check that returns True is version == BulkUriVersion_V0 """ - return self._uri_version.version == BulkUriVersion_V0.version + return self._uri_version.version == BulkStorageVersion_V0.version @classmethod def make_V0_URI(cls, bulk_id: str) -> 'BulkURI': - return cls(bulk_id=bulk_id, version=BulkUriVersion_V0) + return cls(bulk_id=bulk_id, version=BulkStorageVersion_V0) @property def bulk_id(self) -> str: return self._bulk_id @property - def uri_version(self) -> BulkUriVersion: + def uri_version(self) -> BulkStorageVersion: return self._uri_version def __bool__(self) -> bool: diff --git a/app/routers/bulk/bulk_uri_dependencies.py b/app/routers/bulk/bulk_uri_dependencies.py index 9fb5dae8..87b7afa9 100644 --- a/app/routers/bulk/bulk_uri_dependencies.py +++ b/app/routers/bulk/bulk_uri_dependencies.py @@ -3,7 +3,7 @@ from abc import ABC from typing import Optional from fastapi import Request -from app.bulk_persistence.bulk_uri import BulkURI, BulkUriVersion_V1 +from app.bulk_persistence.bulk_uri import BulkURI, BulkStorageVersion_V1 from app.model.log_bulk import LogBulkHelper @@ -29,7 +29,7 @@ class OsduBulkIdAccess(BulkIdAccess): @staticmethod def set_bulk_uri(record, bulk_id: str): - bulk_uri = BulkURI(bulk_id=bulk_id, version=BulkUriVersion_V1) + bulk_uri = BulkURI(bulk_id=bulk_id, version=BulkStorageVersion_V1) record.data.setdefault("ExtensionProperties", {}).setdefault("wdms", {})[BULK_URI_FIELD] = bulk_uri.encode() @@ -41,7 +41,7 @@ class LogBulkIdAccess(BulkIdAccess): @staticmethod def set_bulk_uri(record, bulk_id: str): LogBulkHelper.update_bulk_uri(record=record, - bulk_uri=BulkURI(bulk_id=bulk_id, version=BulkUriVersion_V1)) + bulk_uri=BulkURI(bulk_id=bulk_id, version=BulkStorageVersion_V1)) async def set_log_bulk_id_access(request: Request): diff --git a/app/routers/ddms_v2/log_ddms_v2.py b/app/routers/ddms_v2/log_ddms_v2.py index d53b049a..3911d915 100644 --- a/app/routers/ddms_v2/log_ddms_v2.py +++ b/app/routers/ddms_v2/log_ddms_v2.py @@ -38,7 +38,7 @@ from odes_storage.models import ( from pydantic import BaseModel, Field from app.bulk_persistence import DataframeSerializerAsync, DataframeSerializerSync, JSONOrient, MimeTypes, get_dataframe -from app.bulk_persistence.bulk_uri import BulkURI, BulkUriVersion_V0 +from app.bulk_persistence.bulk_uri import BulkURI, BulkStorageVersion_V0 from app.clients.storage_service_client import get_storage_record_service from app.model.log_bulk import LogBulkHelper from app.model.model_curated import log @@ -197,7 +197,7 @@ async def _write_log_data( fetch_record(ctx, logid), ) # update the record - bulk_uri = BulkURI(bulk_id=bulk_id, version=BulkUriVersion_V0) + bulk_uri = BulkURI(bulk_id=bulk_id, version=BulkStorageVersion_V0) LogBulkHelper.update_bulk_uri(log_record, bulk_uri, bulk_path) # push new version on the storage diff --git a/tests/unit/bulk_persistence/bulk_uri_test.py b/tests/unit/bulk_persistence/bulk_uri_test.py index df2a0b04..ace918ae 100644 --- a/tests/unit/bulk_persistence/bulk_uri_test.py +++ b/tests/unit/bulk_persistence/bulk_uri_test.py @@ -14,12 +14,12 @@ import pytest from app.bulk_persistence.bulk_uri import ( - BulkURI, BulkUriVersion_V1, BulkUriVersion_V0, BULK_URN_PREFIX_VERSION, BulkUriVersion_Invalid) + BulkURI, BulkStorageVersion_V1, BulkStorageVersion_V0, BulkStorageVersion_Invalid) import uuid def test_bulk_id_is_an_uuid(): - uuid.UUID(BulkURI.new_with_version(BulkUriVersion_V1).bulk_id) + uuid.UUID(BulkURI.new_with_version(BulkStorageVersion_V1).bulk_id) def test_bulk_uri_explicit(): @@ -33,7 +33,7 @@ def test_from_uri_without_prefix(): bulk_uri = BulkURI.decode(uri_str) assert bulk_uri.bulk_id == '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' assert bulk_uri.is_V0() - assert bulk_uri.uri_version == BulkUriVersion_V0 + assert bulk_uri.uri_version == BulkStorageVersion_V0 assert bulk_uri.uri_version.prefix is None assert bulk_uri.is_valid() @@ -43,13 +43,13 @@ def test_from_uri_without_prefix(): def test_decode_urn_with_prefix(): - uri_str = f'urn:{BULK_URN_PREFIX_VERSION}:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' + uri_str = f'urn:{BulkStorageVersion_V1.prefix}:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' bulk_uri = BulkURI.decode(uri_str) assert bulk_uri.bulk_id == '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' assert not bulk_uri.is_V0() - assert bulk_uri.uri_version == BulkUriVersion_V1 - assert bulk_uri.uri_version.prefix == BULK_URN_PREFIX_VERSION + assert bulk_uri.uri_version == BulkStorageVersion_V1 + assert bulk_uri.uri_version.prefix == BulkStorageVersion_V1.prefix assert bulk_uri.is_valid() # should encode back to the same uri @@ -61,16 +61,16 @@ def test_decode_urn_with_prefix(): ([], {}), # no param, default ctor ([''], {}), ([''], {'bulk_id': '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8'}), - ([''], {'version': BulkUriVersion_Invalid}), - ([''], {'bulk_id': '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8', 'version': BulkUriVersion_Invalid}), - ([''], {'bulk_id': '', 'version': BulkUriVersion_V1}), + ([''], {'version': BulkStorageVersion_Invalid}), + ([''], {'bulk_id': '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8', 'version': BulkStorageVersion_Invalid}), + ([''], {'bulk_id': '', 'version': BulkStorageVersion_V1}), ]) def test_invalid_uri(args, kwargs): invalid_uri = BulkURI(*args, **kwargs) assert not invalid_uri.is_valid() assert not invalid_uri assert not invalid_uri.bulk_id - assert invalid_uri.uri_version == BulkUriVersion_Invalid + assert invalid_uri.uri_version == BulkStorageVersion_Invalid assert str(invalid_uri) == '' # explicit encode raises diff --git a/tests/unit/model/log_bulk_test.py b/tests/unit/model/log_bulk_test.py index 46881cf2..8c6cedcd 100644 --- a/tests/unit/model/log_bulk_test.py +++ b/tests/unit/model/log_bulk_test.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from app.model.log_bulk import LogBulkHelper -from app.bulk_persistence.bulk_uri import BulkURI, BulkUriVersion_V0 +from app.bulk_persistence.bulk_uri import BulkURI, BulkStorageVersion_V0 from tests.unit.test_utils import basic_record import uuid import pytest @@ -30,7 +30,7 @@ def test_update_bulk_id(record_with_bulkURI): assert record_with_bulkURI.data['log']['bulkURI'] == uri b_id = uuid.uuid4() - b_uri = BulkURI(bulk_id=str(b_id), version=BulkUriVersion_V0) + b_uri = BulkURI(bulk_id=str(b_id), version=BulkStorageVersion_V0) LogBulkHelper.update_bulk_uri(record_with_bulkURI, b_uri) assert record_with_bulkURI.data['log']['bulkURI'] == b_id.urn -- GitLab From 9f8445245bb2b402a12ac0954819167fcc5b60aa Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Fri, 5 Nov 2021 17:47:23 +0100 Subject: [PATCH 07/16] renaming + explicit ctor --- app/bulk_persistence/bulk_uri.py | 51 +++++++++++--------- app/routers/bulk/bulk_routes.py | 4 +- app/routers/bulk/bulk_uri_dependencies.py | 6 +-- app/routers/ddms_v2/log_ddms_v2.py | 4 +- app/routers/dipset/persistence.py | 2 +- app/routers/trajectory/persistence.py | 2 +- tests/unit/bulk_persistence/bulk_uri_test.py | 34 +++++-------- tests/unit/model/log_bulk_test.py | 4 +- 8 files changed, 51 insertions(+), 56 deletions(-) diff --git a/app/bulk_persistence/bulk_uri.py b/app/bulk_persistence/bulk_uri.py index 119abddf..05ea3750 100644 --- a/app/bulk_persistence/bulk_uri.py +++ b/app/bulk_persistence/bulk_uri.py @@ -3,19 +3,24 @@ import uuid class BulkStorageVersion(NamedTuple): - """ """ + """ This is the version of the bulk storage engine """ + version: str - prefix: Optional[str] + """ unique version identifier """ + + uri_prefix: Optional[str] + """ associated uri prefix """ -BulkStorageVersion_Invalid = BulkStorageVersion(version='', prefix=None) -""" represent an invalid/undefined storage version """ +class BulkStorageVersions: + V0 = BulkStorageVersion(version='0', uri_prefix=None) + """ first bulk management implementation with direct management to blob storage with a single blob """ -BulkStorageVersion_V0 = BulkStorageVersion(version='0', prefix=None) -""" first bulk management implementation with direct management to blob storage with a single blob """ + V1 = BulkStorageVersion(version='1', uri_prefix="wdms-1") + """ version 1, using Dask to handle bulk manipulation and storage """ -BulkStorageVersion_V1 = BulkStorageVersion(version='1', prefix="wdms-1") -""" version 1, using Dask to handle bulk manipulation and storage """ + invalid = BulkStorageVersion(version='', uri_prefix=None) + """ represent an invalid/undefined storage version """ class BulkURI: @@ -34,14 +39,14 @@ class BulkURI: if uri: bulk_id, prefix = self._decode_uri(uri) if not prefix: - version = BulkStorageVersion_V0 - elif prefix == BulkStorageVersion_V1.prefix: - version = BulkStorageVersion_V1 + version = BulkStorageVersions.V0 + elif prefix == BulkStorageVersions.V1.uri_prefix: + version = BulkStorageVersions.V1 else: raise ValueError('Unsupported prefix in bulk URI: ' + prefix) - elif not bulk_id or not version or version == BulkStorageVersion_Invalid: + elif not bulk_id or not version or version == BulkStorageVersions.invalid: bulk_id = '' - version = BulkStorageVersion_Invalid + version = BulkStorageVersions.invalid self._bulk_id = bulk_id self._uri_version = version @@ -51,21 +56,21 @@ class BulkURI: """ make an invalid instance """ return cls() - @classmethod - def new_with_version(cls, version: BulkStorageVersion): - return cls(bulk_id=str(uuid.uuid4()), version=version) - @classmethod def decode(cls, uri: str) -> 'BulkURI': return cls(uri) - def is_V0(self) -> bool: + def is_bulk_storage_V0(self) -> bool: """ convenient check that returns True is version == BulkUriVersion_V0 """ - return self._uri_version.version == BulkStorageVersion_V0.version + return self._uri_version.version == BulkStorageVersions.V0.version + + @classmethod + def from_bulk_storage_V0(cls, bulk_id: str) -> 'BulkURI': + return cls(bulk_id=bulk_id, version=BulkStorageVersions.V0) @classmethod - def make_V0_URI(cls, bulk_id: str) -> 'BulkURI': - return cls(bulk_id=bulk_id, version=BulkStorageVersion_V0) + def from_bulk_storage_V1(cls, bulk_id: str) -> 'BulkURI': + return cls(bulk_id=bulk_id, version=BulkStorageVersions.V1) @property def bulk_id(self) -> str: @@ -91,8 +96,8 @@ class BulkURI: If the prefix is not empty returns, uri format = `urn:$prefix:uuid:$bulk_id` If the prefix is empty or None, uri format = `urn:uuid:$bulk_id` """ - if self._uri_version.prefix: - return f'urn:{self._uri_version.prefix}:uuid:{self._bulk_id}' + if self._uri_version.uri_prefix: + return f'urn:{self._uri_version.uri_prefix}:uuid:{self._bulk_id}' return uuid.UUID(self._bulk_id).urn @classmethod diff --git a/app/routers/bulk/bulk_routes.py b/app/routers/bulk/bulk_routes.py index f62f980c..43bd918c 100644 --- a/app/routers/bulk/bulk_routes.py +++ b/app/routers/bulk/bulk_routes.py @@ -153,7 +153,7 @@ async def get_data_version( if not bulk_uri: raise BulkNotFound(record_id=record_id, bulk_id=None) bulk_id = bulk_uri.bulk_id - if bulk_uri.is_V0(): + if bulk_uri.is_bulk_storage_V0(): df = await get_dataframe(ctx, bulk_id) _check_df_columns_type_legacy(df) else: @@ -241,7 +241,7 @@ async def complete_session( detail=f'Record with version {i_session.session.fromVersion} from which ' f'update contains an invalid bulk URI') - if previous_bulk_uri.is_V0(): + if previous_bulk_uri.is_bulk_storage_V0(): try: df = await get_dataframe(ctx, previous_bulk_uri.bulk_id) # convert old bulk to new one diff --git a/app/routers/bulk/bulk_uri_dependencies.py b/app/routers/bulk/bulk_uri_dependencies.py index 87b7afa9..73280e45 100644 --- a/app/routers/bulk/bulk_uri_dependencies.py +++ b/app/routers/bulk/bulk_uri_dependencies.py @@ -3,7 +3,7 @@ from abc import ABC from typing import Optional from fastapi import Request -from app.bulk_persistence.bulk_uri import BulkURI, BulkStorageVersion_V1 +from app.bulk_persistence.bulk_uri import BulkURI from app.model.log_bulk import LogBulkHelper @@ -29,7 +29,7 @@ class OsduBulkIdAccess(BulkIdAccess): @staticmethod def set_bulk_uri(record, bulk_id: str): - bulk_uri = BulkURI(bulk_id=bulk_id, version=BulkStorageVersion_V1) + bulk_uri = BulkURI.from_bulk_storage_V1(bulk_id=bulk_id) record.data.setdefault("ExtensionProperties", {}).setdefault("wdms", {})[BULK_URI_FIELD] = bulk_uri.encode() @@ -41,7 +41,7 @@ class LogBulkIdAccess(BulkIdAccess): @staticmethod def set_bulk_uri(record, bulk_id: str): LogBulkHelper.update_bulk_uri(record=record, - bulk_uri=BulkURI(bulk_id=bulk_id, version=BulkStorageVersion_V1)) + bulk_uri=BulkURI.from_bulk_storage_V1(bulk_id=bulk_id)) async def set_log_bulk_id_access(request: Request): diff --git a/app/routers/ddms_v2/log_ddms_v2.py b/app/routers/ddms_v2/log_ddms_v2.py index 3911d915..a4c80e36 100644 --- a/app/routers/ddms_v2/log_ddms_v2.py +++ b/app/routers/ddms_v2/log_ddms_v2.py @@ -38,7 +38,7 @@ from odes_storage.models import ( from pydantic import BaseModel, Field from app.bulk_persistence import DataframeSerializerAsync, DataframeSerializerSync, JSONOrient, MimeTypes, get_dataframe -from app.bulk_persistence.bulk_uri import BulkURI, BulkStorageVersion_V0 +from app.bulk_persistence.bulk_uri import BulkURI from app.clients.storage_service_client import get_storage_record_service from app.model.log_bulk import LogBulkHelper from app.model.model_curated import log @@ -197,7 +197,7 @@ async def _write_log_data( fetch_record(ctx, logid), ) # update the record - bulk_uri = BulkURI(bulk_id=bulk_id, version=BulkStorageVersion_V0) + bulk_uri = BulkURI.from_bulk_storage_V0(bulk_id=bulk_id) LogBulkHelper.update_bulk_uri(log_record, bulk_uri, bulk_path) # push new version on the storage diff --git a/app/routers/dipset/persistence.py b/app/routers/dipset/persistence.py index cbc79aae..1b3207ef 100644 --- a/app/routers/dipset/persistence.py +++ b/app/routers/dipset/persistence.py @@ -248,7 +248,7 @@ def df_to_dips(dataframe: pd.DataFrame) -> List[Dip]: #TODO refactor duplicate with trajectory async def write_bulk(ctx, dataframe: pd.DataFrame) -> BulkURI: bulk_id = await create_and_store_dataframe(ctx, dataframe) - return BulkURI.make_V0_URI(bulk_id) + return BulkURI.from_bulk_storage_V0(bulk_id) async def write_dipset_data(ctx, dataframe: pd.DataFrame, ds: Union[dipset, str]) -> dipset: diff --git a/app/routers/trajectory/persistence.py b/app/routers/trajectory/persistence.py index 40ffc202..2d491137 100644 --- a/app/routers/trajectory/persistence.py +++ b/app/routers/trajectory/persistence.py @@ -63,4 +63,4 @@ class Persistence: @classmethod async def write_bulk(cls, ctx, dataframe: pd.DataFrame) -> BulkURI: bulk_id = await create_and_store_dataframe(ctx, dataframe) - return BulkURI.make_V0_URI(bulk_id) + return BulkURI.from_bulk_storage_V0(bulk_id) diff --git a/tests/unit/bulk_persistence/bulk_uri_test.py b/tests/unit/bulk_persistence/bulk_uri_test.py index ace918ae..fdb51ab5 100644 --- a/tests/unit/bulk_persistence/bulk_uri_test.py +++ b/tests/unit/bulk_persistence/bulk_uri_test.py @@ -13,17 +13,7 @@ # limitations under the License. import pytest -from app.bulk_persistence.bulk_uri import ( - BulkURI, BulkStorageVersion_V1, BulkStorageVersion_V0, BulkStorageVersion_Invalid) -import uuid - - -def test_bulk_id_is_an_uuid(): - uuid.UUID(BulkURI.new_with_version(BulkStorageVersion_V1).bulk_id) - - -def test_bulk_uri_explicit(): - pass +from app.bulk_persistence.bulk_uri import BulkURI, BulkStorageVersions # urn decode test @@ -32,9 +22,9 @@ def test_from_uri_without_prefix(): bulk_uri = BulkURI.decode(uri_str) assert bulk_uri.bulk_id == '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' - assert bulk_uri.is_V0() - assert bulk_uri.uri_version == BulkStorageVersion_V0 - assert bulk_uri.uri_version.prefix is None + assert bulk_uri.is_bulk_storage_V0() + assert bulk_uri.uri_version == BulkStorageVersions.V0 + assert bulk_uri.uri_version.uri_prefix is None assert bulk_uri.is_valid() # should encode back to the same uri @@ -43,13 +33,13 @@ def test_from_uri_without_prefix(): def test_decode_urn_with_prefix(): - uri_str = f'urn:{BulkStorageVersion_V1.prefix}:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' + uri_str = f'urn:{BulkStorageVersions.V1.uri_prefix}:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' bulk_uri = BulkURI.decode(uri_str) assert bulk_uri.bulk_id == '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' - assert not bulk_uri.is_V0() - assert bulk_uri.uri_version == BulkStorageVersion_V1 - assert bulk_uri.uri_version.prefix == BulkStorageVersion_V1.prefix + assert not bulk_uri.is_bulk_storage_V0() + assert bulk_uri.uri_version == BulkStorageVersions.V1 + assert bulk_uri.uri_version.uri_prefix == BulkStorageVersions.V1.uri_prefix assert bulk_uri.is_valid() # should encode back to the same uri @@ -61,16 +51,16 @@ def test_decode_urn_with_prefix(): ([], {}), # no param, default ctor ([''], {}), ([''], {'bulk_id': '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8'}), - ([''], {'version': BulkStorageVersion_Invalid}), - ([''], {'bulk_id': '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8', 'version': BulkStorageVersion_Invalid}), - ([''], {'bulk_id': '', 'version': BulkStorageVersion_V1}), + ([''], {'version': BulkStorageVersions.invalid}), + ([''], {'bulk_id': '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8', 'version': BulkStorageVersions.invalid}), + ([''], {'bulk_id': '', 'version': BulkStorageVersions.V1}), ]) def test_invalid_uri(args, kwargs): invalid_uri = BulkURI(*args, **kwargs) assert not invalid_uri.is_valid() assert not invalid_uri assert not invalid_uri.bulk_id - assert invalid_uri.uri_version == BulkStorageVersion_Invalid + assert invalid_uri.uri_version == BulkStorageVersions.invalid assert str(invalid_uri) == '' # explicit encode raises diff --git a/tests/unit/model/log_bulk_test.py b/tests/unit/model/log_bulk_test.py index 8c6cedcd..3bcb6e01 100644 --- a/tests/unit/model/log_bulk_test.py +++ b/tests/unit/model/log_bulk_test.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from app.model.log_bulk import LogBulkHelper -from app.bulk_persistence.bulk_uri import BulkURI, BulkStorageVersion_V0 +from app.bulk_persistence.bulk_uri import BulkURI from tests.unit.test_utils import basic_record import uuid import pytest @@ -30,7 +30,7 @@ def test_update_bulk_id(record_with_bulkURI): assert record_with_bulkURI.data['log']['bulkURI'] == uri b_id = uuid.uuid4() - b_uri = BulkURI(bulk_id=str(b_id), version=BulkStorageVersion_V0) + b_uri = BulkURI.from_bulk_storage_V0(str(b_id)) LogBulkHelper.update_bulk_uri(record_with_bulkURI, b_uri) assert record_with_bulkURI.data['log']['bulkURI'] == b_id.urn -- GitLab From 2515e0d0a94b5cb3e7eb59935d0083217bdba488 Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Mon, 8 Nov 2021 10:53:10 +0100 Subject: [PATCH 08/16] clean up, attribut rename add comments --- app/bulk_persistence/bulk_uri.py | 41 +++++++++++++++----- app/model/log_bulk.py | 18 +++------ app/routers/bulk/bulk_routes.py | 2 +- tests/unit/bulk_persistence/bulk_uri_test.py | 14 +++---- 4 files changed, 45 insertions(+), 30 deletions(-) diff --git a/app/bulk_persistence/bulk_uri.py b/app/bulk_persistence/bulk_uri.py index 05ea3750..2596faa6 100644 --- a/app/bulk_persistence/bulk_uri.py +++ b/app/bulk_persistence/bulk_uri.py @@ -25,9 +25,27 @@ class BulkStorageVersions: class BulkURI: """ - Bulk URI, contains the bulk identifier (bulk_id) and URI version which identifies the storage engine version how + Bulk URI, contains the bulk identifier (bulk_id) and Storage engine version which identifies how the bulk is stored. + + Usage: + - ctor from URI string value: + `bulk_uri = BulkURI(uri_str)` + + - ctor explicit given a bulk_id and a storage version: + `bulk_uri = BulkURI(bulk_id=bulk_id_value, version=BulkStorageVersions.V1)` + + - ctor explict using class method: + `bulk_uri = BulkURI.from_bulk_storage_V1(bulk_id_value)` + + - encode to URI string value: + `uri_str: str = bulk_uri.encode()` + + - check which storage engine version is: + `bulk_uri.storage_version == BulkStorageVersions.V0` + `bulk_uri.is_bulk_storage_V0()` """ + def __init__(self, uri: Optional[str] = None, *, bulk_id: Optional[str] = None, version: Optional[BulkStorageVersion] = None): @@ -49,7 +67,7 @@ class BulkURI: version = BulkStorageVersions.invalid self._bulk_id = bulk_id - self._uri_version = version + self._storage_version = version @classmethod def invalid(cls): @@ -61,15 +79,17 @@ class BulkURI: return cls(uri) def is_bulk_storage_V0(self) -> bool: - """ convenient check that returns True is version == BulkUriVersion_V0 """ - return self._uri_version.version == BulkStorageVersions.V0.version + """ convenient check that returns True is version == BulkStorageVersions.V0 """ + return self._storage_version.version == BulkStorageVersions.V0.version @classmethod def from_bulk_storage_V0(cls, bulk_id: str) -> 'BulkURI': + """ construct a BulkURI for storage engine V0 given a bulk id """ return cls(bulk_id=bulk_id, version=BulkStorageVersions.V0) @classmethod def from_bulk_storage_V1(cls, bulk_id: str) -> 'BulkURI': + """ construct a BulkURI for storage engine V1 given a bulk id """ return cls(bulk_id=bulk_id, version=BulkStorageVersions.V1) @property @@ -77,17 +97,17 @@ class BulkURI: return self._bulk_id @property - def uri_version(self) -> BulkStorageVersion: - return self._uri_version + def storage_version(self) -> BulkStorageVersion: + return self._storage_version def __bool__(self) -> bool: """ check invalid """ - if self._bulk_id and self._uri_version.version: + if self._bulk_id and self._storage_version.version: return True return False def __str__(self) -> str: - """ as URN string """ + """ as URN string, does not throw if invalid and return an empty string instead """ return self.encode() if self.is_valid() else '' def encode(self) -> str: @@ -95,9 +115,10 @@ class BulkURI: encode to uri as string If the prefix is not empty returns, uri format = `urn:$prefix:uuid:$bulk_id` If the prefix is empty or None, uri format = `urn:uuid:$bulk_id` + :Throw: ValueError """ - if self._uri_version.uri_prefix: - return f'urn:{self._uri_version.uri_prefix}:uuid:{self._bulk_id}' + if self._storage_version.uri_prefix: + return f'urn:{self._storage_version.uri_prefix}:uuid:{self._bulk_id}' return uuid.UUID(self._bulk_id).urn @classmethod diff --git a/app/model/log_bulk.py b/app/model/log_bulk.py index 257f1853..2d97b120 100644 --- a/app/model/log_bulk.py +++ b/app/model/log_bulk.py @@ -21,6 +21,7 @@ from odes_storage.models import Record from app.bulk_persistence.bulk_uri import BulkURI from app.bulk_persistence import BulkId + class LogBulkHelper: # TODO find a better name, LogRecordHelper. but I don't like 'helper', its just a synonymous of bag of several thing # breaking single responsibility principle @@ -33,7 +34,7 @@ class LogBulkHelper: return record.data @classmethod - def _set_bulk_uri_in_wks(cls, record: Record, bulk_uri: str) -> None: + def _set_bulk_uri_in_wks(cls, record: Record, bulk_uri: str): """ for now it used externalIds, to _get_bulk_id_from_wks be updated once schema is fixed with log.data.bulkId """ @@ -49,9 +50,7 @@ class LogBulkHelper: return bulk_uri @classmethod - def update_bulk_uri( - cls, record: Record, bulk_uri: Union[str, BulkURI], custom_bulk_id_path: Optional[str] = None - ): + def update_bulk_uri(cls, record: Record, bulk_uri: Union[str, BulkURI], custom_bulk_id_path: Optional[str] = None): """ Update bulk id within a log record. Note that the custom path cannot be applied when using a strict structured model It creates the field if not exist @@ -69,17 +68,13 @@ class LogBulkHelper: field_name = custom_bulk_id_path.split(".")[-1] json_exp = parse_jsonpath(custom_bulk_id_path).child(JsonParent()) - json_exp.find(record_dict)[0].value[ - field_name - ] = uri_value + json_exp.find(record_dict)[0].value[field_name] = uri_value # if only support existing field, it can be done with a simple update call # parse_jsonpath(custom_bulk_id_path).update(record, bulk_ref) record.data = record_dict["data"] @classmethod - def get_bulk_uri( - cls, record: Record, custom_bulk_id_path: Optional[str] = None - ) -> BulkURI: + def get_bulk_uri(cls, record: Record, custom_bulk_id_path: Optional[str] = None) -> BulkURI: """ :param record: :param custom_bulk_id_path: !! incompatible with log model @@ -92,5 +87,4 @@ class LogBulkHelper: matches = parse_jsonpath(custom_bulk_id_path).find(record_dict) if len(matches) > 0: return BulkURI(matches[0].value) - return BulkURI() - + return BulkURI.invalid() diff --git a/app/routers/bulk/bulk_routes.py b/app/routers/bulk/bulk_routes.py index 43bd918c..dfe7550e 100644 --- a/app/routers/bulk/bulk_routes.py +++ b/app/routers/bulk/bulk_routes.py @@ -17,7 +17,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request, status from app.bulk_persistence import JSONOrient, get_dataframe from app.bulk_persistence.dask.dask_bulk_storage import DaskBulkStorage -from app.bulk_persistence.dask.errors import BulkError, BulkNotFound, BulkNotProcessable +from app.bulk_persistence.dask.errors import BulkError, BulkNotFound from app.bulk_persistence.mime_types import MimeTypes from app.model.model_chunking import GetDataParams diff --git a/tests/unit/bulk_persistence/bulk_uri_test.py b/tests/unit/bulk_persistence/bulk_uri_test.py index fdb51ab5..417378d5 100644 --- a/tests/unit/bulk_persistence/bulk_uri_test.py +++ b/tests/unit/bulk_persistence/bulk_uri_test.py @@ -23,13 +23,13 @@ def test_from_uri_without_prefix(): bulk_uri = BulkURI.decode(uri_str) assert bulk_uri.bulk_id == '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' assert bulk_uri.is_bulk_storage_V0() - assert bulk_uri.uri_version == BulkStorageVersions.V0 - assert bulk_uri.uri_version.uri_prefix is None + assert bulk_uri.storage_version == BulkStorageVersions.V0 + assert bulk_uri.storage_version.uri_prefix is None assert bulk_uri.is_valid() # should encode back to the same uri assert bulk_uri.encode() == uri_str - assert str(bulk_uri.encode()) == uri_str + assert str(bulk_uri) == uri_str def test_decode_urn_with_prefix(): @@ -38,13 +38,13 @@ def test_decode_urn_with_prefix(): bulk_uri = BulkURI.decode(uri_str) assert bulk_uri.bulk_id == '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' assert not bulk_uri.is_bulk_storage_V0() - assert bulk_uri.uri_version == BulkStorageVersions.V1 - assert bulk_uri.uri_version.uri_prefix == BulkStorageVersions.V1.uri_prefix + assert bulk_uri.storage_version == BulkStorageVersions.V1 + assert bulk_uri.storage_version.uri_prefix == BulkStorageVersions.V1.uri_prefix assert bulk_uri.is_valid() # should encode back to the same uri assert bulk_uri.encode() == uri_str - assert str(bulk_uri.encode()) == uri_str + assert str(bulk_uri) == uri_str @pytest.mark.parametrize("args, kwargs", [ @@ -60,7 +60,7 @@ def test_invalid_uri(args, kwargs): assert not invalid_uri.is_valid() assert not invalid_uri assert not invalid_uri.bulk_id - assert invalid_uri.uri_version == BulkStorageVersions.invalid + assert invalid_uri.storage_version == BulkStorageVersions.invalid assert str(invalid_uri) == '' # explicit encode raises -- GitLab From fa3ee02abf564cd309a1bbdd0f726ca570dd3106 Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Mon, 8 Nov 2021 12:04:22 +0100 Subject: [PATCH 09/16] update NOTICE for fossa scan --- NOTICE | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/NOTICE b/NOTICE index 27fecebd..910765be 100644 --- a/NOTICE +++ b/NOTICE @@ -60,6 +60,7 @@ BSD-2-Clause ======================================================================== The following software have components provided under the terms of this license: +- colorama (from https://github.com/tartley/colorama) - grpcio (from https://grpc.io) - locket (from http://github.com/mwilliamson/locket.py) - mock (from https://github.com/testing-cabal/mock) @@ -82,6 +83,7 @@ The following software have components provided under the terms of this license: - asgiref (from http://github.com/django/asgiref/) - click (from http://github.com/mitsuhiko/click) - cloudpickle (from https://github.com/cloudpipe/cloudpickle) +- colorama (from https://github.com/tartley/colorama) - cryptography (from https://github.com/pyca/cryptography) - dask (from http://github.com/dask/dask/) - decorator (from https://github.com/micheles/decorator) @@ -169,14 +171,6 @@ The following software have components provided under the terms of this license: - grpcio (from https://grpc.io) - pyparsing (from http://pyparsing.wikispaces.com/) - rfc3986 (from https://rfc3986.readthedocs.org) -- strict-rfc3339 (from http://www.danielrichman.co.uk/libraries/strict-rfc3339.html) - -======================================================================== -GPL-3.0-or-later -======================================================================== -The following software have components provided under the terms of this license: - -- strict-rfc3339 (from http://www.danielrichman.co.uk/libraries/strict-rfc3339.html) ======================================================================== ISC -- GitLab From 2e7da25274cfd14c926d76a30fbf687cd678cccf Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Mon, 8 Nov 2021 15:02:28 +0100 Subject: [PATCH 10/16] adressing MR feedback --- app/bulk_persistence/bulk_storage_version.py | 21 +++++ app/bulk_persistence/bulk_uri.py | 80 ++++++++------------ app/model/log_bulk.py | 9 +-- app/routers/bulk/bulk_routes.py | 2 +- app/routers/bulk/bulk_uri_dependencies.py | 2 +- app/routers/ddms_v2/log_ddms_v2.py | 6 +- app/routers/ddms_v2/persistence.py | 2 +- app/routers/dipset/persistence.py | 2 +- app/routers/trajectory/persistence.py | 2 +- tests/unit/bulk_persistence/bulk_uri_test.py | 40 +++++----- tests/unit/model/log_bulk_test.py | 1 - 11 files changed, 83 insertions(+), 84 deletions(-) create mode 100644 app/bulk_persistence/bulk_storage_version.py diff --git a/app/bulk_persistence/bulk_storage_version.py b/app/bulk_persistence/bulk_storage_version.py new file mode 100644 index 00000000..39b072bf --- /dev/null +++ b/app/bulk_persistence/bulk_storage_version.py @@ -0,0 +1,21 @@ +from typing import Optional, NamedTuple + + +class BulkStorageVersion(NamedTuple): + """ This is the version of the bulk storage engine """ + + version: str + """ unique version identifier """ + + uri_prefix: Optional[str] + """ associated uri prefix """ + + +BulkStorageVersion_V0 = BulkStorageVersion(version='0', uri_prefix=None) +""" first bulk management implementation with direct management to blob storage with a single blob """ + +BulkStorageVersion_V1 = BulkStorageVersion(version='1', uri_prefix="wdms-1") +""" version 1, using Dask to handle bulk manipulation and storage """ + +BulkStorageVersion_Invalid = BulkStorageVersion(version='', uri_prefix=None) +""" represent an invalid/undefined storage version """ diff --git a/app/bulk_persistence/bulk_uri.py b/app/bulk_persistence/bulk_uri.py index 2596faa6..65481dff 100644 --- a/app/bulk_persistence/bulk_uri.py +++ b/app/bulk_persistence/bulk_uri.py @@ -1,26 +1,8 @@ -from typing import Optional, NamedTuple, Tuple +from typing import Optional, Tuple import uuid - -class BulkStorageVersion(NamedTuple): - """ This is the version of the bulk storage engine """ - - version: str - """ unique version identifier """ - - uri_prefix: Optional[str] - """ associated uri prefix """ - - -class BulkStorageVersions: - V0 = BulkStorageVersion(version='0', uri_prefix=None) - """ first bulk management implementation with direct management to blob storage with a single blob """ - - V1 = BulkStorageVersion(version='1', uri_prefix="wdms-1") - """ version 1, using Dask to handle bulk manipulation and storage """ - - invalid = BulkStorageVersion(version='', uri_prefix=None) - """ represent an invalid/undefined storage version """ +from .bulk_storage_version import ( + BulkStorageVersion, BulkStorageVersion_V0, BulkStorageVersion_V1, BulkStorageVersion_Invalid) class BulkURI: @@ -30,10 +12,10 @@ class BulkURI: Usage: - ctor from URI string value: - `bulk_uri = BulkURI(uri_str)` + `bulk_uri = BulkURI.decode(uri_str)` - ctor explicit given a bulk_id and a storage version: - `bulk_uri = BulkURI(bulk_id=bulk_id_value, version=BulkStorageVersions.V1)` + `bulk_uri = BulkURI(bulk_id=bulk_id_value, version=BulkStorageVersion_V1)` - ctor explict using class method: `bulk_uri = BulkURI.from_bulk_storage_V1(bulk_id_value)` @@ -42,29 +24,19 @@ class BulkURI: `uri_str: str = bulk_uri.encode()` - check which storage engine version is: - `bulk_uri.storage_version == BulkStorageVersions.V0` + `bulk_uri.storage_version == BulkStorageVersion_V0` `bulk_uri.is_bulk_storage_V0()` """ - def __init__(self, uri: Optional[str] = None, *, - bulk_id: Optional[str] = None, - version: Optional[BulkStorageVersion] = None): + def __init__(self, bulk_id: str, version: BulkStorageVersion): """ make an new one or invalid Either pass uri alone or bulk_id, version :throw: ValueError """ - if uri: - bulk_id, prefix = self._decode_uri(uri) - if not prefix: - version = BulkStorageVersions.V0 - elif prefix == BulkStorageVersions.V1.uri_prefix: - version = BulkStorageVersions.V1 - else: - raise ValueError('Unsupported prefix in bulk URI: ' + prefix) - elif not bulk_id or not version or version == BulkStorageVersions.invalid: + if not bulk_id or not version or version == BulkStorageVersion_Invalid: bulk_id = '' - version = BulkStorageVersions.invalid + version = BulkStorageVersion_Invalid self._bulk_id = bulk_id self._storage_version = version @@ -72,25 +44,38 @@ class BulkURI: @classmethod def invalid(cls): """ make an invalid instance """ - return cls() + return cls('', BulkStorageVersion_Invalid) @classmethod def decode(cls, uri: str) -> 'BulkURI': - return cls(uri) + """ + construct a BulkURI from an encoded URI + :throw: ValueError + """ + if not uri: + return BulkURI.invalid() + bulk_id, prefix = cls._decode_uri(uri) + if not prefix: + version = BulkStorageVersion_V0 + elif prefix == BulkStorageVersion_V1.uri_prefix: + version = BulkStorageVersion_V1 + else: + raise ValueError('Unsupported prefix in bulk URI: ' + prefix) + return cls(bulk_id=bulk_id, version=version) def is_bulk_storage_V0(self) -> bool: """ convenient check that returns True is version == BulkStorageVersions.V0 """ - return self._storage_version.version == BulkStorageVersions.V0.version + return self._storage_version.version == BulkStorageVersion_V0.version @classmethod def from_bulk_storage_V0(cls, bulk_id: str) -> 'BulkURI': """ construct a BulkURI for storage engine V0 given a bulk id """ - return cls(bulk_id=bulk_id, version=BulkStorageVersions.V0) + return cls(bulk_id=bulk_id, version=BulkStorageVersion_V0) @classmethod def from_bulk_storage_V1(cls, bulk_id: str) -> 'BulkURI': """ construct a BulkURI for storage engine V1 given a bulk id """ - return cls(bulk_id=bulk_id, version=BulkStorageVersions.V1) + return cls(bulk_id=bulk_id, version=BulkStorageVersion_V1) @property def bulk_id(self) -> str: @@ -100,12 +85,6 @@ class BulkURI: def storage_version(self) -> BulkStorageVersion: return self._storage_version - def __bool__(self) -> bool: - """ check invalid """ - if self._bulk_id and self._storage_version.version: - return True - return False - def __str__(self) -> str: """ as URN string, does not throw if invalid and return an empty string instead """ return self.encode() if self.is_valid() else '' @@ -137,4 +116,7 @@ class BulkURI: return str(uuid.UUID(f"{parts[0]}:{parts[-2]}:{parts[-1]}")), ":".join(parts[1:-2]) def is_valid(self) -> bool: - return self.__bool__() + """ check invalid """ + if self._bulk_id and self._storage_version.version: + return True + return False diff --git a/app/model/log_bulk.py b/app/model/log_bulk.py index 2d97b120..3a748fe7 100644 --- a/app/model/log_bulk.py +++ b/app/model/log_bulk.py @@ -12,14 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional, Tuple, Union +from typing import Optional, Union from jsonpath_ng import parse as parse_jsonpath from jsonpath_ng.jsonpath import Parent as JsonParent from odes_storage.models import Record from app.bulk_persistence.bulk_uri import BulkURI -from app.bulk_persistence import BulkId class LogBulkHelper: @@ -58,7 +57,7 @@ class LogBulkHelper: :param bulk_uri: either already encode uri as string or BulkURI :param custom_bulk_id_path: !! incompatible with log model """ - uri_value = str(bulk_uri) + uri_value = bulk_uri.encode() if isinstance(bulk_uri, BulkURI) else bulk_uri if custom_bulk_id_path is None: # what about empty string ? cls._set_bulk_uri_in_wks(record, uri_value) else: @@ -81,10 +80,10 @@ class LogBulkHelper: :return: BulkURI, could be invalid if none """ if custom_bulk_id_path is None: # what about empty string ? - return BulkURI(cls._get_bulk_uri_from_wks(record)) + return BulkURI.decode(cls._get_bulk_uri_from_wks(record)) record_dict = {"data": record.data} matches = parse_jsonpath(custom_bulk_id_path).find(record_dict) if len(matches) > 0: - return BulkURI(matches[0].value) + return BulkURI.decode(matches[0].value) return BulkURI.invalid() diff --git a/app/routers/bulk/bulk_routes.py b/app/routers/bulk/bulk_routes.py index dfe7550e..d21a391a 100644 --- a/app/routers/bulk/bulk_routes.py +++ b/app/routers/bulk/bulk_routes.py @@ -150,7 +150,7 @@ async def get_data_version( stat = None try: - if not bulk_uri: + if not bulk_uri.is_valid(): raise BulkNotFound(record_id=record_id, bulk_id=None) bulk_id = bulk_uri.bulk_id if bulk_uri.is_bulk_storage_V0(): diff --git a/app/routers/bulk/bulk_uri_dependencies.py b/app/routers/bulk/bulk_uri_dependencies.py index 73280e45..ce9dad21 100644 --- a/app/routers/bulk/bulk_uri_dependencies.py +++ b/app/routers/bulk/bulk_uri_dependencies.py @@ -25,7 +25,7 @@ class BulkIdAccess(ABC): class OsduBulkIdAccess(BulkIdAccess): @staticmethod def get_bulk_uri(record) -> BulkURI: - return BulkURI(record.data.get("ExtensionProperties", {}).get("wdms", {}).get(BULK_URI_FIELD, None)) + return BulkURI.decode(record.data.get("ExtensionProperties", {}).get("wdms", {}).get(BULK_URI_FIELD, None)) @staticmethod def set_bulk_uri(record, bulk_id: str): diff --git a/app/routers/ddms_v2/log_ddms_v2.py b/app/routers/ddms_v2/log_ddms_v2.py index a4c80e36..8b3851e7 100644 --- a/app/routers/ddms_v2/log_ddms_v2.py +++ b/app/routers/ddms_v2/log_ddms_v2.py @@ -411,11 +411,11 @@ async def get_log_data_statistics(logid: str, log_record = await fetch_record(ctx, logid) # use dict to support the custom path bulk_uri = LogBulkHelper.get_bulk_uri(log_record, bulk_id_path) - if not bulk_uri: - content = '{}' # no bulk - else: + if bulk_uri.is_valid(): df = await get_dataframe(ctx, bulk_uri.bulk_id) content = df.describe(include="all").to_json() + else: + content = '{}' # no bulk return Response(content=content, media_type=MimeTypes.JSON.type) diff --git a/app/routers/ddms_v2/persistence.py b/app/routers/ddms_v2/persistence.py index f3bf36e1..109237d2 100644 --- a/app/routers/ddms_v2/persistence.py +++ b/app/routers/ddms_v2/persistence.py @@ -33,7 +33,7 @@ class Persistence: ) -> pd.DataFrame: bulk_uri = LogBulkHelper.get_bulk_uri(record, bulk_id_path) # TODO use prefix to know how to read the bulk - if not bulk_uri: + if not bulk_uri.is_valid(): return pd.DataFrame() return await get_dataframe(ctx, bulk_uri.bulk_id) diff --git a/app/routers/dipset/persistence.py b/app/routers/dipset/persistence.py index 1b3207ef..aa24d5f3 100644 --- a/app/routers/dipset/persistence.py +++ b/app/routers/dipset/persistence.py @@ -294,7 +294,7 @@ async def read_dipset_data(ctx, ds: Union[dipset, str]) -> Tuple[dipset, pd.Data return my_dipset, pd.DataFrame() # Fetch data - bulk_uri = BulkURI(my_dipset.data.bulkURI) + bulk_uri = BulkURI.decode(my_dipset.data.bulkURI) # TODO use prefix to know how to read the bulk df = await get_dataframe(ctx, bulk_uri.bulk_id) diff --git a/app/routers/trajectory/persistence.py b/app/routers/trajectory/persistence.py index 2d491137..39da497f 100644 --- a/app/routers/trajectory/persistence.py +++ b/app/routers/trajectory/persistence.py @@ -46,7 +46,7 @@ class Persistence: raise NoBulkException try: - bulk_uri = BulkURI(record.data.bulkURI) + bulk_uri = BulkURI.decode(record.data.bulkURI) # TODO use prefix to know how to read the bulk df = await get_dataframe(ctx, bulk_uri.bulk_id) except Exception as ex: diff --git a/tests/unit/bulk_persistence/bulk_uri_test.py b/tests/unit/bulk_persistence/bulk_uri_test.py index 417378d5..4f83858d 100644 --- a/tests/unit/bulk_persistence/bulk_uri_test.py +++ b/tests/unit/bulk_persistence/bulk_uri_test.py @@ -13,7 +13,9 @@ # limitations under the License. import pytest -from app.bulk_persistence.bulk_uri import BulkURI, BulkStorageVersions +from app.bulk_persistence.bulk_uri import BulkURI +from app.bulk_persistence.bulk_storage_version import ( + BulkStorageVersion_V0, BulkStorageVersion_V1, BulkStorageVersion_Invalid) # urn decode test @@ -23,44 +25,40 @@ def test_from_uri_without_prefix(): bulk_uri = BulkURI.decode(uri_str) assert bulk_uri.bulk_id == '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' assert bulk_uri.is_bulk_storage_V0() - assert bulk_uri.storage_version == BulkStorageVersions.V0 + assert bulk_uri.storage_version == BulkStorageVersion_V0 assert bulk_uri.storage_version.uri_prefix is None assert bulk_uri.is_valid() # should encode back to the same uri assert bulk_uri.encode() == uri_str - assert str(bulk_uri) == uri_str def test_decode_urn_with_prefix(): - uri_str = f'urn:{BulkStorageVersions.V1.uri_prefix}:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' + uri_str = f'urn:{BulkStorageVersion_V1.uri_prefix}:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' bulk_uri = BulkURI.decode(uri_str) assert bulk_uri.bulk_id == '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8' assert not bulk_uri.is_bulk_storage_V0() - assert bulk_uri.storage_version == BulkStorageVersions.V1 - assert bulk_uri.storage_version.uri_prefix == BulkStorageVersions.V1.uri_prefix + assert bulk_uri.storage_version == BulkStorageVersion_V1 + assert bulk_uri.storage_version.uri_prefix == BulkStorageVersion_V1.uri_prefix assert bulk_uri.is_valid() # should encode back to the same uri assert bulk_uri.encode() == uri_str - assert str(bulk_uri) == uri_str -@pytest.mark.parametrize("args, kwargs", [ - ([], {}), # no param, default ctor - ([''], {}), - ([''], {'bulk_id': '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8'}), - ([''], {'version': BulkStorageVersions.invalid}), - ([''], {'bulk_id': '489768d2-eee1-4a8f-ae95-7b0c30b0dcd8', 'version': BulkStorageVersions.invalid}), - ([''], {'bulk_id': '', 'version': BulkStorageVersions.V1}), +@pytest.mark.parametrize("bulk_id, version", [ + ('489768d2-eee1-4a8f-ae95-7b0c30b0dcd8', None), + ('', BulkStorageVersion_Invalid), + ('489768d2-eee1-4a8f-ae95-7b0c30b0dcd8', BulkStorageVersion_Invalid), + ('', None), + ('', BulkStorageVersion_V1), ]) -def test_invalid_uri(args, kwargs): - invalid_uri = BulkURI(*args, **kwargs) +def test_invalid_uri(bulk_id, version): + invalid_uri = BulkURI(bulk_id, version) assert not invalid_uri.is_valid() - assert not invalid_uri assert not invalid_uri.bulk_id - assert invalid_uri.storage_version == BulkStorageVersions.invalid + assert invalid_uri.storage_version == BulkStorageVersion_Invalid assert str(invalid_uri) == '' # explicit encode raises @@ -71,12 +69,12 @@ def test_invalid_uri(args, kwargs): def test_decode_urn_invalid_input_should_throw(): # bad formed urn format with pytest.raises(ValueError): - BulkURI('invalid_urn_uri') + BulkURI.decode('invalid_urn_uri') # bulk_id not a valid UUID with pytest.raises(ValueError): - BulkURI('urn:uuid:invalid_uuid') + BulkURI.decode('urn:uuid:invalid_uuid') # unknown prefix with pytest.raises(ValueError): - BulkURI('urn:UNKNOWN_PREFIX:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8') + BulkURI.decode('urn:UNKNOWN_PREFIX:uuid:489768d2-eee1-4a8f-ae95-7b0c30b0dcd8') diff --git a/tests/unit/model/log_bulk_test.py b/tests/unit/model/log_bulk_test.py index 3bcb6e01..fa8170ef 100644 --- a/tests/unit/model/log_bulk_test.py +++ b/tests/unit/model/log_bulk_test.py @@ -60,5 +60,4 @@ def test_update_bulk_id_on_empty_record(basic_record): def test_get_bulk_id_on_empty_record(basic_record): bulk_uri = LogBulkHelper.get_bulk_uri(basic_record) - assert not bulk_uri assert not bulk_uri.is_valid() -- GitLab From 2ca2eb56d96681cd929502319260c77a17eac059 Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Mon, 8 Nov 2021 17:51:26 +0100 Subject: [PATCH 11/16] - remove __str__ to enforce explicit encode - update_bulk_uri for log wks model takes BulkURI only --- app/bulk_persistence/bulk_uri.py | 4 ---- app/model/log_bulk.py | 13 +++--------- app/routers/dipset/persistence.py | 3 ++- app/routers/trajectory/trajectory_ddms_v2.py | 6 +++++- tests/unit/bulk_persistence/bulk_uri_test.py | 1 - tests/unit/model/log_bulk_test.py | 22 +++++++++----------- 6 files changed, 20 insertions(+), 29 deletions(-) diff --git a/app/bulk_persistence/bulk_uri.py b/app/bulk_persistence/bulk_uri.py index 65481dff..89e803fb 100644 --- a/app/bulk_persistence/bulk_uri.py +++ b/app/bulk_persistence/bulk_uri.py @@ -85,10 +85,6 @@ class BulkURI: def storage_version(self) -> BulkStorageVersion: return self._storage_version - def __str__(self) -> str: - """ as URN string, does not throw if invalid and return an empty string instead """ - return self.encode() if self.is_valid() else '' - def encode(self) -> str: """ encode to uri as string diff --git a/app/model/log_bulk.py b/app/model/log_bulk.py index 3a748fe7..266af919 100644 --- a/app/model/log_bulk.py +++ b/app/model/log_bulk.py @@ -32,13 +32,6 @@ class LogBulkHelper: record.data = {} return record.data - @classmethod - def _set_bulk_uri_in_wks(cls, record: Record, bulk_uri: str): - """ - for now it used externalIds, to _get_bulk_id_from_wks be updated once schema is fixed with log.data.bulkId - """ - cls._get_record_data_dict(record).setdefault('log', {})['bulkURI'] = bulk_uri - @classmethod def _get_bulk_uri_from_wks(cls, record: Record) -> Optional[str]: bulk_uri = ( @@ -49,7 +42,7 @@ class LogBulkHelper: return bulk_uri @classmethod - def update_bulk_uri(cls, record: Record, bulk_uri: Union[str, BulkURI], custom_bulk_id_path: Optional[str] = None): + def update_bulk_uri(cls, record: Record, bulk_uri: BulkURI, custom_bulk_id_path: Optional[str] = None): """ Update bulk id within a log record. Note that the custom path cannot be applied when using a strict structured model It creates the field if not exist @@ -57,9 +50,9 @@ class LogBulkHelper: :param bulk_uri: either already encode uri as string or BulkURI :param custom_bulk_id_path: !! incompatible with log model """ - uri_value = bulk_uri.encode() if isinstance(bulk_uri, BulkURI) else bulk_uri + uri_value = bulk_uri.encode() if custom_bulk_id_path is None: # what about empty string ? - cls._set_bulk_uri_in_wks(record, uri_value) + cls._get_record_data_dict(record).setdefault('log', {})['bulkURI'] = uri_value else: record_dict = {"data": record.data} diff --git a/app/routers/dipset/persistence.py b/app/routers/dipset/persistence.py index aa24d5f3..b5b2edb3 100644 --- a/app/routers/dipset/persistence.py +++ b/app/routers/dipset/persistence.py @@ -260,7 +260,8 @@ async def write_dipset_data(ctx, dataframe: pd.DataFrame, ds: Union[dipset, str] dataframe.sort_values(by=["reference", "azimuth"], inplace=True, ignore_index=True) # Write data in storage and update dipset bulk URI - my_dipset.data.bulkURI = str(await write_bulk(ctx, dataframe)) + bulk_uri = await write_bulk(ctx, dataframe) + my_dipset.data.bulkURI = bulk_uri.encode() # Create or update logs await create_missing_logs(ctx, my_dipset) diff --git a/app/routers/trajectory/trajectory_ddms_v2.py b/app/routers/trajectory/trajectory_ddms_v2.py index 4989c092..ebf0a356 100644 --- a/app/routers/trajectory/trajectory_ddms_v2.py +++ b/app/routers/trajectory/trajectory_ddms_v2.py @@ -236,7 +236,8 @@ async def post_traj_data( trajectory_record = await fetch_trajectory_record(ctx, trajectoryid) record = from_record(Trajectory, trajectory_record) - record.data.bulkURI = str(await persistence.write_bulk(ctx, df)) + bulk_uri = await persistence.write_bulk(ctx, df) + record.data.bulkURI = bulk_uri.encode() # update record's channels if not record.data.channels: @@ -298,6 +299,9 @@ async def _get_trajectory_data( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(key_error)) from key_error except InvalidBulkException as ex: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(ex)) + except Exception as ex: + print(ex) + content = await DataframeSerializerAsync().to_json(df, orient=orient) return Response(content=content, media_type=MimeTypes.JSON.type) diff --git a/tests/unit/bulk_persistence/bulk_uri_test.py b/tests/unit/bulk_persistence/bulk_uri_test.py index 4f83858d..6a3d562b 100644 --- a/tests/unit/bulk_persistence/bulk_uri_test.py +++ b/tests/unit/bulk_persistence/bulk_uri_test.py @@ -59,7 +59,6 @@ def test_invalid_uri(bulk_id, version): assert not invalid_uri.is_valid() assert not invalid_uri.bulk_id assert invalid_uri.storage_version == BulkStorageVersion_Invalid - assert str(invalid_uri) == '' # explicit encode raises with pytest.raises(ValueError): diff --git a/tests/unit/model/log_bulk_test.py b/tests/unit/model/log_bulk_test.py index fa8170ef..b5483639 100644 --- a/tests/unit/model/log_bulk_test.py +++ b/tests/unit/model/log_bulk_test.py @@ -25,10 +25,6 @@ def record_with_bulkURI(basic_record): def test_update_bulk_id(record_with_bulkURI): - uri = uuid.uuid4().urn - LogBulkHelper.update_bulk_uri(record_with_bulkURI, uri) - assert record_with_bulkURI.data['log']['bulkURI'] == uri - b_id = uuid.uuid4() b_uri = BulkURI.from_bulk_storage_V0(str(b_id)) LogBulkHelper.update_bulk_uri(record_with_bulkURI, b_uri) @@ -36,26 +32,28 @@ def test_update_bulk_id(record_with_bulkURI): def test_update_bulk_id_with_path(record_with_bulkURI): - uri = uuid.uuid4().urn - LogBulkHelper.update_bulk_uri(record_with_bulkURI, uri, "data.custombulkid") - assert record_with_bulkURI.data['custombulkid'] == uri + b_id = uuid.uuid4() + b_uri = BulkURI.from_bulk_storage_V0(str(b_id)) + LogBulkHelper.update_bulk_uri(record_with_bulkURI, b_uri, "data.custombulkid") + assert record_with_bulkURI.data['custombulkid'] == b_id.urn def test_update_bulk_id_on_not_valid_data_should_throw(basic_record): basic_record.data = 'not a dict data' with pytest.raises(Exception): - LogBulkHelper.update_bulk_uri(basic_record, str(uuid.uuid4())) + LogBulkHelper.update_bulk_uri(basic_record, BulkURI.from_bulk_storage_V0(str(uuid.uuid4()))) def test_get_update_bulk_id(record_with_bulkURI): - assert str(LogBulkHelper.get_bulk_uri(record_with_bulkURI)) == record_with_bulkURI.data['log']['bulkURI'] + assert LogBulkHelper.get_bulk_uri(record_with_bulkURI).encode() == record_with_bulkURI.data['log']['bulkURI'] def test_update_bulk_id_on_empty_record(basic_record): - uri = uuid.uuid4().urn - LogBulkHelper.update_bulk_uri(basic_record, uri) - assert basic_record.data['log']['bulkURI'] == uri + b_id = uuid.uuid4() + b_uri = BulkURI.from_bulk_storage_V0(str(b_id)) + LogBulkHelper.update_bulk_uri(basic_record, b_uri) + assert basic_record.data['log']['bulkURI'] == b_id.urn def test_get_bulk_id_on_empty_record(basic_record): -- GitLab From 5a98c150761f5441fe02de578958288db0873625 Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Tue, 9 Nov 2021 15:17:59 +0100 Subject: [PATCH 12/16] remove BulkId class --- app/bulk_persistence/__init__.py | 1 - app/bulk_persistence/bulk_id.py | 6 ++---- app/bulk_persistence/dask/dask_bulk_storage.py | 4 ++-- app/bulk_persistence/dataframe_persistence.py | 6 +++--- tests/unit/bulk_persistence/bulk_id_test.py | 4 ++-- 5 files changed, 9 insertions(+), 12 deletions(-) diff --git a/app/bulk_persistence/__init__.py b/app/bulk_persistence/__init__.py index aa5f97c0..a2eeabcb 100644 --- a/app/bulk_persistence/__init__.py +++ b/app/bulk_persistence/__init__.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .bulk_id import BulkId from .bulk_uri import BulkURI from .dataframe_persistence import create_and_store_dataframe, get_dataframe from .dataframe_serializer import DataframeSerializerAsync, DataframeSerializerSync diff --git a/app/bulk_persistence/bulk_id.py b/app/bulk_persistence/bulk_id.py index bc7b3586..442af905 100644 --- a/app/bulk_persistence/bulk_id.py +++ b/app/bulk_persistence/bulk_id.py @@ -15,7 +15,5 @@ import uuid -class BulkId: - @staticmethod - def new_bulk_id() -> str: - return str(uuid.uuid4()) +def new_bulk_id() -> str: + return str(uuid.uuid4()) diff --git a/app/bulk_persistence/dask/dask_bulk_storage.py b/app/bulk_persistence/dask/dask_bulk_storage.py index 4b248e4c..4dee0a86 100644 --- a/app/bulk_persistence/dask/dask_bulk_storage.py +++ b/app/bulk_persistence/dask/dask_bulk_storage.py @@ -23,7 +23,7 @@ from typing import List import fsspec import pandas as pd from pyarrow.lib import ArrowException, ArrowInvalid -from app.bulk_persistence import BulkId +from app.bulk_persistence.bulk_id import new_bulk_id from app.bulk_persistence.dask.errors import BulkNotFound, BulkNotProcessable from app.bulk_persistence.dask.traces import wrap_trace_process from app.bulk_persistence.dask.utils import (SessionFileMeta, by_pairs, @@ -256,7 +256,7 @@ class DaskBulkStorage: @with_trace('save_blob') async def save_blob(self, ddf: dd.DataFrame, record_id: str, bulk_id: str = None): """Write the data frame to the blob storage.""" - bulk_id = bulk_id or BulkId.new_bulk_id() + bulk_id = bulk_id or new_bulk_id() if isinstance(ddf, pd.DataFrame): self._check_incoming_chunk(ddf) diff --git a/app/bulk_persistence/dataframe_persistence.py b/app/bulk_persistence/dataframe_persistence.py index 2a248f41..8eb343fb 100644 --- a/app/bulk_persistence/dataframe_persistence.py +++ b/app/bulk_persistence/dataframe_persistence.py @@ -25,7 +25,7 @@ from .blob_storage import ( create_and_write_blob, read_blob, ) -from .bulk_id import BulkId +from .bulk_id import new_bulk_id from .mime_types import MimeTypes from .tenant_provider import resolve_tenant from ..helper.traces import with_trace @@ -33,10 +33,10 @@ from ..helper.traces import with_trace async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str: """Store bulk on a blob storage""" - new_bulk_id = BulkId.new_bulk_id() + bulk_id = new_bulk_id() tenant = await resolve_tenant(ctx.partition_id) async with create_and_write_blob( - df, file_exporter=BlobFileExporters.PARQUET, blob_id=new_bulk_id + df, file_exporter=BlobFileExporters.PARQUET, blob_id=bulk_id ) as bulkblob: storage: BlobStorageBase = await ctx.app_injector.get(BlobStorageBase) await storage.upload( diff --git a/tests/unit/bulk_persistence/bulk_id_test.py b/tests/unit/bulk_persistence/bulk_id_test.py index f84766a7..5bc385fc 100644 --- a/tests/unit/bulk_persistence/bulk_id_test.py +++ b/tests/unit/bulk_persistence/bulk_id_test.py @@ -13,9 +13,9 @@ # limitations under the License. import pytest -from app.bulk_persistence import BulkId +from app.bulk_persistence.bulk_id import new_bulk_id import uuid def test_bulk_id_is_an_uuid(): - uuid.UUID(BulkId.new_bulk_id()) + uuid.UUID(new_bulk_id()) -- GitLab From de6278e872fe94c0b5797c148dca01421e895a55 Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Tue, 9 Nov 2021 15:57:05 +0100 Subject: [PATCH 13/16] enforce bulk_id to be a valid uuid --- app/bulk_persistence/bulk_uri.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/app/bulk_persistence/bulk_uri.py b/app/bulk_persistence/bulk_uri.py index 89e803fb..493e6515 100644 --- a/app/bulk_persistence/bulk_uri.py +++ b/app/bulk_persistence/bulk_uri.py @@ -32,11 +32,16 @@ class BulkURI: """ make an new one or invalid Either pass uri alone or bulk_id, version + :param bulk_id: expected as a valid uuid + :param version: storage version :throw: ValueError """ if not bulk_id or not version or version == BulkStorageVersion_Invalid: bulk_id = '' version = BulkStorageVersion_Invalid + else: + # ensure valid uuid + uuid.UUID(bulk_id) self._bulk_id = bulk_id self._storage_version = version -- GitLab From 65122432ce38c4a9d1a8745a16977762e2f96014 Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Mon, 15 Nov 2021 10:45:36 +0100 Subject: [PATCH 14/16] revert changes not expected to be commited --- app/routers/trajectory/trajectory_ddms_v2.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/app/routers/trajectory/trajectory_ddms_v2.py b/app/routers/trajectory/trajectory_ddms_v2.py index ebf0a356..0b92e649 100644 --- a/app/routers/trajectory/trajectory_ddms_v2.py +++ b/app/routers/trajectory/trajectory_ddms_v2.py @@ -299,9 +299,6 @@ async def _get_trajectory_data( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(key_error)) from key_error except InvalidBulkException as ex: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(ex)) - except Exception as ex: - print(ex) - content = await DataframeSerializerAsync().to_json(df, orient=orient) return Response(content=content, media_type=MimeTypes.JSON.type) -- GitLab From baf791d8ccea4f7a727fe471f5da3afc1c7c1b9e Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Mon, 15 Nov 2021 17:02:45 +0100 Subject: [PATCH 15/16] update NOTICE/fossa --- NOTICE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NOTICE b/NOTICE index 910765be..a555b7ad 100644 --- a/NOTICE +++ b/NOTICE @@ -279,7 +279,7 @@ The following software have components provided under the terms of this license: - sniffio (from https://github.com/python-trio/sniffio) - structlog (from http://www.structlog.org/) - toml (from https://github.com/uiri/toml) -- tomli (from https://pypi.org/project/tomli/1.2.1/) +- tomli (from https://pypi.org/project/tomli/1.2.2/) - urllib3 (from https://urllib3.readthedocs.io/) - xmltodict (from https://github.com/martinblech/xmltodict) - zipp (from https://github.com/jaraco/zipp) -- GitLab From d853368b9eb4c41823e60fb4fb40de6ae79ebd4e Mon Sep 17 00:00:00 2001 From: "yvernet@slb.com" Date: Tue, 16 Nov 2021 10:33:14 +0100 Subject: [PATCH 16/16] re-arrange imports --- app/bulk_persistence/dask/dask_bulk_storage.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/app/bulk_persistence/dask/dask_bulk_storage.py b/app/bulk_persistence/dask/dask_bulk_storage.py index 90b27156..f84a57df 100644 --- a/app/bulk_persistence/dask/dask_bulk_storage.py +++ b/app/bulk_persistence/dask/dask_bulk_storage.py @@ -27,18 +27,19 @@ from dask.distributed import Client as DaskDistributedClient from pyarrow.lib import ArrowException import pyarrow.parquet as pa -from app.bulk_persistence.bulk_id import new_bulk_id -from app.bulk_persistence.dask.session_file_meta import SessionFileMeta +from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters + from app.helper.logger import get_logger from app.helper.traces import with_trace from app.persistence.sessions_storage import Session from app.utils import DaskClient, capture_timings, get_ctx -from osdu.core.api.storage.dask_storage_parameters import DaskStorageParameters from .errors import BulkNotFound, BulkNotProcessable, internal_bulk_exceptions from .traces import wrap_trace_process from .utils import by_pairs, do_merge, worker_capture_timing_handlers from .dask_worker_plugin import DaskWorkerPlugin +from .session_file_meta import SessionFileMeta +from ..bulk_id import new_bulk_id def pandas_to_parquet(pdf, path, opt): -- GitLab