ADR: Workflow Service - Support for custom operators

Context

Composability and re-usability are important aspects when designing workflows. This ADR covers APIs needed to achieve composability using re-usable components using airflow as a workflow engine.

  • All the necessary python libraries for operators and DAGs (Directed Acyclic Graphs) are pre-installed in Airflow.

One of the mechanisms to achieve composability and re-usability is through airflow custom operators. API will be provided with a workflow service to register a custom operator in airflow. This custom operator is one of the building blocks for composability and re-usability. Once the operator is registered, it can be used in a DAG (Directed Acyclic Graph).

Scope

API specification can be found here

image

Out of Scope

  • Support for custom libraries at run time is not supported.
  • Validation of Operators is not supported.
  • Update and delete operators are not supported.
  • Custom Hooks, Sensor are not supported.
  • Packaged DAGs with DAGs, Custom Operators is not supported.

Decision

  • The decision is to add the above-mentioned APIs to the workflow service.
  • The registration of custom operators is expected to be aligned with Airflow recommended practices

Rationale

These APIs will provide a customer to register custom operators at run time and can be used in multiple DAGs. This helps to re-use operators across DAGs

Consequences

CSPs need to implement these new APIs to support custom operators.

High level Architecture

image

Intermediate Data Sharing

One of the key components in running workflows is intermediate sharing data between different tasks. For example, when ingesting 100 MB csv file, each step in the workflow needs to pass the transformed version of this file to next step. Airflow xcom can be used to share metadata between different tasks but it is not sufficient for sharing large files. Airflow xcom has limitation on the amount of data that can be share. To facilitate this a new API is provided on workflow service which will return a signedURL. Using this signed URL multiple files can be created and information about these files can be shared using airflow xcom. Below steps should provide a overview of how this signedURL functionality is going to be used.

  • Every workflow will request a signed URL at the start of the workflow.
  • Each task will use the same signed url to save intermediate data.
  • Tasks will communicate the information about intermediate data through xcom.

Examples

Sample Custom Operator

from airflow.operators.python_operator import PythonOperator
from airflow.exceptions import AirflowException

class PrintRecord(PythonOperator):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(python_callable = self.my_callable,*args, **kwargs)
        self.args = args
        self.kwargs = kwargs

    def my_callable(self, **kwargs):
        print("Printing data")
        ti = kwargs['ti']
        pulled_value = ti.xcom_pull(task_ids='get_record')
        print(pulled_value)

Sample DAG Using Custom Operator and Http operator

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from operators.print_record_operator import PrintRecord
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.python_operator import PythonOperator

token = "{{ dag_run.conf['authToken'] }}"
data_partition_id = "{{ dag_run.conf['runConfiguration']['dataPartitionId'] }}"
kind = "{{ dag_run.conf['runConfiguration']['kind'] }}"
record_id = "{{ dag_run.conf['runConfiguration']['id'] }}"
storage_host = "{{ dag_run.conf['additionalProperties']['storageHost'] }}"
storage_endpoint = "api/storage/v2/records/" + record_id

# Following are defaults which can be overridden later on
default_args = {
    'start_date': days_ago(0),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('Print_Storage_Record', default_args=default_args)

t1 = SimpleHttpOperator(
    http_conn_id='http_storage',
    task_id='get_record',
    endpoint=storage_endpoint,
    method='get',
    headers={"Authorization":token,"data-partition-id":"opendes","Accept":"application/json"},
    xcom_push=True,
    log_response=True,
    dag=dag
)

t2 = PrintRecord(task_id='custom-task', provide_context=True, dag=dag)

t1 >> t2

CSV DAG built using custom operator.

image

Edited by Kishore Battula