Commit 4417f2a8 authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) 🚩
Browse files

Merge branch 'GONRG-2696_Manifest_integrity_batch_search' into 'master'

GONRG-2696: Manifest integrity batch search

See merge request !23
parents 3cb4c963 cf22b19e
Pipeline #60317 passed with stages
in 2 minutes and 20 seconds
......@@ -31,6 +31,9 @@ SURROGATE_KEYS_PATHS = [
("properties", "data", "allOf", 1, "properties", "Components", "items"),
]
SEARCH_ID_BATCH_SIZE = 25
DATA_SECTION = "Data"
DATASETS_SECTION = "Datasets"
MASTER_DATA_SECTION ="MasterData"
......
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List
import dataclasses
from osdu_api.libs.constants import DATA_SECTION, DATASETS_SECTION, MASTER_DATA_SECTION, \
REFERENCE_DATA_SECTION, WORK_PRODUCT_SECTION, WORK_PRODUCT_COMPONENTS_SECTION
from osdu_api.libs.exceptions import EmptyManifestError
logger = logging.getLogger()
@dataclasses.dataclass()
class ManifestEntity:
"""
This a dataclass class to represent entities of linearized Manifest
Args:
entity_data: Content of entity
manifest_path: Path to the entity inside the manifest.
E.g. 'ReferenceData' or 'Data.WorkProduct'
"""
entity_data: dict
manifest_path: str
def __eq__(self, other: "ManifestEntity"):
return self.entity_data == other.entity_data \
and self.manifest_path == other.manifest_path
class ManifestLinearizer:
"""Class to linearize manifest and extract all manifest records"""
def _populate_manifest_entity(self, entity_data: dict, manifest_path: str):
"""
Populate manifest entity for future processing
:param entity_data: manifest entity instance (for future processing)
:param manifest_path: corresponding generic schema (for future schema validation)
:return:
"""
return ManifestEntity(entity_data=entity_data, manifest_path=manifest_path)
def _traverse_list(
self,
manifest_entities: List[dict],
manifest_path: str
) -> List[ManifestEntity]:
"""
Traverse list of entities and returned populated list of entities
"""
entities = []
for manifest_entity in manifest_entities:
entities.append(
self._populate_manifest_entity(manifest_entity, manifest_path)
)
return entities
def linearize_manifest(self, manifest: dict) -> List[ManifestEntity]:
"""
Traverse manifest structure and return the list of manifest records.
:param manifest: Manifest
:return: list of records
"""
if not manifest:
raise EmptyManifestError
manifest_entities = []
for section in (REFERENCE_DATA_SECTION, MASTER_DATA_SECTION):
if manifest.get(section):
manifest_entities.extend(
self._traverse_list(manifest[section], section)
)
if manifest.get(DATA_SECTION):
if manifest[DATA_SECTION].get(WORK_PRODUCT_SECTION):
manifest_entities.append(
self._populate_manifest_entity(
manifest[DATA_SECTION][WORK_PRODUCT_SECTION],
f"{DATA_SECTION}.{WORK_PRODUCT_SECTION}"
)
)
for section in (WORK_PRODUCT_COMPONENTS_SECTION, DATASETS_SECTION):
if manifest[DATA_SECTION].get(section):
manifest_entities.extend(
self._traverse_list(
manifest[DATA_SECTION][section],
f"{DATA_SECTION}.{section}",
)
)
return manifest_entities
def assemble_manifest(self, linearized_manifest: List[ManifestEntity],
manifest_kind: str = None) -> dict:
"""
Assemble Manifest from previously linearized one.
:param linearized_manifest:
:param manifest_kind:
:return:
"""
manifest = {
REFERENCE_DATA_SECTION: [],
MASTER_DATA_SECTION: [],
DATA_SECTION: {
DATASETS_SECTION: [],
WORK_PRODUCT_COMPONENTS_SECTION: [],
WORK_PRODUCT_SECTION: {}
}
}
if manifest_kind:
manifest["kind"] = manifest_kind
for entity_info in linearized_manifest:
manifest_path = entity_info.manifest_path.split(".")
if len(manifest_path) == 1:
manifest[manifest_path[0]].append(entity_info.entity_data)
elif len(manifest_path) == 2:
data_section, subsection = manifest_path
if subsection == WORK_PRODUCT_SECTION:
manifest[data_section][subsection] = entity_info.entity_data
else:
manifest[data_section][subsection].append(entity_info.entity_data)
else:
raise ValueError
return manifest
......@@ -17,13 +17,12 @@ import json
import logging
import re
from collections import deque
from typing import Iterable, Iterator, Set
from typing import Dict, Iterable, Iterator, List, Pattern, Set
from uuid import uuid4
import toposort
from osdu_api.libs.refresh_token import TokenRefresher
from osdu_api.libs.traverse_manifest import ManifestEntity
from osdu_api.libs.linearize_manifest import ManifestEntity
from osdu_api.libs.utils import remove_trailing_colon
logger = logging.getLogger()
......@@ -33,29 +32,48 @@ class EntityNode:
"""
This class represents entities and their links to parent and child ones.
"""
__slots__ = ["srn", "system_srn", "entity_info", "children", "parents", "unprocessed"]
__slots__ = [
"srn",
"system_srn",
"entity_info",
"children",
"parents",
"is_invalid",
"is_external_srn",
"_whitelist_ref_patterns"
]
SRN_REGEX = re.compile(
r"(?<=\")surrogate-key:[\w\-\.\d]+(?=\")|(?<=\")[\w\-\.]+:[\w\-\.]+--[\w\-\.]+:[\w\-\.\:]+:[0-9]*(?=\")")
r"(?<=\")surrogate-key:[\s\w\-\.\d]+(?=\")|(?<=\")[\w\-\.]+:[\w\-\.]+--[\w\-\.]+:.[^,;\"]+(?=\")")
def __init__(self, srn, entity_info: ManifestEntity):
def __init__(
self, srn,
entity_info: ManifestEntity,
is_external_srn: bool = False,
whitelist_ref_patterns: str = None
):
self.srn = srn
self.entity_info = entity_info
self.system_srn = None
self.children = set()
self.parents = set()
self.unprocessed = False
self.is_invalid = False
self.is_external_srn = is_external_srn
self._whitelist_ref_patterns = self._compile_whitelist_ref_patterns(whitelist_ref_patterns)
def __repr__(self):
return f"SRN: {self.srn}"
@property
def content(self) -> dict:
return self.entity_info.entity
def data(self) -> dict:
if self.entity_info:
return self.entity_info.entity_data
else:
return {}
@content.setter
def content(self, value: dict):
self.entity_info.entity = value
@data.setter
def data(self, value: dict):
self.entity_info.entity_data = value
def add_child(self, child_node: "EntityNode"):
self.children.add(child_node)
......@@ -63,35 +81,92 @@ class EntityNode:
def add_parent(self, parent_node: "EntityNode"):
self.parents.add(parent_node)
def get_parent_srns(self, original_manifest_entities: set) -> Set[str]:
def _compile_whitelist_ref_patterns(self, whitelist_ref_patterns: str) -> List[Pattern]:
"""
Trying to parse whitelist reference regexp patterns from string into list of regexp compiled patterns
:param whitelist_ref_patterns: string containing various whitelist reference regexp patterns prepared for compilation
:return: list of regexp compiled patterns or nothing
"""
if not whitelist_ref_patterns:
return []
try:
logger.debug(whitelist_ref_patterns)
whitelist_ref_patterns = whitelist_ref_patterns.replace('\r\n', '\n').strip().split('\n')
return [
re.compile(r"{}".format(pattern), re.I + re.M)
for pattern in whitelist_ref_patterns
]
except Exception as e:
logger.error(f"Unable to init whitelist reference patterns: {whitelist_ref_patterns}",
exc_info=e)
return []
def _extract_whitelist_references(self) -> Set[str]:
"""
Extract whitelisted references from the entity.
:return: Set of whitelisted ids to other entities or records.
"""
manifest_str = json.dumps(self.data)
whitelist_references = set()
for pattern in self._whitelist_ref_patterns:
whitelist_references.update(
{match.get('value') for match in self._match_id_with_whitelist_pattern(pattern, manifest_str)}
)
logger.debug(f"Whitelist references of {self.data.get('id')}: {whitelist_references}")
return whitelist_references
def _match_id_with_whitelist_pattern(
self,
pattern: Pattern,
source: str
) -> List[Dict[str, str]]:
"""
Expects whitelist pattern containing (key) and (value) regexp groups
:param pattern: compiled regexp whitelist pattern
:param source: source to search with pattern
:return: pattern matches filtered by groups
"""
return [match.groupdict() for match in pattern.finditer(source)]
def get_parent_srns(self) -> Set[str]:
"""
Get list of parents' srns.
They may look like bare id or like id with version.
"""
entity_content = json.dumps(self.content, separators=(",", ":"))
whitelist_references = set()
entity_data = json.dumps(self.data, separators=(",", ":"))
if self._whitelist_ref_patterns:
whitelist_references = self._extract_whitelist_references()
parent_srns = set(
remove_trailing_colon(reference) for reference in self.SRN_REGEX.findall(entity_content)
remove_trailing_colon(reference) for reference in self.SRN_REGEX.findall(entity_data)
if reference not in whitelist_references
)
parent_srns.discard(self.srn)
# Get only srns that supposed-to-be-present in the Manifest.
parent_srns = parent_srns.intersection(original_manifest_entities)
logger.debug(f"Parent srns of entity '{self}': '{parent_srns}'")
return parent_srns
def replace_parents_surrogate_srns(self):
"""
Replace surrogate parents' keys with system-generated ones in entity.
Replace surrogate parents' keys with system-generated ones in child entity.
"""
if not self.parents:
return
content = json.dumps(self.content)
content = json.dumps(self.data)
for parent in self.parents:
if parent.system_srn:
# ':' at the end is for showing that it is reference if parent srn was surrogate-key.
if "surrogate-key" in parent.srn:
# ':' at the end is for showing that it is reference if parent srn was surrogate-key.
content = content.replace(parent.srn, f"{parent.system_srn}:")
else:
content = content.replace(parent.srn, f"{parent.system_srn}")
self.content = json.loads(content)
self.data = json.loads(content)
@property
def invalid_parents(self) -> Set["EntityNode"]:
return {parent for parent in self.parents if parent.is_invalid}
class ManifestAnalyzer:
......@@ -100,51 +175,49 @@ class ManifestAnalyzer:
another one, so we must prioritize the order of ingesting. The idea is to create a dependency
graph and traverse it to get the right order of ingestion.
At the current implementation this class can check referential integrity only inside the manifest,
because we assume that external references were checked during previous steps.
The Analyzer traverses each entity, each entity can contain references to other ones.
We are interested only in references to entities that are supposed to be in the manifest skipping
external ones.
If they got absent in the Manifest because of previous
validations, we mark the entity as orphaned and don't process it and its children then.
The flow of prioritizing entities could be described as:
1. Fill graph's nodes with entities (self._fill_srn_node_table())
2. Create links between nodes (self._fill_nodes_edges())
3. Mark unprocessed nodes if they are orphaned or dependant on orphaned nodes (self._find_unprocessed_nodes())
3. Mark unprocessed nodes if they are orphaned or dependant on orphaned nodes (self._find_invalid_nodes())
4. Return prioritized queue for ingesting (self.entity_queue())
"""
def __init__(
self,
entities: Iterable[ManifestEntity],
token_refresher: TokenRefresher,
original_manifest_entities_ids: set = None
previously_skipped_srns: set = None,
whitelist_ref_patterns: str = None
):
self.entities = entities
self.token_refresher = token_refresher
self.original_manifest_entities_ids = original_manifest_entities_ids or set()
self.srn_node_table = dict()
self.processed_entities = []
self.external_srn_node_table = dict()
self._invalid_entities_nodes = set()
self._previously_skipped_srns = previously_skipped_srns or {}
self._whitelist_ref_patterns = whitelist_ref_patterns
# used as a root for all orphan entities
empty_entity_info = ManifestEntity("", {})
self.unprocessed_entities_parent = EntityNode(srn=str(uuid4()),
entity_info=empty_entity_info)
self.unprocessed_entities = set()
empty_entity_info = ManifestEntity({}, "")
self._invalid_entities_parent = EntityNode(srn=str(uuid4()),
entity_info=empty_entity_info)
self._fill_srn_node_table()
self._fill_nodes_parents()
self._find_unprocessed_nodes()
self._find_invalid_nodes()
def _create_entity_node(self, entity: ManifestEntity):
srn = entity.entity.get("id", f"surrogate-key:{str(uuid4())}")
self.srn_node_table[srn] = EntityNode(srn, entity)
srn = entity.entity_data.get("id", f"surrogate-key:{str(uuid4())}")
self.srn_node_table[srn] = EntityNode(
srn,
entity,
whitelist_ref_patterns=self._whitelist_ref_patterns
)
def _fill_srn_node_table(self):
for entity in self.entities:
self._create_entity_node(entity)
self.original_manifest_entities_ids.update(self.srn_node_table.keys())
def _fill_nodes_parents(self):
"""
......@@ -153,37 +226,56 @@ class ManifestAnalyzer:
for entity_node in self.srn_node_table.values():
self._set_entity_parents(entity_node)
def _create_external_entity_node(self, parent_srn: str) -> EntityNode:
"""
Create a node with no content and mark it as external.
:param parent_srn: Parent SRN
:return:
"""
return EntityNode(parent_srn, None, is_external_srn=True)
def _set_entity_parents(self, entity: EntityNode):
"""
Find all references parent in entity's content.
If a parent is not presented in manifest, mark this entity as unprocessed.
"""
parent_srns = entity.get_parent_srns(self.original_manifest_entities_ids)
parent_srns = entity.get_parent_srns()
for parent_srn in parent_srns:
if self.srn_node_table.get(parent_srn):
parent_node = self.srn_node_table[parent_srn]
parent_node.add_child(entity)
entity.add_parent(parent_node)
else: # if entity refers to srn not presenting in manifest
self.unprocessed_entities_parent.add_child(entity)
logger.info(f"'{entity}' is orphaned. Missing parent '{parent_srn}'")
elif parent_srn in self._previously_skipped_srns:
# add to the common root for all invalid entity nodes. Will be marked as invalid
# later in _find_invalid_nodes step in __init__.
self._invalid_entities_parent.add_child(entity)
else:
# if entity srn has been never presented in manifest
parent_node = self.external_srn_node_table.get(parent_srn)
if not parent_node:
parent_node = self._create_external_entity_node(parent_srn)
self.external_srn_node_table[parent_srn] = parent_node
parent_node.add_child(entity)
entity.add_parent(parent_node)
def _find_unprocessed_nodes(self):
def _find_invalid_nodes(self, start_node: EntityNode = None):
"""
Traverse entities dependant on orphaned or invalid ones.
Add them to set of unprocessed nodes to exclude them from ingestion queue.
"""
queue = deque()
queue.append(self.unprocessed_entities_parent)
queue.append(start_node or self._invalid_entities_parent)
while queue:
node = queue.popleft()
self.unprocessed_entities.add(node)
logger.debug(f"Node {node} added to unprocessed.")
self._invalid_entities_nodes.add(node)
node.is_invalid = True
for child in node.children:
if not child.unprocessed:
child.unprocessed = True
if not child.is_invalid:
child.is_invalid = True
queue.append(child)
self.unprocessed_entities.discard(self.unprocessed_entities_parent)
if start_node is self._invalid_entities_parent:
self._invalid_entities_nodes.discard(self._invalid_entities_parent)
def entity_queue(self) -> Iterator[EntityNode]:
"""
......@@ -194,20 +286,36 @@ class ManifestAnalyzer:
logger.debug(f"Entity graph {entity_graph}.")
entity_queue = toposort.toposort_flatten(entity_graph, sort=False)
for entity in entity_queue:
if entity not in self.unprocessed_entities:
self.processed_entities.append(entity)
if entity not in self._invalid_entities_nodes and not entity.is_external_srn:
yield entity
for entity in self.unprocessed_entities:
for entity in self._invalid_entities_nodes:
entity.replace_parents_surrogate_srns()
logger.debug(f"Visited entities {self.processed_entities}")
logger.debug(f"Unprocessed entities {self.unprocessed_entities}")
def add_unprocessed_entity(self, entity: EntityNode):
def add_invalid_node(self, entity: EntityNode):
"""
Use if there some problems with ingesting entity.
Use if there some problems with ingesting or finding entity.
Mark it and its dependants as unprocessed.
"""
self.unprocessed_entities_parent.add_child(entity)
self._find_unprocessed_nodes()
self._invalid_entities_parent.add_child(entity)
self._find_invalid_nodes(entity)
@property
def invalid_entities_info(self) -> List[ManifestEntity]:
"""
:return: List of invalid entities info.
"""
return [entity_node.entity_info for entity_node in self.srn_node_table.values()
if entity_node.is_invalid]
@property
def invalid_entity_nodes(self) -> List[EntityNode]:
return [entity_node for entity_node in self.srn_node_table.values() if entity_node.is_invalid]
@property
def valid_entities_info(self) -> List[ManifestEntity]:
"""
:return: List of valid entities info.
"""
return [entity_node.entity_info for entity_node in self.srn_node_table.values()
if not entity_node.is_invalid]
......@@ -29,7 +29,7 @@ from osdu_api.libs.exceptions import EmptyManifestError
from osdu_api.libs.handle_file import FileHandler
from osdu_api.libs.mixins import HeadersMixin
from osdu_api.libs.source_file_check import SourceFileChecker
from osdu_api.libs.traverse_manifest import ManifestEntity
from osdu_api.libs.linearize_manifest import ManifestEntity
RETRY_SETTINGS = {
"stop": tenacity.stop_after_attempt(RETRIES),
......@@ -178,7 +178,7 @@ class ManifestProcessor(HeadersMixin):
raise EmptyManifestError
for manifest_record in manifest_records:
populated_manifest_records.append(
self._delete_surrogate_key(manifest_record.entity))
self._delete_surrogate_key(manifest_record.entity_data))
save_manifests_response = self.save_record_to_storage(
self.request_headers, request_data=populated_manifest_records)
record_ids.extend(save_manifests_response.json()["recordIds"])
......
......@@ -27,7 +27,7 @@ from osdu_api.libs.exceptions import ProcessRecordError
from osdu_api.libs.manifest_analyzer import ManifestAnalyzer, EntityNode
from osdu_api.libs.process_manifest_r3 import ManifestProcessor
from osdu_api.libs.refresh_token import TokenRefresher
from osdu_api.libs.traverse_manifest import ManifestTraversal
from osdu_api.libs.linearize_manifest import ManifestLinearizer
from osdu_api.libs.validation.validate_referential_integrity import ManifestIntegrity
from osdu_api.libs.validation.validate_schema import SchemaValidator
......@@ -69,7 +69,7 @@ class SingleManifestProcessor:
)[FIRST_STORED_RECORD_INDEX]
entity_node.system_srn = record_id
except Exception as e:
raise ProcessRecordError(entity_node.entity_info.entity, f"{e}"[:128])
raise ProcessRecordError(entity_node.entity_info.entity_data, f"{e}"[:128])
return record_id
def _process_records(self, manifest_analyzer: ManifestAnalyzer) -> Tuple[List[str], List[dict]]:
......@@ -88,7 +88,7 @@ class SingleManifestProcessor:
record_ids.append(self._process_record(entity))
except ProcessRecordError as error:
logger.warning(f"Can't process entity {entity}")
manifest_analyzer.add_unprocessed_entity(entity)
manifest_analyzer.add_invalid_node(entity)
skipped_ids.append(error.skipped_entity)
return record_ids, skipped_ids
......@@ -111,7 +111,6 @@ class SingleManifestProcessor:
:return: List of record ids and list of skipped entities.
"""
skipped_ids = []
manifest_schema = self.schema_validator.validate_common_schema(manifest)
if with_validation:
manifest, not_valid_entities = \
......@@ -123,17 +122,12 @@ class SingleManifestProcessor:
skipped_ids.extend(not_valid_entities)
skipped_ids.extend(orphaned_entities)
traversal = ManifestTraversal(manifest, manifest_schema)
manifest_entities = traversal.traverse_manifest()
# This set is used inside manifest_analyzer to know what entities must be before schema validation and integrity check.
# Used to detect orphaned entities in manifest_analyzer
original_manifest_entities_ids = set(
entity.entity["id"] for entity in manifest_entities if entity.entity.get("id")
)
traversal = ManifestLinearizer()
manifest_entities = traversal.linearize_manifest(manifest)
manifest_analyzer = ManifestAnalyzer(
manifest_entities,
self.token_refresher,
original_manifest_entities_ids