Code owners
Assign users and groups as approvers for specific file changes. Learn more.
osdu-ingest-r3.py 4.33 KiB
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""DAG for R3 ingestion."""
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from libs.exceptions import NotOSDUSchemaFormatError
from operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator
from operators.process_manifest_r3 import ProcessManifestOperatorR3
from operators.update_status import UpdateStatusOperator
from operators.validate_manifest_schema import ValidateManifestSchemaOperator
BATCH_NUMBER = int(Variable.get("core.batch_count", "3"))
PROCESS_SINGLE_MANIFEST_FILE = "process_single_manifest_file_task"
PROCESS_BATCH_MANIFEST_FILE = "batch_upload"
ENSURE_INTEGRITY_TASK = "provide_manifest_integrity_task"
SINGLE_MANIFEST_FILE_FIRST_OPERATOR = "validate_manifest_schema_task"
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
"retries": 0,
"retry_delay": timedelta(seconds=30),
"trigger_rule": "none_failed",
}
workflow_name = "Osdu_ingest"
def is_batch(**context):
"""
:param context: Dag context
:return: SubDag to be executed next depending on Manifest type
"""
manifest = context["dag_run"].conf["execution_context"].get("manifest")
if isinstance(manifest, dict):
subdag = SINGLE_MANIFEST_FILE_FIRST_OPERATOR
elif isinstance(manifest, list):
subdag = PROCESS_BATCH_MANIFEST_FILE
else:
raise NotOSDUSchemaFormatError(f"Manifest must be either 'dict' or 'list'. "
f"Got {manifest}.")
return subdag
with DAG(
workflow_name,
default_args=default_args,
description="R3 manifest processing with providing integrity",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60)
) as dag:
update_status_running_op = UpdateStatusOperator(
task_id="update_status_running_task",
dag=dag
)
branch_is_batch_op = BranchPythonOperator(
task_id="check_payload_type",
dag=dag,
provide_context=True,
python_callable=is_batch
)
update_status_finished_op = UpdateStatusOperator(
task_id="update_status_finished_task",
dag=dag,
trigger_rule="all_done",
)
validate_schema_operator = ValidateManifestSchemaOperator(
task_id="validate_manifest_schema_task",
provide_context=True,
dag=dag
)
ensure_integrity_op = EnsureManifestIntegrityOperator(
task_id=ENSURE_INTEGRITY_TASK,
provide_context=True,
previous_task_id=validate_schema_operator.task_id,
dag=dag
)
process_single_manifest_file = ProcessManifestOperatorR3(
task_id=PROCESS_SINGLE_MANIFEST_FILE,
provide_context=True,
dag=dag,
previous_task_id=ensure_integrity_op.task_id
)
# Dummy operator as entry point into parallel task of batch upload
batch_upload = DummyOperator(
dag=dag,
task_id=PROCESS_BATCH_MANIFEST_FILE
)
for batch in range(0, BATCH_NUMBER):
batch_upload >> ProcessManifestOperatorR3(
task_id=f"process_manifest_task_{batch + 1}",
provide_context=True,
dag=dag,
previous_task_id=f"provide_manifest_integrity_task_{batch + 1}",
batch_number=batch + 1,
trigger_rule="none_failed_or_skipped",
) >> update_status_finished_op
update_status_running_op >> branch_is_batch_op # pylint: disable=pointless-statement
branch_is_batch_op >> batch_upload # pylint: disable=pointless-statement
branch_is_batch_op >> validate_schema_operator >> ensure_integrity_op >> process_single_manifest_file >> update_status_finished_op # pylint: disable=pointless-statement