Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
osdu-ingest-r3.py 4.33 KiB
#  Copyright 2020 Google LLC
#  Copyright 2020 EPAM Systems
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

"""DAG for R3 ingestion."""

from datetime import timedelta

import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from libs.exceptions import NotOSDUSchemaFormatError
from operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator
from operators.process_manifest_r3 import ProcessManifestOperatorR3
from operators.update_status import UpdateStatusOperator
from operators.validate_manifest_schema import ValidateManifestSchemaOperator

BATCH_NUMBER = int(Variable.get("core.batch_count", "3"))
PROCESS_SINGLE_MANIFEST_FILE = "process_single_manifest_file_task"
PROCESS_BATCH_MANIFEST_FILE = "batch_upload"
ENSURE_INTEGRITY_TASK = "provide_manifest_integrity_task"
SINGLE_MANIFEST_FILE_FIRST_OPERATOR = "validate_manifest_schema_task"

default_args = {
    "start_date": airflow.utils.dates.days_ago(0),
    "retries": 0,
    "retry_delay": timedelta(seconds=30),
    "trigger_rule": "none_failed",
}

workflow_name = "Osdu_ingest"


def is_batch(**context):
    """
    :param context: Dag context
    :return: SubDag to be executed next depending on Manifest type
    """
    manifest = context["dag_run"].conf["execution_context"].get("manifest")

    if isinstance(manifest, dict):
        subdag = SINGLE_MANIFEST_FILE_FIRST_OPERATOR
    elif isinstance(manifest, list):
        subdag = PROCESS_BATCH_MANIFEST_FILE
    else:
        raise NotOSDUSchemaFormatError(f"Manifest must be either 'dict' or 'list'. "
                                       f"Got {manifest}.")
    return subdag


with DAG(
    workflow_name,
    default_args=default_args,
    description="R3 manifest processing with providing integrity",
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=60)
) as dag:
    update_status_running_op = UpdateStatusOperator(
        task_id="update_status_running_task",
        dag=dag
    )

    branch_is_batch_op = BranchPythonOperator(
        task_id="check_payload_type",
        dag=dag,
        provide_context=True,
        python_callable=is_batch
    )

    update_status_finished_op = UpdateStatusOperator(
        task_id="update_status_finished_task",
        dag=dag,
        trigger_rule="all_done",
    )

    validate_schema_operator = ValidateManifestSchemaOperator(
        task_id="validate_manifest_schema_task",
        provide_context=True,
        dag=dag
    )

    ensure_integrity_op = EnsureManifestIntegrityOperator(
        task_id=ENSURE_INTEGRITY_TASK,
        provide_context=True,
        previous_task_id=validate_schema_operator.task_id,
        dag=dag
    )

    process_single_manifest_file = ProcessManifestOperatorR3(
        task_id=PROCESS_SINGLE_MANIFEST_FILE,
        provide_context=True,
        dag=dag,
        previous_task_id=ensure_integrity_op.task_id
    )

    # Dummy operator as entry point into parallel task of batch upload
    batch_upload = DummyOperator(
        dag=dag,
        task_id=PROCESS_BATCH_MANIFEST_FILE
    )

    for batch in range(0, BATCH_NUMBER):
        batch_upload >> ProcessManifestOperatorR3(
            task_id=f"process_manifest_task_{batch + 1}",
            provide_context=True,
            dag=dag,
            previous_task_id=f"provide_manifest_integrity_task_{batch + 1}",
            batch_number=batch + 1,
            trigger_rule="none_failed_or_skipped",
        ) >> update_status_finished_op

update_status_running_op >> branch_is_batch_op  # pylint: disable=pointless-statement
branch_is_batch_op >> batch_upload  # pylint: disable=pointless-statement
branch_is_batch_op >> validate_schema_operator >> ensure_integrity_op >> process_single_manifest_file >> update_status_finished_op  # pylint: disable=pointless-statement