diff --git a/src/dags/libs/process_manifest_r3.py b/src/dags/libs/process_manifest_r3.py index bc24dcaab367ea0e6bce22efa1fd9fde823641a9..3e0cf5157777e199282191f264b3a54d7632646d 100644 --- a/src/dags/libs/process_manifest_r3.py +++ b/src/dags/libs/process_manifest_r3.py @@ -50,11 +50,13 @@ class ManifestProcessor(HeadersMixin): storage_url: str, dagrun_conf: dict, file_uploader: FileUploader, + source_file_checker: SourceFileChecker, token_refresher: TokenRefresher, context: Context ): super().__init__(context) self.file_uploader = file_uploader + self.source_file_checker = source_file_checker self.storage_url = storage_url self.data_object = copy.deepcopy(dagrun_conf) self.context = context @@ -160,8 +162,12 @@ class ManifestProcessor(HeadersMixin): """ records = [] for file in manifest["Files"]: - record = self.upload_source_file(file) - record = self.populate_manifest_storage_record(record) + if not file["data"]["FileSource"]: + file = self.upload_source_file(file) + else: + self.source_file_checker.does_file_exist(file["data"]["FileSource"]) + + record = self.populate_manifest_storage_record(file) records.append(record) return records diff --git a/src/plugins/operators/process_manifest_r3.py b/src/plugins/operators/process_manifest_r3.py index f2ceb67d527fe4c60764e39f090273837bb1f6ef..694d9e4b93b277c984647c413dfa24f837965439 100644 --- a/src/plugins/operators/process_manifest_r3.py +++ b/src/plugins/operators/process_manifest_r3.py @@ -18,6 +18,7 @@ import logging from airflow.utils import apply_defaults from airflow.models import BaseOperator, Variable from libs.context import Context +from libs.source_file_check import GCSSourceFileChecker from libs.upload_file import GCSFileUploader from libs.refresh_token import AirflowTokenRefresher from libs.process_manifest_r3 import ManifestProcessor @@ -45,6 +46,7 @@ class ProcessManifestOperatorR3(BaseOperator): token_refresher = AirflowTokenRefresher() file_uploader = GCSFileUploader(self.file_service_url, token_refresher, payload_context) + source_file_checker = GCSSourceFileChecker() validator = SchemaValidator( self.schema_service_url, @@ -56,6 +58,7 @@ class ProcessManifestOperatorR3(BaseOperator): self.storage_url, context["dag_run"].conf, file_uploader, + source_file_checker, token_refresher, payload_context, ) diff --git a/tests/plugin-unit-tests/test_manifest_processor_r3.py b/tests/plugin-unit-tests/test_manifest_processor_r3.py index 9bc83af9bcb1f8e5aa865ba194e56fcffdb68c8e..589c0e8b84fe62e199c40b42a2be6261f3df8f42 100644 --- a/tests/plugin-unit-tests/test_manifest_processor_r3.py +++ b/tests/plugin-unit-tests/test_manifest_processor_r3.py @@ -24,6 +24,7 @@ sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/dags") from libs.context import Context from libs.upload_file import GCSFileUploader +from libs.source_file_check import GCSSourceFileChecker from libs.refresh_token import AirflowTokenRefresher from libs.exceptions import EmptyManifestError from deepdiff import DeepDiff @@ -87,7 +88,8 @@ class TestManifestProcessor: dagrun_conf=conf, token_refresher=token_refresher, context=context, - file_uploader = file_uploader + file_uploader = file_uploader, + source_file_checker=GCSSourceFileChecker(), ) monkeypatch.setattr(manifest_processor, "generate_id", lambda manifest: "test_id") @@ -251,7 +253,8 @@ class TestManifestProcessor: dagrun_conf=conf, token_refresher=AirflowTokenRefresher(), context=context, - file_uploader=GCSFileUploader("test", AirflowTokenRefresher(), context) + file_uploader=GCSFileUploader("test", AirflowTokenRefresher(), context), + source_file_checker=GCSSourceFileChecker() ) for manifest_part in manifest_processor.data_object["manifest"]: group_type = manifest_part["groupType"]