Commit ed1819aa authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

GONRG-3789: Open VDS Metadata

parent 0b115bdb
Pipeline #82049 failed with stages
in 4 minutes and 42 seconds
......@@ -187,9 +187,15 @@ python sdutil unlock sd://<tenant>/<subproject>/<path>/<file_name> --idtoken=$I
`sd://<tenant>/<subproject>/<some_path>/<file_name>` is URI to address your dataset. With this URI you will work in the next steps.
### Ingest the WorkProduct of the Segy-file
Ingest the corresponding Manifest with using Manifest-based-ingestion. The `sd-path` must be set as a value of `data.DatasetsProperties.FileCollectionPath` of the dataset--FileCollection.SEGY record.
Then, you can use the Ids of the File and WorkProduct records for the further conversion.
### Start Segy -> OpenVDS conversion workflow
After you uploaded the file on Seismic Store, you can start the conversion workflow:
After you uploaded the file on Seismic Store and created the metadata of the file, you can start the conversion workflow:
```shell
curl --location --request POST '<workflow_host>/v1/workflow/Segy_to_vds_conversion_sdms/workflowRun' \
......@@ -205,21 +211,26 @@ curl --location --request POST '<workflow_host>/v1/workflow/Segy_to_vds_conversi
"vds_url": "sd://<tenant>/<subproject>/<path>",
"persistent_id": "<unique name of vds conversion>",
"id_token": <id_token>
"segy_url": "sd://<tenant>/<subproject>/<path>/<segy_name>"
"work_product_id": "<work-product-id>"
"file_record_id": <vds-file-record-id>
}
```
After the conversion, a new OpenVDS FileRecord will be created with the `sd-path` to the OpenVDS collection in it. Also, the SeismicTraceData record will be updated with `Artefacts` field with the reference to the OpenVDS file.
The following fields:
- `vds_url` - the part of the OpenVDS dataset Seismic Store URI consisting of `tenant`, `subproject`, and `path`;
- `persistent_id` - unique ID of the dataset, can be considered the dataset's name;
- `segy_url` - Segy-file Seismic Store URI.
- `file_record_id` - Segy-file metadata with Seismic Store URI.
- `work_product_id` - Work product id with WPC that have references to the FIle Record
The full Seismic Store URI of OpenVDS dataset will look like `sd://<tenant>/<subproject>/<path>/<persistent_id>`.
To verify that OpenVDS was created successfully:
To verify that OpenVDS collection was created successfully:
```shell
python sdutil stat sd://<tenant>/<subproject>/<path>/<persistent_id> --idtoken=$ID_TOKEN
......
# These packages need to be installed in AF Env.
--extra-index-url=https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple
osdu-api~=0.12.0
osdu-api~=0.13.0.dev1
--extra-index-url=https://community.opengroup.org/api/v4/projects/668/packages/pypi/simple/
osdu-airflow~=0.12.0
osdu-airflow~=0.13.0.dev1
--extra-index-url https://community.opengroup.org/api/v4/projects/823/packages/pypi/simple
osdu-ingestion~=0.12.0
osdu-ingestion~=0.13.0.dev1
......@@ -19,10 +19,13 @@ from datetime import timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import \
KubernetesPodOperator
from osdu_airflow.backward_compatibility.default_args import \
update_default_args
from osdu_airflow.operators.process_manifest_r3 import \
ProcessManifestOperatorR3
from osdu_airflow.operators.segy_open_vds_conversion import \
KubernetesPodSegyToOpenVDSOperator
from osdu_airflow.operators.update_status import UpdateStatusOperator
default_args = {
......@@ -42,11 +45,6 @@ if not K8S_POD_KWARGS:
K8S_POD_KWARGS = {}
seismic_store_url = "{| SEISMIC_STORE_URL |}"
persistent_id = "{{ dag_run.conf['execution_context']['persistent_id'] }}"
vds_url = "{{ dag_run.conf['execution_context']['vds_url'] }}"
segy_url = "{{ dag_run.conf['execution_context']['segy_url'] }}"
id_token = "{{ dag_run.conf['execution_context']['id_token'] }}"
dag = DAG(
dag_name,
default_args=default_args,
......@@ -60,7 +58,7 @@ with dag:
task_id="update_status_running",
)
segy_to_vds = KubernetesPodOperator(
segy_to_vds = KubernetesPodSegyToOpenVDSOperator(
task_id='segy_to_vds_ssdms_conversion',
name='segy_vds_conversion',
env_vars={
......@@ -68,22 +66,22 @@ with dag:
"SD_SVC_API_KEY": "test"
},
cmds=['SEGYImport'],
arguments=[
f'--url', f'{vds_url}',
f'--url-connection', f'sdtoken={id_token}',
f'--persistentID', f'{persistent_id}',
f'--input-connection', f'sdtoken={id_token}',
f"{segy_url}"
],
namespace=k8s_namespace,
image=docker_image,
is_delete_operator_pod=True,
trigger_rule="none_failed_or_skipped",
**K8S_POD_KWARGS
)
process_single_manifest_file = ProcessManifestOperatorR3(
task_id="process_single_manifest_file_task",
previous_task_id=segy_to_vds.task_id,
trigger_rule="none_failed_or_skipped"
)
update_status_finished = UpdateStatusOperator(
task_id="update_status_finished",
trigger_rule="all_done"
)
update_status_running >> segy_to_vds >> update_status_finished # pylint: disable=pointless-statement
update_status_running >> segy_to_vds >> process_single_manifest_file >> update_status_finished # pylint: disable=pointless-statement
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment