Data Ingestion issueshttps://community.opengroup.org/groups/osdu/platform/data-flow/ingestion/-/issues2021-03-23T14:14:08Zhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/106[Draft] [WIP] Integration Tests [core]2021-03-23T14:14:08ZAalekh Jain[Draft] [WIP] Integration Tests [core]Integration tests in core do not have sufficient coverage. This issue aims at adding sufficient integrations tests in core.Integration tests in core do not have sufficient coverage. This issue aims at adding sufficient integrations tests in core.https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/46Need common DAG deployment framework2021-06-14T16:32:28ZAlan HensonNeed common DAG deployment frameworkAt present, we lack a standard framework for deploying DAGs. There are multiple perspectives to consider:
- The deployment of code, which must also consider dependency conflicts
- The registering of DAGs. Presently, this is facilitated ...At present, we lack a standard framework for deploying DAGs. There are multiple perspectives to consider:
- The deployment of code, which must also consider dependency conflicts
- The registering of DAGs. Presently, this is facilitated via the latest Workflow APIs
Note some approaches in use today:
- CI/CD pipelines used to deploy code for security purposes
- IBM using Git Sync process for deploying DAGs
Considerations:
- Standardizing DAG Operator deployment with things like the Kubernetes Pod Operatorhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/91AirflowConfig should not be in Core, it should be a provider specific impleme...2022-08-23T11:18:45ZMatt WiseAirflowConfig should not be in Core, it should be a provider specific implementationWe do not use User/Pass authentication to hit the airflow API. In fact, AWS's Managed Airflow does not even expose direct API access, only CLI via a protected API. We need the flexibility to choose how we interface with Airflow so this ...We do not use User/Pass authentication to hit the airflow API. In fact, AWS's Managed Airflow does not even expose direct API access, only CLI via a protected API. We need the flexibility to choose how we interface with Airflow so this requirement to create a config with Auth mechanism to Airflow should be moved into provider logic. Any other logic in regards to direct requests to Airflow should also be provider implemented.
https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/master/workflow-core/src/main/java/org/opengroup/osdu/workflow/config/AirflowConfig.javaethiraj krishnamanaiduDania Kodeih (Microsoft)Wladmir FrazaoJoeChris ZhangDmitriy RudkoSpencer Suttonsuttonsp@amazon.comMatt Wiseethiraj krishnamanaiduhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/90integration test issue2021-07-14T19:59:50ZBhushan Radeintegration test issueThe following 3 test cases are failing for IBM.
- Test 1 - [shouldReturn400WhenGetDetailsForSpecificWorkflowRunInstance](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/master/testing/workflow...The following 3 test cases are failing for IBM.
- Test 1 - [shouldReturn400WhenGetDetailsForSpecificWorkflowRunInstance](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/master/testing/workflow-test-core/src/main/java/org/opengroup/osdu/workflow/workflow/v3/WorkflowRunV3IntegrationTests.java#L78) - expected: <400> but was: <404>
this test case expecting 400 status code but as per our understanding code should always gives 404 when workflow run instance does not exist.<br/>
proposed changes at [assertion](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/master/testing/workflow-test-core/src/main/java/org/opengroup/osdu/workflow/workflow/v3/WorkflowRunV3IntegrationTests.java#L91) statement -
```java
assertEquals(HttpStatus.NOT_FOUND, getResponse.getStatus());
```
- Test 2 - [shouldReturnInternalServerErrorWhenIncorrectWorkflowNameWorkflowCreate](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/master/testing/workflow-test-core/src/main/java/org/opengroup/osdu/workflow/workflow/v3/WorkflowV3IntegrationTests.java#L63) - expected: <500> but was: <200>/<409> <br/>
workflow name validation is missing at controller level. as of now it accept in any format.
- Test 3 - [shouldReturnBadRequestWhenInvalidDagNameWorkflowCreate](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/master/testing/workflow-test-core/src/main/java/org/opengroup/osdu/workflow/workflow/v3/WorkflowV3IntegrationTests.java#L51) - expected <400> but was: <200>/<409> <br/>
Validation missing! This issue similar to Test 2 <br/>
- as per our understanding _workflow name_ is unique and all test cases are creating a workflow & workflow run with the same name without deleting the old one that affects subsequent tests (error 409 conflicts) also, CSP level we tried to delete created workflow but it did not allow us to delete immediately (threw 412 error. because integration test immediately tried to delete workflow. generally test DAG take some time to update status from **SUBMITTED** to **FINISHED** state in DB) please give us clarity on this.ethiraj krishnamanaiduethiraj krishnamanaiduhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/88The workflow service should provide outcomes and error messages produced by w...2022-08-08T11:24:21ZAlan HensonThe workflow service should provide outcomes and error messages produced by workflow instancesThe Workflow Service is capable of launching registered workflows. Those workflows presently run inside Airflow containers, which are likely implemented using containers. As these workflows run, they log information to the Airflow log fi...The Workflow Service is capable of launching registered workflows. Those workflows presently run inside Airflow containers, which are likely implemented using containers. As these workflows run, they log information to the Airflow log files. They might also implement a logging strategy that sends logs to the underlying CSP's logging framework. However, there is no programmatic way to fetch the outcomes of a workflow.
We need a uniform way to allow external processes to inquire information about a workflow beyond its execution status. Things that could be fetched via a run id, or some other mechanism, might include:
- Log messages
- Error messages (including partial failures)
- Workflow status (beyond the execution status - workflows might have their own managed state)
- Workflow results (i.e., records processed or stored [including record IDs], activities performed, etc.)
- Other details
This solution might require a broader perspective at the core service level, or it might be a solution unique to workflow service. Either way, we need to move beyond Airflow logs for a better user experience.
Additionally, as part of this story, the Workflow Service should consider statuses that indicate partial failures - meaning part of the job was successful, but part of the job also failed (even if gracefully failed).Raj KannanJane McConnellAsh SathyaseelanAlan HensonRaj Kannanhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/79Cancel running workflow2021-03-03T18:58:47ZAlan HensonCancel running workflowOn an Admin UI call for R3, a suggestion was made that workflows might be cancellable. This story should explore the value of providing APIs that could stop (or cancel) a specific workflow instance while it is running.On an Admin UI call for R3, a suggestion was made that workflows might be cancellable. This story should explore the value of providing APIs that could stop (or cancel) a specific workflow instance while it is running.https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/75Notification Service: assess capabilities to support workflow events2021-03-03T18:58:32ZAlan HensonNotification Service: assess capabilities to support workflow eventsAssess with core services team if Notification Service design and implementation for R3 is suitable for Workflow-triggered events.
Example workflow notification use cases:
- Publish workflow state with supporting payloads to trigger as...Assess with core services team if Notification Service design and implementation for R3 is suitable for Workflow-triggered events.
Example workflow notification use cases:
- Publish workflow state with supporting payloads to trigger async downstream processing
- Indicate that errors occurredAlan HensonAlan Hensonhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/73ADR: Workflow Service - Support for custom operators2023-07-05T10:09:41ZKishore BattulaADR: Workflow Service - Support for custom operators# Context
Composability and re-usability are important aspects when designing workflows. This ADR covers APIs needed to achieve composability using re-usable components using airflow as a workflow engine.
- All the necessary python libr...# Context
Composability and re-usability are important aspects when designing workflows. This ADR covers APIs needed to achieve composability using re-usable components using airflow as a workflow engine.
- All the necessary python libraries for operators and DAGs (Directed Acyclic Graphs) are pre-installed in Airflow.
One of the mechanisms to achieve composability and re-usability is through airflow custom operators. API will be provided with a workflow service to register a custom operator in airflow. This custom operator is one of the building blocks for composability and re-usability. Once the operator is registered, it can be used in a DAG (Directed Acyclic Graph).
# Scope
API specification can be found [here](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/custom-operators-spec/docs/api/openapi.yaml)
![image](/uploads/6bedf852fa6fe47b36782d63d4e79b96/image.png)
# Out of Scope
- Support for custom libraries at run time is not supported.
- Validation of Operators is not supported.
- Update and delete operators are not supported.
- Custom Hooks, Sensor are not supported.
- Packaged DAGs with DAGs, Custom Operators is not supported.
# Decision
- The decision is to add the above-mentioned APIs to the workflow service.
- The registration of custom operators is expected to be aligned with Airflow recommended practices
# Rationale
These APIs will provide a customer to register custom operators at run time and can be used in multiple DAGs. This helps to re-use operators across DAGs
# Consequences
CSPs need to implement these new APIs to support custom operators.
# High level Architecture
![image](/uploads/d6ced893addfc249d77434f9b851fb4b/image.png)
## Intermediate Data Sharing
One of the key components in running workflows is intermediate sharing data between different tasks. For example, when ingesting 100 MB csv file, each step in the workflow needs to pass the transformed version of this file to next step. Airflow xcom can be used to share metadata between different tasks but it is not sufficient for sharing large files. Airflow xcom has limitation on the amount of data that can be share. To facilitate this a new API is provided on workflow service which will return a signedURL. Using this signed URL multiple files can be created and information about these files can be shared using airflow xcom. Below steps should provide a overview of how this signedURL functionality is going to be used.
- Every workflow will request a signed URL at the start of the workflow.
- Each task will use the same signed url to save intermediate data.
- Tasks will communicate the information about intermediate data through xcom.
## Examples
### Sample Custom Operator
```
from airflow.operators.python_operator import PythonOperator
from airflow.exceptions import AirflowException
class PrintRecord(PythonOperator):
def __init__(self, *args, **kwargs) -> None:
super().__init__(python_callable = self.my_callable,*args, **kwargs)
self.args = args
self.kwargs = kwargs
def my_callable(self, **kwargs):
print("Printing data")
ti = kwargs['ti']
pulled_value = ti.xcom_pull(task_ids='get_record')
print(pulled_value)
```
### Sample DAG Using Custom Operator and Http operator
```
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from operators.print_record_operator import PrintRecord
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.python_operator import PythonOperator
token = "{{ dag_run.conf['authToken'] }}"
data_partition_id = "{{ dag_run.conf['runConfiguration']['dataPartitionId'] }}"
kind = "{{ dag_run.conf['runConfiguration']['kind'] }}"
record_id = "{{ dag_run.conf['runConfiguration']['id'] }}"
storage_host = "{{ dag_run.conf['additionalProperties']['storageHost'] }}"
storage_endpoint = "api/storage/v2/records/" + record_id
# Following are defaults which can be overridden later on
default_args = {
'start_date': days_ago(0),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('Print_Storage_Record', default_args=default_args)
t1 = SimpleHttpOperator(
http_conn_id='http_storage',
task_id='get_record',
endpoint=storage_endpoint,
method='get',
headers={"Authorization":token,"data-partition-id":"opendes","Accept":"application/json"},
xcom_push=True,
log_response=True,
dag=dag
)
t2 = PrintRecord(task_id='custom-task', provide_context=True, dag=dag)
t1 >> t2
```
### CSV DAG built using custom operator.
![image](/uploads/cdb536e671528f9edd9a6c02f99d540f/image.png)https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/33Ingestion Job Notification2021-03-10T22:34:23ZKateryna Kurach (EPAM)Ingestion Job NotificationThis user story covers the following business scenarios:
- Ability to produce a notification regarding the status of Ingestion Job execution or / and it's part
- Ability to notify a user via email, text etc - Requirements have to be crea...This user story covers the following business scenarios:
- Ability to produce a notification regarding the status of Ingestion Job execution or / and it's part
- Ability to notify a user via email, text etc - Requirements have to be created here.
This functionality can be achieved by implementing a solution using Notification Service described here: https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/16
or by implementing a totally different approach.Dmitriy RudkoDmitriy Rudkohttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/2Please explore Apache Airflow limitations to finalize design for R32021-03-02T12:48:31ZRaj KannanPlease explore Apache Airflow limitations to finalize design for R3There have been a few concerns raised by Airflow as a generic DAG/orchestration solution for data flow in OSDU. It would be good to capture these issues here and to respond back with observations/solutions so the decision decision can be...There have been a few concerns raised by Airflow as a generic DAG/orchestration solution for data flow in OSDU. It would be good to capture these issues here and to respond back with observations/solutions so the decision decision can be properly captured.
1. Airflow is cloud-native only in GCP, which can make it cumbersome to host in other CSPs where the management of the infrastructure becomes a platform/operator responsibility unlike PaaS solutions.
1. With Airflow, it will be quite hard to isolate workflows as the workflows are within the same execution environment. As OSDU approaches "OSDU SaaS" and OSDU for smaller operators where it may be hosted by a SI or CSP, this can make it challenging for multi-tenant deployments.
1. Airflow DAGs are python only and some parsers and libraries can be Java or C++. Just as a comparison something like Argo which is kubernetes based could help have worksteps in different language/environments as each becomes a separate container instance rather than a python script.
1. Airflow apparently has an execution delay between tasks - it is unclear if this is a framework limitation or specific experience of a setup, but perhaps worth capturing to analyze.
1. Similarly there are concerns about temporary state/data and an intermediary persistence to hold across DAG worksteps. Beyond what can be held in memory, does Airflow provide a persistable temporary cache for such state?
1. Is the Airflow DSL cumbersome to author for ingestion/enrichment workflow providers (ISVs, SIs, operators). In comparison to YAML or other alternatives is this a good choice.
Once the elaboration work is complete, kindly capture this as a LADR for the Data flow project. Thanks for the advice on these issues.M1 - Release 0.1Ferris ArgyleFerris Argylehttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/1Missing clear test strategy and test framework on Airflow DAG development2021-06-14T16:07:20ZWei SunMissing clear test strategy and test framework on Airflow DAG developmentNeed the clarity for DAG test framework (unit test and integration test) to ensure the code quality and no DAG broken.Need the clarity for DAG test framework (unit test and integration test) to ensure the code quality and no DAG broken.Ferris ArgyleDania Kodeih (Microsoft)Wladmir FrazaoJoeDmitriy RudkoAlan BrazKateryna Kurach (EPAM)Ferris Argyle