Skip to content
Snippets Groups Projects
Commit e0146e3d authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM) Committed by Siarhei Khaletski (EPAM)
Browse files

GONRG-1457: Stopped replacing fileSource

parent e3da8d52
No related branches found
No related tags found
1 merge request!7R3 ingestion updates
......@@ -50,11 +50,13 @@ class ManifestProcessor(HeadersMixin):
storage_url: str,
dagrun_conf: dict,
file_uploader: FileUploader,
source_file_checker: SourceFileChecker,
token_refresher: TokenRefresher,
context: Context
):
super().__init__(context)
self.file_uploader = file_uploader
self.source_file_checker = source_file_checker
self.storage_url = storage_url
self.data_object = copy.deepcopy(dagrun_conf)
self.context = context
......@@ -160,8 +162,12 @@ class ManifestProcessor(HeadersMixin):
"""
records = []
for file in manifest["Files"]:
record = self.upload_source_file(file)
record = self.populate_manifest_storage_record(record)
if not file["data"]["FileSource"]:
file = self.upload_source_file(file)
else:
self.source_file_checker.does_file_exist(file["data"]["FileSource"])
record = self.populate_manifest_storage_record(file)
records.append(record)
return records
......
......@@ -18,6 +18,7 @@ import logging
from airflow.utils import apply_defaults
from airflow.models import BaseOperator, Variable
from libs.context import Context
from libs.source_file_check import GCSSourceFileChecker
from libs.upload_file import GCSFileUploader
from libs.refresh_token import AirflowTokenRefresher
from libs.process_manifest_r3 import ManifestProcessor
......@@ -45,6 +46,7 @@ class ProcessManifestOperatorR3(BaseOperator):
token_refresher = AirflowTokenRefresher()
file_uploader = GCSFileUploader(self.file_service_url, token_refresher,
payload_context)
source_file_checker = GCSSourceFileChecker()
validator = SchemaValidator(
self.schema_service_url,
......@@ -56,6 +58,7 @@ class ProcessManifestOperatorR3(BaseOperator):
self.storage_url,
context["dag_run"].conf,
file_uploader,
source_file_checker,
token_refresher,
payload_context,
)
......
......@@ -24,6 +24,7 @@ sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/dags")
from libs.context import Context
from libs.upload_file import GCSFileUploader
from libs.source_file_check import GCSSourceFileChecker
from libs.refresh_token import AirflowTokenRefresher
from libs.exceptions import EmptyManifestError
from deepdiff import DeepDiff
......@@ -87,7 +88,8 @@ class TestManifestProcessor:
dagrun_conf=conf,
token_refresher=token_refresher,
context=context,
file_uploader = file_uploader
file_uploader = file_uploader,
source_file_checker=GCSSourceFileChecker(),
)
monkeypatch.setattr(manifest_processor, "generate_id", lambda manifest: "test_id")
......@@ -251,7 +253,8 @@ class TestManifestProcessor:
dagrun_conf=conf,
token_refresher=AirflowTokenRefresher(),
context=context,
file_uploader=GCSFileUploader("test", AirflowTokenRefresher(), context)
file_uploader=GCSFileUploader("test", AirflowTokenRefresher(), context),
source_file_checker=GCSSourceFileChecker()
)
for manifest_part in manifest_processor.data_object["manifest"]:
group_type = manifest_part["groupType"]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment