Commit 81eea841 authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

GONRG-3789: Open VDS Metadata

parent 74bf1b4c
Pipeline #79748 passed with stages
in 1 minute and 44 seconds
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.refresh_token import AirflowTokenRefresher
from osdu_ingestion.libs.segy_conversion import SegyConversion
logger = logging.getLogger()
class KubernetesPodSegyOpenVDSOperator(KubernetesPodOperator):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.token_refresher = AirflowTokenRefresher()
def pre_execute(self, context):
conf = context["dag_run"].conf
execution_context = conf["execution_context"]
payload_context = Context.populate(execution_context)
self.segy_conversion = SegyConversion(self.token_refresher, payload_context)
persistent_id = execution_context['persistent_id']
id_token = execution_context['id_token']
vds_url = execution_context['vds_url']
segy_url = self.segy_conversion.get_file_collection_path(execution_context['file_record_id'])
self.arguments = [
f'--url', f'{vds_url}',
f'--url-connection', f'sdtoken={id_token}',
f'--persistentID', f'{persistent_id}',
f'--input-connection', f'sdtoken={id_token}',
f"{segy_url}"
]
def execute(self, context):
super().execute(context)
logging.debug("SEGY converted to VDS.")
conf = context["dag_run"].conf
execution_context = conf["execution_context"]
persistent_id = execution_context['persistent_id']
vds_url = execution_context['vds_url']
work_product_id = execution_context['work_product_id']
file_record_id = execution_context['file_record_id']
open_vds_manifest = self.segy_conversion.create_openvds_metadata(
f"{vds_url}/{persistent_id}",
work_product_id,
file_record_id
)
logging.debug(f"New manifest is {open_vds_manifest}")
context["ti"].xcom_push(key="return_value", value={"manifest": open_vds_manifest})
{
"id": "osdu:wks:dataset--FileCollection.SEGY:test",
"version": 12345678,
"kind": "osdu:wks:dataset--FileCollection.SEGY:1.0.0",
"acl": {
"owners": [
"test"
],
"viewers": [
"test"
]
},
"legal": {
"legaltags": [
"ExampleLegalTag"
],
"otherRelevantDataCountries": [
"NO",
"US"
]
},
"data": {
"ResourceSecurityClassification": "osdu:reference-data--ResourceSecurityClassification:RESTRICTED:",
"Name": "ST0202R08-PS_PSDM_FULL_OFFSET_DEPTH.MIG_FIN.POST_Stack.3D.JS-017534",
"Description": "Volve - Seismic Trace Data - FINAL PS PSDM Stack FULL OFFSET IN DEPTH",
"TotalSize": "895367300",
"Endian": "BIG",
"DatasetProperties": {
"FileSourceInfos": [
{
"FileSource": "sd://test/test",
"PreloadFilePath": "s3://osdu-seismic-test-data/volve/seismic/st0202/stacks/ST0202R08-PS_PSDM_FULL_OFFSET_DEPTH.MIG_FIN.POST_Stack.3D.JS-017534.segy",
"Name": "ST0202R08-PS_PSDM_FULL_OFFSET_DEPTH.MIG_FIN.POST_Stack.3D.JS-017534.segy",
"FileSize": "895367300"
}
],
"Checksum": "783884f1a920831400d3d06766953503"
},
"VectorHeaderMapping": [
{
"KeyName": "osdu:reference-data--HeaderKeyName:Inline:",
"WordFormat": "osdu:reference-data--WordFormatType:INT:",
"WordWidth": 4,
"Position": 189
},
{
"KeyName": "osdu:reference-data--HeaderKeyName:Crossline:",
"WordFormat": "osdu:reference-data--WordFormatType:INT:",
"WordWidth": 4,
"Position": 193
},
{
"KeyName": "osdu:reference-data--HeaderKeyName:CMPX:",
"WordFormat": "osdu:reference-data--WordFormatType:INT:",
"WordWidth": 4,
"Position": 181,
"UoM": "osdu:reference-data--UnitOfMeasure:m:",
"ScalarIndicator": "OVERRIDE",
"ScalarOverride": -100.0
},
{
"KeyName": "osdu:reference-data--HeaderKeyName:CMPY:",
"WordFormat": "osdu:reference-data--WordFormatType:INT:",
"WordWidth": 4,
"Position": 185,
"UoM": "osdu:reference-data--UnitOfMeasure:m:",
"ScalarIndicator": "OVERRIDE",
"ScalarOverride": -100.0
}
],
"SEGYRevision": "Rev 1.0"
}
}
\ No newline at end of file
{
"id": "osdu:work-product--WorkProduct:test",
"kind": "osdu:wks:work-product--WorkProduct:1.0.0",
"acl": {
"owners": [
"test"
],
"viewers": [
"test"
]
},
"legal": {
"legaltags": [
"ExampleLegalTag"
],
"otherRelevantDataCountries": [
"NO",
"US"
]
},
"data": {
"ResourceSecurityClassification": "osdu:reference-data--ResourceSecurityClassification:RESTRICTED:",
"Name": "ST0202R08-PS_PSDM_FULL_OFFSET_DEPTH.MIG_FIN.POST_Stack.3D.JS-017534",
"Description": "Volve - Seismic Trace Data - FINAL PS PSDM Stack FULL OFFSET IN DEPTH",
"Components": [
"osdu:work-product-component--SeismicTraceData:ST0202R08-depth-volume:"
]
}
}
\ No newline at end of file
......@@ -26,6 +26,7 @@ from osdu_ingestion.libs.handle_file import FileHandler
import mock_providers
import requests
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.models import TaskInstance
from file_paths import (MANIFEST_BATCH_WELLBORE_VALID_PATH, MANIFEST_GENERIC_SCHEMA_PATH,
MANIFEST_WELLBORE_VALID_PATH)
......@@ -34,8 +35,10 @@ from osdu_airflow.operators.ensure_manifest_integrity import EnsureManifestInteg
from osdu_airflow.operators.mixins.ReceivingContextMixin import ReceivingContextMixin
from osdu_airflow.operators.process_manifest_r3 import (ManifestProcessor,
ProcessManifestOperatorR3, SchemaValidator)
from osdu_airflow.operators.segy_open_vds_conversion import KubernetesPodSegyOpenVDSOperator
from osdu_airflow.operators.update_status import UpdateStatusOperator
from osdu_airflow.operators.validate_manifest_schema import ValidateManifestSchemaOperator
from osdu_ingestion.libs.segy_conversion import SegyConversion
CustomOperator = TypeVar("CustomOperator")
......@@ -175,3 +178,38 @@ class TestOperators(object):
task, context = self._create_task(EnsureManifestIntegrityOperator)
task.pre_execute(context)
task.execute(context)
def test_segy_vds_conversion(self, monkeypatch):
conf = {
"execution_context": {
"Payload": {
"AppKey": "test-app",
"data-partition-id": "{{data-partition-id}}"
},
"vds_url": "{{test_vds_url}}",
"file_record_id": "{{file-collection-id}}:",
"work_product_id": "{{work-product-id}}:",
"persistent_id": "{{vds_id}}",
"id_token": "{{id_token}}"
}
}
monkeypatch.setattr(KubernetesPodOperator, "execute",
lambda *args, **kwargs: "test")
monkeypatch.setattr(SegyConversion, "get_file_collection_path", lambda *args, **kwargs: "sd://test/test")
monkeypatch.setattr(SegyConversion, "create_openvds_metadata", lambda *args, **kwargs: {"test": "test"})
dag = DAG(dag_id='Segy_vds_conversion', start_date=datetime.now())
task: CustomOperator = KubernetesPodSegyOpenVDSOperator(
dag=dag,
image="test",
name="test",
task_id='anytask'
)
ti = TaskInstance(task=task, execution_date=datetime.now())
context = ti.get_template_context()
context["dag_run"] = MockDagRun(conf)
task.pre_execute(context)
task.execute(context)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment