Commit 717fcdfc authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) 🚩
Browse files

Merge branch 'fix/GONRG_3041_frame_of_reference_issue_fix' into 'master'

GONRG-3041 frame of reference issue fix

Closes osdu/platform/data-flow/ingestion/ingestion-dags#77

See merge request !27
parents 987cff77 dba76bdc
Pipeline #59913 passed with stages
in 1 minute and 51 seconds
......@@ -15,7 +15,6 @@
"""Provides Manifest Processor."""
import copy
import json
import logging
from typing import List
......@@ -46,13 +45,6 @@ TIMEOUT = 1
class ManifestProcessor(HeadersMixin):
"""Class to process WP, Master and Reference data."""
RECORD_TEMPLATE = {
"legal": {},
"acl": {},
"kind": "",
"data": {}
}
def __init__(
self,
storage_url: str,
......@@ -103,49 +95,11 @@ class ManifestProcessor(HeadersMixin):
logger.error(f"Unhandled exception while uploading {file_path}: {e}")
return file_record
def populate_manifest_storage_record(self, manifest: dict) -> dict:
"""Create a record from manifest to store it in Storage service.
:param manifest: The manifest to populate from
:type manifest: dict
:return: The populated record
:rtype: dict
"""
record = copy.deepcopy(self.RECORD_TEMPLATE)
manifest = self._delete_surrogate_key(manifest)
if manifest.get("id"):
record["id"] = manifest["id"]
record["kind"] = manifest.pop("kind")
record["legal"] = manifest.pop("legal")
record["acl"] = manifest.pop("acl")
record["data"] = manifest.pop("data")
return record
def _delete_surrogate_key(self, entity: dict) -> dict:
if "surrogate-key:" in entity.get("id", ""):
del entity["id"]
return entity
def _populate_file_storage_record(self, manifest: dict) -> dict:
"""Create a record from file manifest to store it via File service.
:param manifest: file manifest
:type manifest: dict
:return: file record
:rtype: dict
"""
record = copy.deepcopy(self.RECORD_TEMPLATE)
manifest = self._delete_surrogate_key(manifest)
if manifest.get("id"):
record["id"] = manifest["id"]
record["kind"] = manifest.pop("kind")
record["legal"] = manifest.pop("legal")
record["acl"] = manifest.pop("acl")
record["data"] = manifest.pop("data")
# TODO(python-team) verify that no data goes under data.
# then update just -> record["data"].update(manifest)
return record
@staticmethod
def _validate_storage_response(response_dict: dict):
"""Validate Storage service response."""
......@@ -206,7 +160,7 @@ class ManifestProcessor(HeadersMixin):
file_location = self.file_handler.get_file_staging_location(file_source)
self.source_file_checker.does_file_exist(file_location)
record = self._populate_file_storage_record(file_record)
record = self._delete_surrogate_key(file_record)
records.append(record)
return records
......@@ -224,7 +178,7 @@ class ManifestProcessor(HeadersMixin):
raise EmptyManifestError
for manifest_record in manifest_records:
populated_manifest_records.append(
self.populate_manifest_storage_record(manifest_record.entity))
self._delete_surrogate_key(manifest_record.entity))
save_manifests_response = self.save_record_to_storage(
self.request_headers, request_data=populated_manifest_records)
record_ids.extend(save_manifests_response.json()["recordIds"])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment