From e2c68c5de3c1772e93ab9078f6cfac2d276d2716 Mon Sep 17 00:00:00 2001 From: Yan Sushchynski Date: Fri, 9 Sep 2022 13:26:44 +0300 Subject: [PATCH 1/4] GONRG-5648: Manifest-by-reference 0.17 --- .../ensure_manifest_integrity_by_reference.py | 2 +- .../operators/mixins/ReceivingContextMixin.py | 18 +- .../process_manifest_r3_by_reference.py | 32 ++- .../operators/update_status_by_reference.py | 15 +- .../validate_manifest_schema_by_reference.py | 14 +- osdu_airflow/tests/airflow_var.json | 2 + osdu_airflow/tests/airflow_var.sh | 13 ++ .../data/master/test_manifest.json | 43 ++++ .../tests/plugin-unit-tests/file_paths.py | 1 + ...test_operators_r3_manifest_by_reference.py | 199 ++++++++++++++++++ 10 files changed, 316 insertions(+), 23 deletions(-) create mode 100644 osdu_airflow/tests/airflow_var.sh create mode 100644 osdu_airflow/tests/plugin-unit-tests/data/master/test_manifest.json create mode 100644 osdu_airflow/tests/plugin-unit-tests/test_operators_r3_manifest_by_reference.py diff --git a/osdu_airflow/operators/ensure_manifest_integrity_by_reference.py b/osdu_airflow/operators/ensure_manifest_integrity_by_reference.py index 5b7362c..463dfd6 100644 --- a/osdu_airflow/operators/ensure_manifest_integrity_by_reference.py +++ b/osdu_airflow/operators/ensure_manifest_integrity_by_reference.py @@ -71,7 +71,7 @@ class EnsureManifestIntegrityOperatorByReference(BaseOperator, ReceivingContextM execution_context=execution_context, use_history=False, config_manager=None, - data_partition_id=None, + data_partition_id=payload_context.data_partition_id, token_refresher=token_refresher, logger=logger) previously_skipped_entities = self._get_previously_skipped_entities( diff --git a/osdu_airflow/operators/mixins/ReceivingContextMixin.py b/osdu_airflow/operators/mixins/ReceivingContextMixin.py index 2c3333a..dfa0e5b 100644 --- a/osdu_airflow/operators/mixins/ReceivingContextMixin.py +++ b/osdu_airflow/operators/mixins/ReceivingContextMixin.py @@ -17,6 +17,7 @@ import json import logging import re as re +from airflow.models import Variable from osdu_api.auth.authorization import TokenRefresher from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient from osdu_api.clients.dataset.dataset_registry_client import DatasetRegistryClient @@ -87,6 +88,7 @@ class ReceivingContextMixin: if logger is None: logger = logging.getLogger() + record_id = "" if use_history: record_id_list = context["ti"].xcom_pull(task_ids=self.previous_task_id, key="manifest_ref_ids") record_id = record_id_list[-1] # the last one is the most recent one @@ -98,7 +100,8 @@ class ReceivingContextMixin: logger.debug(f"#ReceivingContextMixin try to retrieve data from id : {record_id}.") - client_dms = DatasetDmsClient( config_manager=config_manager, + client_dms = DatasetDmsClient( dataset_url=Variable.get("core__service__dataset__url", default_var=None), + config_manager=config_manager, data_partition_id=data_partition_id, token_refresher=token_refresher, logger=logger) @@ -204,11 +207,12 @@ class ReceivingContextMixin: if logger is None: logger = logging.getLogger() - dataset_registry_url = config_manager.get('environment', 'dataset_registry_url') + dataset_registry_url = Variable.get("core__service__dataset__url", default_var=None) match_domain = re.search(r'https?://([\w\.-]+).*', dataset_registry_url) dataset_registry_url_domain = match_domain.group(1) - client_dms = DatasetDmsClient( config_manager=config_manager, + client_dms = DatasetDmsClient( dataset_url=Variable.get("core__service__dataset__url", default_var=None), + config_manager=config_manager, data_partition_id=data_partition_id, token_refresher=token_refresher, logger=logger) @@ -262,7 +266,8 @@ class ReceivingContextMixin: ancestry=RecordAncestry(parents=[])) ] - client_reg = DatasetRegistryClient( config_manager=config_manager, + client_reg = DatasetRegistryClient( dataset_url=Variable.get("core__service__dataset__url", default_var=None), + config_manager=config_manager, data_partition_id=data_partition_id, token_refresher=token_refresher, logger=logger) @@ -289,11 +294,12 @@ class ReceivingContextMixin: if logger is None: logger = logging.getLogger() - dataset_registry_url = config_manager.get('environment', 'dataset_registry_url') + dataset_registry_url = Variable.get("core__service__dataset__url", default_var=None) match_domain = re.search(r'https?://([\w\.-]+).*', dataset_registry_url) dataset_registry_url_domain = match_domain.group(1) - ent_client = EntitlementsClient(config_manager=config_manager, + ent_client = EntitlementsClient(Variable.get("core__service__entitlements__url", default_var=None), + config_manager=config_manager, data_partition_id=data_partition_id, token_refresher=token_refresher, logger=logger) diff --git a/osdu_airflow/operators/process_manifest_r3_by_reference.py b/osdu_airflow/operators/process_manifest_r3_by_reference.py index 39db0bc..89c7135 100644 --- a/osdu_airflow/operators/process_manifest_r3_by_reference.py +++ b/osdu_airflow/operators/process_manifest_r3_by_reference.py @@ -24,6 +24,9 @@ from typing import List, Tuple from airflow.models import BaseOperator, Variable from jsonschema import SchemaError +from osdu_api.clients.schema.schema_client import SchemaClient +from osdu_api.clients.search.search_client import SearchClient +from osdu_api.clients.storage.record_client import RecordClient from osdu_ingestion.libs.constants import DATA_TYPES_WITH_SURROGATE_KEYS, SURROGATE_KEYS_PATHS from osdu_ingestion.libs.context import Context from osdu_ingestion.libs.exceptions import (EmptyManifestError, GenericManifestSchemaError, @@ -136,31 +139,44 @@ class ProcessManifestOperatorR3ByReference(BaseOperator, ReceivingContextMixin): file_source_validator = FileSourceValidator() source_file_checker = SourceFileChecker() + search_client = SearchClient( + search_url= Variable.get("core__service__search__url", default_var=None), + token_refresher=token_refresher, + data_partition_id=payload_context.data_partition_id + ) + record_client = RecordClient( + storage_url=Variable.get("core__service__storage__url", default_var=None), + token_refresher=token_refresher, + data_partition_id=payload_context.data_partition_id + ) + schema_client = SchemaClient( + schema_url=Variable.get("core__service__schema__url", default_var=None), + token_refresher=token_refresher, + data_partition_id=payload_context.data_partition_id + ) + referential_integrity_validator = ManifestIntegrity( - token_refresher, + search_client, file_source_validator, payload_context ) manifest_processor = ManifestProcessor( + record_client=record_client, file_handler=file_handler, - token_refresher=token_refresher, context=payload_context, source_file_checker=source_file_checker, ) validator = SchemaValidator( - token_refresher, - payload_context, + schema_client=schema_client, + context=payload_context, data_types_with_surrogate_ids=DATA_TYPES_WITH_SURROGATE_KEYS, surrogate_key_fields_paths=SURROGATE_KEYS_PATHS ) single_manifest_processor = SingleManifestProcessor( - storage_url=self.storage_url, - payload_context=payload_context, referential_integrity_validator=referential_integrity_validator, manifest_processor=manifest_processor, schema_validator=validator, - token_refresher=token_refresher, batch_save_enabled=self.batch_save_enabled, save_records_batch_size=self.batch_save_size ) @@ -169,7 +185,7 @@ class ProcessManifestOperatorR3ByReference(BaseOperator, ReceivingContextMixin): execution_context=execution_context, use_history=False, config_manager=None, - data_partition_id=None, + data_partition_id=payload_context.data_partition_id, token_refresher=token_refresher, logger=logger) logger.debug(f"Manifest data: {manifest_data}") diff --git a/osdu_airflow/operators/update_status_by_reference.py b/osdu_airflow/operators/update_status_by_reference.py index d63c6db..5c452fd 100644 --- a/osdu_airflow/operators/update_status_by_reference.py +++ b/osdu_airflow/operators/update_status_by_reference.py @@ -21,6 +21,7 @@ import logging from typing import Tuple from airflow.models import BaseOperator, Variable +from osdu_api.clients.ingestion_workflow.ingestion_workflow_client import IngestionWorkflowClient from osdu_ingestion.libs.context import Context from osdu_ingestion.libs.exceptions import PipelineFailedError from osdu_ingestion.libs.refresh_token import AirflowTokenRefresher @@ -124,15 +125,21 @@ class UpdateStatusOperatorByReference(BaseOperator): workflow_name = conf["workflow_name"] run_id = conf["run_id"] status = self.status.value + + workflow_client = IngestionWorkflowClient( + ingestion_workflow_url=Variable.get("core__service__workflow__url", default_var=None), + token_refresher=AirflowTokenRefresher(), + data_partition_id=payload_context.data_partition_id + ) status_updater = UpdateStatus( + workflow_client=workflow_client, workflow_name=workflow_name, - workflow_url=Variable.get("core__service__workflow__host"), workflow_id="", run_id=run_id, status=status, - token_refresher=AirflowTokenRefresher(), context=payload_context ) + status_updater.update_workflow_status() if self._show_skipped_ids: @@ -144,14 +151,14 @@ class UpdateStatusOperatorByReference(BaseOperator): record_id = mixin._put_file_on_dataset_service( file_content=str(saved_record_ids), acl_data=mixin._get_default_acl(execution_context=execution_context, config_manager=None, - data_partition_id=None, + data_partition_id=payload_context.data_partition_id, token_refresher=AirflowTokenRefresher(), logger=logger), legal_tags=mixin._get_default_legaltags(execution_context=execution_context, config_manager=None, data_partition_id=None), config_manager=None, - data_partition_id=None, + data_partition_id=payload_context.data_partition_id, token_refresher=AirflowTokenRefresher(), logger=logger) logger.error(f"#SKIPPED_IDS: Some ids in the manifest were skipped. You can find the report in the datasetService with this record id : {record_id}") diff --git a/osdu_airflow/operators/validate_manifest_schema_by_reference.py b/osdu_airflow/operators/validate_manifest_schema_by_reference.py index c9c5036..a369f6d 100644 --- a/osdu_airflow/operators/validate_manifest_schema_by_reference.py +++ b/osdu_airflow/operators/validate_manifest_schema_by_reference.py @@ -20,6 +20,7 @@ Validate Manifest against R3 schemas operator. import logging from airflow.models import BaseOperator, Variable +from osdu_api.clients.schema.schema_client import SchemaClient from osdu_ingestion.libs.constants import DATA_TYPES_WITH_SURROGATE_KEYS, SURROGATE_KEYS_PATHS from osdu_ingestion.libs.context import Context from osdu_ingestion.libs.exceptions import EmptyManifestError, GenericManifestSchemaError @@ -65,9 +66,14 @@ class ValidateManifestSchemaOperatorByReference(BaseOperator, ReceivingContextMi f"DATA_TYPES_WITH_SURROGATE_KEYS: {DATA_TYPES_WITH_SURROGATE_KEYS}") logger.debug(f"SURROGATE_KEYS_PATHS: {SURROGATE_KEYS_PATHS}") + schema_client = SchemaClient( + schema_url=Variable.get("core__service__schema__url", default_var=None), + token_refresher=token_refresher, + data_partition_id=payload_context.data_partition_id + ) schema_validator = SchemaValidator( - token_refresher, - payload_context, + schema_client=schema_client, + context=payload_context, surrogate_key_fields_paths=SURROGATE_KEYS_PATHS, data_types_with_surrogate_ids=DATA_TYPES_WITH_SURROGATE_KEYS ) @@ -76,7 +82,7 @@ class ValidateManifestSchemaOperatorByReference(BaseOperator, ReceivingContextMi execution_context=execution_context, use_history=True, # use the history because "check_payload_type" does not return the id config_manager=None, - data_partition_id=None, + data_partition_id=payload_context.data_partition_id, token_refresher=token_refresher, logger=logger) @@ -100,6 +106,6 @@ class ValidateManifestSchemaOperatorByReference(BaseOperator, ReceivingContextMi manifest=valid_manifest_file, use_history=False, config_manager=None, - data_partition_id=None, + data_partition_id=payload_context.data_partition_id, token_refresher=token_refresher, logger=logger) diff --git a/osdu_airflow/tests/airflow_var.json b/osdu_airflow/tests/airflow_var.json index 60b7981..a5e56cc 100644 --- a/osdu_airflow/tests/airflow_var.json +++ b/osdu_airflow/tests/airflow_var.json @@ -1,6 +1,7 @@ { "core__service__storage__url": "https://test", "core__service__workflow__host": "https://test", + "core__service__entitlements__url": "https://test", "core__service__file__host": "htpps://test", "core__service__workflow__url": "https://test", "core__service__search__url": "https://test", @@ -10,6 +11,7 @@ "core__service__storage__host": "https://test", "core__service__file__url": "https://test", "core__config__dataload_config_path": "https://test", + "core__service__dataset__url": "https://test/test", "core__auth__access_token": "test", "core__ingestion__batch_count": 3 } diff --git a/osdu_airflow/tests/airflow_var.sh b/osdu_airflow/tests/airflow_var.sh new file mode 100644 index 0000000..7a8dc5a --- /dev/null +++ b/osdu_airflow/tests/airflow_var.sh @@ -0,0 +1,13 @@ +export AIRFLOW_VAR_CORE__SERVICE__STORAGE__URL="https://test" +export AIRFLOW_VAR_CORE__SERVICE__WORKFLOW__HOST="https://test" +export AIRFLOW_VAR_CORE__SERVICE__FILE__HOST="htpps://test" +export AIRFLOW_VAR_CORE__SERVICE__WORKFLOW__URL="https://test" +export AIRFLOW_VAR_CORE__SERVICE__SEARCH__URL="https://test" +export AIRFLOW_VAR_CORE__SERVICE__SEARCH__HOST="https://test" +export AIRFLOW_VAR_CORE__SERVICE__SCHEMA__URL="https://test" +export AIRFLOW_VAR_CORE__SERVICE__SCHEMA__HOST="https://test" +export AIRFLOW_VAR_CORE__SERVICE__STORAGE__HOST="https://test" +export AIRFLOW_VAR_CORE__SERVICE__FILE__URL="https://test" +export AIRFLOW_VAR_CORE__CONFIG__DATALOAD_CONFIG_PATH="https://test" +export AIRFLOW_VAR_CORE__AUTH__ACCESS_TOKEN="test" +export AIRFLOW_VAR_CORE__INGESTION__BATCH_COUNT=3 \ No newline at end of file diff --git a/osdu_airflow/tests/plugin-unit-tests/data/master/test_manifest.json b/osdu_airflow/tests/plugin-unit-tests/data/master/test_manifest.json new file mode 100644 index 0000000..4bb1b28 --- /dev/null +++ b/osdu_airflow/tests/plugin-unit-tests/data/master/test_manifest.json @@ -0,0 +1,43 @@ +{ + "kind": "test:test:Manifest:1.0.0", + "ReferenceData": [], + "MasterData": [ + { + "id": "opendes:master-data/Wellbore:350112350400", + "kind": "opendes:osdu:TestMaster:0.3.0", + "groupType": "master-data", + "version": 1, + "acl": { + "owners": [ + "data.default.viewers@opendes.osdu-gcp.go3-nrg.projects.epam.com" + ], + "viewers": [ + "data.default.owners@opendes.osdu-gcp.go3-nrg.projects.epam.com" + ] + }, + "legal": { + "legaltags": [ + "opendes-demo-legaltag" + ], + "otherRelevantDataCountries": [ + "srn:opendes:master-data/GeoPoliticalEntity:USA:" + ], + "status": "srn:opendes:reference-data/LegalStatus:public:1111" + }, + "resourceHostRegionIDs": [ + "srn:opendes:reference-data/OSDURegion:US-EAST:" + ], + "resourceObjectCreationDateTime": "2020-10-16T11:14:45-05:00", + "resourceVersionCreationDateTime": "2020-10-16T11:14:45-05:00", + "resourceSecurityClassification": "srn:opendes:reference-data/ResourceSecurityClassification:public:", + "source": "srn:opendes:master-data/Organisation:Oklahoma Corporation Commission:", + "existenceKind": "srn:opendes:reference-data/ExistenceKind:Active:", + "licenseState": "srn:opendes:reference-data/LicenseState:Unlicensed:", + "data": { + "SequenceNumber": 1 + }, + "schema": "test:test:GenericMasterData:1.0.0" + } + ], + "Data": {} +} diff --git a/osdu_airflow/tests/plugin-unit-tests/file_paths.py b/osdu_airflow/tests/plugin-unit-tests/file_paths.py index e9d6aab..ccd34f0 100644 --- a/osdu_airflow/tests/plugin-unit-tests/file_paths.py +++ b/osdu_airflow/tests/plugin-unit-tests/file_paths.py @@ -21,5 +21,6 @@ MANIFEST_GENERIC_SCHEMA_PATH = f"{DATA_PATH_PREFIX}/manifests/schema_Manifest.1. MANIFEST_WELLBORE_VALID_PATH = f"{DATA_PATH_PREFIX}/master/Wellbore.0.3.0.json" MANIFEST_BATCH_WELLBORE_VALID_PATH = f"{DATA_PATH_PREFIX}/master/batch_Wellbore.0.3.0.json" +MANIFEST_TEST_PATH = f"{DATA_PATH_PREFIX}/master/test_manifest.json" SEARCH_VALID_RESPONSE_PATH = f"{DATA_PATH_PREFIX}/other/SearchResponseValid.json" diff --git a/osdu_airflow/tests/plugin-unit-tests/test_operators_r3_manifest_by_reference.py b/osdu_airflow/tests/plugin-unit-tests/test_operators_r3_manifest_by_reference.py new file mode 100644 index 0000000..c7daef4 --- /dev/null +++ b/osdu_airflow/tests/plugin-unit-tests/test_operators_r3_manifest_by_reference.py @@ -0,0 +1,199 @@ +# Copyright 2020 Google LLC +# Copyright 2020 EPAM Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import http +import json +from datetime import datetime +from typing import ClassVar, TypeVar +from unittest.mock import Mock + +import pytest +import requests +import osdu_airflow.operators.mixins.ReceivingContextMixin as receiving_context +from airflow import DAG +from airflow.models import TaskInstance +from osdu_ingestion.libs.exceptions import PipelineFailedError +from osdu_ingestion.libs.handle_file import FileHandler + +import mock_providers +from file_paths import (MANIFEST_BATCH_WELLBORE_VALID_PATH, MANIFEST_TEST_PATH, MANIFEST_GENERIC_SCHEMA_PATH, + MANIFEST_WELLBORE_VALID_PATH) +from mock_responses import MockWorkflowResponse +from osdu_airflow.operators.ensure_manifest_integrity_by_reference import EnsureManifestIntegrityOperatorByReference +from osdu_airflow.operators.mixins.ReceivingContextMixin import ReceivingContextMixin +from osdu_airflow.operators.process_manifest_r3_by_reference import (ManifestProcessor, + ProcessManifestOperatorR3ByReference, + SchemaValidator) +from osdu_airflow.operators.update_status_by_reference import UpdateStatusOperatorByReference +from osdu_airflow.operators.validate_manifest_schema_by_reference import ValidateManifestSchemaOperatorByReference + +CustomOperator = TypeVar("CustomOperator") + + +class MockDagRun: + def __init__(self, conf): + self.conf = conf + + +class MockStorageResponse(requests.Response): + + def json(self, **kwargs): + return {"recordIds": ["test"]} + + +class TestManifestByReferenceOperators(object): + + @staticmethod + def _read_manifest(): + with open(MANIFEST_TEST_PATH) as f: + return f.read() + + def _make_services_calls_mocked(self): + mock_retrieval_response = Mock() + mock_retrieval_response.json = Mock( + side_effect=lambda *args, **kwargs: {"delivery": [{"retrievalProperties": {"signedUrl": "test"}}]}) + receiving_context.DatasetDmsClient.get_retrieval_instructions = Mock(side_effect=lambda *args, **kwargs: mock_retrieval_response) + + mock_storage_instructions = Mock() + mock_storage_instructions.json = Mock( + side_effect=lambda *args, **kwargs: { + "storageLocation": { + "signedUrl": "string", + "fileSource": "string", + } + } + ) + receiving_context.DatasetDmsClient.get_storage_instructions = Mock(side_effect=lambda *args, **kwargs: mock_storage_instructions) + + mock_make_request = Mock() + mock_make_request.json = Mock(side_effect=lambda *args, **kwargs: self._read_manifest()) + receiving_context.DatasetDmsClient.make_request = Mock(side_effect=lambda *args, **kwargs: mock_make_request) + + mock_register_dataset = Mock() + mock_register_dataset.json = Mock(side_effect=lambda *args, **kwargs: {'datasetRegistries': [{"id": "test"}]}) + receiving_context.DatasetRegistryClient.register_dataset = Mock(side_effect=lambda *args, **kwargs: mock_register_dataset) + + mock_get_groups_for_user = Mock() + mock_get_groups_for_user.json = Mock( + side_effect=lambda *args, **kwargs: {"groups": []}) + receiving_context.EntitlementsClient.get_groups_for_user = Mock(side_effect=lambda *args, **kwargs: mock_get_groups_for_user) + + def _create_batch_task(self, operator: ClassVar[CustomOperator]) -> (CustomOperator, dict): + self._make_services_calls_mocked() + with open(MANIFEST_BATCH_WELLBORE_VALID_PATH) as f: + conf = json.load(f) + dag = DAG(dag_id='batch_osdu_ingest', start_date=datetime.now()) + task: CustomOperator = operator(dag=dag, task_id='anytask') + ti = TaskInstance(task=task, execution_date=datetime.now()) + + ti.xcom_pull = Mock(side_effect=lambda *args, **kwargs: "test") + context = ti.get_template_context() + context["dag_run"] = MockDagRun(conf) + return task, context + + def _create_task(self, operator: ClassVar[CustomOperator]) -> (CustomOperator, dict): + self._make_services_calls_mocked() + with open(MANIFEST_WELLBORE_VALID_PATH) as f: + conf = json.load(f) + dag = DAG(dag_id='Osdu_ingest', start_date=datetime.now()) + task: CustomOperator = operator(dag=dag, task_id='anytask') + ti = TaskInstance(task=task, execution_date=datetime.now()) + + ti.xcom_pull = Mock(side_effect=lambda *args, **kwargs: "test") + context = ti.get_template_context() + context["dag_run"] = MockDagRun(conf) + return task, context + + def test_process_manifest_r3_operator(self, monkeypatch): + + def _get_common_schema(*args, **kwargs): + with open(MANIFEST_GENERIC_SCHEMA_PATH) as f: + manifest_schema = json.load(f) + return manifest_schema + + + monkeypatch.setattr(SchemaValidator, "get_schema", _get_common_schema) + monkeypatch.setattr(SchemaValidator, "_validate_against_schema", lambda *args, **kwargs: None) + monkeypatch.setattr(SchemaValidator, "validate_manifest", lambda obj, entities: entities) + monkeypatch.setattr(ManifestProcessor, "save_record_to_storage", + lambda obj, headers, request_data: MockStorageResponse()) + monkeypatch.setattr(FileHandler, "upload_file", + lambda *args, **kwargs: "test") + + task, context = self._create_task(ProcessManifestOperatorR3ByReference) + task.pre_execute(context) + task.execute(context) + + def _test_update_status_operator(self, monkeypatch, status: UpdateStatusOperatorByReference.prev_ti_state): + monkeypatch.setattr(UpdateStatusOperatorByReference, "get_previous_ti_statuses", + lambda obj, context: status) + monkeypatch.setattr(requests, "put", lambda *args, **kwargs: MockWorkflowResponse( + status_code=http.HTTPStatus.OK, json="test")) + + task, context = self._create_task(UpdateStatusOperatorByReference) + task.pre_execute(context) + task.execute(context) + + @pytest.mark.parametrize( + "status", + [ + pytest.param( + UpdateStatusOperatorByReference.prev_ti_state.NONE + ), + pytest.param( + UpdateStatusOperatorByReference.prev_ti_state.SUCCESS + ) + ] + ) + def test_update_status_operator(self, monkeypatch, status): + self._test_update_status_operator(monkeypatch, status) + + @pytest.mark.parametrize( + "status", + [ + pytest.param( + UpdateStatusOperatorByReference.prev_ti_state.FAILED + ) + ] + ) + def test_update_status_operator_failed(self, monkeypatch, status): + """ + Test if operator raises PipeLineFailedError if any previous task failed. + """ + with pytest.raises(PipelineFailedError): + self._test_update_status_operator(monkeypatch, status) + + def test_validate_schema_operator(self, monkeypatch): + + def _get_common_schema(*args, **kwargs): + with open(MANIFEST_GENERIC_SCHEMA_PATH) as f: + manifest_schema = json.load(f) + return manifest_schema + + monkeypatch.setattr(SchemaValidator, "get_schema", _get_common_schema) + monkeypatch.setattr(SchemaValidator, "_validate_against_schema", lambda *args, **kwargs: None) + monkeypatch.setattr(SchemaValidator, "validate_manifest", lambda obj, entities: entities) + task, context = self._create_task(ValidateManifestSchemaOperatorByReference) + task.pre_execute(context) + task.execute(context) + + def test_ensure_manifest_integrity(self, monkeypatch): + monkeypatch.setattr(FileHandler, "upload_file", + lambda *args, **kwargs: "test") + monkeypatch.setattr(ReceivingContextMixin, "_get_previously_skipped_entities", + lambda *args, **kwargs: []) + task, context = self._create_task(EnsureManifestIntegrityOperatorByReference) + task.pre_execute(context) + task.execute(context) -- GitLab From ab31530dbafa209d8b9446d9dcf7cbe658d06717 Mon Sep 17 00:00:00 2001 From: Valentin Gauthier Date: Wed, 14 Sep 2022 16:58:15 +0200 Subject: [PATCH 2/4] updating manifest-by-reference to use new PreLoadFilePath syntax, and add the possibility to use new dataset-service enpoints by. By default, new enpoints are used, set 'use_new_dataset_service_endpoints' variable to 'false' in airflow gloabl context if you want to keep using old dataset-service endpoints --- .../ensure_manifest_integrity_by_reference.py | 63 +++++++-- .../operators/mixins/ReceivingContextMixin.py | 122 +++++++----------- .../process_manifest_r3_by_reference.py | 17 ++- .../operators/update_status_by_reference.py | 57 ++++++-- .../validate_manifest_schema_by_reference.py | 40 +++++- 5 files changed, 186 insertions(+), 113 deletions(-) diff --git a/osdu_airflow/operators/ensure_manifest_integrity_by_reference.py b/osdu_airflow/operators/ensure_manifest_integrity_by_reference.py index 463dfd6..fd55688 100644 --- a/osdu_airflow/operators/ensure_manifest_integrity_by_reference.py +++ b/osdu_airflow/operators/ensure_manifest_integrity_by_reference.py @@ -17,6 +17,12 @@ import logging from airflow.models import BaseOperator, Variable +from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient +from osdu_api.clients.dataset.dataset_registry_client import DatasetRegistryClient +from osdu_api.clients.entitlements.entitlements_client import EntitlementsClient +from osdu_api.configuration.config_manager import DefaultConfigManager +from osdu_api.clients.search.search_client import SearchClient +from osdu_ingestion.libs import search_client from osdu_ingestion.libs.context import Context from osdu_ingestion.libs.refresh_token import AirflowTokenRefresher from osdu_ingestion.libs.validation.validate_file_source import FileSourceValidator @@ -53,27 +59,55 @@ class EnsureManifestIntegrityOperatorByReference(BaseOperator, ReceivingContextM :param context: Airflow context :type context: dict """ - payload_context = Context.populate( - context["dag_run"].conf["execution_context"]) + execution_context = context["dag_run"].conf["execution_context"] + payload_context = Context.populate(context["dag_run"].conf["execution_context"]) token_refresher = AirflowTokenRefresher() file_source_validator = FileSourceValidator() + config_manager = DefaultConfigManager() + + search_client = SearchClient( + search_url= Variable.get("core__service__search__url", default_var=None), + token_refresher=token_refresher, + data_partition_id=payload_context.data_partition_id + ) manifest_integrity = ManifestIntegrity( - token_refresher, - file_source_validator, - payload_context, - self.whitelist_ref_patterns, + search_client=search_client, + file_source_validator=file_source_validator, + context=payload_context, + whitelist_ref_patterns=self.whitelist_ref_patterns, ) - execution_context = context["dag_run"].conf["execution_context"] + dataset_dms_client = DatasetDmsClient( + dataset_url=Variable.get("core__service__dataset__url", default_var=None), + config_manager=config_manager, + data_partition_id=payload_context.data_partition_id, + token_refresher=token_refresher, + logger=logger + ) - manifest_data = self._get_manifest_data_by_reference(context=context, + dataset_reg_client = DatasetRegistryClient( + dataset_url=Variable.get("core__service__dataset__url", default_var=None), + config_manager=config_manager, + data_partition_id=payload_context.data_partition_id, + token_refresher=token_refresher, + logger=logger + ) + + entitlements_client = EntitlementsClient( + entitlements_url=Variable.get("core__service__entitlements__url", default_var=None), + config_manager=config_manager, + data_partition_id=payload_context.data_partition_id, + token_refresher=token_refresher, + logger=logger + ) + + manifest_data = self._get_manifest_data_by_reference(context=context, execution_context=execution_context, use_history=False, - config_manager=None, - data_partition_id=payload_context.data_partition_id, - token_refresher=token_refresher, + dataset_dms_client=dataset_dms_client, logger=logger) + previously_skipped_entities = self._get_previously_skipped_entities( context) logger.debug(f"Manifest data: {manifest_data}") @@ -90,7 +124,8 @@ class EnsureManifestIntegrityOperatorByReference(BaseOperator, ReceivingContextM execution_context=execution_context, manifest=manifest, use_history=False, - config_manager=None, - data_partition_id=None, - token_refresher=token_refresher, + data_partition_id=payload_context.data_partition_id, + dataset_dms_client=dataset_dms_client, + dataset_reg_client=dataset_reg_client, + entitlements_client=entitlements_client, logger=logger) diff --git a/osdu_airflow/operators/mixins/ReceivingContextMixin.py b/osdu_airflow/operators/mixins/ReceivingContextMixin.py index dfa0e5b..4b197ab 100644 --- a/osdu_airflow/operators/mixins/ReceivingContextMixin.py +++ b/osdu_airflow/operators/mixins/ReceivingContextMixin.py @@ -22,7 +22,6 @@ from osdu_api.auth.authorization import TokenRefresher from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient from osdu_api.clients.dataset.dataset_registry_client import DatasetRegistryClient from osdu_api.clients.entitlements.entitlements_client import EntitlementsClient -from osdu_api.configuration.base_config_manager import BaseConfigManager from osdu_api.configuration.config_manager import DefaultConfigManager from osdu_api.model.dataset.create_dataset_registries_request import CreateDatasetRegistriesRequest from osdu_api.model.http_method import HttpMethod @@ -69,25 +68,19 @@ class ReceivingContextMixin: previously_skipped_ids.extend(task_skipped_ids) return previously_skipped_ids - def _get_manifest_data_by_reference(self, context: dict, execution_context: dict, use_history:bool=False, - config_manager: BaseConfigManager = None, - data_partition_id = None, - token_refresher: TokenRefresher = None, + def _get_manifest_data_by_reference(self, context: dict, execution_context: dict, + dataset_dms_client: DatasetDmsClient, use_history:bool=False, logger = None) -> ManifestType: """ [Geosiris Developement] Get manifest from a datasetService. If use_history is set to True, the data is taken from the record_id history instead of using last task return value """ - if config_manager is None: - config_manager = DefaultConfigManager() - - if data_partition_id is None: - data_partition_id = config_manager.get('environment', 'data_partition_id') - if logger is None: logger = logging.getLogger() + use_new_dataset_service_endpoints = Variable.get("use_new_dataset_service_endpoints", default_var=True, deserialize_json=True) + record_id = "" if use_history: record_id_list = context["ti"].xcom_pull(task_ids=self.previous_task_id, key="manifest_ref_ids") @@ -100,25 +93,29 @@ class ReceivingContextMixin: logger.debug(f"#ReceivingContextMixin try to retrieve data from id : {record_id}.") - client_dms = DatasetDmsClient( dataset_url=Variable.get("core__service__dataset__url", default_var=None), - config_manager=config_manager, - data_partition_id=data_partition_id, - token_refresher=token_refresher, - logger=logger) - retrieval = client_dms.get_retrieval_instructions(record_id=record_id) + if use_new_dataset_service_endpoints: + logger.debug(f"#ReceivingContextMixin your are using NEW dataset-service endpoints." + + "If you want to use old dataset-service endpoints, set variable 'use_new_dataset_service_endpoints' to false" + + "in Airflow global variables") + retrieval = dataset_dms_client.retrieval_instructions(record_id=record_id) + else: + logger.debug(f"#ReceivingContextMixin your are using OLD dataset-service endpoints") + retrieval = dataset_dms_client.get_retrieval_instructions(record_id=record_id) retrieval_content_url = retrieval.json()["delivery"][0]["retrievalProperties"]["signedUrl"] - manifest_data = client_dms.make_request(method=HttpMethod.GET, url=retrieval_content_url).json() + manifest_data = dataset_dms_client.make_request(method=HttpMethod.GET, url=retrieval_content_url).json() if isinstance(manifest_data, str): return json.loads(manifest_data) else: return manifest_data - def _put_manifest_data_by_reference(self, context: dict, execution_context: dict, manifest, use_history:bool=False, - config_manager: BaseConfigManager = None, - data_partition_id = None, - token_refresher: TokenRefresher = None, + def _put_manifest_data_by_reference(self, context: dict, execution_context: dict, manifest, + data_partition_id: str, + dataset_dms_client: DatasetDmsClient, + dataset_reg_client: DatasetRegistryClient, + entitlements_client: EntitlementsClient=None, + use_history:bool=False, logger = None) -> str: """ [Geosiris Developement] @@ -145,9 +142,8 @@ class ReceivingContextMixin: if acl_data is None: logger.debug(f"Getting default value for acl, because not found in manifest. {type(manifest_dict)}, \n{manifest_dict}") acl_data = self._get_default_acl(execution_context=execution_context, - config_manager=config_manager, + entitlements_client=entitlements_client, data_partition_id=data_partition_id, - token_refresher=token_refresher, logger=logger) #### END ACL @@ -164,17 +160,15 @@ class ReceivingContextMixin: if legal_tags is None: logger.debug(f"Getting default value for legal, because not found in manifest. {type(manifest_dict)}, \n{manifest_dict}") - legal_tags = self._get_default_legaltags(execution_context=execution_context, - config_manager=config_manager, - data_partition_id=data_partition_id) + legal_tags = self._get_default_legaltags(execution_context=execution_context, data_partition_id=data_partition_id) #### END Legal tags record_id = self._put_file_on_dataset_service( file_content=manifest, # manifest_dict acl_data=acl_data, legal_tags=legal_tags, - config_manager=config_manager, + dataset_reg_client=dataset_reg_client, + dataset_dms_client=dataset_dms_client, data_partition_id=data_partition_id, - token_refresher=token_refresher, logger=logger) if use_history: @@ -190,33 +184,31 @@ class ReceivingContextMixin: def _put_file_on_dataset_service(self, file_content, acl_data: Acl, legal_tags: Legal, - config_manager: BaseConfigManager = None, - data_partition_id = None, - token_refresher: TokenRefresher = None, + dataset_reg_client: DatasetRegistryClient, + dataset_dms_client: DatasetDmsClient, + data_partition_id: str, logger = None) -> str: """ [Geosiris Developement] Store a file on the dataset-service """ - if config_manager is None: - config_manager = DefaultConfigManager() - - if data_partition_id is None: - data_partition_id = config_manager.get('environment', 'data_partition_id') - if logger is None: logger = logging.getLogger() + use_new_dataset_service_endpoints = Variable.get("use_new_dataset_service_endpoints", default_var=True, deserialize_json=True) + dataset_registry_url = Variable.get("core__service__dataset__url", default_var=None) match_domain = re.search(r'https?://([\w\.-]+).*', dataset_registry_url) dataset_registry_url_domain = match_domain.group(1) - client_dms = DatasetDmsClient( dataset_url=Variable.get("core__service__dataset__url", default_var=None), - config_manager=config_manager, - data_partition_id=data_partition_id, - token_refresher=token_refresher, - logger=logger) - storage_instruction = client_dms.get_storage_instructions(kind_sub_type="dataset--File.Generic") + if use_new_dataset_service_endpoints: + logger.debug(f"#ReceivingContextMixin your are using NEW dataset-service endpoints." + + "If you want to use old dataset-service endpoints, set variable 'use_new_dataset_service_endpoints' to false" + + "in Airflow global variables") + storage_instruction = dataset_dms_client.storage_instructions(kind_sub_type="dataset--File.Generic") + else: + logger.debug(f"#ReceivingContextMixin your are using OLD dataset-service endpoints") + storage_instruction = dataset_dms_client.get_storage_instructions(kind_sub_type="dataset--File.Generic") storage_location = storage_instruction.json()['storageLocation'] @@ -244,9 +236,8 @@ class ReceivingContextMixin: except KeyError as e: logger.debug("No 'signedUploadFileName' parameter found for storage location") - #### Uploading data - put_result = client_dms.make_request(method=HttpMethod.PUT, url=signed_url, data=file_content) + put_result = dataset_dms_client.make_request(method=HttpMethod.PUT, url=signed_url, data=file_content) record_list = [ Record( kind = f"{data_partition_id}:wks:dataset--File.Generic:1.0.0", @@ -256,7 +247,7 @@ class ReceivingContextMixin: "DatasetProperties": { "FileSourceInfo": { "FileSource": file_source, - "PreloadFilePath": f"{unsigned_url}{signed_upload_file_name}" + "PreLoadFilePath": f"{unsigned_url}{signed_upload_file_name}" } }, "ResourceSecurityClassification": f"{data_partition_id}:reference-data--ResourceSecurityClassification:RESTRICTED:", @@ -266,44 +257,28 @@ class ReceivingContextMixin: ancestry=RecordAncestry(parents=[])) ] - client_reg = DatasetRegistryClient( dataset_url=Variable.get("core__service__dataset__url", default_var=None), - config_manager=config_manager, - data_partition_id=data_partition_id, - token_refresher=token_refresher, - logger=logger) + - registered_dataset = client_reg.register_dataset(CreateDatasetRegistriesRequest(dataset_registries=record_list)) + registered_dataset = dataset_reg_client.register_dataset(CreateDatasetRegistriesRequest(dataset_registries=record_list)) return registered_dataset.json()['datasetRegistries'][0]["id"] def _get_default_acl(self, execution_context: dict, - config_manager: BaseConfigManager = None, - data_partition_id = None, - token_refresher: TokenRefresher = None, - logger = None): + entitlements_client: EntitlementsClient, + data_partition_id = None, + logger = None): if "acl" in execution_context: # try to take the value from the context return execution_context['acl'] else: - if config_manager is None: - config_manager = DefaultConfigManager() - - if data_partition_id is None: - data_partition_id = config_manager.get('environment', 'data_partition_id') - if logger is None: logger = logging.getLogger() dataset_registry_url = Variable.get("core__service__dataset__url", default_var=None) match_domain = re.search(r'https?://([\w\.-]+).*', dataset_registry_url) dataset_registry_url_domain = match_domain.group(1) - - ent_client = EntitlementsClient(Variable.get("core__service__entitlements__url", default_var=None), - config_manager=config_manager, - data_partition_id=data_partition_id, - token_refresher=token_refresher, - logger=logger) - ent_response = ent_client.get_groups_for_user() + + ent_response = entitlements_client.get_groups_for_user() acl_domain = data_partition_id + "." + dataset_registry_url_domain @@ -331,15 +306,10 @@ class ReceivingContextMixin: def _get_default_legaltags(self, execution_context: dict, - data_partition_id: str=None, - config_manager: BaseConfigManager = None): + data_partition_id: str): if "legal" in execution_context: # try to take the value from the context return execution_context['legal'] else: - if config_manager is None: - config_manager = DefaultConfigManager() - if data_partition_id is None: - data_partition_id = config_manager.get('environment', 'data_partition_id') return Legal(legaltags=[data_partition_id + "-demo-legaltag"], other_relevant_data_countries=["US"], status="compliant") diff --git a/osdu_airflow/operators/process_manifest_r3_by_reference.py b/osdu_airflow/operators/process_manifest_r3_by_reference.py index 89c7135..004799d 100644 --- a/osdu_airflow/operators/process_manifest_r3_by_reference.py +++ b/osdu_airflow/operators/process_manifest_r3_by_reference.py @@ -27,6 +27,11 @@ from jsonschema import SchemaError from osdu_api.clients.schema.schema_client import SchemaClient from osdu_api.clients.search.search_client import SearchClient from osdu_api.clients.storage.record_client import RecordClient +from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient +from osdu_api.configuration.config_manager import DefaultConfigManager +from osdu_api.clients.schema.schema_client import SchemaClient +from osdu_api.clients.search.search_client import SearchClient +from osdu_api.clients.storage.record_client import RecordClient from osdu_ingestion.libs.constants import DATA_TYPES_WITH_SURROGATE_KEYS, SURROGATE_KEYS_PATHS from osdu_ingestion.libs.context import Context from osdu_ingestion.libs.exceptions import (EmptyManifestError, GenericManifestSchemaError, @@ -155,6 +160,14 @@ class ProcessManifestOperatorR3ByReference(BaseOperator, ReceivingContextMixin): data_partition_id=payload_context.data_partition_id ) + dataset_dms_client = DatasetDmsClient( + dataset_url=Variable.get("core__service__dataset__url", default_var=None), + config_manager=DefaultConfigManager(), + data_partition_id=payload_context.data_partition_id, + token_refresher=token_refresher, + logger=logger + ) + referential_integrity_validator = ManifestIntegrity( search_client, file_source_validator, @@ -184,9 +197,7 @@ class ProcessManifestOperatorR3ByReference(BaseOperator, ReceivingContextMixin): manifest_data = self._get_manifest_data_by_reference(context=context, execution_context=execution_context, use_history=False, - config_manager=None, - data_partition_id=payload_context.data_partition_id, - token_refresher=token_refresher, + dataset_dms_client=dataset_dms_client, logger=logger) logger.debug(f"Manifest data: {manifest_data}") diff --git a/osdu_airflow/operators/update_status_by_reference.py b/osdu_airflow/operators/update_status_by_reference.py index 5c452fd..2604017 100644 --- a/osdu_airflow/operators/update_status_by_reference.py +++ b/osdu_airflow/operators/update_status_by_reference.py @@ -21,6 +21,10 @@ import logging from typing import Tuple from airflow.models import BaseOperator, Variable +from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient +from osdu_api.clients.dataset.dataset_registry_client import DatasetRegistryClient +from osdu_api.clients.entitlements.entitlements_client import EntitlementsClient +from osdu_api.configuration.config_manager import DefaultConfigManager from osdu_api.clients.ingestion_workflow.ingestion_workflow_client import IngestionWorkflowClient from osdu_ingestion.libs.context import Context from osdu_ingestion.libs.exceptions import PipelineFailedError @@ -33,7 +37,7 @@ from osdu_airflow.operators.mixins.ReceivingContextMixin import ReceivingContext logger = logging.getLogger() -class UpdateStatusOperatorByReference(BaseOperator): +class UpdateStatusOperatorByReference(BaseOperator, ReceivingContextMixin): """Operator to update status.""" ui_color = '#10ECAA' ui_fgcolor = '#000000' @@ -125,10 +129,12 @@ class UpdateStatusOperatorByReference(BaseOperator): workflow_name = conf["workflow_name"] run_id = conf["run_id"] status = self.status.value + + token_refresher = AirflowTokenRefresher() workflow_client = IngestionWorkflowClient( ingestion_workflow_url=Variable.get("core__service__workflow__url", default_var=None), - token_refresher=AirflowTokenRefresher(), + token_refresher=token_refresher, data_partition_id=payload_context.data_partition_id ) status_updater = UpdateStatus( @@ -143,23 +149,46 @@ class UpdateStatusOperatorByReference(BaseOperator): status_updater.update_workflow_status() if self._show_skipped_ids: + config_manager = DefaultConfigManager() + + dataset_dms_client = DatasetDmsClient( + dataset_url=Variable.get("core__service__dataset__url", default_var=None), + config_manager=config_manager, + data_partition_id=payload_context.data_partition_id, + token_refresher=token_refresher, + logger=logger + ) + + dataset_reg_client = DatasetRegistryClient( + dataset_url=Variable.get("core__service__dataset__url", default_var=None), + config_manager=config_manager, + data_partition_id=payload_context.data_partition_id, + token_refresher=token_refresher, + logger=logger + ) + + entitlements_client = EntitlementsClient( + entitlements_url=Variable.get("core__service__entitlements__url", default_var=None), + config_manager=config_manager, + data_partition_id=payload_context.data_partition_id, + token_refresher=token_refresher, + logger=logger + ) + skipped_ids, saved_record_ids = self._create_skipped_report(context) context["ti"].xcom_push(key="skipped_ids", value=skipped_ids) context["ti"].xcom_push(key="saved_record_ids", value=saved_record_ids) - mixin = ReceivingContextMixin() - record_id = mixin._put_file_on_dataset_service( file_content=str(saved_record_ids), - acl_data=mixin._get_default_acl(execution_context=execution_context, - config_manager=None, - data_partition_id=payload_context.data_partition_id, - token_refresher=AirflowTokenRefresher(), - logger=logger), - legal_tags=mixin._get_default_legaltags(execution_context=execution_context, - config_manager=None, - data_partition_id=None), - config_manager=None, + record_id = self._put_file_on_dataset_service( file_content=str(saved_record_ids), + acl_data=self._get_default_acl(execution_context=execution_context, + entitlements_client=entitlements_client, + data_partition_id=data_partition_id, + logger=logger), + legal_tags=self._get_default_legaltags(execution_context=execution_context, + data_partition_id=payload_context.data_partition_id), data_partition_id=payload_context.data_partition_id, - token_refresher=AirflowTokenRefresher(), + dataset_reg_client=dataset_reg_client, + dataset_dms_client=dataset_dms_client, logger=logger) logger.error(f"#SKIPPED_IDS: Some ids in the manifest were skipped. You can find the report in the datasetService with this record id : {record_id}") diff --git a/osdu_airflow/operators/validate_manifest_schema_by_reference.py b/osdu_airflow/operators/validate_manifest_schema_by_reference.py index a369f6d..4e28e9d 100644 --- a/osdu_airflow/operators/validate_manifest_schema_by_reference.py +++ b/osdu_airflow/operators/validate_manifest_schema_by_reference.py @@ -21,6 +21,10 @@ import logging from airflow.models import BaseOperator, Variable from osdu_api.clients.schema.schema_client import SchemaClient +from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient +from osdu_api.clients.dataset.dataset_registry_client import DatasetRegistryClient +from osdu_api.clients.entitlements.entitlements_client import EntitlementsClient +from osdu_api.configuration.config_manager import DefaultConfigManager from osdu_ingestion.libs.constants import DATA_TYPES_WITH_SURROGATE_KEYS, SURROGATE_KEYS_PATHS from osdu_ingestion.libs.context import Context from osdu_ingestion.libs.exceptions import EmptyManifestError, GenericManifestSchemaError @@ -61,6 +65,7 @@ class ValidateManifestSchemaOperatorByReference(BaseOperator, ReceivingContextMi execution_context = context["dag_run"].conf["execution_context"] payload_context = Context.populate(execution_context) token_refresher = AirflowTokenRefresher() + config_manager = DefaultConfigManager() logger.debug( f"DATA_TYPES_WITH_SURROGATE_KEYS: {DATA_TYPES_WITH_SURROGATE_KEYS}") @@ -78,12 +83,34 @@ class ValidateManifestSchemaOperatorByReference(BaseOperator, ReceivingContextMi data_types_with_surrogate_ids=DATA_TYPES_WITH_SURROGATE_KEYS ) - manifest_data = self._get_manifest_data_by_reference(context=context, + dataset_dms_client = DatasetDmsClient( + dataset_url=Variable.get("core__service__dataset__url", default_var=None), + config_manager=config_manager, + data_partition_id=payload_context.data_partition_id, + token_refresher=token_refresher, + logger=logger + ) + + dataset_reg_client = DatasetRegistryClient( + dataset_url=Variable.get("core__service__dataset__url", default_var=None), + config_manager=config_manager, + data_partition_id=payload_context.data_partition_id, + token_refresher=token_refresher, + logger=logger + ) + + entitlements_client = EntitlementsClient( + entitlements_url=Variable.get("core__service__entitlements__url", default_var=None), + config_manager=config_manager, + data_partition_id=payload_context.data_partition_id, + token_refresher=token_refresher, + logger=logger + ) + + manifest_data = self._get_manifest_data_by_reference(context=context, execution_context=execution_context, use_history=True, # use the history because "check_payload_type" does not return the id - config_manager=None, - data_partition_id=payload_context.data_partition_id, - token_refresher=token_refresher, + dataset_dms_client=dataset_dms_client, logger=logger) if not manifest_data: @@ -105,7 +132,8 @@ class ValidateManifestSchemaOperatorByReference(BaseOperator, ReceivingContextMi execution_context=execution_context, manifest=valid_manifest_file, use_history=False, - config_manager=None, data_partition_id=payload_context.data_partition_id, - token_refresher=token_refresher, + dataset_dms_client=dataset_dms_client, + dataset_reg_client=dataset_reg_client, + entitlements_client=entitlements_client, logger=logger) -- GitLab From 865abe45322d5c17c98d80a2c7ba2c0b3f555d32 Mon Sep 17 00:00:00 2001 From: Valentin Gauthier Date: Tue, 20 Sep 2022 14:09:43 +0200 Subject: [PATCH 3/4] try to use old dataset_service_endpoints for tests --- osdu_airflow/tests/osdu_api.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/osdu_airflow/tests/osdu_api.ini b/osdu_airflow/tests/osdu_api.ini index edd54d5..4f8a4f6 100644 --- a/osdu_airflow/tests/osdu_api.ini +++ b/osdu_airflow/tests/osdu_api.ini @@ -24,6 +24,7 @@ dataset_url=https://blah/api/dataset-registry/v1 schema_url=https://blah/api/schema-service/v1 ingestion_workflow_url=stub use_service_principal=no +use_new_dataset_service_endpoints=false [provider] name=provider_test -- GitLab From 2690e0bd705f21a94a551bfb00107304c9ddf1ec Mon Sep 17 00:00:00 2001 From: Valentin Gauthier Date: Tue, 20 Sep 2022 14:17:19 +0200 Subject: [PATCH 4/4] isort and moving variable for dataset endpoints to airflow var from ini file --- .../ensure_manifest_integrity_by_reference.py | 2 +- .../process_manifest_r3_by_reference.py | 5 +---- .../operators/update_status_by_reference.py | 2 +- .../validate_manifest_schema_by_reference.py | 2 +- osdu_airflow/tests/airflow_var.json | 3 ++- osdu_airflow/tests/osdu_api.ini | 1 - .../tests/plugin-unit-tests/mock_providers.py | 3 ++- .../tests/plugin-unit-tests/mock_responses.py | 3 ++- .../plugin-unit-tests/test_operators_r3.py | 20 ++++++++---------- ...test_operators_r3_manifest_by_reference.py | 21 ++++++++++--------- 10 files changed, 30 insertions(+), 32 deletions(-) diff --git a/osdu_airflow/operators/ensure_manifest_integrity_by_reference.py b/osdu_airflow/operators/ensure_manifest_integrity_by_reference.py index fd55688..9cc8a8b 100644 --- a/osdu_airflow/operators/ensure_manifest_integrity_by_reference.py +++ b/osdu_airflow/operators/ensure_manifest_integrity_by_reference.py @@ -20,8 +20,8 @@ from airflow.models import BaseOperator, Variable from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient from osdu_api.clients.dataset.dataset_registry_client import DatasetRegistryClient from osdu_api.clients.entitlements.entitlements_client import EntitlementsClient -from osdu_api.configuration.config_manager import DefaultConfigManager from osdu_api.clients.search.search_client import SearchClient +from osdu_api.configuration.config_manager import DefaultConfigManager from osdu_ingestion.libs import search_client from osdu_ingestion.libs.context import Context from osdu_ingestion.libs.refresh_token import AirflowTokenRefresher diff --git a/osdu_airflow/operators/process_manifest_r3_by_reference.py b/osdu_airflow/operators/process_manifest_r3_by_reference.py index 004799d..7f35455 100644 --- a/osdu_airflow/operators/process_manifest_r3_by_reference.py +++ b/osdu_airflow/operators/process_manifest_r3_by_reference.py @@ -24,14 +24,11 @@ from typing import List, Tuple from airflow.models import BaseOperator, Variable from jsonschema import SchemaError -from osdu_api.clients.schema.schema_client import SchemaClient -from osdu_api.clients.search.search_client import SearchClient -from osdu_api.clients.storage.record_client import RecordClient from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient -from osdu_api.configuration.config_manager import DefaultConfigManager from osdu_api.clients.schema.schema_client import SchemaClient from osdu_api.clients.search.search_client import SearchClient from osdu_api.clients.storage.record_client import RecordClient +from osdu_api.configuration.config_manager import DefaultConfigManager from osdu_ingestion.libs.constants import DATA_TYPES_WITH_SURROGATE_KEYS, SURROGATE_KEYS_PATHS from osdu_ingestion.libs.context import Context from osdu_ingestion.libs.exceptions import (EmptyManifestError, GenericManifestSchemaError, diff --git a/osdu_airflow/operators/update_status_by_reference.py b/osdu_airflow/operators/update_status_by_reference.py index 2604017..883b0b5 100644 --- a/osdu_airflow/operators/update_status_by_reference.py +++ b/osdu_airflow/operators/update_status_by_reference.py @@ -24,8 +24,8 @@ from airflow.models import BaseOperator, Variable from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient from osdu_api.clients.dataset.dataset_registry_client import DatasetRegistryClient from osdu_api.clients.entitlements.entitlements_client import EntitlementsClient -from osdu_api.configuration.config_manager import DefaultConfigManager from osdu_api.clients.ingestion_workflow.ingestion_workflow_client import IngestionWorkflowClient +from osdu_api.configuration.config_manager import DefaultConfigManager from osdu_ingestion.libs.context import Context from osdu_ingestion.libs.exceptions import PipelineFailedError from osdu_ingestion.libs.refresh_token import AirflowTokenRefresher diff --git a/osdu_airflow/operators/validate_manifest_schema_by_reference.py b/osdu_airflow/operators/validate_manifest_schema_by_reference.py index 4e28e9d..4edf26b 100644 --- a/osdu_airflow/operators/validate_manifest_schema_by_reference.py +++ b/osdu_airflow/operators/validate_manifest_schema_by_reference.py @@ -20,10 +20,10 @@ Validate Manifest against R3 schemas operator. import logging from airflow.models import BaseOperator, Variable -from osdu_api.clients.schema.schema_client import SchemaClient from osdu_api.clients.dataset.dataset_dms_client import DatasetDmsClient from osdu_api.clients.dataset.dataset_registry_client import DatasetRegistryClient from osdu_api.clients.entitlements.entitlements_client import EntitlementsClient +from osdu_api.clients.schema.schema_client import SchemaClient from osdu_api.configuration.config_manager import DefaultConfigManager from osdu_ingestion.libs.constants import DATA_TYPES_WITH_SURROGATE_KEYS, SURROGATE_KEYS_PATHS from osdu_ingestion.libs.context import Context diff --git a/osdu_airflow/tests/airflow_var.json b/osdu_airflow/tests/airflow_var.json index a5e56cc..2e7b940 100644 --- a/osdu_airflow/tests/airflow_var.json +++ b/osdu_airflow/tests/airflow_var.json @@ -13,5 +13,6 @@ "core__config__dataload_config_path": "https://test", "core__service__dataset__url": "https://test/test", "core__auth__access_token": "test", - "core__ingestion__batch_count": 3 + "core__ingestion__batch_count": 3, + "use_new_dataset_service_endpoints": "false" } diff --git a/osdu_airflow/tests/osdu_api.ini b/osdu_airflow/tests/osdu_api.ini index 4f8a4f6..edd54d5 100644 --- a/osdu_airflow/tests/osdu_api.ini +++ b/osdu_airflow/tests/osdu_api.ini @@ -24,7 +24,6 @@ dataset_url=https://blah/api/dataset-registry/v1 schema_url=https://blah/api/schema-service/v1 ingestion_workflow_url=stub use_service_principal=no -use_new_dataset_service_endpoints=false [provider] name=provider_test diff --git a/osdu_airflow/tests/plugin-unit-tests/mock_providers.py b/osdu_airflow/tests/plugin-unit-tests/mock_providers.py index 7690ff1..2d26f35 100644 --- a/osdu_airflow/tests/plugin-unit-tests/mock_providers.py +++ b/osdu_airflow/tests/plugin-unit-tests/mock_providers.py @@ -17,10 +17,11 @@ import io import logging from typing import Tuple + from osdu_api.providers.blob_storage import get_client from osdu_api.providers.credentials import get_credentials from osdu_api.providers.factory import ProvidersFactory -from osdu_api.providers.types import BlobStorageClient, BaseCredentials +from osdu_api.providers.types import BaseCredentials, BlobStorageClient logger = logging.getLogger(__name__) diff --git a/osdu_airflow/tests/plugin-unit-tests/mock_responses.py b/osdu_airflow/tests/plugin-unit-tests/mock_responses.py index 227f859..89a092a 100644 --- a/osdu_airflow/tests/plugin-unit-tests/mock_responses.py +++ b/osdu_airflow/tests/plugin-unit-tests/mock_responses.py @@ -14,8 +14,9 @@ # limitations under the License. -import json import http +import json + import requests diff --git a/osdu_airflow/tests/plugin-unit-tests/test_operators_r3.py b/osdu_airflow/tests/plugin-unit-tests/test_operators_r3.py index 3c10760..8f3a39a 100644 --- a/osdu_airflow/tests/plugin-unit-tests/test_operators_r3.py +++ b/osdu_airflow/tests/plugin-unit-tests/test_operators_r3.py @@ -13,18 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import mock_providers - import http import json from datetime import datetime from typing import ClassVar, TypeVar from unittest.mock import MagicMock +import mock_providers import pytest -from osdu_ingestion.libs.exceptions import PipelineFailedError -from osdu_ingestion.libs.handle_file import FileHandler - import requests from airflow import DAG from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator @@ -32,18 +28,20 @@ from airflow.models import TaskInstance from file_paths import (MANIFEST_BATCH_WELLBORE_VALID_PATH, MANIFEST_GENERIC_SCHEMA_PATH, MANIFEST_WELLBORE_VALID_PATH) from mock_responses import MockWorkflowResponse +from osdu_api.clients.search.search_client import SearchClient +from osdu_api.clients.storage.record_client import RecordClient +from osdu_ingestion.libs.exceptions import PipelineFailedError +from osdu_ingestion.libs.handle_file import FileHandler +from osdu_ingestion.libs.segy_conversion_metadata.headers_byte_locations import HeadersByteLocations +from osdu_ingestion.libs.segy_conversion_metadata.open_vds import OpenVDSMetadata + from osdu_airflow.operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator from osdu_airflow.operators.mixins.ReceivingContextMixin import ReceivingContextMixin from osdu_airflow.operators.process_manifest_r3 import (ManifestProcessor, ProcessManifestOperatorR3, SchemaValidator) -from osdu_airflow.operators.segy_open_vds_conversion import KubernetesPodSegyToOpenVDSOperator +from osdu_airflow.operators.segy_open_vds_conversion import KubernetesPodSegyToOpenVDSOperator from osdu_airflow.operators.update_status import UpdateStatusOperator from osdu_airflow.operators.validate_manifest_schema import ValidateManifestSchemaOperator -from osdu_api.clients.storage.record_client import RecordClient -from osdu_api.clients.search.search_client import SearchClient -from osdu_ingestion.libs.segy_conversion_metadata.open_vds import OpenVDSMetadata -from osdu_ingestion.libs.segy_conversion_metadata.headers_byte_locations import HeadersByteLocations - CustomOperator = TypeVar("CustomOperator") diff --git a/osdu_airflow/tests/plugin-unit-tests/test_operators_r3_manifest_by_reference.py b/osdu_airflow/tests/plugin-unit-tests/test_operators_r3_manifest_by_reference.py index c7daef4..c95bf02 100644 --- a/osdu_airflow/tests/plugin-unit-tests/test_operators_r3_manifest_by_reference.py +++ b/osdu_airflow/tests/plugin-unit-tests/test_operators_r3_manifest_by_reference.py @@ -19,25 +19,26 @@ from datetime import datetime from typing import ClassVar, TypeVar from unittest.mock import Mock +import mock_providers import pytest import requests -import osdu_airflow.operators.mixins.ReceivingContextMixin as receiving_context from airflow import DAG from airflow.models import TaskInstance +from file_paths import (MANIFEST_BATCH_WELLBORE_VALID_PATH, MANIFEST_GENERIC_SCHEMA_PATH, + MANIFEST_TEST_PATH, MANIFEST_WELLBORE_VALID_PATH) +from mock_responses import MockWorkflowResponse from osdu_ingestion.libs.exceptions import PipelineFailedError from osdu_ingestion.libs.handle_file import FileHandler -import mock_providers -from file_paths import (MANIFEST_BATCH_WELLBORE_VALID_PATH, MANIFEST_TEST_PATH, MANIFEST_GENERIC_SCHEMA_PATH, - MANIFEST_WELLBORE_VALID_PATH) -from mock_responses import MockWorkflowResponse -from osdu_airflow.operators.ensure_manifest_integrity_by_reference import EnsureManifestIntegrityOperatorByReference +import osdu_airflow.operators.mixins.ReceivingContextMixin as receiving_context +from osdu_airflow.operators.ensure_manifest_integrity_by_reference import \ + EnsureManifestIntegrityOperatorByReference from osdu_airflow.operators.mixins.ReceivingContextMixin import ReceivingContextMixin -from osdu_airflow.operators.process_manifest_r3_by_reference import (ManifestProcessor, - ProcessManifestOperatorR3ByReference, - SchemaValidator) +from osdu_airflow.operators.process_manifest_r3_by_reference import ( + ManifestProcessor, ProcessManifestOperatorR3ByReference, SchemaValidator) from osdu_airflow.operators.update_status_by_reference import UpdateStatusOperatorByReference -from osdu_airflow.operators.validate_manifest_schema_by_reference import ValidateManifestSchemaOperatorByReference +from osdu_airflow.operators.validate_manifest_schema_by_reference import \ + ValidateManifestSchemaOperatorByReference CustomOperator = TypeVar("CustomOperator") -- GitLab