Commit 2a62d8fa authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

GONRG-3789: Open VDS Metadata

parent 02b56829
Pipeline #79436 passed with stages
in 2 minutes and 33 seconds
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from uuid import uuid4
from osdu_api.auth.authorization import TokenRefresher
from osdu_api.clients.storage.record_client import RecordClient
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.exceptions import FileSourceError
from osdu_ingestion.libs.utils import split_id
class SegyConversion:
def __init__(
self,
token_refresher: TokenRefresher,
payload_context: Context
) -> None:
self._record_client = RecordClient(token_refresher=token_refresher)
self._payload_context = payload_context
@staticmethod
def _populate_openvds_file_record(
surrogate_key_id: str,
file_collection_path: str,
ancestry_file_record: dict
) -> dict:
"""Populate a new OpenVDS record
:param surrogate_key_id: surrogate-key
:type surrogate_key_id: str
:param file_collection_path: FileCollection path
:type file_collection_path: str
:param ancestry_file_record: Ancestry FileReocrd
:type ancestry_file_record: dict
:return: OpenVDS record
:rtype: dict
"""
return {
"kind": "osdu:wks:dataset--FileCollection.Bluware.OpenVDS:1.0.0",
"id": surrogate_key_id,
"acl": ancestry_file_record["acl"],
"legal": ancestry_file_record["legal"],
"ancestry": {
"parents": [
f'{ancestry_file_record["id"]}:{ancestry_file_record["version"]}'
]
},
"data": {
"DatasetProperties": {
"FileCollectionPath": file_collection_path
}
}
}
def _populate_open_vds_artefacts_field(
self,
seismic_trace_data_wpc: dict,
file_record_id: str
) -> dict:
data_partition_id = self._payload_context.data_partition_id
openvds_artefact = {
"ResourceID": f"{file_record_id}",
"ResourceKind": f"{data_partition_id}:wks:dataset--FileCollection.Bluware.OpenVDS:1.0.0",
"RoleId": f"{data_partition_id}:reference-data--ArtefactRole:ConvertedContent:"
}
artefacts = seismic_trace_data_wpc.get("data", {}).get("Artefacts", [])
artefacts.append(openvds_artefact)
seismic_trace_data_wpc["data"]["Artefacts"] = artefacts
return seismic_trace_data_wpc
def _get_record_by_id(self, record_id) -> dict:
"""[summary]
:param record_id: [description]
:type record_id: [type]
:return: [description]
:rtype: [type]
"""
id_with_version = split_id(record_id)
if id_with_version.version:
record_response = self._record_client.get_specific_record(id_with_version.id, id_with_version.version)
else:
record_response = self._record_client.get_latest_record(id_with_version.id)
record_response.raise_for_status()
record = record_response.json()
return record
def get_file_source(self, file_record_id: dict) -> str:
"""[summary]
:param file_record_id: [description]
:type file_record_id: dict
:raises FileSourceError: [description]
:return: [description]
:rtype: str
"""
file_record = self._get_record_by_id(file_record_id)
try:
file_source = file_record["data"]["DatasetProperties"]["FileSourceInfos"][0]["FileSource"]
except (KeyError, IndexError):
raise FileSourceError("Can't get FileSource info.")
return file_source
def _get_components_ids(self, work_product_id: str) -> List[str]:
"""[summary]
:param work_product_id: [description]
:type work_product_id: str
:raises ValueError: [description]
:return: [description]
:rtype: List[str]
"""
work_product_record = self._get_record_by_id(work_product_id)
try:
components = work_product_record["data"]["Components"]
except KeyError:
raise ValueError("WorkProduct doesn't contain Components field.")
return components
def _get_wpc_by_file_record(self, wpc_ids: str, file_record_id: str) -> List[dict]:
"""[summary]
:param wpc_ids: [description]
:type wpc_ids: str
:param file_record_id: [description]
:type file_record_id: str
:return: [description]
:rtype: List[dict]
"""
wpc_records_with_file_record = []
wpc_records = [self._get_record_by_id(wpc_id) for wpc_id in wpc_ids]
for wpc_record in wpc_records:
try:
if file_record_id in wpc_record["data"]["Datasets"]:
wpc_records_with_file_record.append(wpc_record)
except (IndexError, KeyError):
continue
return wpc_records_with_file_record
def _create_manifest(
self,
reference_data: List[dict] = None,
master_data: List[dict] = None,
work_product: dict = None,
work_product_components: List[dict] = None,
datasets: List[dict] = None
) -> dict:
"""Create new Manifest to pass further
:param reference_data: reference-data, defaults to None
:type reference_data: List[dict], optional
:param master_data: master-data, defaults to None
:type master_data: List[dict], optional
:param work_product: work-product, defaults to None
:type work_product: dict, optional
:param work_product_components: work-product-components, defaults to None
:type work_product_components: List[dict], optional
:param datasets: datasets, defaults to None
:type datasets: List[dict], optional
:return: Manifest
:rtype: dict
"""
reference_data = reference_data or []
master_data = master_data or []
work_product = work_product or {}
work_product_components = work_product_components or []
datasets = datasets or []
return {
"kind": "osdu:wks:Manifest:1.0.0",
"ReferenceData": reference_data,
"MasterData": master_data,
"Data": {
"WorkProduct": work_product,
"WorkProductComponents": work_product_components,
"Datasets": datasets
}
}
@staticmethod
def _generate_surrogate_key() -> str:
"""Generate surrogate-key
:return: surrogate-key
:rtype: str
"""
return f"surrogate-key:record-{str(uuid4())}"
def create_openvds_metadata(
self,
vds_sd_path: str,
work_product_id: str,
file_record_id: str
) -> dict:
"""Populate OpenVDS FileRecord and WPC's Artefacts field. And return a new Manifest.
:param vds_sd_path: OpenVDS SSDMS path
:type vds_sd_path: str
:param work_product_id: WP Id
:type work_product_id: str
:param file_record_id: FileRecord Id
:type file_record_id: str
:return: Manifest
:rtype: dict
"""
segy_file_record = self._get_record_by_id(file_record_id)
wpc_ids = self._get_components_ids(work_product_id)
wpc_records = self._get_wpc_by_file_record(wpc_ids, file_record_id)
vds_record_id = self._generate_surrogate_key()
vds_record = self._populate_openvds_file_record(vds_record_id, vds_sd_path, segy_file_record)
wpc_records_with_artefacts = [
self._populate_open_vds_artefacts_field(wpc, vds_record_id) for wpc in wpc_records]
manifest = self._create_manifest(work_product_components=wpc_records_with_artefacts, datasets=[vds_record])
return manifest
{
"id": "osdu:wks:dataset--FileCollection.SEGY:test",
"version": 12345678,
"kind": "osdu:wks:dataset--FileCollection.SEGY:1.0.0",
"acl": {
"owners": [
"test"
],
"viewers": [
"test"
]
},
"legal": {
"legaltags": [
"ExampleLegalTag"
],
"otherRelevantDataCountries": [
"NO",
"US"
]
},
"data": {
"ResourceSecurityClassification": "osdu:reference-data--ResourceSecurityClassification:RESTRICTED:",
"Name": "ST0202R08-PS_PSDM_FULL_OFFSET_DEPTH.MIG_FIN.POST_Stack.3D.JS-017534",
"Description": "Volve - Seismic Trace Data - FINAL PS PSDM Stack FULL OFFSET IN DEPTH",
"TotalSize": "895367300",
"Endian": "BIG",
"DatasetProperties": {
"FileSourceInfos": [
{
"FileSource": "sd://test/test",
"PreloadFilePath": "s3://osdu-seismic-test-data/volve/seismic/st0202/stacks/ST0202R08-PS_PSDM_FULL_OFFSET_DEPTH.MIG_FIN.POST_Stack.3D.JS-017534.segy",
"Name": "ST0202R08-PS_PSDM_FULL_OFFSET_DEPTH.MIG_FIN.POST_Stack.3D.JS-017534.segy",
"FileSize": "895367300"
}
],
"Checksum": "783884f1a920831400d3d06766953503"
},
"VectorHeaderMapping": [
{
"KeyName": "osdu:reference-data--HeaderKeyName:Inline:",
"WordFormat": "osdu:reference-data--WordFormatType:INT:",
"WordWidth": 4,
"Position": 189
},
{
"KeyName": "osdu:reference-data--HeaderKeyName:Crossline:",
"WordFormat": "osdu:reference-data--WordFormatType:INT:",
"WordWidth": 4,
"Position": 193
},
{
"KeyName": "osdu:reference-data--HeaderKeyName:CMPX:",
"WordFormat": "osdu:reference-data--WordFormatType:INT:",
"WordWidth": 4,
"Position": 181,
"UoM": "osdu:reference-data--UnitOfMeasure:m:",
"ScalarIndicator": "OVERRIDE",
"ScalarOverride": -100.0
},
{
"KeyName": "osdu:reference-data--HeaderKeyName:CMPY:",
"WordFormat": "osdu:reference-data--WordFormatType:INT:",
"WordWidth": 4,
"Position": 185,
"UoM": "osdu:reference-data--UnitOfMeasure:m:",
"ScalarIndicator": "OVERRIDE",
"ScalarOverride": -100.0
}
],
"SEGYRevision": "Rev 1.0"
}
}
\ No newline at end of file
{
"id": "osdu:work-product--WorkProduct:test",
"kind": "osdu:wks:work-product--WorkProduct:1.0.0",
"acl": {
"owners": [
"test"
],
"viewers": [
"test"
]
},
"legal": {
"legaltags": [
"ExampleLegalTag"
],
"otherRelevantDataCountries": [
"NO",
"US"
]
},
"data": {
"ResourceSecurityClassification": "osdu:reference-data--ResourceSecurityClassification:RESTRICTED:",
"Name": "ST0202R08-PS_PSDM_FULL_OFFSET_DEPTH.MIG_FIN.POST_Stack.3D.JS-017534",
"Description": "Volve - Seismic Trace Data - FINAL PS PSDM Stack FULL OFFSET IN DEPTH",
"Components": [
"osdu:work-product-component--SeismicTraceData:ST0202R08-depth-volume:"
]
}
}
\ No newline at end of file
......@@ -85,3 +85,7 @@ SURROGATE_MANIFEST_SEISMIC_NO_REFS_PATH= f"{DATA_PATH_PREFIX}/surrogate/manifest
FILE_GENERIC_WRONG_DATE_TIME = f"{DATA_PATH_PREFIX}/datasets/File.Generic.1.0.0_wrong_date_time.json"
SCHEMA_FILE_GENERIC = f"{DATA_PATH_PREFIX}/datasets/schema_File.Generic.1.0.0.json"
SEGY_FILE_RECORD = f"{DATA_PATH_PREFIX}/segy_records/file_collection.json"
SEGY_WPC_RECORD = f"{DATA_PATH_PREFIX}/segy_records/wpc.json"
SEGY_WP_RECORD = f"{DATA_PATH_PREFIX}/segy_records/work-product.json"
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import json
from subprocess import call
from typing import List
import mock
import pytest
from osdu_api.clients.storage.record_client import RecordClient
from requests import Response
from responses import assert_call_count
from file_paths import SEGY_FILE_RECORD, SEGY_WP_RECORD, SEGY_WPC_RECORD
from mock_providers import get_test_credentials
from mock_responses import MockSearchResponseForRecords
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.refresh_token import BaseTokenRefresher
from osdu_ingestion.libs.segy_conversion import SegyConversion
from osdu_ingestion.libs.utils import (EntityId, is_surrogate_key, split_id,
split_into_batches)
with open(SEGY_WP_RECORD) as f:
SEGY_WP_DATA = json.load(f)
with open(SEGY_WPC_RECORD) as f:
SEGY_WPC_DATA = json.load(f)
with open(SEGY_FILE_RECORD) as f:
SEGY_FILE_DATA = json.load(f)
class TestSegyConversion:
def _mock_return_record(path):
with open(path) as f:
return json.load(f)
@pytest.fixture
def segy_converter(self):
token_refresher = BaseTokenRefresher()
context = Context("osdu", "osdu")
return SegyConversion(token_refresher, context)
def test_generate_surrogate_key(self, segy_converter):
assert is_surrogate_key(segy_converter._generate_surrogate_key())
@mock.patch.object(RecordClient, "get_specific_record", return_value=MockSearchResponseForRecords([]))
@mock.patch.object(RecordClient, "get_latest_record", return_value=MockSearchResponseForRecords([]))
def test_get_record_by_id_latest_version(self, mock_get_last_record, mock_get_version_record):
id = "test:work-product:test:"
token_refresher = BaseTokenRefresher()
context = Context("osdu", "osdu")
segy_converter = SegyConversion(token_refresher, context)
segy_converter._get_record_by_id(id)
mock_get_last_record.assert_called()
mock_get_version_record.assert_not_called()
@mock.patch.object(RecordClient, "get_specific_record", return_value=MockSearchResponseForRecords([]))
@mock.patch.object(RecordClient, "get_latest_record", return_value=MockSearchResponseForRecords([]))
def test_get_record_by_id_specific_version(self, mock_get_last_record, mock_get_version_record):
id = "test:work-product:test:123456"
token_refresher = BaseTokenRefresher()
context = Context("osdu", "osdu")
segy_converter = SegyConversion(token_refresher, context)
segy_converter._get_record_by_id(id)
mock_get_last_record.assert_not_called()
mock_get_version_record.assert_called()
@mock.patch.object(
SegyConversion,
"_get_record_by_id",
return_value=SEGY_FILE_DATA
)
def test_get_filesource(self, mock_get_record, segy_converter: SegyConversion):
file_source = segy_converter.get_file_source("test")
assert file_source == SEGY_FILE_DATA["data"]["DatasetProperties"]["FileSourceInfos"][0]["FileSource"]
@mock.patch.object(
SegyConversion,
"_get_record_by_id",
side_effect=[SEGY_FILE_DATA, SEGY_WP_DATA, SEGY_WPC_DATA]
)
def test_create_openvds_metadata(self, mock_get_record, segy_converter: SegyConversion):
manifest = segy_converter.create_openvds_metadata(
"test",
"osdu:work-product--WorkProduct:test",
"osdu:wks:dataset--FileCollection.SEGY:test"
)
assert mock_get_record.call_count == 3
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment