Manifest Ingestion DAG issueshttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues2023-07-05T10:09:41Zhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/58ADR: General purpose batch write DAG operator2023-07-05T10:09:41ZAlan HensonADR: General purpose batch write DAG operator## Status
- [X] Draft
- [ ] Proposed
- [ ] Trialing
- [ ] Under Review
- [ ] Approved
- [ ] Retired
## Context
There are a wide variety of volume-based use cases that drive how ingestion with the OSDU(TM) data platform will occur. The ...## Status
- [X] Draft
- [ ] Proposed
- [ ] Trialing
- [ ] Under Review
- [ ] Approved
- [ ] Retired
## Context
There are a wide variety of volume-based use cases that drive how ingestion with the OSDU(TM) data platform will occur. The use cases span from a single record to millions of records. There are also multiple sources of data in multiple formats. Additionally, the Storage Service `createOrUpdate` API endpoint is by default programmed to receive at most 500 records at a time. As such, any ingestion workflow must determine how many records it needs to save and if that number exceeds 500, it must batch writes accordingly.
However, the lowest-common-denominator is a record that will be stored in OSDU via the Storage API. Therefore, we have the ability to design and build a DAG operator that is capable of receiving a list of records that will then batch them according to the Storage Service's `createOrUpdate` configuration, perform the writes, capture the results, and make them available via logging. This approach will prevent other ingestion workflows from implementing custom batching, which reduces code duplication and enables a move toward standardization.
## Scope
- A single DAG Operator that has an expected set of inputs, outputs, and errors
- The DAG Operator will have the ability to receive a list of records, which it will batch and send to the Storage Service's `createOrUpdate` API endpoint
- The DAG Operator will write the records in the order provided by the list (starting with position 0 - assuming a zero-based list)
- The DAG Operator will log the ID of each record and its outcome (success, error) using the XCom logging style used by Manifest Ingestion
- The DAG Operator will not handle Surrogate Keys (or should it?)
## Decision
- Create a common DAG Operator that can batch and write records to the Storage Service's `createOrUpdate` API endpoint.
## Rationale
- This approach will standardize the writing step of ingestion, provide batching for the Storage Service's limit on `createOrUpdate`, and reduce code duplication by creating a reusable DAG Operator.
## Consequences
- No consequences as the DAG Operator is optional. This ADR does not suggest making the use of the generic batch operator a requirement for DAG implementations.
## When to revisit
- N/A
## Tradeoff Analysis - Input to decision
- No tradeoffs as leveraging the DAG Operator is optional. Other ingestion workflows may opt to exclude it from their DAG.
## Decision timeline
Decision ready to be made.https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/56Audit trail shows generic service account and not the actual Data Loader's name2023-08-21T04:36:57ZKateryna Kurach (EPAM)Audit trail shows generic service account and not the actual Data Loader's nameOriginal issue identified by the Pre-shipping team:
https://gitlab.opengroup.org/osdu/subcommittees/ea/projects/pre-shipping/home/-/issues/110
Please see a new record created by “Ingestion workflow” in AWS Pre-ship (R3M4) environment. ...Original issue identified by the Pre-shipping team:
https://gitlab.opengroup.org/osdu/subcommittees/ea/projects/pre-shipping/home/-/issues/110
Please see a new record created by “Ingestion workflow” in AWS Pre-ship (R3M4) environment. The issue is that the real username is not captured. Instead we see a service account name. **[serviceprincipal@testing.com](mailto:serviceprincipal@testing.com)**
GET {{osduonaws_base_url}}/api/storage/v2/records/osdu:master-data--Well:katetest2:mar24
`{"data":{"ResourceSecurityClassification":
"osdu:reference-data--ResourceSecurityClassification:Public:",
"Source":"NL_TNO",
"SpatialLocation":{"Wgs84Coordinates":{"type":"FeatureCollection",
"features":[{"type":"Feature","geometry":{"type":"Point","coordinates":[3.51906683,55.68101428]},
"properties":{}}]}},
"FacilityID":"10110909",
"FacilityTypeID":"osdu:reference-data--FacilityType:WELL:",
"FacilityOperator":[{"FacilityOperatorID":"410464","FacilityOperatorOrganisationID":"osdu:master-data--Organisation:HESS:"}],
"FacilityName":"DC-A05-01",
"FacilityNameAlias":[{"AliasName":"DC-A05-01","AliasNameTypeID":"osdu:reference-data--AliasNameType:WELL_NAME:"}],
"FacilityEvent":[
{"FacilityEventTypeID":"osdu:reference-data--FacilityEventType:SPUD_DATE:",
"EffectiveDateTime":"1999-06-03T00:00:00"}],
"VerticalMeasurements":[{"VerticalMeasurementID":"Kelly Bushing","VerticalMeasurement":36.6,
"VerticalMeasurementPathID":"osdu:reference-data--VerticalMeasurementPath:DEPTH_DATUM_ELEV:"}],
"NameAliases":[],"GeoContexts":[]},"meta":[],
"id":"osdu:master-data--Well:katetest2:mar24",
"version":1616620705209879,
"kind":"osdu:wks:master-data--Well:1.0.0",
"acl":{"viewers":["data.default.viewers@osdu.testing.com"],"owners":["data.default.owners@osdu.testing.com"]},
"legal":{"legaltags":["osdu-public-usa-dataset-1"],"otherRelevantDataCountries":["US"],"status":"compliant"},
"createUser":"serviceprincipal@testing.com",
"createTime":"2021-03-24T21:18:24.936Z",
"modifyUser":"serviceprincipal@testing.com",
"modifyTime":"2021-03-24T21:18:25.223Z"}
`https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/53Errors with running tests for Ingestion DAGs2021-03-23T10:19:51ZBrady Spiva [AWS]Errors with running tests for Ingestion DAGs## Expected behavior:
Unit tests and end-to-end tests should work for core code out of the box.
## Observed behavior:
When attempting to run unit and end-to-end tests for Ingestion DAGs core code, the tests in /plugin-unit-tests/ fail d...## Expected behavior:
Unit tests and end-to-end tests should work for core code out of the box.
## Observed behavior:
When attempting to run unit and end-to-end tests for Ingestion DAGs core code, the tests in /plugin-unit-tests/ fail due to an issue related to Airflow's "variable" table ( see the error text below ).
From researching this issue, it appears that this could be related to needing to initialize the Airflow DB?
```❯ pytest /Users/spivbrad/Documents/aws-osdu-code/os-ingestion-dags/tests/plugin-unit-tests
===================================================== test session starts ======================================================
platform darwin -- Python 3.8.6, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
rootdir: /Users/spivbrad/Documents/aws-osdu-code/os-ingestion-dags
plugins: anyio-2.2.0, dash-1.19.0, mock-3.5.1
collected 79 items / 1 error / 78 selected
============================================================ ERRORS ============================================================
_____________________________ ERROR collecting tests/plugin-unit-tests/test_process_manifest_r2.py _____________________________
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1705: in _execute_context
self.dialect.do_execute(
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:681: in do_execute
cursor.execute(statement, parameters)
E sqlite3.OperationalError: no such table: variable
The above exception was the direct cause of the following exception:
tests/plugin-unit-tests/test_process_manifest_r2.py:27: in <module>
from operators import process_manifest_r2
src/plugins/operators/process_manifest_r2.py:37: in <module>
config.read(Variable.get("core__config__dataload_config_path"))
../../../.local/lib/python3.8/site-packages/airflow/models/variable.py:123: in get
var_val = Variable.get_variable_from_secrets(key=key)
../../../.local/lib/python3.8/site-packages/airflow/models/variable.py:181: in get_variable_from_secrets
var_val = secrets_backend.get_variable(key=key)
../../../.local/lib/python3.8/site-packages/airflow/utils/session.py:65: in wrapper
return func(*args, session=session, **kwargs)
../../../.local/lib/python3.8/site-packages/airflow/secrets/metastore.py:66: in get_variable
var_value = session.query(Variable).filter(Variable.key == key).first()
../../../.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py:2695: in first
return self.limit(1)._iter().first()
../../../.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py:2779: in _iter
result = self.session.execute(
../../../.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1653: in execute
result = conn._execute_20(statement, params or {}, execution_options)
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1520: in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
../../../.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py:313: in _execute_on_connection
return connection._execute_clauseelement(
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1389: in _execute_clauseelement
ret = self._execute_context(
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1748: in _execute_context
self._handle_dbapi_exception(
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1929: in _handle_dbapi_exception
util.raise_(
../../../.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:198: in raise_
raise exception
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1705: in _execute_context
self.dialect.do_execute(
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:681: in do_execute
cursor.execute(statement, parameters)
E sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: variable
E [SQL: SELECT variable.val AS variable_val, variable.id AS variable_id, variable."key" AS variable_key, variable.is_encrypted AS variable_is_encrypted
E FROM variable
E WHERE variable."key" = ?
E LIMIT ? OFFSET ?]
E [parameters: ('core__config__dataload_config_path', 1, 0)]
E (Background on this error at: http://sqlalche.me/e/14/e3q8)
=================================================== short test summary info ====================================================
ERROR tests/plugin-unit-tests/test_process_manifest_r2.py - sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no su...
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 1 error during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
======================================================= 1 error in 1.18s =======================================================
```
## A recommended solution:
The [README](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/tree/master/tests) for the tests mention a docker image that is used for testing. 1) could this image be made available for cloud providers to run these tests in a consistent environment? 2) could we alter the dockerfile used to run the `unit_tests.sh`, `set_airflow_env.sh`, and `test_dags.sh` scripts?
Is this feasible?JoeSiarhei Khaletski (EPAM)Kateryna Kurach (EPAM)Alan HensonJoehttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/51Ingestion workflow - provide option to enable, disable validation checks (ref...2022-12-09T12:25:27ZDebasis ChatterjeeIngestion workflow - provide option to enable, disable validation checks (referenced information)Starting with recent version of "Ingestion Workflow", we see that integrity checks are enabled. This is very useful.
However, please provide option of setting true/false for "integrity checks" to the end user (Data Loader) who may or ma...Starting with recent version of "Ingestion Workflow", we see that integrity checks are enabled. This is very useful.
However, please provide option of setting true/false for "integrity checks" to the end user (Data Loader) who may or may not have required skills to tweak python code inside the DAG. Default can be left as "True".
Excerpt from Airflow Log (GCP environment) is shown below for quick reference -
```
[2021-03-18 17:45:08,504] {base_task_runner.py:113} INFO - Job 27967: Subtask provide_manifest_integrity_task [2021-03-18 17:45:08,503]
{validate_referential_integrity.py:156} DEBUG - Extracted reference ids:
['osdu:reference-data--AliasNameType:WELL_NAME',
'osdu:reference-data--VerticalMeasurementPath:DEPTH_DATUM_ELEV',
'osdu:reference-data--ResourceSecurityClassification:Public',
'osdu:reference-data--FacilityEventType:SPUD_DATE',
'osdu:reference-data--FacilityType:WELLBLABLA',
'osdu:master-data--Organisation:HESS']
```
In this example, all checks failed as the environment lacked standard Reference values at the time of this run. Else, I would only expect one reference check to fail (FacilityType = "WELLBLABLA" instead of "WELL").
```
[2021-03-18 17:45:44,405] {base_task_runner.py:113} INFO - Job 27967: Subtask provide_manifest_integrity_task [2021-03-18 17:45:44,405]
{validate_referential_integrity.py:177} WARNING - The next ids are absent in the system:
['osdu:reference-data--FacilityType:WELLBLABLA',
'osdu:reference-data--FacilityEventType:SPUD_DATE',
'osdu:reference-data--ResourceSecurityClassification:Public',
'osdu:reference-data--VerticalMeasurementPath:DEPTH_DATUM_ELEV',
'osdu:master-data--Organisation:HESS',
'osdu:reference-data--AliasNameType:WELL_NAME']
[2021-03-18 17:45:44,413] {base_task_runner.py:113} INFO - Job 27967: Subtask provide_manifest_integrity_task [2021-03-18 17:45:44,411]
{validate_referential_integrity.py:231} WARNING - Resource with kind odesprod:wks:master-data--Well:1.0.0 was rejected
```https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/46Need common DAG deployment framework2021-06-14T16:32:28ZAlan HensonNeed common DAG deployment frameworkAt present, we lack a standard framework for deploying DAGs. There are multiple perspectives to consider:
- The deployment of code, which must also consider dependency conflicts
- The registering of DAGs. Presently, this is facilitated ...At present, we lack a standard framework for deploying DAGs. There are multiple perspectives to consider:
- The deployment of code, which must also consider dependency conflicts
- The registering of DAGs. Presently, this is facilitated via the latest Workflow APIs
Note some approaches in use today:
- CI/CD pipelines used to deploy code for security purposes
- IBM using Git Sync process for deploying DAGs
Considerations:
- Standardizing DAG Operator deployment with things like the Kubernetes Pod Operatorhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/40Create common ingestion Python library2021-06-14T16:47:49ZAlan HensonCreate common ingestion Python libraryCreate a new common Python library for common ingestion code. Code that lives in here is specific to logic commonly found in ingestion workflows. Similar to other common libraries, the core code should be cloud-agnostic with provider imp...Create a new common Python library for common ingestion code. Code that lives in here is specific to logic commonly found in ingestion workflows. Similar to other common libraries, the core code should be cloud-agnostic with provider implementation areas where cloud-specific code should go.
Possible examples of items to include:
- Utilities
- Validation logic
- Logginghttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/39Create common ingestion Java library2021-06-14T16:47:56ZAlan HensonCreate common ingestion Java libraryCreate a new common Java library for common ingestion code. Code that lives in here is specific to logic commonly found in ingestion workflows. Similar to other common libraries, the core code should be cloud-agnostic with provider imple...Create a new common Java library for common ingestion code. Code that lives in here is specific to logic commonly found in ingestion workflows. Similar to other common libraries, the core code should be cloud-agnostic with provider implementation areas where cloud-specific code should go.
Possible examples of items to include:
- Utilities
- Validation logic
- Logginghttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/38Refactor reusable Java logic from Ingestion DAGs to common Java ingestion lib...2021-06-14T16:48:47ZAlan HensonRefactor reusable Java logic from Ingestion DAGs to common Java ingestion libraryIngestion DAGs have functionality baked into the Python code that should be refactored into an OSDU Python ingestion library.
To deliver on this issue, determine what functionality should be refactored from the Manifest, Energistics, a...Ingestion DAGs have functionality baked into the Python code that should be refactored into an OSDU Python ingestion library.
To deliver on this issue, determine what functionality should be refactored from the Manifest, Energistics, and CSV DAGs and create the corresponding issues to capture that work.https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/14Work Product Component - Load the file plus manifest2021-01-25T23:38:14ZMeena RathinavelWork Product Component - Load the file plus manifest- Check status till "finished" or "failed"
- Return some overall status "finished' or "failed"- Check status till "finished" or "failed"
- Return some overall status "finished' or "failed"M1 - Release 0.1Clifford PattersonJames O'BoyleRohit KurhekarClifford Pattersonhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/2Please explore Apache Airflow limitations to finalize design for R32021-03-02T12:48:31ZRaj KannanPlease explore Apache Airflow limitations to finalize design for R3There have been a few concerns raised by Airflow as a generic DAG/orchestration solution for data flow in OSDU. It would be good to capture these issues here and to respond back with observations/solutions so the decision decision can be...There have been a few concerns raised by Airflow as a generic DAG/orchestration solution for data flow in OSDU. It would be good to capture these issues here and to respond back with observations/solutions so the decision decision can be properly captured.
1. Airflow is cloud-native only in GCP, which can make it cumbersome to host in other CSPs where the management of the infrastructure becomes a platform/operator responsibility unlike PaaS solutions.
1. With Airflow, it will be quite hard to isolate workflows as the workflows are within the same execution environment. As OSDU approaches "OSDU SaaS" and OSDU for smaller operators where it may be hosted by a SI or CSP, this can make it challenging for multi-tenant deployments.
1. Airflow DAGs are python only and some parsers and libraries can be Java or C++. Just as a comparison something like Argo which is kubernetes based could help have worksteps in different language/environments as each becomes a separate container instance rather than a python script.
1. Airflow apparently has an execution delay between tasks - it is unclear if this is a framework limitation or specific experience of a setup, but perhaps worth capturing to analyze.
1. Similarly there are concerns about temporary state/data and an intermediary persistence to hold across DAG worksteps. Beyond what can be held in memory, does Airflow provide a persistable temporary cache for such state?
1. Is the Airflow DSL cumbersome to author for ingestion/enrichment workflow providers (ISVs, SIs, operators). In comparison to YAML or other alternatives is this a good choice.
Once the elaboration work is complete, kindly capture this as a LADR for the Data flow project. Thanks for the advice on these issues.M1 - Release 0.1Ferris ArgyleFerris Argylehttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/1Missing clear test strategy and test framework on Airflow DAG development2021-06-14T16:07:20ZWei SunMissing clear test strategy and test framework on Airflow DAG developmentNeed the clarity for DAG test framework (unit test and integration test) to ensure the code quality and no DAG broken.Need the clarity for DAG test framework (unit test and integration test) to ensure the code quality and no DAG broken.Ferris ArgyleDania Kodeih (Microsoft)Wladmir FrazaoJoeDmitriy RudkoAlan BrazKateryna Kurach (EPAM)Ferris Argyle