Ingestion Workflow issueshttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues2023-07-05T10:09:41Zhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/73ADR: Workflow Service - Support for custom operators2023-07-05T10:09:41ZKishore BattulaADR: Workflow Service - Support for custom operators# Context
Composability and re-usability are important aspects when designing workflows. This ADR covers APIs needed to achieve composability using re-usable components using airflow as a workflow engine.
- All the necessary python libr...# Context
Composability and re-usability are important aspects when designing workflows. This ADR covers APIs needed to achieve composability using re-usable components using airflow as a workflow engine.
- All the necessary python libraries for operators and DAGs (Directed Acyclic Graphs) are pre-installed in Airflow.
One of the mechanisms to achieve composability and re-usability is through airflow custom operators. API will be provided with a workflow service to register a custom operator in airflow. This custom operator is one of the building blocks for composability and re-usability. Once the operator is registered, it can be used in a DAG (Directed Acyclic Graph).
# Scope
API specification can be found [here](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/custom-operators-spec/docs/api/openapi.yaml)
![image](/uploads/6bedf852fa6fe47b36782d63d4e79b96/image.png)
# Out of Scope
- Support for custom libraries at run time is not supported.
- Validation of Operators is not supported.
- Update and delete operators are not supported.
- Custom Hooks, Sensor are not supported.
- Packaged DAGs with DAGs, Custom Operators is not supported.
# Decision
- The decision is to add the above-mentioned APIs to the workflow service.
- The registration of custom operators is expected to be aligned with Airflow recommended practices
# Rationale
These APIs will provide a customer to register custom operators at run time and can be used in multiple DAGs. This helps to re-use operators across DAGs
# Consequences
CSPs need to implement these new APIs to support custom operators.
# High level Architecture
![image](/uploads/d6ced893addfc249d77434f9b851fb4b/image.png)
## Intermediate Data Sharing
One of the key components in running workflows is intermediate sharing data between different tasks. For example, when ingesting 100 MB csv file, each step in the workflow needs to pass the transformed version of this file to next step. Airflow xcom can be used to share metadata between different tasks but it is not sufficient for sharing large files. Airflow xcom has limitation on the amount of data that can be share. To facilitate this a new API is provided on workflow service which will return a signedURL. Using this signed URL multiple files can be created and information about these files can be shared using airflow xcom. Below steps should provide a overview of how this signedURL functionality is going to be used.
- Every workflow will request a signed URL at the start of the workflow.
- Each task will use the same signed url to save intermediate data.
- Tasks will communicate the information about intermediate data through xcom.
## Examples
### Sample Custom Operator
```
from airflow.operators.python_operator import PythonOperator
from airflow.exceptions import AirflowException
class PrintRecord(PythonOperator):
def __init__(self, *args, **kwargs) -> None:
super().__init__(python_callable = self.my_callable,*args, **kwargs)
self.args = args
self.kwargs = kwargs
def my_callable(self, **kwargs):
print("Printing data")
ti = kwargs['ti']
pulled_value = ti.xcom_pull(task_ids='get_record')
print(pulled_value)
```
### Sample DAG Using Custom Operator and Http operator
```
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from operators.print_record_operator import PrintRecord
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.python_operator import PythonOperator
token = "{{ dag_run.conf['authToken'] }}"
data_partition_id = "{{ dag_run.conf['runConfiguration']['dataPartitionId'] }}"
kind = "{{ dag_run.conf['runConfiguration']['kind'] }}"
record_id = "{{ dag_run.conf['runConfiguration']['id'] }}"
storage_host = "{{ dag_run.conf['additionalProperties']['storageHost'] }}"
storage_endpoint = "api/storage/v2/records/" + record_id
# Following are defaults which can be overridden later on
default_args = {
'start_date': days_ago(0),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('Print_Storage_Record', default_args=default_args)
t1 = SimpleHttpOperator(
http_conn_id='http_storage',
task_id='get_record',
endpoint=storage_endpoint,
method='get',
headers={"Authorization":token,"data-partition-id":"opendes","Accept":"application/json"},
xcom_push=True,
log_response=True,
dag=dag
)
t2 = PrintRecord(task_id='custom-task', provide_context=True, dag=dag)
t1 >> t2
```
### CSV DAG built using custom operator.
![image](/uploads/cdb536e671528f9edd9a6c02f99d540f/image.png)https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/128System Dags Implementation for AWS, GCP and IBM2021-09-24T13:28:18ZAalekh JainSystem Dags Implementation for AWS, GCP and IBMLink to the ADR: #118
Link to the MR: !146
In order to support system dags, the following changes are required for AWS, GCP and IBM -
1. `IWorkflowSystemMetadataRepository` (Link to azure implementation for reference: [here](https://c...Link to the ADR: #118
Link to the MR: !146
In order to support system dags, the following changes are required for AWS, GCP and IBM -
1. `IWorkflowSystemMetadataRepository` (Link to azure implementation for reference: [here](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/bc469fb101f27c24670bf03bc92760ad7303d747/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowSystemMetadataRepository.java))
2. `IAdminAuthorizationService` (Link to azure implementation for reference: [here](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/bc469fb101f27c24670bf03bc92760ad7303d747/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/service/AdminAuthorizationServiceImpl.java))
Once these SPIs are implemented, the ITs for the corresponding can be extended for each of the cloud provider by extending the base abstract class which is - `DeleteSystemWorkflowV3IntegrationTests` and `PostCreateSystemWorkflowV3IntegrationTests`. For reference, this is how it's done for azure.
1. Extending ITs for delete system workflow - [TestDeleteSystemWorkflowV3Integration.java](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/bc469fb101f27c24670bf03bc92760ad7303d747/testing/workflow-test-azure/src/test/java/org/opengroup/osdu/azure/workflow/workflow/TestDeleteSystemWorkflowV3Integration.java)
2. Extending ITs for create system workflow - [TestPostCreateSystemWorkflowV3Integration.java](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/bc469fb101f27c24670bf03bc92760ad7303d747/testing/workflow-test-azure/src/test/java/org/opengroup/osdu/azure/workflow/workflow/TestPostCreateSystemWorkflowV3Integration.java)
The expected behaviour of system workflows is presented in the ADR.