Skip to content
Snippets Groups Projects
Commit 844159e4 authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

Fix split_id issue

parent 40f03e49
No related branches found
No related tags found
1 merge request!1GONRG-3452: Move Ingestion from Python SDK
......@@ -23,7 +23,8 @@ from uuid import uuid4
import toposort
from osdu_ingestion.libs.linearize_manifest import ManifestEntity
from osdu_ingestion.libs.utils import is_surrogate_key, remove_trailing_colon
from osdu_ingestion.libs.utils import (EntityId, is_surrogate_key,
remove_trailing_colon, split_id)
logger = logging.getLogger()
......@@ -33,13 +34,14 @@ class EntityNode:
This class represents entities and their links to parent and child ones.
"""
__slots__ = [
"srn",
"entity_id",
"system_srn",
"entity_info",
"children",
"parents",
"is_invalid",
"is_external_srn",
"_entity_id",
"_whitelist_ref_patterns"
]
......@@ -47,12 +49,13 @@ class EntityNode:
r"(?<=\")surrogate-key:[\s\w\-\.\d]+(?=\")|(?<=\")[\w\-\.]+:[\w\-\.]+--[\w\-\.]+:.[^,;\"]+(?=\")")
def __init__(
self, srn,
self,
entity_id: EntityId,
entity_info: ManifestEntity,
is_external_srn: bool = False,
whitelist_ref_patterns: str = None
):
self.srn = srn
self.entity_id = entity_id
self.entity_info = entity_info
self.system_srn = None
self.children = set()
......@@ -61,6 +64,10 @@ class EntityNode:
self.is_external_srn = is_external_srn
self._whitelist_ref_patterns = self._compile_whitelist_ref_patterns(whitelist_ref_patterns)
@property
def srn(self):
return self.entity_id.srn
def __repr__(self):
return f"SRN: {self.srn}"
......@@ -132,21 +139,20 @@ class EntityNode:
"""
return [match.groupdict() for match in pattern.finditer(source)]
def get_parent_srns(self) -> Set[str]:
def get_parent_entity_ids(self) -> Set[EntityId]:
"""
Get list of parents' srns.
They may look like bare id or like id with version.
Get list of parents' EnetityIds.
"""
whitelist_references = set()
entity_data = json.dumps(self.data, separators=(",", ":"))
if self._whitelist_ref_patterns:
whitelist_references = self._extract_whitelist_references()
parent_srns = set(
remove_trailing_colon(reference) for reference in self.SRN_REGEX.findall(entity_data)
parent_entity_ids = set(
split_id(reference) for reference in self.SRN_REGEX.findall(entity_data)
if reference not in whitelist_references
)
parent_srns.discard(self.srn)
return parent_srns
parent_entity_ids.discard(self.entity_id)
return parent_entity_ids
def replace_parents_surrogate_srns(self):
"""
......@@ -178,7 +184,7 @@ class ManifestAnalyzer:
The Analyzer traverses each entity, each entity can contain references to other ones.
The flow of prioritizing entities could be described as:
1. Fill graph's nodes with entities (self._fill_srn_node_table())
1. Fill graph's nodes with entities (self._fill_entity_id_node_table())
2. Create links between nodes (self._fill_nodes_edges())
3. Mark unprocessed nodes if they are orphaned or dependant on orphaned nodes (self._find_invalid_nodes())
4. Return prioritized queue for ingesting (self.entity_queue())
......@@ -187,35 +193,39 @@ class ManifestAnalyzer:
def __init__(
self,
entities: Iterable[ManifestEntity],
previously_skipped_srns: set = None,
previously_skipped_srns: Set[str] = None,
whitelist_ref_patterns: str = None
):
self.entities = entities
self.srn_node_table = dict()
self.external_srn_node_table = dict()
self.entity_id_node_table = dict()
self.external_entity_id_node_table = dict()
self._invalid_entities_nodes = set()
self._previously_skipped_srns = previously_skipped_srns or {}
if previously_skipped_srns:
self._previously_skipped_srns = set(split_id(_id) for _id in previously_skipped_srns)
else:
self._previously_skipped_srns = []
self._whitelist_ref_patterns = whitelist_ref_patterns
# used as a root for all orphan entities
empty_entity_info = ManifestEntity({}, "")
self._invalid_entities_parent = EntityNode(srn=str(uuid4()),
self._invalid_entities_parent = EntityNode(entity_id=split_id(str(uuid4())),
entity_info=empty_entity_info)
self._fill_srn_node_table()
self._fill_entity_id_node_table()
self._fill_nodes_parents()
self._find_invalid_nodes()
def _create_entity_node(self, entity: ManifestEntity):
srn = entity.entity_data.get("id", f"surrogate-key:{str(uuid4())}")
self.srn_node_table[srn] = EntityNode(
srn,
_id = entity.entity_data.get("id", f"surrogate-key:{str(uuid4())}")
entity_id = split_id(_id)
self.entity_id_node_table[entity_id] = EntityNode(
entity_id,
entity,
whitelist_ref_patterns=self._whitelist_ref_patterns
)
def _fill_srn_node_table(self):
def _fill_entity_id_node_table(self):
for entity in self.entities:
self._create_entity_node(entity)
......@@ -223,41 +233,41 @@ class ManifestAnalyzer:
"""
Find parents in every entity.
"""
for entity_node in self.srn_node_table.values():
for entity_node in self.entity_id_node_table.values():
self._set_entity_parents(entity_node)
def _create_external_entity_node(self, parent_srn: str) -> EntityNode:
def _create_external_entity_node(self, parent_entity_id: EntityId) -> EntityNode:
"""
Create a node with no content and mark it as external.
:param parent_srn: Parent SRN
:return:
:param parent_entity_id: Parent EntityId
:return: EntityNode
"""
return EntityNode(parent_srn, None, is_external_srn=True)
return EntityNode(parent_entity_id, None, is_external_srn=True)
def _set_entity_parents(self, entity: EntityNode):
def _set_entity_parents(self, entity_node: EntityNode):
"""
Find all references parent in entity's content.
If a parent is not presented in manifest, mark this entity as unprocessed.
"""
parent_srns = entity.get_parent_srns()
for parent_srn in parent_srns:
if self.srn_node_table.get(parent_srn):
parent_node = self.srn_node_table[parent_srn]
parent_node.add_child(entity)
entity.add_parent(parent_node)
elif parent_srn in self._previously_skipped_srns:
parent_entity_ids = entity_node.get_parent_entity_ids()
for parent_entity_id in parent_entity_ids:
if self.entity_id_node_table.get(parent_entity_id):
parent_node = self.entity_id_node_table[parent_entity_id]
parent_node.add_child(entity_node)
entity_node.add_parent(parent_node)
elif parent_entity_id in self._previously_skipped_srns:
# add to the common root for all invalid entity nodes. Will be marked as invalid
# later in _find_invalid_nodes step in __init__.
self._invalid_entities_parent.add_child(entity)
self._invalid_entities_parent.add_child(entity_node)
else:
# if entity srn has been never presented in manifest
parent_node = self.external_srn_node_table.get(parent_srn)
parent_node = self.external_entity_id_node_table.get(parent_entity_id)
if not parent_node:
parent_node = self._create_external_entity_node(parent_srn)
self.external_srn_node_table[parent_srn] = parent_node
parent_node.add_child(entity)
entity.add_parent(parent_node)
parent_node = self._create_external_entity_node(parent_entity_id)
self.external_entity_id_node_table[parent_entity_id] = parent_node
parent_node.add_child(entity_node)
entity_node.add_parent(parent_node)
def _find_invalid_nodes(self, start_node: EntityNode = None):
"""
......@@ -282,15 +292,15 @@ class ManifestAnalyzer:
Create a queue, where a child entity goes after all its parents.
If an entity is marked as unprocessed, then skip it.
"""
entity_graph = {entity: entity.parents for entity in self.srn_node_table.values()}
entity_graph = {entity_node: entity_node.parents for entity_node in self.entity_id_node_table.values()}
logger.debug(f"Entity graph {entity_graph}.")
entity_queue = toposort.toposort_flatten(entity_graph, sort=False)
for entity in entity_queue:
if entity not in self._invalid_entities_nodes and not entity.is_external_srn:
yield entity
for entity_node in entity_queue:
if entity_node not in self._invalid_entities_nodes and not entity_node.is_external_srn:
yield entity_node
for entity in self._invalid_entities_nodes:
entity.replace_parents_surrogate_srns()
for entity_node in self._invalid_entities_nodes:
entity_node.replace_parents_surrogate_srns()
def entity_generation_queue(self) -> Iterator[Set[EntityNode]]:
"""
......@@ -298,38 +308,38 @@ class ManifestAnalyzer:
Generations of parents are followed by generations of children.
"""
entity_graph = {entity: entity.parents for entity in self.srn_node_table.values()}
entity_graph = {entity: entity.parents for entity in self.entity_id_node_table.values()}
logger.debug(f"Entity graph {entity_graph}.")
toposorted_entities = toposort.toposort(entity_graph)
for entity_set in toposorted_entities:
toposorted_entities_nodes = toposort.toposort(entity_graph)
for entity_set in toposorted_entities_nodes:
valid_entities = {entity for entity in entity_set
if entity not in self._invalid_entities_nodes and not entity.is_external_srn}
yield valid_entities
def add_invalid_node(self, entity: EntityNode):
def add_invalid_node(self, entity_node: EntityNode):
"""
Use if there some problems with ingesting or finding entity.
Mark it and its dependants as unprocessed.
"""
self._invalid_entities_parent.add_child(entity)
self._find_invalid_nodes(entity)
self._invalid_entities_parent.add_child(entity_node)
self._find_invalid_nodes(entity_node)
@property
def invalid_entities_info(self) -> List[ManifestEntity]:
"""
:return: List of invalid entities info.
"""
return [entity_node.entity_info for entity_node in self.srn_node_table.values()
return [entity_node.entity_info for entity_node in self.entity_id_node_table.values()
if entity_node.is_invalid]
@property
def invalid_entity_nodes(self) -> List[EntityNode]:
return [entity_node for entity_node in self.srn_node_table.values() if entity_node.is_invalid]
return [entity_node for entity_node in self.entity_id_node_table.values() if entity_node.is_invalid]
@property
def valid_entities_info(self) -> List[ManifestEntity]:
"""
:return: List of valid entities info.
"""
return [entity_node.entity_info for entity_node in self.srn_node_table.values()
return [entity_node.entity_info for entity_node in self.entity_id_node_table.values()
if not entity_node.is_invalid]
......@@ -22,18 +22,25 @@ from typing import Any, Generator, Iterable, List, TypeVar
BatchElement = TypeVar("BatchElement")
@dataclasses.dataclass
@dataclasses.dataclass()
class EntityId:
id: str
raw_value: str
version: str = ""
@property
def srn(self):
return f"{self.id}:{self.version}"
def srn(self) -> str:
if self.version:
return f"{self.id}:{self.version}"
else:
return self.id
def __hash__(self):
def __hash__(self) -> int:
return hash(self.srn)
def __eq__(self, other: "EntityId") -> bool:
return self.srn == self.srn
def remove_trailing_colon(id_value: str) -> str:
"""
......@@ -52,7 +59,9 @@ def split_id(id_value: str) -> EntityId:
:id_value: ID of some entity with or without versions.
"""
version = ""
if id_value.endswith(":"):
if is_surrogate_key(id_value):
_id = id_value
elif id_value.endswith(":"):
_id = id_value[:-1]
elif id_value.split(":")[-1].isdigit():
version = str(id_value.split(":")[-1])
......@@ -60,7 +69,7 @@ def split_id(id_value: str) -> EntityId:
else:
_id = id_value
return EntityId(_id, version)
return EntityId(_id, raw_value=id_value, version=version)
def create_skipped_entity_info(entity: Any, reason: str) -> dict:
......
......@@ -97,7 +97,7 @@ class ManifestIntegrity:
search_handler = ExtendedSearchId(self.search_url, external_references_without_version,
self.token_refresher, self.context)
found_ids = search_handler.search_records()
missing_ids.update(self._filter_not_found_ids(external_references, found_ids))
missing_ids.update(self._filter_not_found_ids(ids_batch, found_ids))
return {missing_id.srn for missing_id in missing_ids}
......@@ -170,7 +170,7 @@ class ManifestIntegrity:
"""
for missing_id in missing_external_ids:
missing_id = remove_trailing_colon(missing_id)
missing_entity = manifest_analyzer.external_srn_node_table[missing_id]
missing_entity = manifest_analyzer.external_entity_id_node_table[missing_id]
manifest_analyzer.add_invalid_node(missing_entity)
def _ensure_external_references_integrity(self, manifest_analyzer: ManifestAnalyzer):
......@@ -184,11 +184,10 @@ class ManifestIntegrity:
"""
missing_external_ids = set()
external_references = [entity_node.srn for entity_node in
manifest_analyzer.external_srn_node_table.values()]
if external_references:
external_ids_to_search = [split_id(record_id) for record_id in external_references]
missing_external_ids.update(self._find_missing_external_ids(external_ids_to_search))
external_references_entity_id = [entity_node.entity_id for entity_node in
manifest_analyzer.external_entity_id_node_table.values()]
if external_references_entity_id:
missing_external_ids.update(self._find_missing_external_ids(external_references_entity_id))
if missing_external_ids:
self._mark_dependant_entities_invalid(manifest_analyzer, missing_external_ids)
......
......@@ -26,6 +26,7 @@ from file_paths import SURROGATE_MANIFEST_WELLBORE, REF_RESULT_WHITELIST_WELLLOG
MANIFEST_WELLLOG_PATH, MANIFEST_REFERENCE_PATTERNS_WHITELIST
from osdu_ingestion.libs.manifest_analyzer import ManifestAnalyzer, EntityNode
from osdu_ingestion.libs.linearize_manifest import ManifestEntity, ManifestLinearizer
from osdu_ingestion.libs.utils import EntityId, split_id
logger = logging.getLogger()
......@@ -78,7 +79,7 @@ class TestManifestAnalyzer(object):
@pytest.fixture()
def fake_data_manifest_analyzer(self, monkeypatch, data):
monkeypatch.setattr(EntityNode, "get_parent_srns", self.mock_get_parent_srns)
monkeypatch.setattr(EntityNode, "get_parent_entity_ids", self.mock_get_parent_srns)
manifest_analyzer = ManifestAnalyzer(data)
return manifest_analyzer
......@@ -94,12 +95,12 @@ class TestManifestAnalyzer(object):
queue: list,
srn: str
):
entity_node = manifest_analyzer.srn_node_table[srn]
entity_node = manifest_analyzer.entity_id_node_table[split_id(srn)]
return queue.index(entity_node)
@staticmethod
def mock_get_parent_srns(obj: EntityNode):
parent_srns = set(obj.data.get("parents", []))
parent_srns = set(split_id(p) for p in obj.data.get("parents", []))
return parent_srns
@pytest.mark.parametrize(
......@@ -118,7 +119,7 @@ class TestManifestAnalyzer(object):
Check if queue return parents, then and only then their children.
Check if there is no orphaned and their children in the queue (SRN 7 and SRN 9).
"""
monkeypatch.setattr(EntityNode, "get_parent_srns", self.mock_get_parent_srns)
monkeypatch.setattr(EntityNode, "get_parent_entity_ids", self.mock_get_parent_srns)
manifest_analyzer = ManifestAnalyzer(data, {"6"})
queue = list(manifest_analyzer.entity_queue())
index_in_queue = partial(self.index_in_queue_by_srn, manifest_analyzer, queue)
......@@ -130,7 +131,7 @@ class TestManifestAnalyzer(object):
# check if orphans and their dependants are not in ingestion queue.
for unprocessed_srn in ("7", "9"):
unprocessed_entity = manifest_analyzer.srn_node_table[unprocessed_srn]
unprocessed_entity = manifest_analyzer.entity_id_node_table[split_id(unprocessed_srn)]
assert unprocessed_entity not in queue, \
f"{unprocessed_entity} expected not to be in queue: {queue}"
......@@ -155,7 +156,7 @@ class TestManifestAnalyzer(object):
"""
queue = fake_data_manifest_analyzer.entity_queue()
unprocessed_node = fake_data_manifest_analyzer.srn_node_table["3"]
unprocessed_node = fake_data_manifest_analyzer.entity_id_node_table[split_id("3")]
expected_unprocessed_entities = {"7", "9", "3", "5"}
fake_data_manifest_analyzer.add_invalid_node(unprocessed_node)
for entity in queue:
......@@ -189,8 +190,8 @@ class TestManifestAnalyzer(object):
entity.replace_parents_surrogate_srns()
entity.system_srn = self.process_entity(entity)
logger.info(f"Processed entity {json.dumps(entity.data, indent=2)}")
assert "surrogate-key:wpc-1" in (e.entity_data.get("id") for e in manifest_analyzer.invalid_entities_info)
invalid_entities = [e.entity_data.get("id") for e in manifest_analyzer.invalid_entities_info]
assert "surrogate-key:wpc-1" in invalid_entities
@pytest.mark.parametrize(
"manifest,system_srn,expected_replaced_srns",
......@@ -227,7 +228,7 @@ class TestManifestAnalyzer(object):
)
]
)
def test_update_parent_srn_with_system_srn(self, manifest, system_srn, expected_replaced_srns):
def test_update_parent_entity_id_with_system_srn(self, manifest, system_srn, expected_replaced_srns):
data = [ManifestEntity(entity_data=e, manifest_path="") for e in manifest]
manifest_analyzer = ManifestAnalyzer(
data,
......@@ -263,12 +264,13 @@ class TestManifestAnalyzer(object):
manifest_path="Data.WorkProductComponents"
)
entity_node = EntityNode(
test_data.get("id", str(uuid.uuid4())),
split_id(test_data.get("id", str(uuid.uuid4()))),
entity_info,
whitelist_ref_patterns=whitelist_ref_patterns_str
)
entity_node_parents = entity_node.get_parent_srns()
assert entity_node_parents == set(expected_result), entity_node_parents.difference(set(expected_result))
entity_node_parents = entity_node.get_parent_entity_ids()
parent_srns = set(i.srn for i in entity_node_parents)
assert parent_srns == set(expected_result), parent_srns.difference(set(expected_result))
@pytest.mark.parametrize(
"conf_path,ref_result_file",
......@@ -294,7 +296,8 @@ class TestManifestAnalyzer(object):
linearized_manifest,
whitelist_ref_patterns=whitelist_ref_patterns_str
)
wpc_parents = {p.srn for p in manifest_analyzer.srn_node_table[tested_wpc["id"]].parents}
tested_wpc_entity_id = split_id(tested_wpc["id"])
wpc_parents = {p.srn for p in manifest_analyzer.entity_id_node_table[tested_wpc_entity_id].parents}
assert wpc_parents == set(expected_result)
@pytest.mark.parametrize(
......
......@@ -185,26 +185,26 @@ class TestIntegrityProvider:
[
pytest.param(
[
"osdu:reference-data--ResourceSecurityClassification:Public:",
"osdu:master-data--Organisation:HESS:",
"osdu:reference-data--ResourceSecurityClassification:Public:1",
"osdu:master-data--Organisation:HESS:1",
],
set(),
[
"osdu:reference-data--ResourceSecurityClassification:Public:",
"osdu:master-data--Organisation:HESS:",
"osdu:reference-data--ResourceSecurityClassification:Public:1",
"osdu:master-data--Organisation:HESS:1",
],
id="Empty search return"
),
pytest.param(
[
"osdu:reference-data--ResourceSecurityClassification:Public:",
"osdu:master-data--Organisation:HESS:123",
"osdu:master-data--Organisation:123",
],
{
"osdu:reference-data--ResourceSecurityClassification:Public",
"osdu:reference-data--ResourceSecurityClassification:Public:123",
"osdu:master-data--Organisation:HESS",
"osdu:master-data--Organisation:HESS:123",
"osdu:master-data--Organisation:123",
"osdu:master-data--Organisation:123:123",
},
set(),
id="Full search return"
......@@ -219,7 +219,7 @@ class TestIntegrityProvider:
"osdu:reference-data--ResourceSecurityClassification:Public",
},
[
"osdu:master-data--Organisation:HESS:",
"osdu:master-data--Organisation:HESS",
],
id="Partial search return."
)
......
......@@ -29,16 +29,16 @@ class TestUtil:
"_id,expected_id_version",
[
pytest.param(
"test:test:",
("test:test", ""),
"namespace:reference-data--UnitQuantity:1:",
("namespace:reference-data--UnitQuantity:1", ""),
id="Trailing colon"),
pytest.param(
"test:test",
("test:test", ""),
"namespace:reference-data--UnitQuantity:1a",
("namespace:reference-data--UnitQuantity:1a", ""),
id="With no colon"),
pytest.param(
"test:test:1",
("test:test", "1"),
"namespace:reference-data--UnitQuantity:1:1",
("namespace:reference-data--UnitQuantity:1", "1"),
id="With version")
]
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment