ADR: Airflow DAG run output to Workflow service
Introduction
The DAG consists of several steps. The manifest’s entities (Master-data records, Reference-data records etc.) can be marked as skipped and extracted from the original manifest on each step, if they don’t pass validations (e.g., they don’t follow proper schemas, have inconsistent data, can’t be stored in Storage due to wrong ACL).
At the current moment skipped entities’ info of each step is stored in XComs. The valid manifest without skipped records is also passed to the downstream step in return_value
field.
Aggregated result grouped by each previous task is stored in update_status_finished_task
’s XCom.
This approach lets us quickly see what is skipped on each step of the DAG. However, to see the skipped ids, the user must visit Airflow UI and must know the exact dag_run ID
of each workflow. Also, it gets difficult to keep track of skipped records if the Manifest contains hundreds or even thousands of records. The most prominent example is ingesting LogCurveType
Reference-data (https://gitlab.opengroup.org/osdu/subcommittees/data-def/work-products/reference-values/-/blob/master/Manifests/reference-data/LOCAL/LogCurveType.1.0.0.json) with tens thousands of records.
As the end user must interact with Workflow Service only and mustn’t be aware of Airflow Ingestion Engine, the mechanism of notifying Workflow Service about current statuses of each task must be developed.
Anyway, we must extend existing Workflow Service’s API.
Relates to osdu/platform/system/home#87 (moved)
Proposals
Get XCom outputs through Workflow Service using Stable API.
This is the most straightforward way that doesn’t need to change the DAGs’ implementations. Stable API lets the user get XCom entries on concrete dag_runs
and task_ids
.
So, when we request the workflow run status, Workflow Service can get the XCom entry, which contains aggregated report from update_status_finished_task
step.
The link to the method description for getting XCom is here: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_xcom_entry
Possible response on /v1/workflow/{workflow_name}/workflowRun/{runId}
:
{
"workflowId": "string",
"runId": "string",
"startTimestamp": 0,
"endTimestamp": 0,
"status": "FINNISHED",
"report": {
"saved_record_ids": {},
"skipped_ids": {
"process_single_manifest_file_task": [
{
"id": "<data-partition-id>:work-product-component--SeismicBinGrid:Auto_Test_999493329875",
"kind": "work-product-component--SeismicBinGrid",
"reason": "400 Client Error: Bad Request for url."
}
]
}
},
"submittedBy": "string"
}
Pros:
- Easy to implement.
- Doesn’t require changes in Airflow Operators.
- Little changes in Workflow Service.
Cons:
- XComs can be huge, that can cause problems with sending such messages via HTTP and reading them.
Save status in Dataset/File Service as files
Another approach is not to pass skipped ids via XComs but save skipped entities report as a file using Dataset Service.
We can add new endpoints for Workflow Service to update the current workflow run status, so, Airflow will send report to it on each task. At the same time, Workflow Service will create a new dataset by uploading the report as a file to Dataset Service.
After the workflow is finished, Workflow Service will show the datasets' ID
of the datasets with FileSource
of the report in the workflow run. Then, it will be possible to get signedURL
of the report files using retrieval instructions of Dataset/File Service.
Also, saving reports as files in Dataset Service can be implemented from Airflow side, then Workflow Service will get Dataset IDs
only.
Possible response on /v1/workflow/{workflow_name}/workflowRun/{runId}
:
{
"workflowId": "string",
"runId": "string",
"startTimestamp": 0,
"endTimestamp": 0,
"status": "FINNISHED",
"report": {"task_id": "<dataset-id-of-report>"},
"submittedBy": "string"
}
Pros:
- Save large reports in Dataset Service not in XComs.
- Follow “Manifest as a reference” approach.
- Using Dataset Service is not CSP-specific.
- User can get required report as a file from Dataset Service using the dataset-id only.
Cons:
- Requires extra changes in Workflow Service.
- Need to think how to display the report if they are stored as files in dataset service.
- All records in Dataset Service are indexed.
- We need to consider creating a specific kind for reports.
Save status in Cloud Storage with signed URL
Another approach is to use signed URL
from the Cloud Storage. Airflow Operators can request Workflow Service(?) for a new file location and signed URL
in some Landing Zone (bucket). The report of skipped ids can be stored by this signed URL.
Every task will request a signed URL and “real” location of the Storage object (e.g., gs://…
, s3://…
) at the start of its executions. Each task will use the signed URL
to save intermediate data. We can’t communicate signed URLs, because they have expiration date. That’s why tasks XCom’s content will consist of Storage paths. The final task will collect all XComs from the upstream tasks and provide aggregated report grouped by upstream tasks.
When the user asks for the workflow_run
status, Workflow Service must be able to convert cloud-storage paths into signed URLs
to make reports accessible for users.
Possible response on /v1/workflow/{workflow_name}/workflowRun/{runId}
:
{
"workflowId": "string",
"runId": "string",
"startTimestamp": 0,
"endTimestamp": 0,
"status": "FINNISHED",
"report": {"task_id": "<signed-url-to-report>"},
"submittedBy": "string"
}
Pros:
- We use separate Landing Zone instead of Dataset Service not to litter it with transient data.
- Reduce XComs’ content.
Cons:
- Need to implement significant changes in Workflow Service and add extra responsibilities to it.
- Working with Landing Zone is cloud-specific.
- Probably, we'll need to introduce a new service to save report Landing Zone and get them.
Open OSDU Architectural questions.
- We need to discuss the structure of the report.
- What fields of records must be present in the report?
- What kind of information must be in this report?
- How can we make this report readable for users?
-
For now, it is hard to identify the record if it doesn’t have system-generated or unique ID.
-
Also, we need to think about access to the workflow results depending on
ACL
of records. That means that the user is allowed to get the report about the skipped records only with correspondingACL
. Also, there is a situation when the same report will contain records with differentACLs
.