Skip to content
Snippets Groups Projects
Yan Sushchynski (EPAM)'s avatar
Yan Sushchynski (EPAM) authored
Update devops/gcp/override-stages.yml

See merge request !62
b5c06847
History

Segy to Vds Conversion

Contents

Introduction

Airflow DAG for transformation from SEGY to OpenVDS

GCP

The dags/segy_to_vds_ssdms_conversion_dag.py DAG stores output to Seismic DDMS.

The dags/segy_to_vds_conversion_dag.py DAG stores output to GCS bucket.

DAG File Compilation

GCP

The dags/segy_to_vds_ssdms_conversion_dag.py DAG file contains a number of placeholders {| |}, specifying to put proper values there, e.g.

DAG_NAME = "{| DAG_NAME |}"
DOCKER_IMAGE = "{| DOCKER_IMAGE |}"
NAMESPACE = "{| K8S_NAMESPACE |}"
...

These values have to be populated before the DAG is deployed. To bootstrap the DAG file cloud providers provide their specific bootstrap scripts.

DAG Release and Distribution

GCP

GCP DAG version is distributed as a generic package. During OSDU release the bootstrapped DAG file is published to the Gitlab Generic Registy.

DAG Deployment

GCP

For GCP, the bootstrapped DAG only uses Airflow variables to populate these values at runtime.

Before deploying this DAG, make sure the following Airlfow variables are set:

Name Default Description
core__service__seismic__url - OSDU Seismic DDMS URL
gcp__data_partition_id - Data partition ID
gcp__image__segy_to_vds_converter - SEGY to VDS Converter Image
gcp__vds_ingestion_request_memory 1Gi Request Memory
gcp__vds_ingestion_request_cpu 200m Request CPU
gcp__vds_ingestion_limit_memory 1Gi Memory Limit
gcp__vds_ingestion_limit_cpu 1000m CPU Limit

Note: Environment variables can be used as well. Pelase see Airflow Documnetation

Requirements

The Airflow DAG has dependencies from osdu-airflow-lib package for common operators and backward compatibility.

Install it in Airflow Environment:

pip install 'osdu-airflow' --extra-index-url=https://community.opengroup.org/api/v4/projects/668/packages/pypi/simple

Testing

Registering a Workflow

curl --location --request POST 'https://<base_url>/api/workflow/v1/workflow' \
--header 'Content-Type: application/json' \
--header 'data-partition-id: opendes' \
--header 'Authorization: <Bearer Token>' \
--data-raw '{
    "description": "SegY To OpenVDS Conversion",
    "registrationInstructions": {
        "dagName": "openvds_import"
    },
    "workflowName": "openvds_import"
}'

Note: THe WorkflowName should be the Dag name registered with Airflow

Expected Output

{
    "workflowId": "opendes:openvds_import",
    "workflowName": "openvds_import",
    "description": "SegY To OpenVDS Conversion",
    "createdBy": "admin@testing.com",
    "creationTimestamp": 1617297515622,
    "version": 1617297515622
}

Triggering Workflow

Configuring the input, url connection and target location, see the

curl --location --request POST 'https://<base_url>/api/workflow/v1/workflow/openvds_import/workflowRun' \
--header 'Content-Type: application/json' \
--header 'data-partition-id: opendes' \
--header 'Authorization: <Bearer Token>' \
--data-raw '{
    "executionContext": {
        "url_connection":"Region=us-east-1;AccessKeyId=XXX;SecretKey=XXX;SessionToken=XXX",
        "input_connection":"Region=us-east-1;AccessKeyId=XXX;SecretKey=XXX;SessionToken=XXX",
        "segy_file":"s3://aws-osdu-sample-data/sample-data/seismic/st0202/stacks/ST0202R08_PS_PSDM_RAW_PP_TIME.MIG_RAW.POST_STACK.3D.JS-017534.segy",
        "url":"s3://aws-osdu-sample-data/"
    }
}
'

Expected output

{
    "workflowId": "opendes:openvds_import",
    "runId": "3e73eb98-69d3-48c9-bf1e-ab967d2dba91",
    "startTimeStamp": 1617297632023,
    "status": "submitted",
    "submittedBy": "admin@testing.com"
}

Example of Dag Run Success

*** Reading remote log from s3://osdu-wanzhiji-ingest-s3airflowbucketdev-11h61ldwb6zv2/logs/openvds_import/OPENVDS/2021-04-01T16:47:25.432823+00:00/1.log.
[2021-04-01 16:47:33,245] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: openvds_import.OPENVDS 2021-04-01T16:47:25.432823+00:00 [queued]>
[2021-04-01 16:47:33,268] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: openvds_import.OPENVDS 2021-04-01T16:47:25.432823+00:00 [queued]>
[2021-04-01 16:47:33,268] {taskinstance.py:880} INFO - 
--------------------------------------------------------------------------------
[2021-04-01 16:47:33,268] {taskinstance.py:881} INFO - Starting attempt 1 of 1
[2021-04-01 16:47:33,268] {taskinstance.py:882} INFO - 
--------------------------------------------------------------------------------
[2021-04-01 16:47:33,284] {taskinstance.py:901} INFO - Executing <Task(KubernetesPodOperator): OPENVDS> on 2021-04-01T16:47:25.432823+00:00
[2021-04-01 16:47:33,287] {standard_task_runner.py:54} INFO - Started process 216 to run task
[2021-04-01 16:47:33,319] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'openvds_import', 'OPENVDS', '2021-04-01T16:47:25.432823+00:00', '--job_id', '117', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/openvds/openvds.py', '--cfg_path', '/tmp/tmprpvc0je2']
[2021-04-01 16:47:33,320] {standard_task_runner.py:78} INFO - Job 117: Subtask OPENVDS
[2021-04-01 16:47:33,383] {logging_mixin.py:112} INFO - Running <TaskInstance: openvds_import.OPENVDS 2021-04-01T16:47:25.432823+00:00 [running]> on host 67371a44eba7
[2021-04-01 16:47:34,205] {logging_mixin.py:112} WARNING - /usr/local/lib/python3.8/site-packages/airflow/kubernetes/pod_launcher.py:309: DeprecationWarning: Using `airflow.contrib.kubernetes.pod.Pod` is deprecated. Please use `k8s.V1Pod`.
  dummy_pod = Pod(
[2021-04-01 16:47:34,205] {logging_mixin.py:112} WARNING - /usr/local/lib/python3.8/site-packages/airflow/kubernetes/pod_launcher.py:77: DeprecationWarning: Using `airflow.contrib.kubernetes.pod.Pod` is deprecated. Please use `k8s.V1Pod` instead.
  pod = self._mutate_pod_backcompat(pod)
[2021-04-01 16:47:34,272] {pod_launcher.py:171} INFO - Event: openvds-9d1ddefd4b5f4268b50af564cde10795 had an event of type Pending
[2021-04-01 16:47:34,272] {pod_launcher.py:139} WARNING - Pod not yet started: openvds-9d1ddefd4b5f4268b50af564cde10795
[2021-04-01 16:47:35,283] {pod_launcher.py:171} INFO - Event: openvds-9d1ddefd4b5f4268b50af564cde10795 had an event of type Pending
[2021-04-01 16:47:35,284] {pod_launcher.py:139} WARNING - Pod not yet started: openvds-9d1ddefd4b5f4268b50af564cde10795
[2021-04-01 16:47:36,298] {pod_launcher.py:171} INFO - Event: openvds-9d1ddefd4b5f4268b50af564cde10795 had an event of type Running
[2021-04-01 16:47:48,547] {pod_launcher.py:156} INFO - b'\n'
[2021-04-01 16:47:48,547] {pod_launcher.py:156} INFO - b'Importing into: s3://aws-osdu-sample-data/515D714B13377CAD\n'
[2021-04-01 16:47:48,547] {pod_launcher.py:156} INFO - b'\n'
[2021-04-01 16:47:48,547] {pod_launcher.py:156} INFO - b'\r100% done processing s3://aws-osdu-sample-data/515D714B13377CAD.\n'
[2021-04-01 16:47:49,573] {pod_launcher.py:171} INFO - Event: openvds-9d1ddefd4b5f4268b50af564cde10795 had an event of type Succeeded
[2021-04-01 16:47:49,573] {pod_launcher.py:287} INFO - Event with job id openvds-9d1ddefd4b5f4268b50af564cde10795 Succeeded
[2021-04-01 16:47:49,584] {pod_launcher.py:171} INFO - Event: openvds-9d1ddefd4b5f4268b50af564cde10795 had an event of type Succeeded
[2021-04-01 16:47:49,584] {pod_launcher.py:287} INFO - Event with job id openvds-9d1ddefd4b5f4268b50af564cde10795 Succeeded
[2021-04-01 16:47:49,625] {taskinstance.py:1057} INFO - Marking task as SUCCESS.dag_id=openvds_import, task_id=OPENVDS, execution_date=20210401T164725, start_date=20210401T164733, end_date=20210401T164749
[2021-04-01 16:47:53,332] {local_task_job.py:102} INFO - Task exited with return code 0

Example of Dag Run Failed

*** Reading remote log from s3://osdu-wanzhiji-ingest-s3airflowbucketdev-11h61ldwb6zv2/logs/openvds_import/OPENVDS/2021-04-01T16:33:10.782063+00:00/1.log.
[2021-04-01 16:33:16,145] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: openvds_import.OPENVDS 2021-04-01T16:33:10.782063+00:00 [queued]>
[2021-04-01 16:33:16,169] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: openvds_import.OPENVDS 2021-04-01T16:33:10.782063+00:00 [queued]>
[2021-04-01 16:33:16,169] {taskinstance.py:880} INFO - 
--------------------------------------------------------------------------------
[2021-04-01 16:33:16,169] {taskinstance.py:881} INFO - Starting attempt 1 of 1
[2021-04-01 16:33:16,169] {taskinstance.py:882} INFO - 
--------------------------------------------------------------------------------
[2021-04-01 16:33:16,184] {taskinstance.py:901} INFO - Executing <Task(KubernetesPodOperator): OPENVDS> on 2021-04-01T16:33:10.782063+00:00
[2021-04-01 16:33:16,187] {standard_task_runner.py:54} INFO - Started process 178 to run task
[2021-04-01 16:33:16,219] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'openvds_import', 'OPENVDS', '2021-04-01T16:33:10.782063+00:00', '--job_id', '112', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/openvds/openvds.py', '--cfg_path', '/tmp/tmpl63qcsu6']
[2021-04-01 16:33:16,219] {standard_task_runner.py:78} INFO - Job 112: Subtask OPENVDS
[2021-04-01 16:33:16,280] {logging_mixin.py:112} INFO - Running <TaskInstance: openvds_import.OPENVDS 2021-04-01T16:33:10.782063+00:00 [running]> on host 67371a44eba7
[2021-04-01 16:33:17,260] {logging_mixin.py:112} WARNING - /usr/local/lib/python3.8/site-packages/airflow/kubernetes/pod_launcher.py:309: DeprecationWarning: Using `airflow.contrib.kubernetes.pod.Pod` is deprecated. Please use `k8s.V1Pod`.
  dummy_pod = Pod(
[2021-04-01 16:33:17,261] {logging_mixin.py:112} WARNING - /usr/local/lib/python3.8/site-packages/airflow/kubernetes/pod_launcher.py:77: DeprecationWarning: Using `airflow.contrib.kubernetes.pod.Pod` is deprecated. Please use `k8s.V1Pod` instead.
  pod = self._mutate_pod_backcompat(pod)
[2021-04-01 16:33:17,522] {pod_launcher.py:171} INFO - Event: openvds-b46f116bd78346fdb88c86444edd448d had an event of type Pending
[2021-04-01 16:33:17,522] {pod_launcher.py:139} WARNING - Pod not yet started: openvds-b46f116bd78346fdb88c86444edd448d
[2021-04-01 16:33:18,534] {pod_launcher.py:171} INFO - Event: openvds-b46f116bd78346fdb88c86444edd448d had an event of type Pending
[2021-04-01 16:33:18,535] {pod_launcher.py:139} WARNING - Pod not yet started: openvds-b46f116bd78346fdb88c86444edd448d
[2021-04-01 16:33:19,545] {pod_launcher.py:171} INFO - Event: openvds-b46f116bd78346fdb88c86444edd448d had an event of type Pending
[2021-04-01 16:33:19,546] {pod_launcher.py:139} WARNING - Pod not yet started: openvds-b46f116bd78346fdb88c86444edd448d
[2021-04-01 16:33:20,555] {pod_launcher.py:171} INFO - Event: openvds-b46f116bd78346fdb88c86444edd448d had an event of type Pending
[2021-04-01 16:33:20,555] {pod_launcher.py:139} WARNING - Pod not yet started: openvds-b46f116bd78346fdb88c86444edd448d
[2021-04-01 16:33:21,565] {pod_launcher.py:171} INFO - Event: openvds-b46f116bd78346fdb88c86444edd448d had an event of type Pending
[2021-04-01 16:33:21,565] {pod_launcher.py:139} WARNING - Pod not yet started: openvds-b46f116bd78346fdb88c86444edd448d
[2021-04-01 16:33:22,575] {pod_launcher.py:171} INFO - Event: openvds-b46f116bd78346fdb88c86444edd448d had an event of type Pending
[2021-04-01 16:33:22,575] {pod_launcher.py:139} WARNING - Pod not yet started: openvds-b46f116bd78346fdb88c86444edd448d
[2021-04-01 16:33:23,586] {pod_launcher.py:171} INFO - Event: openvds-b46f116bd78346fdb88c86444edd448d had an event of type Pending
[2021-04-01 16:33:23,586] {pod_launcher.py:139} WARNING - Pod not yet started: openvds-b46f116bd78346fdb88c86444edd448d
[2021-04-01 16:33:24,597] {pod_launcher.py:171} INFO - Event: openvds-b46f116bd78346fdb88c86444edd448d had an event of type Failed
[2021-04-01 16:33:24,597] {pod_launcher.py:284} INFO - Event with job id openvds-b46f116bd78346fdb88c86444edd448d Failed
[2021-04-01 16:33:24,631] {pod_launcher.py:156} INFO - b'Could not open:  - File::open \x00No such file or directory\n'
[2021-04-01 16:33:24,655] {pod_launcher.py:171} INFO - Event: openvds-b46f116bd78346fdb88c86444edd448d had an event of type Failed
[2021-04-01 16:33:24,655] {pod_launcher.py:284} INFO - Event with job id openvds-b46f116bd78346fdb88c86444edd448d Failed
[2021-04-01 16:33:24,664] {pod_launcher.py:171} INFO - Event: openvds-b46f116bd78346fdb88c86444edd448d had an event of type Failed
[2021-04-01 16:33:24,665] {pod_launcher.py:284} INFO - Event with job id openvds-b46f116bd78346fdb88c86444edd448d Failed
[2021-04-01 16:33:24,704] {taskinstance.py:1150} ERROR - Pod Launching failed: Pod returned a failure: failed
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 308, in execute
    raise AirflowException(
airflow.exceptions.AirflowException: Pod returned a failure: failed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.8/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 312, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod returned a failure: failed
[2021-04-01 16:33:24,709] {taskinstance.py:1187} INFO - Marking task as FAILED. dag_id=openvds_import, task_id=OPENVDS, execution_date=20210401T163310, start_date=20210401T163316, end_date=20210401T163324
[2021-04-01 16:33:26,147] {local_task_job.py:102} INFO - Task exited with return code 1

End-to-End Testing

Deployed dags/segy_to_vds_ssdms_conversion_dag.py and dags/segy_to_vds_conversion_dag.py DAGs functionality can be tested by e2e testing collection.