Commit 3d23030e authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) 🚩
Browse files

Merge branch 'GONRG-3789_VDS_Metadata' into 'master'

GONRG-3789: Open VDS Metadata

See merge request !16
parents b5a2a06a 3097b2bf
Pipeline #82040 passed with stages
in 15 minutes and 23 seconds
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from osdu_api.clients.storage.record_client import RecordClient
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.refresh_token import AirflowTokenRefresher
from osdu_ingestion.libs.segy_conversion_metadata.open_vds import OpenVDSMetadata
logger = logging.getLogger()
class KubernetesPodSegyToOpenVDSOperator(KubernetesPodOperator):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.token_refresher = AirflowTokenRefresher()
def pre_execute(self, context):
conf = context["dag_run"].conf
execution_context = conf["execution_context"]
payload_context = Context.populate(execution_context)
record_client = RecordClient(token_refresher=self.token_refresher)
self.segy_vds_metadata = OpenVDSMetadata(record_client, payload_context)
persistent_id = execution_context['persistent_id']
id_token = execution_context['id_token']
vds_url = execution_context['vds_url']
segy_url = self.segy_vds_metadata.get_file_collection_path(execution_context['file_record_id'])
self.arguments = [
f'--url', f'{vds_url}',
f'--url-connection', f'sdtoken={id_token}',
f'--persistentID', f'{persistent_id}',
f'--input-connection', f'sdtoken={id_token}',
f"{segy_url}"
]
def execute(self, context):
super().execute(context)
logging.debug("SEGY converted to VDS.")
conf = context["dag_run"].conf
execution_context = conf["execution_context"]
persistent_id = execution_context['persistent_id']
vds_url = execution_context['vds_url']
work_product_id = execution_context['work_product_id']
file_record_id = execution_context['file_record_id']
open_vds_manifest = self.segy_vds_metadata.create_metadata(
f"{vds_url}/{persistent_id}",
work_product_id,
file_record_id
)
logging.debug(f"New manifest is {open_vds_manifest}")
context["ti"].xcom_push(key="return_value", value={"manifest": open_vds_manifest})
......@@ -26,6 +26,7 @@ from osdu_ingestion.libs.handle_file import FileHandler
import mock_providers
import requests
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.models import TaskInstance
from file_paths import (MANIFEST_BATCH_WELLBORE_VALID_PATH, MANIFEST_GENERIC_SCHEMA_PATH,
MANIFEST_WELLBORE_VALID_PATH)
......@@ -34,8 +35,11 @@ from osdu_airflow.operators.ensure_manifest_integrity import EnsureManifestInteg
from osdu_airflow.operators.mixins.ReceivingContextMixin import ReceivingContextMixin
from osdu_airflow.operators.process_manifest_r3 import (ManifestProcessor,
ProcessManifestOperatorR3, SchemaValidator)
from osdu_airflow.operators.segy_open_vds_conversion import KubernetesPodSegyToOpenVDSOperator
from osdu_airflow.operators.update_status import UpdateStatusOperator
from osdu_airflow.operators.validate_manifest_schema import ValidateManifestSchemaOperator
from osdu_api.clients.storage.record_client import RecordClient
from osdu_ingestion.libs.segy_conversion_metadata.open_vds import OpenVDSMetadata
CustomOperator = TypeVar("CustomOperator")
......@@ -175,3 +179,38 @@ class TestOperators(object):
task, context = self._create_task(EnsureManifestIntegrityOperator)
task.pre_execute(context)
task.execute(context)
def test_segy_vds_conversion(self, monkeypatch):
conf = {
"execution_context": {
"Payload": {
"AppKey": "test-app",
"data-partition-id": "{{data-partition-id}}"
},
"vds_url": "{{test_vds_url}}",
"file_record_id": "{{file-collection-id}}:",
"work_product_id": "{{work-product-id}}:",
"persistent_id": "{{vds_id}}",
"id_token": "{{id_token}}"
}
}
monkeypatch.setattr(KubernetesPodOperator, "execute",
lambda *args, **kwargs: "test")
monkeypatch.setattr(OpenVDSMetadata, "get_file_collection_path", lambda *args, **kwargs: "sd://test/test")
monkeypatch.setattr(OpenVDSMetadata, "create_metadata", lambda *args, **kwargs: {"test": "test"})
dag = DAG(dag_id='Segy_vds_conversion', start_date=datetime.now())
task: CustomOperator = KubernetesPodSegyToOpenVDSOperator(
dag=dag,
image="test",
name="test",
task_id='anytask'
)
ti = TaskInstance(task=task, execution_date=datetime.now())
context = ti.get_template_context()
context["dag_run"] = MockDagRun(conf)
task.pre_execute(context)
task.execute(context)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment