ADR - Checkpointing mechanism for Airflow Tasks
Status
-
Proposed -
Trialing -
Under review -
Approved -
Retired
Introduction
Resiliency in DAGs
As Airflow is a distributed environment and tasks can be executing on different host machines, there is a possibility of tasks getting killed or some unexpected crashes/failures. To mitigate and handle such failure scenarios there are general guidelines on Airflow and DAGs in Airflow like
- Making task logic idempotent so that they can be retried safely without getting into inconsistent state
- Making tasks atomic that is each task should be responsible for one operation that can be re-run independently
- Enabling retries at DAG task level in case of failure
All above resiliency guidelines are mostly getting followed in the community ingestion DAGs like manifest ingestion, csv-ingestion, but even though the DAG tasks involving ingestion are completely safe for retry it can result in data duplication or additional versions of same records because a blanket retry might re ingest the same data records into the system. Also, in future we might onboard to very compute intensive workloads orchestrated by Airflow on big data systems like spark, a generic framework to checkpoint and state preservation will really improve system reliability and performance.
Storage API Behavior
The ingestion tasks in Airflow invokes Storage APIs to ingest data records into the system, Storage service PUT record API is idempotent in nature, if a record with same id is ingested it will result in 200 and a new version of data record will be created. In case a record is not provided by storage service random ids are generated and assigned to the data records.
Storage service doesn't perform any checksum operations on record data fields as well to prevent data duplicity because of performance issues like
- Computing checksums for each record can be very costly operation
- Even if checksums are present, on the fly checksum retrieval and comparison is very trick to handle
Problem Statement
A Dag retry will result in re-ingestion and end users will see multiple versions of same records which they ingested, with use cases where even single Dag Run can contain 100s of thousands of records will be ingested and a plain re-ingestion of these records costs
- Additional resource consumption in terms of storage like blob storage, database, Elastic etc.
- Additional resource consumption in terms of computation as well as the records will get re-indexed via during search and indexing pipeline.
Also, in future we might onboard to very compute intensive workloads orchestrated by Airflow via big data systems like spark, a generic framework to checkpoint and state preservation will really improve system reliability and performance, but this ADR will mainly focus on ingestion tasks
From the above commentary there is a requirement to preserve the state of ingestion task using some markers, these markers can be record ids, batch ids etc. depending on the DAG logic so that in case of failures and subsequent retries, the tasks can resume from last checkpointed state and avoid re-ingestion scenarios
Below are some mechanisms/options to store and retrieve this state, we will discuss some potential markers below but these markers will be very dag specific and implementation can be left to DAG authors.
Proposals
Approach 1
Leverage Airflow Xcoms for storing state
XCom is a built-in Airflow feature and allows tasks to exchange task metadata or store small amounts of data. Xcom uses airflow metadata database (PostgreSQL/Mysql) to persist the information. They are defined by a key, value, and timestamp.
XComs can be "pushed", meaning sent by a task, or "pulled", meaning received by a task. When an XCom is pushed, it is stored in the Airflow metadata database and made available to all other tasks
Solution Details
We can use out of the box Xcom support provided by airflow to save some state markers being ingested in each DAG run, airflow exposes APIs to perform read/write operations with Xcom to persist the data in metadata database
1. DAGs with Python Operators
Flow Details
- A workflow like manifest ingestion is triggered by the user by invoking workflow service, workflow service will call Airflow to initiate the Dag run
- The tasks (Airflow Operators) in the DAGs will begin ingestion, Xcom will be queried to check if any state was persisted for the run, in case state is found the task will resume from last ingested batch of records otherwise all records will be ingested, records will be ingested by invoking storage service PUT Records API. The task will also keep saving the records ids or relevant state markers like batch-ids with xcom in form of key-value pair, xcom values will be stored in PostgreSQL. The state object won’t have a fixed schema for generalization, these xcom entries are grouped at run_id and task_id level
{
"key": "RunId+TaskId",
"value": {
"state": {
"fieldA": "",
"fieldB": ""
}
}
}
- The xcom values can be queried based on run_id and task_id by the task run instance and payloads will be skipped/ingested appropriately to prevent duplication of data
2. DAGs with K8s Pod Operators
Python Dags can directly leverage airflow modules to perform Xcom operations but non python and k8s pod operators need an HTTP endpoint to interact with Xcom
Details on the flow
- For non-python and DAGs involving k8s pod operator, REST endpoints will be required to access Xcom
- New APIs will be onboarded in Workflow service to facilitate Xcom interaction
- The executing tasks will call APIs in Workflow service for Airflow Xcom interactions
- Rest flow remains same as in previous
Pros
- Easy to implement as the solution utilizes native capabilities of airflow to persist data
- Minimal changes required in workflow service, only two new APIs need to be exposed to perform read/write with xcom using airflow APIs
Cons
- No support for out of the box REST API support for write operations with Airflow Xcom
Airflow only exposes APIs to list and get Xcom entries but not the write and push APIs, there is feasibility in Airflow to write own custom APIs using plugins feasibility but will additional effort and POC validations, plus some managed airflow offerings might not support the same
- Xcom size constraints and usability
Xcom comes with a big limitation of size constraints, XCom should not be used for passing large data sets between tasks. The limit for the size of the XCom is determined by which metadata database you are using, which makes the solution platform dependent, supported sizes
- Postgres: 1 Gb
- SQLite: 2 Gb
- MySQL: 64 Kb
The above limits good enough for postgres backend but the recommendation from Airflow community is to not consume XComs even if data might meet the maximum allowable limit as larger Xcom can significantly degrade the performance task execution times and UI responsiveness
- Xcoms are not recommended in implementation of other ADRs as well
Due to inherent constraints with Xcom, we introduced notion of manifest by reference to avoid passing of large manifests between airflow tasks and same issue can surface in case of large markers.
Approach 2
Using External storage for persisting records
We can leverage a external storage like Azure Blob storage or AWS S3 to persist the records id information in the system
Details
- To facilitate access to external storage by the DAGs, new endpoints can be added in Workflow to abstract direct infrastructure access.
- The new APIs are supposed to be internal, new entitlements can be introduced for restricted access
- DAGs are expected to save small amounts of data as state markers, there are no storage constraints as such. Even if records ids are persisted as state markers given maximum batch size supported at storage service is 500, hence typical payload size will be around 25-50KB
Pros
- No inherent constraints which were posed in Xcom
- Generic REST interface which can be used by all Dags
- Each CSP can extend and use their own backend infrastructure
Cons
- Minor - Additional CSP implementation required, along with onboarding new APIs in Workflow
Checkpointing markers
These markers are essentially some metadata information which can be saved and retrieved by the DAG to resume processing in case of retries. These exact markers will depend on the DAG logic and implementation hence the framework is generic and schema free to save any objects
For typical OSDU DAGs like manifest ingestion or csv parser typical tasks performed during ingestion are schema validations, file metadata validations, CRS and Unit validations, referential checks by invoking search query API etc., apart from actual data record ingestion all other steps are read only operations and do not change state of system of system in any way. Record ingestion is the only write operation performed which changes the state of system, hence some markers to save state of records ingested is a good candidate. For instance, these state markers can be
- Records ids of the data records ingested
- Batch number of the records ingested (Batching logic should be predictable to resume safely)
- Unique monotonically increasing integer values assigned to each data record
All above can be decent marker choices to implement and depict the utility of this framework for OSDU DAGs like csv ingestion, goin
References
- ingestion-dags#67
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
- https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#tag/XCom
- https://docs.astronomer.io/learn/airflow-passing-data-between-tasks
- https://docs.astronomer.io/learn/custom-xcom-backends