Ingestion Workflow issueshttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues2021-03-23T11:38:38Zhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/77Airflow Performance / Load testing2021-03-23T11:38:38ZKateryna Kurach (EPAM)Airflow Performance / Load testingIn conversations with Data Loading (Michaël, Ash, and others), we identified a need to develop an approach to determining performance requirements for the workflow service. Concerns have been raised based on implementation experience tha...In conversations with Data Loading (Michaël, Ash, and others), we identified a need to develop an approach to determining performance requirements for the workflow service. Concerns have been raised based on implementation experience that AirFlow will not properly scale based on anticipated data loading demands.
I've expanded this issue to include representation from Data Loading, all 4 CSPs, and @Jane from EA. We should begin addressing this for M5 or shortly thereafter.
Initial discussions have identified two potential areas for improvement:
- Configure Airflow within the infrastructure as always-on vs. spin-up-on-demand. This approach increases cost but improves performance as it minimizes the delay in initiating a workflow.
- Introduce a throttling mechanism for workflow run requests to ensure Airflow is not overwhelmed to the point of failure with large numbers of request
There are likely other performance improvements to consider. We will update this description as those are discussed.Jane McConnellAsh SathyaseelanKishore BattulaKateryna Kurach (EPAM)Matt WiseAlan HensonJane McConnellhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/72Seek information pertaining to workflow, DAG, DAG Operator, and runtime envir...2021-01-20T19:46:19ZAlan HensonSeek information pertaining to workflow, DAG, DAG Operator, and runtime environments from all CSPs and data workflow teams.Request made of CSP and data workflow development teams (CSV, EDS, Energistics/WITSML, Manifest):
Per today’s daily dev standup discussion, I’m requesting information regarding the environment information for your workflow service, Airf...Request made of CSP and data workflow development teams (CSV, EDS, Energistics/WITSML, Manifest):
Per today’s daily dev standup discussion, I’m requesting information regarding the environment information for your workflow service, Airflow implementation, DAGs, and DAG Operators. Please fill out this table and send it back to me. I will aggregate and share with the group. We will use this as a baseline to address the next steps in unifying workflow environments to ensure the DAGs you and your teams are writing will run across all four CSP platforms. This effort will also drive discussions for standardization.
Given some teams are on holiday through Monday of next week, please target getting this to me by next Wednesday, Jan 20th. I will remind you in the daily dev standups. If you have follow-up questions, please let me know.
For the CSV, EDS, and Energistics/WITSML teams, please disregard the questions on Airflow as I know you depend on the CSP implementation for that answer. Please address the DAG Operator and container questions where possible.Alan HensonAlan Hensonhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/78Workflow Service: Update Python version2021-01-26T20:12:09ZAlan HensonWorkflow Service: Update Python versionPer this [ADR](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/74), all Airflow and Python runtime environments across the 4 CSPs should be uniform.
AWS is on Python 3.8, but need to revert ...Per this [ADR](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/74), all Airflow and Python runtime environments across the 4 CSPs should be uniform.
AWS is on Python 3.8, but need to revert to 3.6.x.https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/71ADR: Workflow Service - R3 Improvements2021-04-15T12:58:17ZDmitriy RudkoADR: Workflow Service - R3 Improvements## Context
During work with different stream, we identify several critical design issues with Workflow service that needs to be addressed in R3:
* Workflow service is not just an `abstraction` over orchestration engine (Airflow) but also...## Context
During work with different stream, we identify several critical design issues with Workflow service that needs to be addressed in R3:
* Workflow service is not just an `abstraction` over orchestration engine (Airflow) but also contains OSDU specific logic (`DataType`, `WorkflowType`, `UserType`). This logic should be moved to Ingestion Service.
* Workflow Service do not respect Data Partitions. Users potentially can trigger any Workflow in the system.
* There is not functionality to register a new Workflow
## Scope
- Add functionality to register new Workflows
- Add support of Data Partitions
- Remove OSDU specific workflow functionality (`DataType`, `WorkflowType`, `UserType`) from Workflow Service.
- Allow OSDU clients directly trigger registered Workflows, without Ingestion Service.
- Update API to reflect [Google REST API Design Guide](https://cloud.google.com/apis/design). Please see[OpenAPI Spec](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/blob/refactoring_workflow/docs/api/openapi.workflow.yaml) for details.
## Decision
- Accept API changes as a part of R3
- Accept Workflow > Core changes as a part of R3
- Deprecate exiting Workflow API (startWorkflow, etc)
## Rationale
- Registration of workflows required for E2E R3 Ingestion
- API spec is on critical path for CSV Ingestion
## Consequences
- Most of the Core logic changes will be implemented by GCP
- Will require support of CSPs as SPI layer will be touched.
## When to revisit
- Post R3
## Technical details:
![R3_Workflow_-_L3__Target](/uploads/75f02f3ec73ee85a95bb668dc7426df2/R3_Workflow_-_L3__Target.png)
![R3_Workflow_-_L4__Target](/uploads/03429b8474b61049b4327ae920969374/R3_Workflow_-_L4__Target.png)
### SPI Layer:
- `IWorkflowEngineService` - **Has default implementation.** Abstraction over orchestration engine. By default we have implementation for Airflow.
- `IWorkflowManagerService` - **Has default implementation.** Implements CRUD over Workflow entity.
- `IWorkflowRunService` - - **Has default implementation.** Implements CRUD over Workflow Run entity.
- `IWorkflowMetadataRepository` - Should be implemented by CSP!. Repository for Workflow entity.
- `IWorkflowRunRepository` - Should be implemented by CSP!. Repository for Workflow Run entityM1 - Release 0.1Dmitriy RudkoDmitriy Rudkohttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/74ADR: Workflow Service Environment Standardization2021-03-23T11:45:21ZAlan HensonADR: Workflow Service Environment Standardization## Context
Providing consistent workflow runtime environments enables DAGs (Directed Acyclic Graphs) to be written once and run across any standardized workflow service environment. There are some differences in the Workflow Service envi...## Context
Providing consistent workflow runtime environments enables DAGs (Directed Acyclic Graphs) to be written once and run across any standardized workflow service environment. There are some differences in the Workflow Service environments built for R3, so we must agree on the versions of the major components of the Workflow Service to achieve standardization.
## Scope
- All Workflow Service implementations should operate with the same `major.minor` version of Airflow.
- All Workflow Service implementations should operate with the same `major.minor` Python version within Airflow.
- All Workflow Service DAG Operators should be authored to run with the same `major.minor` Python version within Airflow.
## Decision
Standardize on the following Workflow Service component versions
| Component | Version |
| --------- | ------- |
| Airflow | 1.10.x |
| Airflow Python Runtime | 3.6.x |
| DAG Operator Python Development Version | 3.6.x |
## Rationale
- Workflows (DAGs) written against the standard will be portable to all standardized Workflow Service runtime environments.
## Consequences
- Workflow Service implementers may have to change Airflow and Python versions and re-test developed workflows (DAGs)Alan HensonAlan Hensonhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/95Workflow API authorization failure should throw 401/403 not 4042021-07-21T14:40:45ZMatt WiseWorkflow API authorization failure should throw 401/403 not 404When a user is not valid or a valid user does not have the entitlements group required to call /api/workflow/v1/workflow, they are returned a 404 error. This should follow entitlements convention and return a 401/403 depending on the ca...When a user is not valid or a valid user does not have the entitlements group required to call /api/workflow/v1/workflow, they are returned a 404 error. This should follow entitlements convention and return a 401/403 depending on the case.
![image](/uploads/d414580f319bcadb0fe7575fb8763ca4/image.png)M7 - Release 0.10Dania Kodeih (Microsoft)Wladmir FrazaoJoeDmitriy RudkoMatt WiseAlan HensonDania Kodeih (Microsoft)https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/125Unable to Ingest the Seismic horizon data using the manifest ingestion dag :...2022-09-29T14:06:02ZAnanth TUnable to Ingest the Seismic horizon data using the manifest ingestion dag : Osdu_ingest@todaiks @ChrisZhang @
GCP & AWS clouds
While working on the Seismic Horizon API, Refresh_Token was working fine. Similarly
10 (Post)-Ingestion Seismic Horizon record R3 and 10a (Get) Check the Seismic Horizon Workflow Status were exec...@todaiks @ChrisZhang @
GCP & AWS clouds
While working on the Seismic Horizon API, Refresh_Token was working fine. Similarly
10 (Post)-Ingestion Seismic Horizon record R3 and 10a (Get) Check the Seismic Horizon Workflow Status were executed normally. However at 10b (Post) Storage - Retrieve ingested Seismic Horizon Data record api was stopped due to 404- Record not found.
{
"code": 404,
"reason": "Record not found",
"message": "The record 'odesprod:work-product-component--SeismicHorizon:Auto_Test_999645280744' was not found"
}
Further when checked on the airflow lag files,
This "SeismicHorizon:Auto_Test_999645280744" was created in the before steps. But this was throwing error at 10b-Retrieve ingested Seismic Horizon Data record.
Two cloud platforms i.e GCP & AWS has given the errors at this level.
The corresponding GCP Code Snippet and body response for the 10, 10a, 10b API's has been compiled and the document is enclosed.
Kindly have the detailed review and revert us for the possible solution.
[Seismic_Horizon_Issue_GCP_020821.docx](/uploads/25668e785006d46aed881bf3151a5e2d/Seismic_Horizon_Issue_GCP_020821.docx)https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/129Fixed get workflow status request by workflowName2021-10-11T09:44:52ZRiabokon Stanislav(EPAM)[GCP]Fixed get workflow status request by workflowNameGET "/{workflow_name}/workflowRun/{runId}" request used 'dagName' instead of 'workflowName' underhood during airflow call.
In case of different 'dagName' and 'workflowName' it leads error:
```
{
"code": 404,
"reason": "Failed to send...GET "/{workflow_name}/workflowRun/{runId}" request used 'dagName' instead of 'workflowName' underhood during airflow call.
In case of different 'dagName' and 'workflowName' it leads error:
```
{
"code": 404,
"reason": "Failed to send request.",
"message": "Unable to send request to Airflow. 404 NOT FOUND_{\"error\":\"Dag id workflow_name not found in DagModel\"}_"
}
```M9 - Release 0.12Riabokon Stanislav(EPAM)[GCP]Riabokon Stanislav(EPAM)[GCP]https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/157Pass workflow user ID to the Airflow as part of payload.2023-11-08T19:54:46ZRiabokon Stanislav(EPAM)[GCP]Pass workflow user ID to the Airflow as part of payload.This issue was discovered by GC Team when the QA Team was testing a platform.
It revolves around triggering workflows and the addition of the User ID into the execution context through the 'x-user-id' header.
Upon further investigation,...This issue was discovered by GC Team when the QA Team was testing a platform.
It revolves around triggering workflows and the addition of the User ID into the execution context through the 'x-user-id' header.
Upon further investigation, we came across the(MR) https://community.opengroup.org/osdu/platform/deployment-and-operations/helm-charts-azure/-/merge_requests/366, which appears to implement this logic with a dependency on the infrastructural level.
However, we have to add some kind of validation or additional logic to use a header 'user' in core logic. This adjustment is essential as we might want to use the service without a service mesh or similar infrastructure.
org.opengroup.osdu.workflow.service.WorkflowRunServiceImpl#addUserId
```
private Map<String, Object> addUserId(String workflowName, TriggerWorkflowRequest request) {
final Map<String, Object> executionContext = request.getExecutionContext();
if (executionContext.get(KEY_USER_ID) != null) {
String errorMessage = String.format("Request to trigger workflow with name %s failed because execution context contains reserved key 'userId'", workflowName);
throw new AppException(400, "Failed to trigger workflow run", errorMessage);
}
String userId = dpsHeaders.getUserId();
log.debug("putting user id: " + userId + " in execution context");
executionContext.put(KEY_USER_ID, userId);
return executionContext;
}
```M21 - Release 0.24https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/158A custom header 'x-user-id' is used in core part2023-11-08T19:54:10ZRiabokon Stanislav(EPAM)[GCP]A custom header 'x-user-id' is used in core partI wanted to bring to your attention an issue that was identified by our GC Team while they were in the process of addressing https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/157.
org.opengrou...I wanted to bring to your attention an issue that was identified by our GC Team while they were in the process of addressing https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/157.
org.opengroup.osdu.workflow.service.WorkflowRunServiceImpl#addUserId
```
private Map<String, Object> addUserId(String workflowName, TriggerWorkflowRequest request) {
final Map<String, Object> executionContext = request.getExecutionContext();
if (executionContext.get(KEY_USER_ID) != null) {
String errorMessage = String.format("Request to trigger workflow with name %s failed because execution context contains reserved key 'userId'", workflowName);
throw new AppException(400, "Failed to trigger workflow run", errorMessage);
}
String userId = dpsHeaders.getUserId();
log.debug("putting user id: " + userId + " in execution context");
executionContext.put(KEY_USER_ID, userId);
return executionContext;
}
```
The current logic relies on a custom header that is primarily intended for use at an infrastructural level, as outlined in https://community.opengroup.org/osdu/platform/data-flow/ingestion/home/-/issues/52. The GC team approved an ADR with the understanding that this custom header would not be utilized within the core codebase.
However, as indicated in https://community.opengroup.org/osdu/platform/deployment-and-operations/helm-charts-azure/-/merge_requests/366, a header named 'x-user-id' is populated with data from 'x-on-behalf-of' using a specific rule. This mechanism aligns with the requirements of the CSP provider but may not be entirely suitable for the Core Part of the Workflow Service.
```
if (jwt_authn[msft_issuer]["appid"] == serviceAccountClientId and on_behalf_of_header ~= nil and on_behalf_of_header ~= '') then
request_handle:headers():add("x-user-id", request_handle:headers():get("x-on-behalf-of"))
else
request_handle:headers():add("x-user-id", jwt_authn[msft_issuer]["appid"])
end
```
This logic introduces **three key issues**:
- The core part of the Workflow service depends on a custom CSP header to execute context, which may not be in alignment with the intended architecture.
- The Workflow service may not operate correctly without <ISTIO> and the accompanying special rule, potentially limiting its usability.
- There is a security concern in that 'x-user-id' is not currently validated on the BackEnd side, allowing any user to utilize it for potentially vested interests.
_As for the third problem_, there is the test case:
1. A user was authorized within Workflow Service.
1. This user uses 'x-user-id' with the name of another user, resulting in the triggering of a workflow under the identity of a different user.https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-workflow/-/issues/159ADR: Implement Airflow facade endpoint2024-01-08T10:10:33ZRiabokon Stanislav(EPAM)[GCP]ADR: Implement Airflow facade endpoint# Context
OSDU Platform uses Apache Airflow for orchestration of various data ingestion and processing jobs.
# Problem statement
Currently OSDU Airflow component does not support data isolation for multi-tenant deployments. Airflow Admi...# Context
OSDU Platform uses Apache Airflow for orchestration of various data ingestion and processing jobs.
# Problem statement
Currently OSDU Airflow component does not support data isolation for multi-tenant deployments. Airflow Administrative UI is available for all users and makes possible to observe all the processing data for all existing tenants which may cause data leaks and security issues.
# Proposal of the solution
It is proposed to introduce a facade that will replace Airflow admin UI and will collect in a tenant-specific way via the Airflow REST API job execution information (namely its resulting x-com variables). To do this we need to add a new endpoint in the Workflow service API, which will collect the details of the DAG run using the existing Airflow REST API v2.
New API endpoint /v1/workflow/{workflow_name}/workflowRun/{runId}/lastInfo should implement the following business logic:
![image-2023-10-18_17-48-20](/uploads/44f53a3de410b8dff0276b127387f29a/image-2023-10-18_17-48-20.png)
- Get internal workflow entity with getWorkflowRunByName and check if submittedBy corresponds to the user submitted in the header, otherwise return 401 NOT_AUTHORIZED
- Get list of all task instances with /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances where dag_id is workflow_name and dag_run_id is runId
- Select task instance with maximal end_date
- With task_id of the selected task instance get list of xcom entries keys /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries
- Obtain xcom values by theis keys using /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}
- Return task instance details from step 3 combined with xcom values map in a single JSON responceM23 - Release 0.26Rustam Lotsmanenko (EPAM)rustam_lotsmanenko@epam.comRiabokon Stanislav(EPAM)[GCP]Andrei Dalhikh [EPAM/GC]Rustam Lotsmanenko (EPAM)rustam_lotsmanenko@epam.com