Skip to content
Snippets Groups Projects
Commit 507e5c57 authored by Yannick's avatar Yannick
Browse files

Merge branch 'unit_test_mock_storage_versioning_support' into 'master'

add support of versioning in mock_storage_client_holding_data

See merge request !474
parents 0410b101 69f4c4d8
No related branches found
No related tags found
1 merge request!474add support of versioning in mock_storage_client_holding_data
Pipeline #102638 passed with warnings
......@@ -75,6 +75,9 @@ def mock_storage_client_holding_data(local_dev_config, nope_logger_fixture):
appkey: str = None,
token: str = None) -> odes_storage.models.Record:
# return the latest
if attribute is not None:
raise NotImplementedError("mocked_get_record does not support 'attribute' parameter")
return await self.get_record_version(id, None, data_partition_id, appkey, token)
async def mocked_get_record_version(self,
......@@ -83,13 +86,28 @@ def mock_storage_client_holding_data(local_dev_config, nope_logger_fixture):
data_partition_id: str = None,
appkey: str = None,
token: str = None) -> odes_storage.models.Record:
"""
If version is None it will return the latest version. To determine the latest version, two cases:
* the version field in the record is not set or None => considered as the latest
* the version field set and not None => latest = record with the greater version (basic int comparison)
"""
latest = None
for d in data:
# CAREFUL: id might be optional in the model (not set on write)
# Also storage seems to have problematic behavior with id ending in ':'
if id is not None and (id == d.id or id + ":" == d.id):
if version is None or version == d.version: # Note: version None means latest
if version == d.version:
return d
if latest is None \
or latest.version is None \
or (d.version is not None and d.version > latest.version):
latest = d
if latest is not None:
return latest
# if not found, attempt to emulate behavior of the actual client
raise odes_storage.UnexpectedResponse(
status_code=404,
......@@ -99,9 +117,33 @@ def mock_storage_client_holding_data(local_dev_config, nope_logger_fixture):
headers=httpx.Headers(),
)
async def mocked_get_all_record_versions(self,
id: str,
data_partition_id: str) -> odes_storage.models.RecordVersions:
versions = []
record_found = False
for d in data:
# CAREFUL: id might be optional in the model (not set on write)
# Also storage seems to have problematic behavior with id ending in ':'
if id is not None and (id == d.id or id + ":" == d.id):
record_found = True
if d.version is not None: # Note: version None means latest
versions.append(d.version)
# if not found, attempt to emulate behavior of the actual client
if not record_found:
raise odes_storage.UnexpectedResponse(
status_code=404,
reason_phrase="Item not found",
# not sure what to put here at this time
content="".encode(encoding="utf-8"),
headers=httpx.Headers(),
)
return odes_storage.models.RecordVersions(recordId=id, versions=versions or None)
# override get_record method on the instance to return sample data
mock.get_record = types.MethodType(mocked_get_record, mock)
mock.get_record_version = types.MethodType(mocked_get_record_version, mock)
mock.get_all_record_versions = types.MethodType(mocked_get_all_record_versions, mock)
return mock
......
......@@ -70,6 +70,48 @@ def test_mock_storage_client_holding_well_v3_record_data(
)
def test_mock_storage_client_holding_well_v3_record_with_version_data(
mock_storage_client_holding_data, well_v3_record_list
):
"""Test the mock_storage_client_holding_data behavior, along with the well_v2_record data itself"""
single_record_v0 = well_v3_record_list[0]
record_id = single_record_v0.id
single_record_v1 = single_record_v0.copy(deep=True)
single_record_v0.version = 0
single_record_v1.version = 1
single_record_v1.data['FacilityName'] = single_record_v0.data['FacilityName'] + "_updated"
storage_client = mock_storage_client_holding_data([single_record_v0, single_record_v1])
# grab current eventloop if we already have one, otherwise creates it
loop = asyncio.get_event_loop()
# get latest
assert (
loop.run_until_complete(
storage_client.get_record(record_id, "fake_data_partition_id")
).version == single_record_v1.version
)
# get V0
r = loop.run_until_complete(
storage_client.get_record_version(record_id, 0, "fake_data_partition_id")
)
assert r.version == 0 and r.data['FacilityName'] == single_record_v0.data['FacilityName']
# get V1
r = loop.run_until_complete(
storage_client.get_record_version(record_id, 1, "fake_data_partition_id")
)
assert r.version == 1 and r.data['FacilityName'] == single_record_v1.data['FacilityName']
# get versions
r = loop.run_until_complete(
storage_client.get_all_record_versions(record_id, "fake_data_partition_id")
)
assert set(r.versions) == {0, 1}
def test_mock_storage_client_holding_wellbore_v2_record_data(
mock_storage_client_holding_data, wellbore_v2_record_list
):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment