witsml_parser_dag.py 3.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#  Copyright 2020 Google LLC
#  Copyright 2020 EPAM Systems
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

16
"""Energistics XML ingest."""
17

18
import os
19
20
21
from datetime import timedelta

import airflow
22
import yaml
23
from airflow import DAG
24
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
25
from osdu_airflow.backward_compatibility.default_args import update_default_args
26
27
28
29
from osdu_airflow.operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator
from osdu_airflow.operators.process_manifest_r3 import ProcessManifestOperatorR3
from osdu_airflow.operators.validate_manifest_schema import ValidateManifestSchemaOperator
from osdu_airflow.operators.update_status import UpdateStatusOperator
30

31
32
33
34
35
PROCESS_SINGLE_MANIFEST_FILE = "process_single_manifest_file_task"
PROCESS_BATCH_MANIFEST_FILE = "batch_upload"
ENSURE_INTEGRITY_TASK = "provide_manifest_integrity_task"
SINGLE_MANIFEST_FILE_FIRST_OPERATOR = "validate_manifest_schema_task"

36
with open(os.environ["OSDU_API_CONFIG"]) as f:
37
38
39
40
    k8s_configs = yaml.safe_load(f)

energistics_k8s_configs = k8s_configs["energistics"]["energistics_witsml_parser_k8s_operator"]

41
default_args = {
42
    "name": "Energistics_xml_ingest",
43
44
45
46
47
48
49
50
    "owner": "energistics",
    "start_date": airflow.utils.dates.days_ago(0),
    "retries": 0,
    "retry_delay": timedelta(minutes=50),
    "trigger_rule": "none_failed",

}

51
52
default_args = update_default_args(default_args)

53
dag = DAG(
54
    "Energistics_xml_ingest",
55
56
57
58
59
60
61
62
63
64
65
66
67
68
    default_args=default_args,
    description="witsml parser dag",
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=60)
)

update_status_running_op = UpdateStatusOperator(
    task_id="update_status_running_task",
    dag=dag
)

update_status_finished_op = UpdateStatusOperator(
    task_id="update_status_finished_task",
    dag=dag,
69
    trigger_rule="all_done",
70
71
)

72
73
74
75
76
77
78
79
80
81
82
83
process_energistics_op = KubernetesPodOperator(
    namespace="default",
    task_id="witsml_parser_task",
    name="witsml_parser_task",
    do_xcom_push=True,
    cmds=[
        "sh",
        "-c",
        "mkdir -p /airflow/xcom/; "
        "python main.py"
        " --context '{{ dag_run.conf['execution_context'] | tojson }}'"
        " --file_service {{ var.value.core__service__file__host }}"
84
        " --out /airflow/xcom/return.json",
85
86
87
88
89
90
91
    ],
    is_delete_operator_pod=True,
    image_pull_policy="Always",
    get_logs=True,
    startup_timeout_seconds=300,
    dag=dag,
    **energistics_k8s_configs
92
93
)

94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
validate_schema_operator = ValidateManifestSchemaOperator(
    task_id="validate_manifest_schema_task",
    dag=dag,
    previous_task_id=process_energistics_op.task_id,
    trigger_rule="none_failed_or_skipped"
)

ensure_integrity_op = EnsureManifestIntegrityOperator(
    task_id=ENSURE_INTEGRITY_TASK,
    dag=dag,
    previous_task_id=validate_schema_operator.task_id,
    trigger_rule="none_failed_or_skipped"
)

process_single_manifest_file = ProcessManifestOperatorR3(
    task_id=PROCESS_SINGLE_MANIFEST_FILE,
    dag=dag,
    previous_task_id=ensure_integrity_op.task_id,
    trigger_rule="none_failed_or_skipped"
)
114

115
update_status_running_op >> \
116
117
118
119
120
process_energistics_op >> \
validate_schema_operator >> \
ensure_integrity_op >> \
process_single_manifest_file >> \
update_status_finished_op