diff --git a/src/dags/libs/exceptions.py b/src/dags/libs/exceptions.py index 747ea51b3b2297d3b1b77be85ff539477d7db984..f0755e35ae46a2b90f2d926ad46ce6a9dfac6b7d 100644 --- a/src/dags/libs/exceptions.py +++ b/src/dags/libs/exceptions.py @@ -59,3 +59,14 @@ class SAFilePathError(Exception): """ pass +class FileSourceError(Exception): + """ + Raise when file doesn't exist under given path. + """ + pass + +class GCSObjectURIError(Exception): + """ + Raise when wrong Google Storage Object was given. + """ + pass diff --git a/src/dags/libs/file_source_check.py b/src/dags/libs/file_source_check.py new file mode 100644 index 0000000000000000000000000000000000000000..43ae0c386c14b2fb8412a48c462301194d119561 --- /dev/null +++ b/src/dags/libs/file_source_check.py @@ -0,0 +1,78 @@ +# Copyright 2020 Google LLC +# Copyright 2020 EPAM Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from abc import ABC, abstractmethod +from typing import Tuple +from urllib.parse import urlparse + +import tenacity +from google.cloud import storage +from libs.exceptions import GCSObjectURIError, FileSourceError + + +RETRIES = 3 + + +class FileSourceChecker(ABC): + + @abstractmethod + def does_file_exist(self, file_path: str) -> bool: + """ + Validate if file exists under given file path. + """ + pass + + +class GoogleStorageFileSourceChecker(FileSourceChecker): + + def __init__(self): + pass + + def __repr__(self): + return "GCS file checker" + + def _parse_object_uri(self, file_path: str) -> Tuple[str, str]: + """ + Parse GCS Object uri. + Return bucket and blob names. + """ + parsed_path = urlparse(file_path) + if parsed_path.scheme == "gs": + bucket_name = parsed_path.netloc + source_blob_name = parsed_path.path[1:] # delete the first slash + + if bucket_name and source_blob_name: + return bucket_name, source_blob_name + + raise GCSObjectURIError + + @tenacity.retry( + stop=tenacity.stop_after_attempt(RETRIES), + reraise=True + ) + def _does_file_exist_in_bucket(self, bucket_name: str, source_blob_name: str): + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(source_blob_name) + does_exist = blob.exists() + return does_exist + + def does_file_exist(self, file_path: str) -> bool: + bucket_name, source_blob_name = self._parse_object_uri(file_path) + does_exist = self._does_file_exist_in_bucket(bucket_name, source_blob_name) + if not does_exist: + raise FileSourceError(f"File doesn't exist in '{file_path}'") + return True diff --git a/src/dags/libs/process_manifest_r3.py b/src/dags/libs/process_manifest_r3.py index 4b2f55b060724b42d4c250268a37bcd909837325..1f858e805f1d8ed6cdd2068d136cdddf457835dd 100644 --- a/src/dags/libs/process_manifest_r3.py +++ b/src/dags/libs/process_manifest_r3.py @@ -23,6 +23,7 @@ import requests import tenacity from libs.context import Context from libs.exceptions import EmptyManifestError, NotOSDUShemaFormatError +from libs.file_source_check import FileSourceChecker from libs.mixins import HeadersMixin from libs.refresh_token import AirflowTokenRefresher, refresh_token @@ -43,8 +44,16 @@ class ManifestProcessor(HeadersMixin): } } - def __init__(self, storage_url: str, dagrun_conf: dict, context: Context): + def __init__( + self, + storage_url: str, + dagrun_conf: dict, + file_source_checker: FileSourceChecker, + context: Context + ): super().__init__(context) + self._validate_file_source_checker_type(file_source_checker) + self.file_source_checker = file_source_checker self.storage_url = storage_url self.data_object = copy.deepcopy(dagrun_conf) self.context = context @@ -57,6 +66,10 @@ class ManifestProcessor(HeadersMixin): kind_name = kind.split(":")[2] return kind_name + def check_file_resource(self, file_record: dict): + file_path = file_record["Data"]["PreLoadFilePath"] + self.file_source_checker.does_file_exist(file_path) + def generate_id(self, manifest_fragment: dict) -> str: """ Generate id to use it in Storage. @@ -79,7 +92,14 @@ class ManifestProcessor(HeadersMixin): record["data"] = manifest return record - def _validate_storage_response(self, response_dict: dict): + @staticmethod + def _validate_file_source_checker_type(file_source_checker: FileSourceChecker): + if not isinstance(file_source_checker, FileSourceChecker): + raise TypeError(f"File checker must be instance of '{FileSourceChecker}' class.\n" + f"Got got instance of '{file_source_checker}'") + + @staticmethod + def _validate_storage_response(response_dict: dict): if not ( isinstance(response_dict, dict) and isinstance(response_dict.get("recordIds"), list) @@ -136,6 +156,7 @@ class ManifestProcessor(HeadersMixin): """ records = [] for file in manifest["Files"]: + self.check_file_resource(file) record = self.populate_manifest_storage_record(file) records.append(record) return records diff --git a/src/plugins/operators/process_manifest_r3.py b/src/plugins/operators/process_manifest_r3.py index 6850e41b363dd0dd59ac41990b4a599fa02484ac..82d2e8ad928b92aee63bc1dbd0dcc506c8fa9628 100644 --- a/src/plugins/operators/process_manifest_r3.py +++ b/src/plugins/operators/process_manifest_r3.py @@ -18,6 +18,7 @@ import logging from airflow.utils import apply_defaults from airflow.models import BaseOperator, Variable from libs.context import Context +from libs.file_source_check import GoogleStorageFileSourceChecker from libs.process_manifest_r3 import ManifestProcessor from libs.validate_schema import SchemaValidator @@ -48,7 +49,8 @@ class ProcessManifestOperatorR3(BaseOperator): manifest_processor = ManifestProcessor( self.storage_url, context["dag_run"].conf, - payload_context + GoogleStorageFileSourceChecker(), + payload_context, ) validator.validate_manifest() record_ids = manifest_processor.process_manifest() diff --git a/tests/plugin-unit-tests/test_file_source_check.py b/tests/plugin-unit-tests/test_file_source_check.py new file mode 100644 index 0000000000000000000000000000000000000000..349a854c2552cca3059dbb8e6dfa5dff53293a0f --- /dev/null +++ b/tests/plugin-unit-tests/test_file_source_check.py @@ -0,0 +1,91 @@ +# Copyright 2020 Google LLC +# Copyright 2020 EPAM Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import sys + +sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins") +sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/dags") + +from libs.exceptions import FileSourceError, GCSObjectURIError +import pytest +from libs.file_source_check import GoogleStorageFileSourceChecker + + +class TestFileSourceChecker: + + @pytest.fixture() + def file_checker(self): + return GoogleStorageFileSourceChecker() + + @pytest.fixture() + def mock_file_exist_under_uri(self, monkeypatch, file_exists: bool): + """ + Mock response from GCS if file exists or not. + """ + monkeypatch.setattr(GoogleStorageFileSourceChecker, "_does_file_exist_in_bucket", + lambda *args, **kwargs: file_exists) + + @pytest.mark.parametrize( + "file_path, file_exists", + [ + pytest.param("gs://test/test", True, id="Valid URI") + ] + ) + def test_does_file_exist_in_bucket( + self, + monkeypatch, + mock_file_exist_under_uri, + file_exists: bool, + file_checker: GoogleStorageFileSourceChecker, + file_path: str + ): + """ + Test if file does really exist. + """ + file_checker.does_file_exist(file_path) + + @pytest.mark.parametrize( + "file_path, file_exists", + [ + pytest.param("gs://test/test", False) + ] + ) + def test_file_does_not_exist_in_bucket( + self, + monkeypatch, + mock_file_exist_under_uri, + file_exists: bool, + file_checker: GoogleStorageFileSourceChecker, + file_path: str + ): + """ + Test if file doesn't exist. + """ + with pytest.raises(FileSourceError): + file_checker.does_file_exist(file_path) + + @pytest.mark.parametrize( + "file_path", + [ + pytest.param("gs://test"), + pytest.param("://test"), + pytest.param("test"), + ] + ) + def test_invalid_gcs_object_uri(self, file_checker: GoogleStorageFileSourceChecker, file_path): + with pytest.raises(GCSObjectURIError): + file_checker._parse_object_uri(file_path) diff --git a/tests/plugin-unit-tests/test_manifest_processor_r3.py b/tests/plugin-unit-tests/test_manifest_processor_r3.py index 5a696615f32e6befba8c9f0d43cbefae40610a25..d7c69ebe824298b4139cb1645a2ddde5f6427d89 100644 --- a/tests/plugin-unit-tests/test_manifest_processor_r3.py +++ b/tests/plugin-unit-tests/test_manifest_processor_r3.py @@ -23,6 +23,7 @@ sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins") sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/dags") from libs.context import Context +from libs.file_source_check import FileSourceChecker, GoogleStorageFileSourceChecker from libs.exceptions import EmptyManifestError from deepdiff import DeepDiff import pytest @@ -78,12 +79,17 @@ class TestManifestProcessor: with open(conf_path) as f: conf = json.load(f) context = Context.populate(conf) + file_checker = GoogleStorageFileSourceChecker() manifest_processor = process_manifest_r3.ManifestProcessor( storage_url="", dagrun_conf=conf, - context=context + context=context, + file_source_checker=file_checker ) + monkeypatch.setattr(manifest_processor, "generate_id", lambda manifest: "test_id") + monkeypatch.setattr(file_checker, "_does_file_exist_in_bucket", + lambda *args, **kwargs: True) return manifest_processor @pytest.fixture() @@ -240,7 +246,8 @@ class TestManifestProcessor: manifest_processor = process_manifest_r3.ManifestProcessor( storage_url="", dagrun_conf=conf, - context=context + context=context, + file_source_checker=GoogleStorageFileSourceChecker() ) for manifest_part in manifest_processor.data_object["manifest"]: group_type = manifest_part["groupType"] diff --git a/tests/plugin-unit-tests/test_operators_r3.py b/tests/plugin-unit-tests/test_operators_r3.py index e184a5ee80b3e10f7d0ae189727205ed75fa42de..219cd97b33fdb2f9fe960659d7dbb4537c213a8a 100644 --- a/tests/plugin-unit-tests/test_operators_r3.py +++ b/tests/plugin-unit-tests/test_operators_r3.py @@ -37,7 +37,7 @@ from file_paths import ( from operators.process_manifest_r3 import ProcessManifestOperatorR3, SchemaValidator, \ ManifestProcessor from operators.search_record_id import SearchRecordIdOperator -from hooks.http_hooks import workflow_hook +from libs.file_source_check import FileSourceChecker, GoogleStorageFileSourceChecker from operators.update_status import UpdateStatusOperator from mock_responses import MockSearchResponse, MockWorkflowResponse @@ -72,6 +72,8 @@ class TestOperators(object): monkeypatch.setattr(SchemaValidator, "validate_manifest", lambda obj: None) monkeypatch.setattr(ManifestProcessor, "save_record", lambda obj, headers, request_data: MockStorageResponse()) + monkeypatch.setattr(GoogleStorageFileSourceChecker, "_does_file_exist_in_bucket", + lambda *args, **kwargs: True) task, context = self._create_task(ProcessManifestOperatorR3) task.pre_execute(context) task.execute(context)