Skip to content
Snippets Groups Projects
Commit e62dc4ee authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

Merge branch 'GONRG-3789_VDS_Metadata' into 'master'

GONRG-3789: Open VDS Metadata

See merge request !31
parents 0b115bdb ed1819aa
No related branches found
No related tags found
3 merge requests!41Draft: Merge latest oVDS changes from 'master' to OSDU-SEGY-VDS-Azure-M8-Master,!40Draft: Merge Master latest changes to Azure,!31GONRG-3789: Open VDS Metadata
Pipeline #82053 passed with warnings
...@@ -187,9 +187,15 @@ python sdutil unlock sd://<tenant>/<subproject>/<path>/<file_name> --idtoken=$I ...@@ -187,9 +187,15 @@ python sdutil unlock sd://<tenant>/<subproject>/<path>/<file_name> --idtoken=$I
`sd://<tenant>/<subproject>/<some_path>/<file_name>` is URI to address your dataset. With this URI you will work in the next steps. `sd://<tenant>/<subproject>/<some_path>/<file_name>` is URI to address your dataset. With this URI you will work in the next steps.
### Ingest the WorkProduct of the Segy-file
Ingest the corresponding Manifest with using Manifest-based-ingestion. The `sd-path` must be set as a value of `data.DatasetsProperties.FileCollectionPath` of the dataset--FileCollection.SEGY record.
Then, you can use the Ids of the File and WorkProduct records for the further conversion.
### Start Segy -> OpenVDS conversion workflow ### Start Segy -> OpenVDS conversion workflow
After you uploaded the file on Seismic Store, you can start the conversion workflow: After you uploaded the file on Seismic Store and created the metadata of the file, you can start the conversion workflow:
```shell ```shell
curl --location --request POST '<workflow_host>/v1/workflow/Segy_to_vds_conversion_sdms/workflowRun' \ curl --location --request POST '<workflow_host>/v1/workflow/Segy_to_vds_conversion_sdms/workflowRun' \
...@@ -205,21 +211,26 @@ curl --location --request POST '<workflow_host>/v1/workflow/Segy_to_vds_conversi ...@@ -205,21 +211,26 @@ curl --location --request POST '<workflow_host>/v1/workflow/Segy_to_vds_conversi
"vds_url": "sd://<tenant>/<subproject>/<path>", "vds_url": "sd://<tenant>/<subproject>/<path>",
"persistent_id": "<unique name of vds conversion>", "persistent_id": "<unique name of vds conversion>",
"id_token": <id_token> "id_token": <id_token>
"segy_url": "sd://<tenant>/<subproject>/<path>/<segy_name>" "work_product_id": "<work-product-id>"
"file_record_id": <vds-file-record-id>
} }
``` ```
After the conversion, a new OpenVDS FileRecord will be created with the `sd-path` to the OpenVDS collection in it. Also, the SeismicTraceData record will be updated with `Artefacts` field with the reference to the OpenVDS file.
The following fields: The following fields:
- `vds_url` - the part of the OpenVDS dataset Seismic Store URI consisting of `tenant`, `subproject`, and `path`; - `vds_url` - the part of the OpenVDS dataset Seismic Store URI consisting of `tenant`, `subproject`, and `path`;
- `persistent_id` - unique ID of the dataset, can be considered the dataset's name; - `persistent_id` - unique ID of the dataset, can be considered the dataset's name;
- `segy_url` - Segy-file Seismic Store URI. - `file_record_id` - Segy-file metadata with Seismic Store URI.
- `work_product_id` - Work product id with WPC that have references to the FIle Record
The full Seismic Store URI of OpenVDS dataset will look like `sd://<tenant>/<subproject>/<path>/<persistent_id>`. The full Seismic Store URI of OpenVDS dataset will look like `sd://<tenant>/<subproject>/<path>/<persistent_id>`.
To verify that OpenVDS was created successfully: To verify that OpenVDS collection was created successfully:
```shell ```shell
python sdutil stat sd://<tenant>/<subproject>/<path>/<persistent_id> --idtoken=$ID_TOKEN python sdutil stat sd://<tenant>/<subproject>/<path>/<persistent_id> --idtoken=$ID_TOKEN
......
# These packages need to be installed in AF Env. # These packages need to be installed in AF Env.
--extra-index-url=https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple --extra-index-url=https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple
osdu-api~=0.12.0 osdu-api~=0.13.0.dev1
--extra-index-url=https://community.opengroup.org/api/v4/projects/668/packages/pypi/simple/ --extra-index-url=https://community.opengroup.org/api/v4/projects/668/packages/pypi/simple/
osdu-airflow~=0.12.0 osdu-airflow~=0.13.0.dev1
--extra-index-url https://community.opengroup.org/api/v4/projects/823/packages/pypi/simple --extra-index-url https://community.opengroup.org/api/v4/projects/823/packages/pypi/simple
osdu-ingestion~=0.12.0 osdu-ingestion~=0.13.0.dev1
...@@ -19,10 +19,13 @@ from datetime import timedelta ...@@ -19,10 +19,13 @@ from datetime import timedelta
import airflow import airflow
from airflow import DAG from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import \
KubernetesPodOperator
from osdu_airflow.backward_compatibility.default_args import \ from osdu_airflow.backward_compatibility.default_args import \
update_default_args update_default_args
from osdu_airflow.operators.process_manifest_r3 import \
ProcessManifestOperatorR3
from osdu_airflow.operators.segy_open_vds_conversion import \
KubernetesPodSegyToOpenVDSOperator
from osdu_airflow.operators.update_status import UpdateStatusOperator from osdu_airflow.operators.update_status import UpdateStatusOperator
default_args = { default_args = {
...@@ -42,11 +45,6 @@ if not K8S_POD_KWARGS: ...@@ -42,11 +45,6 @@ if not K8S_POD_KWARGS:
K8S_POD_KWARGS = {} K8S_POD_KWARGS = {}
seismic_store_url = "{| SEISMIC_STORE_URL |}" seismic_store_url = "{| SEISMIC_STORE_URL |}"
persistent_id = "{{ dag_run.conf['execution_context']['persistent_id'] }}"
vds_url = "{{ dag_run.conf['execution_context']['vds_url'] }}"
segy_url = "{{ dag_run.conf['execution_context']['segy_url'] }}"
id_token = "{{ dag_run.conf['execution_context']['id_token'] }}"
dag = DAG( dag = DAG(
dag_name, dag_name,
default_args=default_args, default_args=default_args,
...@@ -60,7 +58,7 @@ with dag: ...@@ -60,7 +58,7 @@ with dag:
task_id="update_status_running", task_id="update_status_running",
) )
segy_to_vds = KubernetesPodOperator( segy_to_vds = KubernetesPodSegyToOpenVDSOperator(
task_id='segy_to_vds_ssdms_conversion', task_id='segy_to_vds_ssdms_conversion',
name='segy_vds_conversion', name='segy_vds_conversion',
env_vars={ env_vars={
...@@ -68,22 +66,22 @@ with dag: ...@@ -68,22 +66,22 @@ with dag:
"SD_SVC_API_KEY": "test" "SD_SVC_API_KEY": "test"
}, },
cmds=['SEGYImport'], cmds=['SEGYImport'],
arguments=[
f'--url', f'{vds_url}',
f'--url-connection', f'sdtoken={id_token}',
f'--persistentID', f'{persistent_id}',
f'--input-connection', f'sdtoken={id_token}',
f"{segy_url}"
],
namespace=k8s_namespace, namespace=k8s_namespace,
image=docker_image, image=docker_image,
is_delete_operator_pod=True, is_delete_operator_pod=True,
trigger_rule="none_failed_or_skipped",
**K8S_POD_KWARGS **K8S_POD_KWARGS
) )
process_single_manifest_file = ProcessManifestOperatorR3(
task_id="process_single_manifest_file_task",
previous_task_id=segy_to_vds.task_id,
trigger_rule="none_failed_or_skipped"
)
update_status_finished = UpdateStatusOperator( update_status_finished = UpdateStatusOperator(
task_id="update_status_finished", task_id="update_status_finished",
trigger_rule="all_done" trigger_rule="all_done"
) )
update_status_running >> segy_to_vds >> update_status_finished # pylint: disable=pointless-statement update_status_running >> segy_to_vds >> process_single_manifest_file >> update_status_finished # pylint: disable=pointless-statement
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment