From e0146e3dcb2639de955369b4c4eea8035a9e54b4 Mon Sep 17 00:00:00 2001
From: yan <yan_sushchynski@epam.com>
Date: Tue, 22 Dec 2020 09:57:31 +0300
Subject: [PATCH] GONRG-1457: Stopped replacing fileSource

---
 src/dags/libs/process_manifest_r3.py                  | 10 ++++++++--
 src/plugins/operators/process_manifest_r3.py          |  3 +++
 tests/plugin-unit-tests/test_manifest_processor_r3.py |  7 +++++--
 3 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/src/dags/libs/process_manifest_r3.py b/src/dags/libs/process_manifest_r3.py
index bc24dca..3e0cf51 100644
--- a/src/dags/libs/process_manifest_r3.py
+++ b/src/dags/libs/process_manifest_r3.py
@@ -50,11 +50,13 @@ class ManifestProcessor(HeadersMixin):
         storage_url: str,
         dagrun_conf: dict,
         file_uploader: FileUploader,
+        source_file_checker: SourceFileChecker,
         token_refresher: TokenRefresher,
         context: Context
     ):
         super().__init__(context)
         self.file_uploader = file_uploader
+        self.source_file_checker = source_file_checker
         self.storage_url = storage_url
         self.data_object = copy.deepcopy(dagrun_conf)
         self.context = context
@@ -160,8 +162,12 @@ class ManifestProcessor(HeadersMixin):
         """
         records = []
         for file in manifest["Files"]:
-            record = self.upload_source_file(file)
-            record = self.populate_manifest_storage_record(record)
+            if not file["data"]["FileSource"]:
+                file = self.upload_source_file(file)
+            else:
+                self.source_file_checker.does_file_exist(file["data"]["FileSource"])
+
+            record = self.populate_manifest_storage_record(file)
             records.append(record)
         return records
 
diff --git a/src/plugins/operators/process_manifest_r3.py b/src/plugins/operators/process_manifest_r3.py
index f2ceb67..694d9e4 100644
--- a/src/plugins/operators/process_manifest_r3.py
+++ b/src/plugins/operators/process_manifest_r3.py
@@ -18,6 +18,7 @@ import logging
 from airflow.utils import apply_defaults
 from airflow.models import BaseOperator, Variable
 from libs.context import Context
+from libs.source_file_check import GCSSourceFileChecker
 from libs.upload_file import GCSFileUploader
 from libs.refresh_token import AirflowTokenRefresher
 from libs.process_manifest_r3 import ManifestProcessor
@@ -45,6 +46,7 @@ class ProcessManifestOperatorR3(BaseOperator):
         token_refresher = AirflowTokenRefresher()
         file_uploader = GCSFileUploader(self.file_service_url, token_refresher,
                                                          payload_context)
+        source_file_checker = GCSSourceFileChecker()
 
         validator = SchemaValidator(
             self.schema_service_url,
@@ -56,6 +58,7 @@ class ProcessManifestOperatorR3(BaseOperator):
             self.storage_url,
             context["dag_run"].conf,
             file_uploader,
+            source_file_checker,
             token_refresher,
             payload_context,
         )
diff --git a/tests/plugin-unit-tests/test_manifest_processor_r3.py b/tests/plugin-unit-tests/test_manifest_processor_r3.py
index 9bc83af..589c0e8 100644
--- a/tests/plugin-unit-tests/test_manifest_processor_r3.py
+++ b/tests/plugin-unit-tests/test_manifest_processor_r3.py
@@ -24,6 +24,7 @@ sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/dags")
 
 from libs.context import Context
 from libs.upload_file import GCSFileUploader
+from libs.source_file_check import GCSSourceFileChecker
 from libs.refresh_token import AirflowTokenRefresher
 from libs.exceptions import EmptyManifestError
 from deepdiff import DeepDiff
@@ -87,7 +88,8 @@ class TestManifestProcessor:
             dagrun_conf=conf,
             token_refresher=token_refresher,
             context=context,
-            file_uploader = file_uploader
+            file_uploader = file_uploader,
+            source_file_checker=GCSSourceFileChecker(),
         )
 
         monkeypatch.setattr(manifest_processor, "generate_id", lambda manifest: "test_id")
@@ -251,7 +253,8 @@ class TestManifestProcessor:
             dagrun_conf=conf,
             token_refresher=AirflowTokenRefresher(),
             context=context,
-            file_uploader=GCSFileUploader("test", AirflowTokenRefresher(), context)
+            file_uploader=GCSFileUploader("test", AirflowTokenRefresher(), context),
+            source_file_checker=GCSSourceFileChecker()
         )
         for manifest_part in manifest_processor.data_object["manifest"]:
             group_type = manifest_part["groupType"]
-- 
GitLab