Skip to content
Snippets Groups Projects
Commit 43b80d2f authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

Merge branch 'trusted-fixes-ids-refs' into 'master'

Fix for IDs refs. Updated Docker image link

See merge request !26
parents b93ff05f b1778896
No related branches found
No related tags found
1 merge request!26Fix for IDs refs. Updated Docker image link
Pipeline #30250 passed
......@@ -22,7 +22,7 @@ stages:
- deploy
pylint:
image: johnybear/osdu-airflow:airflow.1.10.14
image: eu.gcr.io/osdu-cicd-epam/airflow-python-dags/airflow-python-dags:latest
stage: linters
allow_failure: true
script:
......@@ -32,7 +32,7 @@ pylint:
- exit ${EXIT_CODE}
isort:
image: johnybear/osdu-airflow:airflow.1.10.14
image: eu.gcr.io/osdu-cicd-epam/airflow-python-dags/airflow-python-dags:latest
allow_failure: true
stage: linters
script:
......@@ -42,7 +42,7 @@ isort:
test_dags:
stage: test_dags
image: johnybear/osdu-airflow:airflow.1.10.14
image: eu.gcr.io/osdu-cicd-epam/airflow-python-dags/airflow-python-dags:latest
script:
- chmod +x tests/test_dags.sh
- tests/./test_dags.sh || EXIT_CODE=$?
......@@ -55,7 +55,7 @@ test_dags:
unit_tests:
stage: unit_tests
image: johnybear/osdu-airflow:airflow.1.10.14
image: eu.gcr.io/osdu-cicd-epam/airflow-python-dags/airflow-python-dags:latest
script:
- chmod +x tests/unit_tests.sh
- tests/./unit_tests.sh || EXIT_CODE=$?
......
......@@ -17,7 +17,7 @@
import json
import logging
from typing import List
from typing import List, Set
from typing import Tuple
import requests
......@@ -164,7 +164,7 @@ class ExtendedSearchId(SearchId):
def _make_post_request(self, headers: dict, request_body: dict) -> Response:
return requests.post(self.search_url, request_body, headers=headers)
def search_records(self) -> Tuple[List, List]:
def search_records(self) -> Set[str]:
"""
Send request with recordIds to Search service.
"""
......@@ -178,10 +178,5 @@ class ExtendedSearchId(SearchId):
raise ValueError(f"Got no totalCount field in Search service response. "
f"Response is {data}.")
response_records_ids = set(self._extract_id_from_response(data))
if set(self.record_ids).issubset(response_records_ids):
return self.record_ids, []
else:
found_ids = list(set(self.record_ids).intersection(response_records_ids))
missing_ids = list(set(self.record_ids).difference(response_records_ids))
logger.warning(f"The next ids are absent in the system: {missing_ids}")
return found_ids, missing_ids
return response_records_ids
return set()
......@@ -67,6 +67,18 @@ class DataIntegrityValidator:
logger.debug(f"ids_found: {ids_found}")
all_ids_set.update(ids_found)
def _remove_redundant_colon(self, ids: Iterable[str]) -> Iterable[str]:
"""
Remove symbol ':' from ids without versions
"""
cleaned_ids = []
for elem in ids:
if elem.endswith(":"):
cleaned_ids.append(elem[:-1])
else:
cleaned_ids.append(elem)
return set(cleaned_ids)
def _validate_wpcs_to_datasets(
self, work_product_components: Iterable[dict],
datasets: Iterable[dict]) -> Tuple[Iterable[dict], Iterable[dict]]:
......@@ -90,6 +102,8 @@ class DataIntegrityValidator:
filtered_wpcs = []
for wpc in work_product_components:
expected_datasets_ids = set(wpc["data"]["Datasets"])
expected_datasets_ids = self._remove_redundant_colon(expected_datasets_ids)
logger.debug(f"Expected datasets ids: {expected_datasets_ids}")
valid_datasets_ids = expected_datasets_ids.intersection(all_datasets_ids)
all_valid_datasets_ids.update(valid_datasets_ids)
diffs = expected_datasets_ids.symmetric_difference(valid_datasets_ids)
......@@ -125,6 +139,7 @@ class DataIntegrityValidator:
return {}
all_wpcs_ids = self._collect_ids_from_object_array(work_product_components)
expected_wpc_ids = set(work_product["data"]["Components"])
expected_wpc_ids = self._remove_redundant_colon(expected_wpc_ids)
self._update_ids_from_search(expected_wpc_ids, all_wpcs_ids)
diffs = expected_wpc_ids.symmetric_difference(all_wpcs_ids)
if not diffs:
......
......@@ -17,7 +17,9 @@ import copy
import json
import re
import logging
from typing import List, re as regex
from typing import List, re as regex, Optional
import dataclasses
from libs.context import Context
from libs.exceptions import EmptyManifestError
......@@ -26,6 +28,16 @@ from libs.search_record_ids import ExtendedSearchId
logger = logging.getLogger()
@dataclasses.dataclass()
class IdStructure:
id: str = ""
version: str = ""
def __eq__(self, other: "IdStructure"):
return self.id == other.id\
and self.version == other.version
class ManifestIntegrity(object):
"""Class to validate if parents reference and master data are exists and
remove non-valid entities to provide integrity
......@@ -81,17 +93,43 @@ class ManifestIntegrity(object):
entities_ids.append(elem["id"])
self.entities_ids = set(entities_ids)
def _remove_redundant_colon(self, ids: List[str]) -> List[str]:
def _decompose_ref_id(self, id_value: str) -> Optional[IdStructure]:
"""
Util method for separating id and version from referential field value.
Ex: {namespace}:reference-data--FacilityType:Well:12345 ->
{
"id": {namespace}:reference-data--FacilityType:Well,
"version": 12345
}
"""
separate_id = IdStructure()
if id_value.endswith(":"):
separate_id.id = id_value[:-1]
elif id_value.split(":")[-1].isdigit():
separate_id.version = str(id_value.split(":")[-1])
separate_id.id = id_value[:-len(separate_id.version) - 1]
else:
separate_id.id = id_value
return separate_id
def _is_entity_present_in_manifest(self, id_value: str) -> bool:
"""check if entity with specified id value presents in the current processed manifest
"""
if id_value in self.entities_ids:
return True
return False
def _combine_ids_with_versions(self) -> List[str]:
"""
Remove symbol ':' from reference ids without versions
Util method for combining id and version values
"""
cleaned_ids = []
for elem in ids:
if elem.endswith(":"):
cleaned_ids.append(elem[:-1])
ids_with_version = []
for elem in self.ids_for_validation:
if elem.version:
ids_with_version.append(":".join([elem.id, elem.version]))
else:
cleaned_ids.append(elem)
return cleaned_ids
ids_with_version.append(elem.id)
return ids_with_version
def _extract_references(self, manifest: dict) -> List[str]:
"""
......@@ -103,23 +141,36 @@ class ManifestIntegrity(object):
ids_for_validation = []
for pattern in self.patterns:
ids_for_validation.extend(self._match_id_with_pattern(pattern, manifest_str))
ids_for_validation = list(set(ids_for_validation).difference(self.entities_ids))
cleaned_ids_for_validation = self._remove_redundant_colon(ids_for_validation)
logger.debug(f"Extracted reference ids: {cleaned_ids_for_validation}")
return cleaned_ids_for_validation
for id_for_validation in ids_for_validation:
decompose_id = None
if not self._is_entity_present_in_manifest(id_for_validation):
decompose_id = self._decompose_ref_id(id_for_validation)
if decompose_id and not self._is_entity_present_in_manifest(decompose_id.id):
self.ids_for_validation.append(decompose_id)
result = [elem.id for elem in self.ids_for_validation]
logger.debug(f"Extracted reference ids: {result}")
return result
def _validate_entity(self, manifest: dict) -> bool:
"""
Validate reference ids from manifest entity
"""
ids_for_validation = self._extract_references(manifest)
missing_ids = None
if ids_for_validation:
search_class = ExtendedSearchId(self.search_url, ids_for_validation, self.token_refresher, self.context)
found_ids, missing_ids = search_class.search_records()
if not missing_ids:
if not ids_for_validation:
return True
return False
search_handler = ExtendedSearchId(self.search_url, ids_for_validation, self.token_refresher, self.context)
returned_ids = search_handler.search_records()
ids_for_validation = self._combine_ids_with_versions()
if set(ids_for_validation).issubset(returned_ids):
return True
else:
found_ids = list(set(ids_for_validation).intersection(returned_ids))
missing_ids = list(set(ids_for_validation).difference(returned_ids))
logger.debug(f"The next ids was found: {found_ids}")
logger.warning(f"The next ids are absent in the system: {missing_ids}")
return False
def ensure_integrity(self, manifest_file: dict=None):
"""
......
{
"kind": "osdu:wks:Manifest:1.0.0",
"ReferenceData": [],
"MasterData": [],
"Data": {
"WorkProduct": {
"kind": "osdu:wks:work-product--WorkProduct:1.0.0",
"acl": {
"owners": [],
"viewers": []
},
"legal": {
"legaltags": [],
"otherRelevantDataCountries": []
},
"data": {
"ResourceSecurityClassification": "osdu:reference-data--ResourceSecurityClassification:RESTRICTED:",
"Name": "69_D_CH_11",
"Description": "Document",
"Components": [
"surrogate-key:wpc-1:"
]
}
},
"WorkProductComponents": [
{
"id": "surrogate-key:wpc-1",
"kind": "osdu:wks:work-product-component--Document:1.0.0",
"acl": {
"owners": [],
"viewers": []
},
"legal": {
"legaltags": [],
"otherRelevantDataCountries": []
},
"data": {
"ResourceSecurityClassification": "osdu:reference-data--ResourceSecurityClassification:RESTRICTED:",
"Name": "69_D_CH_11",
"Description": "Document",
"Datasets": [
"surrogate-key:file-1:"
]
}
}
],
"Datasets": [
{
"id": "surrogate-key:file-1",
"kind": "osdu:wks:dataset--File.Generic:1.0.0",
"acl": {
"owners": [],
"viewers": []
},
"legal": {
"legaltags": [],
"otherRelevantDataCountries": []
},
"data": {
"ResourceSecurityClassification": "osdu:reference-data--ResourceSecurityClassification:RESTRICTED:",
"DatasetProperties": {
"FileSourceInfo": {
"FileSource": "/r1/data/provided/USGS_docs/69_D_CH_11.pdf",
"PreloadFilePath": "s3://osdu-seismic-test-data/r1/data/provided/USGS_docs/69_D_CH_11.pdf"
}
}
}
}
]
}
}
......@@ -66,6 +66,7 @@ DATA_INTEGRITY_EMPTY_DATA = f"{DATA_PATH_PREFIX}/data_integrity/empty_data.json"
DATA_INTEGRITY_EMPTY_DATA_CASE_2 = f"{DATA_PATH_PREFIX}/data_integrity/empty_data_inside.json"
DATA_INTEGRITY_EMPTY_WP = f"{DATA_PATH_PREFIX}/data_integrity/empty_wp.json"
DATA_INTEGRITY_VALID_REAL_IDS = f"{DATA_PATH_PREFIX}/data_integrity/valid_data_real_ids.json"
DATA_INTEGRITY_VALID_DATA_IDS_WITH_COLON = f"{DATA_PATH_PREFIX}/data_integrity/valid_data_ids_with_colon.json"
FILES_SOURCE_VALID = f"{DATA_PATH_PREFIX}/data_integrity/file_source/valid_files.json"
FILES_SOURCE_INVALID = f"{DATA_PATH_PREFIX}/data_integrity/file_source/invalid_files.json"
......
......@@ -26,7 +26,8 @@ import pytest_mock
from file_paths import (DATA_INTEGRITY_VALID_DATA, DATA_INTEGRITY_ORPHAN_DATASETS,
DATA_INTEGRITY_VALID_WP_INVALID_WPC, DATA_INTEGRITY_INVALID_WP,
DATA_INTEGRITY_EMPTY_DATA, DATA_INTEGRITY_VALID_REAL_IDS,
DATA_INTEGRITY_EMPTY_DATA_CASE_2, DATA_INTEGRITY_EMPTY_WP)
DATA_INTEGRITY_EMPTY_DATA_CASE_2, DATA_INTEGRITY_EMPTY_WP,
DATA_INTEGRITY_VALID_DATA_IDS_WITH_COLON)
from libs.search_client import SearchClient, SearchResponse
from libs.validation.validate_data_integrity import DataIntegrityValidator
from libs.validation.validate_file_source import FileSourceValidator
......@@ -53,6 +54,7 @@ class TestDataIntegrityValidator:
pytest.param(DATA_INTEGRITY_VALID_DATA, DATA_INTEGRITY_ORPHAN_DATASETS),
pytest.param(DATA_INTEGRITY_VALID_DATA, DATA_INTEGRITY_VALID_WP_INVALID_WPC),
pytest.param(DATA_INTEGRITY_EMPTY_DATA, DATA_INTEGRITY_INVALID_WP),
pytest.param(DATA_INTEGRITY_VALID_DATA_IDS_WITH_COLON, DATA_INTEGRITY_VALID_DATA_IDS_WITH_COLON),
])
def test_validate_data_integrity(self, mocker: pytest_mock.MockerFixture, provide_manifests,
file_source_validator, expected_manifest_path: str,
......
......@@ -24,7 +24,7 @@ import pytest
from mock_providers import get_test_credentials
from file_paths import MANIFEST_WELL_PATH, REF_RESULT_WELL_PATH, MANIFEST_WELLLOG_PATH, REF_RESULT_WELLLOG_PATH
from libs.refresh_token import AirflowTokenRefresher
from libs.validation.validate_referential_integrity import ManifestIntegrity
from libs.validation.validate_referential_integrity import ManifestIntegrity, IdStructure
class TestIntegrityProvider:
......@@ -70,3 +70,34 @@ class TestIntegrityProvider:
manifest_records = manifest_integrity._extract_references(
manifest_integrity.context["Data"]["WorkProductComponents"][0])
assert set(manifest_records) == set(expected_result)
@pytest.mark.parametrize(
"conf_path,raw_id,decompose_id",
[
pytest.param(
MANIFEST_WELLLOG_PATH,
"osdu:master-data--Wellbore:1013:",
IdStructure(id="osdu:master-data--Wellbore:1013",
version=""),
id="Extract id Wellbore"),
pytest.param(
MANIFEST_WELLLOG_PATH,
"osdu:reference-data--FacilityType:Well:12345",
IdStructure(id="osdu:reference-data--FacilityType:Well",
version="12345"),
id="Extract id Facility Type"),
pytest.param(
MANIFEST_WELLLOG_PATH,
"osdu:reference-data--FacilityType:Well:2:12345",
IdStructure(id="osdu:reference-data--FacilityType:Well:2",
version="12345"),
id="Extract id Facility Type with colon inside ID"),
]
)
def test_extract_references_wpc(self, monkeypatch,
manifest_integrity,
conf_path: str,
raw_id: str,
decompose_id: dict):
extracted_id = manifest_integrity._decompose_ref_id(raw_id)
assert extracted_id == decompose_id
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment