diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index cf6b94ecde861b007c0ddffc934b9c7709194cbc..8c93fabe8857a937d48968f7cf07ede0c12f0862 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -22,7 +22,7 @@ stages: - deploy pylint: - image: johnybear/osdu-airflow:airflow.1.10.14 + image: eu.gcr.io/osdu-cicd-epam/airflow-python-dags/airflow-python-dags:latest stage: linters allow_failure: true script: @@ -32,7 +32,7 @@ pylint: - exit ${EXIT_CODE} isort: - image: johnybear/osdu-airflow:airflow.1.10.14 + image: eu.gcr.io/osdu-cicd-epam/airflow-python-dags/airflow-python-dags:latest allow_failure: true stage: linters script: @@ -42,7 +42,7 @@ isort: test_dags: stage: test_dags - image: johnybear/osdu-airflow:airflow.1.10.14 + image: eu.gcr.io/osdu-cicd-epam/airflow-python-dags/airflow-python-dags:latest script: - chmod +x tests/test_dags.sh - tests/./test_dags.sh || EXIT_CODE=$? @@ -55,7 +55,7 @@ test_dags: unit_tests: stage: unit_tests - image: johnybear/osdu-airflow:airflow.1.10.14 + image: eu.gcr.io/osdu-cicd-epam/airflow-python-dags/airflow-python-dags:latest script: - chmod +x tests/unit_tests.sh - tests/./unit_tests.sh || EXIT_CODE=$? diff --git a/src/dags/libs/search_record_ids.py b/src/dags/libs/search_record_ids.py index 7258c9a65ada27e9a31d9b9b452f529cc5716afb..935bbcbb6d6b90df3c35209b89fb08197fa06bbe 100644 --- a/src/dags/libs/search_record_ids.py +++ b/src/dags/libs/search_record_ids.py @@ -17,7 +17,7 @@ import json import logging -from typing import List +from typing import List, Set from typing import Tuple import requests @@ -164,7 +164,7 @@ class ExtendedSearchId(SearchId): def _make_post_request(self, headers: dict, request_body: dict) -> Response: return requests.post(self.search_url, request_body, headers=headers) - def search_records(self) -> Tuple[List, List]: + def search_records(self) -> Set[str]: """ Send request with recordIds to Search service. """ @@ -178,10 +178,5 @@ class ExtendedSearchId(SearchId): raise ValueError(f"Got no totalCount field in Search service response. " f"Response is {data}.") response_records_ids = set(self._extract_id_from_response(data)) - if set(self.record_ids).issubset(response_records_ids): - return self.record_ids, [] - else: - found_ids = list(set(self.record_ids).intersection(response_records_ids)) - missing_ids = list(set(self.record_ids).difference(response_records_ids)) - logger.warning(f"The next ids are absent in the system: {missing_ids}") - return found_ids, missing_ids + return response_records_ids + return set() diff --git a/src/dags/libs/validation/validate_data_integrity.py b/src/dags/libs/validation/validate_data_integrity.py index 9261f11dcd4d4d3cf2729bbc971ffe676641aa6a..96bd07f55795fc120a97cb657c6736a9ad14c61c 100644 --- a/src/dags/libs/validation/validate_data_integrity.py +++ b/src/dags/libs/validation/validate_data_integrity.py @@ -67,6 +67,18 @@ class DataIntegrityValidator: logger.debug(f"ids_found: {ids_found}") all_ids_set.update(ids_found) + def _remove_redundant_colon(self, ids: Iterable[str]) -> Iterable[str]: + """ + Remove symbol ':' from ids without versions + """ + cleaned_ids = [] + for elem in ids: + if elem.endswith(":"): + cleaned_ids.append(elem[:-1]) + else: + cleaned_ids.append(elem) + return set(cleaned_ids) + def _validate_wpcs_to_datasets( self, work_product_components: Iterable[dict], datasets: Iterable[dict]) -> Tuple[Iterable[dict], Iterable[dict]]: @@ -90,6 +102,8 @@ class DataIntegrityValidator: filtered_wpcs = [] for wpc in work_product_components: expected_datasets_ids = set(wpc["data"]["Datasets"]) + expected_datasets_ids = self._remove_redundant_colon(expected_datasets_ids) + logger.debug(f"Expected datasets ids: {expected_datasets_ids}") valid_datasets_ids = expected_datasets_ids.intersection(all_datasets_ids) all_valid_datasets_ids.update(valid_datasets_ids) diffs = expected_datasets_ids.symmetric_difference(valid_datasets_ids) @@ -125,6 +139,7 @@ class DataIntegrityValidator: return {} all_wpcs_ids = self._collect_ids_from_object_array(work_product_components) expected_wpc_ids = set(work_product["data"]["Components"]) + expected_wpc_ids = self._remove_redundant_colon(expected_wpc_ids) self._update_ids_from_search(expected_wpc_ids, all_wpcs_ids) diffs = expected_wpc_ids.symmetric_difference(all_wpcs_ids) if not diffs: diff --git a/src/dags/libs/validation/validate_referential_integrity.py b/src/dags/libs/validation/validate_referential_integrity.py index 1c03a29dfc7bc5973538a0bc584bcb2ffb01e9c0..a6ce079d3a85910d94fcf16a8277493c10cb3d78 100644 --- a/src/dags/libs/validation/validate_referential_integrity.py +++ b/src/dags/libs/validation/validate_referential_integrity.py @@ -17,7 +17,9 @@ import copy import json import re import logging -from typing import List, re as regex +from typing import List, re as regex, Optional + +import dataclasses from libs.context import Context from libs.exceptions import EmptyManifestError @@ -26,6 +28,16 @@ from libs.search_record_ids import ExtendedSearchId logger = logging.getLogger() +@dataclasses.dataclass() +class IdStructure: + id: str = "" + version: str = "" + + def __eq__(self, other: "IdStructure"): + return self.id == other.id\ + and self.version == other.version + + class ManifestIntegrity(object): """Class to validate if parents reference and master data are exists and remove non-valid entities to provide integrity @@ -81,17 +93,43 @@ class ManifestIntegrity(object): entities_ids.append(elem["id"]) self.entities_ids = set(entities_ids) - def _remove_redundant_colon(self, ids: List[str]) -> List[str]: + def _decompose_ref_id(self, id_value: str) -> Optional[IdStructure]: + """ + Util method for separating id and version from referential field value. + Ex: {namespace}:reference-data--FacilityType:Well:12345 -> + { + "id": {namespace}:reference-data--FacilityType:Well, + "version": 12345 + } + """ + separate_id = IdStructure() + if id_value.endswith(":"): + separate_id.id = id_value[:-1] + elif id_value.split(":")[-1].isdigit(): + separate_id.version = str(id_value.split(":")[-1]) + separate_id.id = id_value[:-len(separate_id.version) - 1] + else: + separate_id.id = id_value + return separate_id + + def _is_entity_present_in_manifest(self, id_value: str) -> bool: + """check if entity with specified id value presents in the current processed manifest + """ + if id_value in self.entities_ids: + return True + return False + + def _combine_ids_with_versions(self) -> List[str]: """ - Remove symbol ':' from reference ids without versions + Util method for combining id and version values """ - cleaned_ids = [] - for elem in ids: - if elem.endswith(":"): - cleaned_ids.append(elem[:-1]) + ids_with_version = [] + for elem in self.ids_for_validation: + if elem.version: + ids_with_version.append(":".join([elem.id, elem.version])) else: - cleaned_ids.append(elem) - return cleaned_ids + ids_with_version.append(elem.id) + return ids_with_version def _extract_references(self, manifest: dict) -> List[str]: """ @@ -103,23 +141,36 @@ class ManifestIntegrity(object): ids_for_validation = [] for pattern in self.patterns: ids_for_validation.extend(self._match_id_with_pattern(pattern, manifest_str)) - ids_for_validation = list(set(ids_for_validation).difference(self.entities_ids)) - cleaned_ids_for_validation = self._remove_redundant_colon(ids_for_validation) - logger.debug(f"Extracted reference ids: {cleaned_ids_for_validation}") - return cleaned_ids_for_validation + for id_for_validation in ids_for_validation: + decompose_id = None + if not self._is_entity_present_in_manifest(id_for_validation): + decompose_id = self._decompose_ref_id(id_for_validation) + if decompose_id and not self._is_entity_present_in_manifest(decompose_id.id): + self.ids_for_validation.append(decompose_id) + result = [elem.id for elem in self.ids_for_validation] + logger.debug(f"Extracted reference ids: {result}") + return result def _validate_entity(self, manifest: dict) -> bool: """ Validate reference ids from manifest entity """ ids_for_validation = self._extract_references(manifest) - missing_ids = None - if ids_for_validation: - search_class = ExtendedSearchId(self.search_url, ids_for_validation, self.token_refresher, self.context) - found_ids, missing_ids = search_class.search_records() - if not missing_ids: + if not ids_for_validation: return True - return False + + search_handler = ExtendedSearchId(self.search_url, ids_for_validation, self.token_refresher, self.context) + returned_ids = search_handler.search_records() + ids_for_validation = self._combine_ids_with_versions() + + if set(ids_for_validation).issubset(returned_ids): + return True + else: + found_ids = list(set(ids_for_validation).intersection(returned_ids)) + missing_ids = list(set(ids_for_validation).difference(returned_ids)) + logger.debug(f"The next ids was found: {found_ids}") + logger.warning(f"The next ids are absent in the system: {missing_ids}") + return False def ensure_integrity(self, manifest_file: dict=None): """ diff --git a/tests/plugin-unit-tests/data/data_integrity/valid_data_ids_with_colon.json b/tests/plugin-unit-tests/data/data_integrity/valid_data_ids_with_colon.json new file mode 100644 index 0000000000000000000000000000000000000000..538ae42734934073db2b6f3bf69871017d6134c3 --- /dev/null +++ b/tests/plugin-unit-tests/data/data_integrity/valid_data_ids_with_colon.json @@ -0,0 +1,71 @@ +{ + "kind": "osdu:wks:Manifest:1.0.0", + "ReferenceData": [], + "MasterData": [], + "Data": { + "WorkProduct": { + "kind": "osdu:wks:work-product--WorkProduct:1.0.0", + "acl": { + "owners": [], + "viewers": [] + }, + "legal": { + "legaltags": [], + "otherRelevantDataCountries": [] + }, + "data": { + "ResourceSecurityClassification": "osdu:reference-data--ResourceSecurityClassification:RESTRICTED:", + "Name": "69_D_CH_11", + "Description": "Document", + "Components": [ + "surrogate-key:wpc-1:" + ] + } + }, + "WorkProductComponents": [ + { + "id": "surrogate-key:wpc-1", + "kind": "osdu:wks:work-product-component--Document:1.0.0", + "acl": { + "owners": [], + "viewers": [] + }, + "legal": { + "legaltags": [], + "otherRelevantDataCountries": [] + }, + "data": { + "ResourceSecurityClassification": "osdu:reference-data--ResourceSecurityClassification:RESTRICTED:", + "Name": "69_D_CH_11", + "Description": "Document", + "Datasets": [ + "surrogate-key:file-1:" + ] + } + } + ], + "Datasets": [ + { + "id": "surrogate-key:file-1", + "kind": "osdu:wks:dataset--File.Generic:1.0.0", + "acl": { + "owners": [], + "viewers": [] + }, + "legal": { + "legaltags": [], + "otherRelevantDataCountries": [] + }, + "data": { + "ResourceSecurityClassification": "osdu:reference-data--ResourceSecurityClassification:RESTRICTED:", + "DatasetProperties": { + "FileSourceInfo": { + "FileSource": "/r1/data/provided/USGS_docs/69_D_CH_11.pdf", + "PreloadFilePath": "s3://osdu-seismic-test-data/r1/data/provided/USGS_docs/69_D_CH_11.pdf" + } + } + } + } + ] + } +} diff --git a/tests/plugin-unit-tests/file_paths.py b/tests/plugin-unit-tests/file_paths.py index 796a39693101ad2255c5a44a2c97ef72a8072c25..5e19f88bc234bc51ac6ecd7421f4fe85347fa164 100644 --- a/tests/plugin-unit-tests/file_paths.py +++ b/tests/plugin-unit-tests/file_paths.py @@ -66,6 +66,7 @@ DATA_INTEGRITY_EMPTY_DATA = f"{DATA_PATH_PREFIX}/data_integrity/empty_data.json" DATA_INTEGRITY_EMPTY_DATA_CASE_2 = f"{DATA_PATH_PREFIX}/data_integrity/empty_data_inside.json" DATA_INTEGRITY_EMPTY_WP = f"{DATA_PATH_PREFIX}/data_integrity/empty_wp.json" DATA_INTEGRITY_VALID_REAL_IDS = f"{DATA_PATH_PREFIX}/data_integrity/valid_data_real_ids.json" +DATA_INTEGRITY_VALID_DATA_IDS_WITH_COLON = f"{DATA_PATH_PREFIX}/data_integrity/valid_data_ids_with_colon.json" FILES_SOURCE_VALID = f"{DATA_PATH_PREFIX}/data_integrity/file_source/valid_files.json" FILES_SOURCE_INVALID = f"{DATA_PATH_PREFIX}/data_integrity/file_source/invalid_files.json" diff --git a/tests/plugin-unit-tests/test_data_integrity_validator.py b/tests/plugin-unit-tests/test_data_integrity_validator.py index b48d35fe315680cb2cf7db1cbe08e2295bd704d0..5243e725eadc6b9a0da07ab3e9b84c8cd9eb4fc3 100644 --- a/tests/plugin-unit-tests/test_data_integrity_validator.py +++ b/tests/plugin-unit-tests/test_data_integrity_validator.py @@ -26,7 +26,8 @@ import pytest_mock from file_paths import (DATA_INTEGRITY_VALID_DATA, DATA_INTEGRITY_ORPHAN_DATASETS, DATA_INTEGRITY_VALID_WP_INVALID_WPC, DATA_INTEGRITY_INVALID_WP, DATA_INTEGRITY_EMPTY_DATA, DATA_INTEGRITY_VALID_REAL_IDS, - DATA_INTEGRITY_EMPTY_DATA_CASE_2, DATA_INTEGRITY_EMPTY_WP) + DATA_INTEGRITY_EMPTY_DATA_CASE_2, DATA_INTEGRITY_EMPTY_WP, + DATA_INTEGRITY_VALID_DATA_IDS_WITH_COLON) from libs.search_client import SearchClient, SearchResponse from libs.validation.validate_data_integrity import DataIntegrityValidator from libs.validation.validate_file_source import FileSourceValidator @@ -53,6 +54,7 @@ class TestDataIntegrityValidator: pytest.param(DATA_INTEGRITY_VALID_DATA, DATA_INTEGRITY_ORPHAN_DATASETS), pytest.param(DATA_INTEGRITY_VALID_DATA, DATA_INTEGRITY_VALID_WP_INVALID_WPC), pytest.param(DATA_INTEGRITY_EMPTY_DATA, DATA_INTEGRITY_INVALID_WP), + pytest.param(DATA_INTEGRITY_VALID_DATA_IDS_WITH_COLON, DATA_INTEGRITY_VALID_DATA_IDS_WITH_COLON), ]) def test_validate_data_integrity(self, mocker: pytest_mock.MockerFixture, provide_manifests, file_source_validator, expected_manifest_path: str, diff --git a/tests/plugin-unit-tests/test_manifest_integrity.py b/tests/plugin-unit-tests/test_manifest_integrity.py index ab67c21816092764fa872500053ed53936df2143..356be23cf9d5db7e36a4fe31200335940478c46e 100644 --- a/tests/plugin-unit-tests/test_manifest_integrity.py +++ b/tests/plugin-unit-tests/test_manifest_integrity.py @@ -24,7 +24,7 @@ import pytest from mock_providers import get_test_credentials from file_paths import MANIFEST_WELL_PATH, REF_RESULT_WELL_PATH, MANIFEST_WELLLOG_PATH, REF_RESULT_WELLLOG_PATH from libs.refresh_token import AirflowTokenRefresher -from libs.validation.validate_referential_integrity import ManifestIntegrity +from libs.validation.validate_referential_integrity import ManifestIntegrity, IdStructure class TestIntegrityProvider: @@ -70,3 +70,34 @@ class TestIntegrityProvider: manifest_records = manifest_integrity._extract_references( manifest_integrity.context["Data"]["WorkProductComponents"][0]) assert set(manifest_records) == set(expected_result) + + @pytest.mark.parametrize( + "conf_path,raw_id,decompose_id", + [ + pytest.param( + MANIFEST_WELLLOG_PATH, + "osdu:master-data--Wellbore:1013:", + IdStructure(id="osdu:master-data--Wellbore:1013", + version=""), + id="Extract id Wellbore"), + pytest.param( + MANIFEST_WELLLOG_PATH, + "osdu:reference-data--FacilityType:Well:12345", + IdStructure(id="osdu:reference-data--FacilityType:Well", + version="12345"), + id="Extract id Facility Type"), + pytest.param( + MANIFEST_WELLLOG_PATH, + "osdu:reference-data--FacilityType:Well:2:12345", + IdStructure(id="osdu:reference-data--FacilityType:Well:2", + version="12345"), + id="Extract id Facility Type with colon inside ID"), + ] + ) + def test_extract_references_wpc(self, monkeypatch, + manifest_integrity, + conf_path: str, + raw_id: str, + decompose_id: dict): + extracted_id = manifest_integrity._decompose_ref_id(raw_id) + assert extracted_id == decompose_id