Skip to content
Snippets Groups Projects
test_manifest_integrity.py 14.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • #  Copyright 2021 Google LLC
    #  Copyright 2021 EPAM Systems
    #
    #  Licensed under the Apache License, Version 2.0 (the "License");
    #  you may not use this file except in compliance with the License.
    #  You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    #  limitations under the License.
    
    
    from typing import List, Set
    
    
    sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins")
    sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/dags")
    
    import pytest
    from mock_providers import get_test_credentials
    
    from file_paths import MANIFEST_WELL_PATH, REF_RESULT_WELL_PATH, MANIFEST_WELLLOG_PATH, \
        REF_RESULT_WELLLOG_PATH, MANIFEST_GENERIC_PATH
    
    from libs.exceptions import ValidationIntegrityError
    
    from libs.refresh_token import BaseTokenRefresher
    
    from libs.context import Context
    from libs.search_record_ids import ExtendedSearchId
    
    from libs.validation.validate_referential_integrity import ManifestIntegrity
    
    from libs.utils import EntityId, split_id
    
        @staticmethod
        def mock_valid_extended_search(monkeypatch, entity_references: List[EntityId]):
            search_response = set()
            for entity in entity_references:
                search_response.add(entity.id)
                search_response.add(entity.srn)
            monkeypatch.setattr(
                ExtendedSearchId,
                "search_records",
                lambda *args, **kwargs: search_response
            )
    
    
        def manifest_integrity(self) -> ManifestIntegrity:
            context = Context(app_key="", data_partition_id="test")
    
            manifest_integrity = ManifestIntegrity("", BaseTokenRefresher(get_test_credentials()),
    
            return manifest_integrity
    
        @pytest.mark.parametrize(
            "conf_path,ref_result_file",
            [
                pytest.param(
    
                    MANIFEST_WELLLOG_PATH,
                    REF_RESULT_WELLLOG_PATH,
                    id="Validate manifest WPC")
    
        def test_extract_references_wpc(self, monkeypatch, manifest_integrity, conf_path: str,
                                        ref_result_file: str):
    
            with open(ref_result_file) as f:
                expected_result = json.load(f)
    
            with open(conf_path) as f:
                conf = json.load(f)
            test_data = conf["Data"]["WorkProductComponents"][0]
            manifest_integrity._collect_manifest_entities_ids(conf)
            references = manifest_integrity._extract_references(test_data)
            manifest_records = manifest_integrity._extract_external_references(
                test_data, references)
    
            assert set(e.id for e in manifest_records) == set(expected_result)
    
    
        @pytest.mark.parametrize(
            "conf_path,ref_result_file",
            [
                pytest.param(
                    MANIFEST_WELLLOG_PATH,
                    REF_RESULT_WELLLOG_PATH,
                    id="Validate manifest WPC")
            ]
        )
    
        def test_internal_references_are_not_consider_external(self, monkeypatch, manifest_integrity, conf_path: str,
                                        ref_result_file: str):
    
            with open(ref_result_file) as f:
                expected_result = json.load(f)
    
            with open(conf_path) as f:
                conf = json.load(f)
            test_data = conf["Data"]["WorkProductComponents"][0]
            manifest_integrity._collect_manifest_entities_ids(conf)
            references = manifest_integrity._extract_references(test_data)
            external_references = manifest_integrity._extract_external_references(
                test_data, references)
    
            intersection = set(e.srn for e in external_references).intersection(manifest_integrity.entities_ids)
            assert not intersection, intersection
    
    
        @pytest.mark.parametrize(
            "conf_path,ref_result_file",
            [
                pytest.param(
                    MANIFEST_WELLLOG_PATH,
                    REF_RESULT_WELLLOG_PATH,
                    id="Validate manifest WPC")
            ]
        )
        def test_ensure_referential_integrity_valid(self, monkeypatch, manifest_integrity,
                                                    conf_path: str, ref_result_file: str):
            with open(ref_result_file) as f:
                expected_result = json.load(f)
            with open(conf_path) as f:
                conf = json.load(f)
    
            monkeypatch.setattr(manifest_integrity, "_validate_referential_integrity", lambda *args, **kwargs: None)
    
            manifest_integrity.ensure_integrity(conf)
    
        @pytest.mark.parametrize(
            "manifest,ref_result_file",
            [
                pytest.param(
                    MANIFEST_GENERIC_PATH,
                    REF_RESULT_WELLLOG_PATH,
                    id="Validate manifest WPC")
            ]
        )
    
        def test_artefacts_manifest(self, monkeypatch, manifest_integrity, manifest: str,
                                    ref_result_file: str):
    
            with open(manifest) as f:
                manifest = json.load(f)
            work_product_component = manifest["Data"]["WorkProductComponents"][0]
    
            references = manifest_integrity._extract_references(work_product_component)
            manifest_records = manifest_integrity._extract_external_references(
                work_product_component, references)
    
            self.mock_valid_extended_search(monkeypatch, manifest_records)
    
            expected_wpc_list = copy.deepcopy(manifest["Data"]["WorkProductComponents"])
            manifest_integrity.ensure_integrity(manifest)
            assert expected_wpc_list == manifest["Data"]["WorkProductComponents"]
    
    
        @pytest.mark.parametrize(
            "manifest,ref_result_file",
            [
                pytest.param(
                    MANIFEST_GENERIC_PATH,
                    REF_RESULT_WELLLOG_PATH,
                    id="Validate manifest WPC")
            ]
        )
    
        def test_skip_valdiation_without_artefacts_wpc(self, monkeypatch, manifest_integrity,
                                                       manifest: str, ref_result_file: str):
    
            with open(manifest) as f:
                manifest = json.load(f)
            work_product_component = manifest["Data"]["WorkProductComponents"][0]
            work_product_component["data"].pop("Artefacts", None)
    
            references = manifest_integrity._extract_references(work_product_component)
            manifest_records = manifest_integrity._extract_external_references(
                work_product_component, references)
    
            self.mock_valid_extended_search(monkeypatch, manifest_records)
    
            expected_wpc_list = copy.deepcopy(manifest["Data"]["WorkProductComponents"])
            manifest_integrity.ensure_integrity(manifest)
            assert expected_wpc_list == manifest["Data"]["WorkProductComponents"]
    
    
        @pytest.mark.parametrize(
            "manifest,ref_result_file",
            [
                pytest.param(
                    MANIFEST_GENERIC_PATH,
                    REF_RESULT_WELLLOG_PATH,
                    id="Validate manifest WPC")
            ]
        )
    
        def test_artefacts_absent_in_manifest_and_system_resourceId(self, monkeypatch,
                                                                    manifest_integrity, manifest: str,
                                                                    ref_result_file: str):
    
            wrong_resource_id = "wrong"
            with open(manifest) as f:
                manifest = json.load(f)
    
            wrong_wpc = manifest["Data"]["WorkProductComponents"][0]
            wrong_wpc["data"]["Artefacts"][0]["ResourceID"] = wrong_resource_id
    
            monkeypatch.setattr(ExtendedSearchId, "search_records", lambda *args, **kwargs: [])
            expected_wpc_list = copy.deepcopy(manifest["Data"]["WorkProductComponents"])
            manifest_integrity.ensure_integrity(manifest)
    
            assert wrong_wpc not in manifest["Data"]["WorkProductComponents"]
    
    
        @pytest.mark.parametrize(
            "manifest,ref_result_file",
            [
                pytest.param(
                    MANIFEST_GENERIC_PATH,
                    REF_RESULT_WELLLOG_PATH,
                    id="Validate manifest WPC")
            ]
        )
    
        def test_artefacts_resourceId_duplicated_in_datasets(self, monkeypatch, manifest_integrity,
                                                             manifest: str,
                                                             ref_result_file: str):
    
            with open(manifest) as f:
                manifest = json.load(f)
    
            wrong_wpc = manifest["Data"]["WorkProductComponents"][0]
            wrong_wpc["data"]["Artefacts"][0]["ResourceID"] = \
                wrong_wpc["data"]["Datasets"][0]
    
            valid_wpcs = manifest_integrity._ensure_artefacts_integrity(
                manifest["Data"]["WorkProductComponents"])
    
            assert wrong_wpc not in valid_wpcs
    
        @pytest.mark.parametrize(
            "external_references,search_response,expected_missing_references",
            [
                pytest.param(
                    [
                        "osdu:reference-data--ResourceSecurityClassification:Public:",
                        "osdu:master-data--Organisation:HESS:",
                    ],
                    set(),
                    [
                        "osdu:reference-data--ResourceSecurityClassification:Public:",
                        "osdu:master-data--Organisation:HESS:",
                    ],
                    id="Empty search return"
                ),
                pytest.param(
                    [
                        "osdu:reference-data--ResourceSecurityClassification:Public:",
                        "osdu:master-data--Organisation:HESS:123",
                    ],
                    {
                        "osdu:reference-data--ResourceSecurityClassification:Public",
                        "osdu:reference-data--ResourceSecurityClassification:Public:123",
                        "osdu:master-data--Organisation:HESS",
                        "osdu:master-data--Organisation:HESS:123",
                    },
                    set(),
                    id="Full search return"
                ),
                pytest.param(
                    [
                        "osdu:reference-data--ResourceSecurityClassification:Public:",
                        "osdu:master-data--Organisation:HESS:",
                    ],
                    {
                        "osdu:reference-data--ResourceSecurityClassification:Public:111",
                        "osdu:reference-data--ResourceSecurityClassification:Public",
                    },
                    [
                        "osdu:master-data--Organisation:HESS:",
                    ],
                    id="Partial search return."
                )
            ]
        )
        def test_find_missing_external_ids(
            self,
            monkeypatch,
            manifest_integrity,
            external_references: List[str],
            search_response: set,
            expected_missing_references: set
        ):
            entity_ids = [split_id(r) for r in external_references]
            monkeypatch.setattr(
                ExtendedSearchId,
                "search_records",
                lambda *args, **kwargs: search_response
            )
            missing_ids = manifest_integrity._find_missing_external_ids(entity_ids)
            assert not missing_ids.symmetric_difference(expected_missing_references), \
                f'External references {external_references}\n' \
                f'Search response {search_response}\n' \
                f'Expected missing ids {expected_missing_references}'
    
    
            "entity,search_return",
    
                    {"test":[
                            "osdu:reference-data--ResourceSecurityClassification:Public:",
                            "osdu:master-data--Organisation:HESS:",
                        ]
                    },
                    set(),
                    id="Empty search return"),
    
                    {"test": [
                        "osdu:reference-data--ResourceSecurityClassification:Public:",
                        "osdu:master-data--Organisation:HESS:",
                    ]
                    },
                    {
                        "osdu:master-data--Organisation:HESS:1",
                        "osdu:master-data--Organisation:HESS"
                    },
                    id="Not complete search return")
            ]
        )
        def test_has_missing_entities(self, monkeypatch, manifest_integrity, entity: list, search_return: set):
            monkeypatch.setattr(
                ExtendedSearchId,
                "search_records",
                lambda *args, **kwargs: search_return
            )
    
            with pytest.raises(ValidationIntegrityError):
                manifest_integrity._validate_referential_integrity(entity)
    
    
        @pytest.mark.parametrize(
            "entity,search_return",
            [
    
                    {
                        "id": "osdu:master-data--Organisation:test:1",
                        "ref": [
                                "osdu:master-data--Organisation:HESS:",
                            ]
    
                    },
                    {
                        "osdu:master-data--Organisation:HESS:1",
    
                        "osdu:master-data--Organisation:HESS",
    
    
                    },
                    id="Search last version"),
                pytest.param(
    
                    {
                        "id": "osdu:master-data--Organisation:test:1",
                        "ref": [
                            "osdu:master-data--Organisation:HESS:1",
                        ]
    
                    {
                        "osdu:master-data--Organisation:HESS:1",
                        "osdu:master-data--Organisation:HESS"
                    },
    
                    id="Search returns all ids"),
                pytest.param(
                    {
                        "id": "osdu:master-data--Organisation:test:1",
                        "ref": [
                            "osdu:master-data--Organisation:HESS:1:",
                        ]
                    },
    
                    {
                        "osdu:master-data--Organisation:HESS:1:1",
                        "osdu:master-data--Organisation:HESS:1"
                    },
    
                    id="Integer part of id of reference is not considered as a version")
    
        def test_has_no_missing_entities(self, monkeypatch, manifest_integrity, entity: list,
                                      search_return: set):
            monkeypatch.setattr(
                ExtendedSearchId,
                "search_records",
                lambda *args, **kwargs: search_return
            )
    
            assert not manifest_integrity._validate_referential_integrity(entity)