Commit 158a0c7d authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

GONRG-3789: Open VDS Metadata

parent 02b56829
Pipeline #80442 passed with stages
in 2 minutes and 14 seconds
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from typing import List
from osdu_api.clients.storage.record_client import RecordClient
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.exceptions import FileSourceError
from osdu_ingestion.libs.utils import get_record_data_by_id, split_id
class BaseSegyConversionMetadata(abc.ABC):
def __init__(
self,
record_client: RecordClient,
payload_context: Context,
converted_file_kind: str,
acl: dict = None,
legal: dict = None,
artefact_role_id: str = None
) -> None:
"""
:param token_refresher: Record client
:type token_refresher: RecordClient
:param payload_context: Payload Context
:type payload_context: Context
:param converted_file_kind: Kind of converted file, defaults to None
:type converted_file_kind: str, optional
:param acl: ACL, defaults to None
:type acl: dict, optional
:param legal: Legal, defaults to None
:type legal: dict, optional
:param artefact_role_id: RoleID, defaults to None
:type artefact_role_id: str, optional
"""
self._record_client = record_client
self._payload_context = payload_context
self._converted_file_kind = converted_file_kind
self._acl = acl or {}
self._legal = legal or {}
self._artefact_role_id = self._populate_artefact_role_id(
artefact_role_id)
def _populate_artefact_role_id(self, artefact_role_id: str = None) -> str:
"""Populate Artefact's RoleID.
:param artefact_role_id: RoleID, defaults to None
:type artefact_role_id: str, optional
:return: Artefact's RoleID
:rtype: str
"""
default_artefact_role_id = f"{self._payload_context.data_partition_id}:reference-data--ArtefactRole:ConvertedContent:"
artefact_role_id = artefact_role_id or default_artefact_role_id
# RoleId must end with ":" if it doesn't contain a version
if not split_id(artefact_role_id).version and not artefact_role_id.endswith(":"):
return f"{artefact_role_id}:"
return artefact_role_id
def _populate_converted_file_record(
self,
file_record_id: str,
file_collection_path: str,
ancestry_file_record: dict
) -> dict:
"""Populate a new File record of the converted file
:param file_record_id: File record id
:type surrogate_key_id: str
:param file_collection_path: FileCollection path
:type file_collection_path: str
:param ancestry_file_record: ancestry FileReocrd
:type ancestry_file_record: dict
:return: record of the converted file
:rtype: dict
"""
return {
"kind": self._converted_file_kind,
"id": file_record_id,
"acl": self._acl or ancestry_file_record["acl"],
"legal": self._legal or ancestry_file_record["legal"],
"ancestry": {
"parents": [
f'{ancestry_file_record["id"]}:{ancestry_file_record["version"]}'
]
},
"data": {
"DatasetProperties": {
"FileCollectionPath": file_collection_path
}
}
}
def _populate_artefacts_field(
self,
work_product_component: dict,
file_record_id: str
) -> dict:
"""Extends Artefacts field with a new Generated File Record
:param seismic_trace_data_wpc: Work Product Component
:type seismic_trace_data_wpc: dict
:param file_record_id: File Record ID
:type file_record_id: str
:return: WPC with extended Artefacts field
:rtype: dict
"""
artefact = {
"ResourceID": f"{file_record_id}",
"ResourceKind": self._converted_file_kind,
"RoleId": self._artefact_role_id
}
artefacts = work_product_component.get("data", {}).get("Artefacts", [])
artefacts.append(artefact)
work_product_component["data"]["Artefacts"] = artefacts
return work_product_component
def get_file_collection_path(self, file_record_id: dict) -> str:
"""Get FileCollectionPath by FileRecord Id.
:param file_record_id: FileRecord Id
:type file_record_id: dict
:raises FileSourceError: Error
:return: FileCollectionPath
:rtype: str
"""
file_record = get_record_data_by_id(self._record_client, file_record_id)
try:
file_collection_path = file_record["data"]["DatasetProperties"]["FileCollectionPath"]
except (KeyError, IndexError):
raise FileSourceError("Can't get FileSource info.")
return file_collection_path
def _get_components_ids(self, work_product_id: str) -> List[str]:
"""Get WP's components
:param work_product_id: Work Product
:type work_product_id: str
:raises ValueError: Error
:return: Components Ids
:rtype: List[str]
"""
work_product_record = get_record_data_by_id(self._record_client, work_product_id)
try:
components = work_product_record["data"]["Components"]
except KeyError:
raise ValueError("WorkProduct doesn't contain Components field.")
return components
def _get_wpc_by_file_record(self, wpc_ids: List[str], file_record_id: str) -> List[dict]:
"""Filter WPC by their datasets.
:param wpc_ids: List of WPCs' Ids
:type wpc_ids: List[str]
:param file_record_id: FileRecord Id
:type file_record_id: str
:return: List of WPCs
:rtype: List[dict]
"""
wpc_records_with_file_record = []
wpc_records = [get_record_data_by_id(self._record_client, wpc_id) for wpc_id in wpc_ids]
file_record_id_with_version = split_id(file_record_id)
for wpc_record in wpc_records:
try:
if file_record_id_with_version in map(split_id, wpc_record["data"]["Datasets"]):
wpc_records_with_file_record.append(wpc_record)
except (IndexError, KeyError):
continue
return wpc_records_with_file_record
@abc.abstractmethod
def create_metadata(self) -> dict:
"""Populate conversion result FileRecord and WPC's Artefacts field.
Then, return a new Manifest with these records.
:return: Manifest
:rtype: dict
"""
raise NotImplemented
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from osdu_api.clients.storage.record_client import RecordClient
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.segy_conversion_metadata.base_metadata import \
BaseSegyConversionMetadata
from osdu_ingestion.libs.utils import (create_manifest, generate_surrogate_key,
get_record_data_by_id)
class OpenVDSMetadata(BaseSegyConversionMetadata):
"""
Generate metadata for converted Segy->OpenVDS files.
"""
DEFAULT_VDS_FILE_KIND = "osdu:wks:dataset--FileCollection.Bluware.OpenVDS:1.0.0"
def __init__(
self,
record_client: RecordClient,
payload_context: Context,
converted_file_kind: str = None,
acl: dict = None,
legal: dict = None,
artefact_role_id: str = None
) -> None:
"""
:param token_refresher: Token Refresher
:type token_refresher: TokenRefresher
:param payload_context: Payload Context
:type payload_context: Context
:param converted_file_kind: Kind of converted file, defaults to None
:type converted_file_kind: str, optional
:param acl: ACL, defaults to None
:type acl: dict, optional
:param legal: Legal, defaults to None
:type legal: dict, optional
:param artefact_role_id: RoleID, defaults to None
:type artefact_role_id: str, optional
"""
converted_file_kind = converted_file_kind or self.DEFAULT_VDS_FILE_KIND
super().__init__(
record_client=record_client,
payload_context=payload_context,
converted_file_kind=converted_file_kind,
acl=acl,
legal=legal,
artefact_role_id=artefact_role_id
)
def create_metadata(
self,
vds_sd_path: str,
work_product_id: str,
file_record_id: str
) -> dict:
"""Populate OpenVDS FileRecord and WPC's Artefacts field. And return a new Manifest.
:param vds_sd_path: OpenVDS SDMS path
:type vds_sd_path: str
:param work_product_id: WP Id
:type work_product_id: str
:param file_record_id: FileRecord Id
:type file_record_id: str
:return: Manifest
:rtype: dict
"""
segy_file_record = get_record_data_by_id(self._record_client, file_record_id)
wpc_ids = self._get_components_ids(work_product_id)
wpc_records = self._get_wpc_by_file_record(wpc_ids, file_record_id)
vds_record_id = generate_surrogate_key()
vds_record = self._populate_converted_file_record(
vds_record_id, vds_sd_path, segy_file_record)
wpc_records_with_artefacts = [
self._populate_artefacts_field(wpc, vds_record_id) for wpc in wpc_records]
manifest = create_manifest(
work_product_components=wpc_records_with_artefacts, datasets=[vds_record])
return manifest
......@@ -18,6 +18,9 @@
import dataclasses
from itertools import islice
from typing import Any, Generator, Iterable, List, TypeVar
from uuid import uuid4
from osdu_api.clients.storage.record_client import RecordClient
BatchElement = TypeVar("BatchElement")
......@@ -100,7 +103,7 @@ def split_into_batches(
"""
if not isinstance(element_sequence, Iterable):
raise TypeError(
f"Element sequence '{element_sequence}' is '{type(element_sequence)}'. "
f"Element sequence '{element_sequence}' is '{type(element_sequence)}'. "
"It must be either 'list' or 'tuple'."
)
......@@ -114,6 +117,7 @@ def split_into_batches(
yield batch
def is_surrogate_key(entity_id: str):
"""
Check if the entity's id is surrogate.
......@@ -127,3 +131,70 @@ def is_surrogate_key(entity_id: str):
else:
return False
def create_manifest(
reference_data: List[dict] = None,
master_data: List[dict] = None,
work_product: dict = None,
work_product_components: List[dict] = None,
datasets: List[dict] = None
) -> dict:
"""Create a new Manifest.
:param reference_data: reference-data, defaults to None
:type reference_data: List[dict], optional
:param master_data: master-data, defaults to None
:type master_data: List[dict], optional
:param work_product: work-product, defaults to None
:type work_product: dict, optional
:param work_product_components: work-product-components, defaults to None
:type work_product_components: List[dict], optional
:param datasets: datasets, defaults to None
:type datasets: List[dict], optional
:return: Manifest
:rtype: dict
"""
reference_data = reference_data or []
master_data = master_data or []
work_product = work_product or {}
work_product_components = work_product_components or []
datasets = datasets or []
return {
"kind": "osdu:wks:Manifest:1.0.0",
"ReferenceData": reference_data,
"MasterData": master_data,
"Data": {
"WorkProduct": work_product,
"WorkProductComponents": work_product_components,
"Datasets": datasets
}
}
def get_record_data_by_id(record_client: RecordClient, record_id: str) -> dict:
"""Get a record by its id
:param record_id: record client
:type record_id: RecordClient
:param record_id: record Id
:type record_id: str
:return: record data
:rtype: dict
"""
id_with_version = split_id(record_id)
if id_with_version.version:
record_response = record_client.get_specific_record(
id_with_version.id, id_with_version.version)
else:
record_response = record_client.get_latest_record(
id_with_version.id)
record = record_response.json()
return record
def generate_surrogate_key() -> str:
"""Generate surrogate-key
:return: surrogate-key
:rtype: str
"""
return f"surrogate-key:record-{str(uuid4())}"
{
"id": "osdu:wks:dataset--FileCollection.SEGY:test",
"version": 12345678,
"kind": "osdu:wks:dataset--FileCollection.SEGY:1.0.0",
"acl": {
"owners": [
"test"
],
"viewers": [
"test"
]
},
"legal": {
"legaltags": [
"ExampleLegalTag"
],
"otherRelevantDataCountries": [
"NO",
"US"
]
},
"data": {
"ResourceSecurityClassification": "osdu:reference-data--ResourceSecurityClassification:RESTRICTED:",
"Name": "ST0202R08-PS_PSDM_FULL_OFFSET_DEPTH.MIG_FIN.POST_Stack.3D.JS-017534",
"Description": "Volve - Seismic Trace Data - FINAL PS PSDM Stack FULL OFFSET IN DEPTH",
"TotalSize": "895367300",
"Endian": "BIG",
"DatasetProperties": {
"FileSourceInfos": [
{
"FileSource": "sd://test/test",
"PreloadFilePath": "s3://osdu-seismic-test-data/volve/seismic/st0202/stacks/ST0202R08-PS_PSDM_FULL_OFFSET_DEPTH.MIG_FIN.POST_Stack.3D.JS-017534.segy",
"Name": "ST0202R08-PS_PSDM_FULL_OFFSET_DEPTH.MIG_FIN.POST_Stack.3D.JS-017534.segy",
"FileSize": "895367300"
}
],
"FileCollectionPath": "sd://test/test",
"Checksum": "783884f1a920831400d3d06766953503"
},
"VectorHeaderMapping": [
{
"KeyName": "osdu:reference-data--HeaderKeyName:Inline:",
"WordFormat": "osdu:reference-data--WordFormatType:INT:",
"WordWidth": 4,
"Position": 189
},
{
"KeyName": "osdu:reference-data--HeaderKeyName:Crossline:",
"WordFormat": "osdu:reference-data--WordFormatType:INT:",
"WordWidth": 4,
"Position": 193
},
{
"KeyName": "osdu:reference-data--HeaderKeyName:CMPX:",
"WordFormat": "osdu:reference-data--WordFormatType:INT:",
"WordWidth": 4,
"Position": 181,
"UoM": "osdu:reference-data--UnitOfMeasure:m:",
"ScalarIndicator": "OVERRIDE",
"ScalarOverride": -100.0
},
{
"KeyName": "osdu:reference-data--HeaderKeyName:CMPY:",
"WordFormat": "osdu:reference-data--WordFormatType:INT:",
"WordWidth": 4,
"Position": 185,
"UoM": "osdu:reference-data--UnitOfMeasure:m:",
"ScalarIndicator": "OVERRIDE",
"ScalarOverride": -100.0
}
],
"SEGYRevision": "Rev 1.0"
}
}
\ No newline at end of file
{
"id": "osdu:work-product--WorkProduct:test",
"kind": "osdu:wks:work-product--WorkProduct:1.0.0",
"acl": {
"owners": [
"test"
],
"viewers": [
"test"
]
},
"legal": {
"legaltags": [
"ExampleLegalTag"
],
"otherRelevantDataCountries": [
"NO",
"US"
]
},
"data": {
"ResourceSecurityClassification": "osdu:reference-data--ResourceSecurityClassification:RESTRICTED:",
"Name": "ST0202R08-PS_PSDM_FULL_OFFSET_DEPTH.MIG_FIN.POST_Stack.3D.JS-017534",
"Description": "Volve - Seismic Trace Data - FINAL PS PSDM Stack FULL OFFSET IN DEPTH",
"Components": [
"osdu:work-product-component--SeismicTraceData:ST0202R08-depth-volume:"
]
}
}
\ No newline at end of file
......@@ -85,3 +85,7 @@ SURROGATE_MANIFEST_SEISMIC_NO_REFS_PATH= f"{DATA_PATH_PREFIX}/surrogate/manifest
FILE_GENERIC_WRONG_DATE_TIME = f"{DATA_PATH_PREFIX}/datasets/File.Generic.1.0.0_wrong_date_time.json"
SCHEMA_FILE_GENERIC = f"{DATA_PATH_PREFIX}/datasets/schema_File.Generic.1.0.0.json"
SEGY_FILE_RECORD = f"{DATA_PATH_PREFIX}/segy_records/file_collection.json"
SEGY_WPC_RECORD = f"{DATA_PATH_PREFIX}/segy_records/wpc.json"
SEGY_WP_RECORD = f"{DATA_PATH_PREFIX}/segy_records/work-product.json"
......@@ -27,12 +27,16 @@ class MockResponse(requests.Response):
)
"""
def __init__(self, status_code: http.HTTPStatus):
def __init__(self, status_code: http.HTTPStatus, json: dict = None):
super(MockResponse, self).__init__()
self._json = json
self.status_code = status_code
self.url = "Test"
self.reason = "Test"
def json(self) -> dict:
return self._json
@property
def text(self):
return None
......
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import json
from subprocess import call
from typing import List
import mock
import pytest
from mock.mock import Mock
from osdu_api.clients.storage.record_client import RecordClient
from requests.cookies import MockResponse
from file_paths import SEGY_FILE_RECORD, SEGY_WP_RECORD, SEGY_WPC_RECORD
from mock_providers import get_test_credentials
from mock_responses import MockResponse
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.refresh_token import BaseTokenRefresher
from osdu_ingestion.libs.segy_conversion_metadata.base_metadata import \
BaseSegyConversionMetadata
from osdu_ingestion.libs.segy_conversion_metadata.open_vds import \
OpenVDSMetadata
from osdu_ingestion.libs.utils import generate_surrogate_key, split_id
with open(SEGY_WP_RECORD) as f: