Manifest Ingestion DAG issueshttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues2021-06-14T16:55:26Zhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/34Dataset of type file should be saved using file service API instead of storag...2021-06-14T16:55:26ZKishore BattulaDataset of type file should be saved using file service API instead of storage serviceWhen using a manifest with WP, WPC and Dataset, instead of preloadedFilePath we can use FileSource. Below are steps I followed to create a manifest using FileSource by uploading the file to file service.
- Generate a signed URL using fi...When using a manifest with WP, WPC and Dataset, instead of preloadedFilePath we can use FileSource. Below are steps I followed to create a manifest using FileSource by uploading the file to file service.
- Generate a signed URL using file service URL
- Upload the contents of the file using signed URL
- Refer the FileSource received from get signed URL in manifest
- Trigger manifest ingestion
- Get the metadata of the file uploaded
- Generated signed url to download the file content
I got the URL but when trying to access the file it is failing with blob not found exception. The reason being manifest ingestion dag creates the metadata through storage service which doesn't copy the file from staging area to persistent area.
The manifest ingestion DAG must use file service to upload Dataset of type File so that the file gets copied to persistent area.
We must also explore if the Manifest Ingestion should facilitate source data movement or if uploading data to the OSDU persistent zone is a pre-load activity. Given the robustness of the Dataset Service, it is not realistic to expect the Manifest Ingestion workflow to handle all dataset types. There is also the consideration of whether Manifest Ingestion should handle a subset of Dataset types (i.e., files and file collections).Kateryna Kurach (EPAM)Alan HensonKateryna Kurach (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/38Refactor reusable Java logic from Ingestion DAGs to common Java ingestion lib...2021-06-14T16:48:47ZAlan HensonRefactor reusable Java logic from Ingestion DAGs to common Java ingestion libraryIngestion DAGs have functionality baked into the Python code that should be refactored into an OSDU Python ingestion library.
To deliver on this issue, determine what functionality should be refactored from the Manifest, Energistics, a...Ingestion DAGs have functionality baked into the Python code that should be refactored into an OSDU Python ingestion library.
To deliver on this issue, determine what functionality should be refactored from the Manifest, Energistics, and CSV DAGs and create the corresponding issues to capture that work.https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/39Create common ingestion Java library2021-06-14T16:47:56ZAlan HensonCreate common ingestion Java libraryCreate a new common Java library for common ingestion code. Code that lives in here is specific to logic commonly found in ingestion workflows. Similar to other common libraries, the core code should be cloud-agnostic with provider imple...Create a new common Java library for common ingestion code. Code that lives in here is specific to logic commonly found in ingestion workflows. Similar to other common libraries, the core code should be cloud-agnostic with provider implementation areas where cloud-specific code should go.
Possible examples of items to include:
- Utilities
- Validation logic
- Logginghttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/40Create common ingestion Python library2021-06-14T16:47:49ZAlan HensonCreate common ingestion Python libraryCreate a new common Python library for common ingestion code. Code that lives in here is specific to logic commonly found in ingestion workflows. Similar to other common libraries, the core code should be cloud-agnostic with provider imp...Create a new common Python library for common ingestion code. Code that lives in here is specific to logic commonly found in ingestion workflows. Similar to other common libraries, the core code should be cloud-agnostic with provider implementation areas where cloud-specific code should go.
Possible examples of items to include:
- Utilities
- Validation logic
- Logginghttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/46Need common DAG deployment framework2021-06-14T16:32:28ZAlan HensonNeed common DAG deployment frameworkAt present, we lack a standard framework for deploying DAGs. There are multiple perspectives to consider:
- The deployment of code, which must also consider dependency conflicts
- The registering of DAGs. Presently, this is facilitated ...At present, we lack a standard framework for deploying DAGs. There are multiple perspectives to consider:
- The deployment of code, which must also consider dependency conflicts
- The registering of DAGs. Presently, this is facilitated via the latest Workflow APIs
Note some approaches in use today:
- CI/CD pipelines used to deploy code for security purposes
- IBM using Git Sync process for deploying DAGs
Considerations:
- Standardizing DAG Operator deployment with things like the Kubernetes Pod Operatorhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/1Missing clear test strategy and test framework on Airflow DAG development2021-06-14T16:07:20ZWei SunMissing clear test strategy and test framework on Airflow DAG developmentNeed the clarity for DAG test framework (unit test and integration test) to ensure the code quality and no DAG broken.Need the clarity for DAG test framework (unit test and integration test) to ensure the code quality and no DAG broken.Ferris ArgyleDania Kodeih (Microsoft)Wladmir FrazaoJoeDmitriy RudkoAlan BrazKateryna Kurach (EPAM)Ferris Argylehttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/59Develop log formatter for Airflow logs2021-04-28T15:31:18ZKateryna Kurach (EPAM)Develop log formatter for Airflow logsWe had an OSDU community sync whether the problem of log formatting was discussed. It was decided that repository will be created where developers will provide formatters and handlers for standard 'logging' python library.
https://commun...We had an OSDU community sync whether the problem of log formatting was discussed. It was decided that repository will be created where developers will provide formatters and handlers for standard 'logging' python library.
https://community.opengroup.org/osdu/platform/system/lib/core/python-core-common - repositoryhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/53Errors with running tests for Ingestion DAGs2021-03-23T10:19:51ZBrady Spiva [AWS]Errors with running tests for Ingestion DAGs## Expected behavior:
Unit tests and end-to-end tests should work for core code out of the box.
## Observed behavior:
When attempting to run unit and end-to-end tests for Ingestion DAGs core code, the tests in /plugin-unit-tests/ fail d...## Expected behavior:
Unit tests and end-to-end tests should work for core code out of the box.
## Observed behavior:
When attempting to run unit and end-to-end tests for Ingestion DAGs core code, the tests in /plugin-unit-tests/ fail due to an issue related to Airflow's "variable" table ( see the error text below ).
From researching this issue, it appears that this could be related to needing to initialize the Airflow DB?
```❯ pytest /Users/spivbrad/Documents/aws-osdu-code/os-ingestion-dags/tests/plugin-unit-tests
===================================================== test session starts ======================================================
platform darwin -- Python 3.8.6, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
rootdir: /Users/spivbrad/Documents/aws-osdu-code/os-ingestion-dags
plugins: anyio-2.2.0, dash-1.19.0, mock-3.5.1
collected 79 items / 1 error / 78 selected
============================================================ ERRORS ============================================================
_____________________________ ERROR collecting tests/plugin-unit-tests/test_process_manifest_r2.py _____________________________
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1705: in _execute_context
self.dialect.do_execute(
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:681: in do_execute
cursor.execute(statement, parameters)
E sqlite3.OperationalError: no such table: variable
The above exception was the direct cause of the following exception:
tests/plugin-unit-tests/test_process_manifest_r2.py:27: in <module>
from operators import process_manifest_r2
src/plugins/operators/process_manifest_r2.py:37: in <module>
config.read(Variable.get("core__config__dataload_config_path"))
../../../.local/lib/python3.8/site-packages/airflow/models/variable.py:123: in get
var_val = Variable.get_variable_from_secrets(key=key)
../../../.local/lib/python3.8/site-packages/airflow/models/variable.py:181: in get_variable_from_secrets
var_val = secrets_backend.get_variable(key=key)
../../../.local/lib/python3.8/site-packages/airflow/utils/session.py:65: in wrapper
return func(*args, session=session, **kwargs)
../../../.local/lib/python3.8/site-packages/airflow/secrets/metastore.py:66: in get_variable
var_value = session.query(Variable).filter(Variable.key == key).first()
../../../.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py:2695: in first
return self.limit(1)._iter().first()
../../../.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py:2779: in _iter
result = self.session.execute(
../../../.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1653: in execute
result = conn._execute_20(statement, params or {}, execution_options)
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1520: in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
../../../.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py:313: in _execute_on_connection
return connection._execute_clauseelement(
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1389: in _execute_clauseelement
ret = self._execute_context(
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1748: in _execute_context
self._handle_dbapi_exception(
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1929: in _handle_dbapi_exception
util.raise_(
../../../.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:198: in raise_
raise exception
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1705: in _execute_context
self.dialect.do_execute(
../../../.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:681: in do_execute
cursor.execute(statement, parameters)
E sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: variable
E [SQL: SELECT variable.val AS variable_val, variable.id AS variable_id, variable."key" AS variable_key, variable.is_encrypted AS variable_is_encrypted
E FROM variable
E WHERE variable."key" = ?
E LIMIT ? OFFSET ?]
E [parameters: ('core__config__dataload_config_path', 1, 0)]
E (Background on this error at: http://sqlalche.me/e/14/e3q8)
=================================================== short test summary info ====================================================
ERROR tests/plugin-unit-tests/test_process_manifest_r2.py - sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no su...
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 1 error during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
======================================================= 1 error in 1.18s =======================================================
```
## A recommended solution:
The [README](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/tree/master/tests) for the tests mention a docker image that is used for testing. 1) could this image be made available for cloud providers to run these tests in a consistent environment? 2) could we alter the dockerfile used to run the `unit_tests.sh`, `set_airflow_env.sh`, and `test_dags.sh` scripts?
Is this feasible?JoeSiarhei Khaletski (EPAM)Kateryna Kurach (EPAM)Alan HensonJoehttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/2Please explore Apache Airflow limitations to finalize design for R32021-03-02T12:48:31ZRaj KannanPlease explore Apache Airflow limitations to finalize design for R3There have been a few concerns raised by Airflow as a generic DAG/orchestration solution for data flow in OSDU. It would be good to capture these issues here and to respond back with observations/solutions so the decision decision can be...There have been a few concerns raised by Airflow as a generic DAG/orchestration solution for data flow in OSDU. It would be good to capture these issues here and to respond back with observations/solutions so the decision decision can be properly captured.
1. Airflow is cloud-native only in GCP, which can make it cumbersome to host in other CSPs where the management of the infrastructure becomes a platform/operator responsibility unlike PaaS solutions.
1. With Airflow, it will be quite hard to isolate workflows as the workflows are within the same execution environment. As OSDU approaches "OSDU SaaS" and OSDU for smaller operators where it may be hosted by a SI or CSP, this can make it challenging for multi-tenant deployments.
1. Airflow DAGs are python only and some parsers and libraries can be Java or C++. Just as a comparison something like Argo which is kubernetes based could help have worksteps in different language/environments as each becomes a separate container instance rather than a python script.
1. Airflow apparently has an execution delay between tasks - it is unclear if this is a framework limitation or specific experience of a setup, but perhaps worth capturing to analyze.
1. Similarly there are concerns about temporary state/data and an intermediary persistence to hold across DAG worksteps. Beyond what can be held in memory, does Airflow provide a persistable temporary cache for such state?
1. Is the Airflow DSL cumbersome to author for ingestion/enrichment workflow providers (ISVs, SIs, operators). In comparison to YAML or other alternatives is this a good choice.
Once the elaboration work is complete, kindly capture this as a LADR for the Data flow project. Thanks for the advice on these issues.M1 - Release 0.1Ferris ArgyleFerris Argylehttps://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/10File structure updates. Airflow pluggable approach2021-02-09T23:17:24ZSiarhei Khaletski (EPAM)File structure updates. Airflow pluggable approach## Change Type:
- [x] Feature
- [ ] Bugfix
- [ ] Refactoring
## Context and Scope
Existing code base has a few disadvantages:
1. there is no any standardized approach to common modules close to the dags
2. it is not module struct, i.e....## Change Type:
- [x] Feature
- [ ] Bugfix
- [ ] Refactoring
## Context and Scope
Existing code base has a few disadvantages:
1. there is no any standardized approach to common modules close to the dags
2. it is not module struct, i.e. the code base can't be split on independent modules
This ADR proposes a vision how to make the project more plugable, i.e. this is an attempt to standardize the code base and vendors modules including.
Within the Decision section the next topics will be discovered:
1. Airflow project structure update
2. How to plug in the local python packages
3. Сaveats about Airflow internals
Furthermore, it should be noted the proposal implies two flows of improvement:
1. Strategic (late R3, post R3)
- Multiple API for deployments
* Operators (reusable components)
* DAG
* Libs
Developed by number of vendors libraries hosted on a platform. DAGs composing, for instance within UI and sends against API endpoint to be processed.
2. Immediate needs (R3)
- Single endpoint / approach for module code deployment
* DAGs
* Plugins
The second case is disclosed in the proposal bellow.
## Decision
#### Vendors contribution
Proposed approach will allow the next:
1. Each vendor can keep their code in separate repository
2. Vendors could contribute to core functionality
3. Vendors Ingestion extensions will be in a separate Git repository
This is following the steps above each vendor will can develop own extensions separately and just delivers when it needed.
The repositories can take the following representation:
~~~
/IngestionDAGs.git #ingestion core functionality
/Venror1.git
/Vendor2.git
~~~
Some caveats follows:
1. Extensions repositories must should proposed code structure (see above)
2. There is a list of supported libraries that should be updated by Operator. Version of libraries should be documented by CSPs
#### Code structure update
Our proposal is to split the current code base according to the next structure:
~~~
src/
├── dags/
│ ├── commons/
│ └── common_utils.py #for instance common functions to prepare DAG params/constants
│ ├── vendor_1/
│ │ ├── libs/
│ │ └── utils.py # the vendor utilities/functions
│ │ └── dag.py # the vendor DAGs here
│ └── vendor_2/
│ ├── libs/
│ ...
├── plugins/
│ ├── commons/
│ └── common_utils.py #for instance common functions to prepare operators params/constants
│ ├── vendor_1/
│ │ ├── libs/
│ │ └── utils.py # the vendor utilities/functions
│ │ ├── operators/ # the vendor operators here
│ │ ├── hooks/ # the vendor hooks here
│ │ └── ...
│ └── vendor_2/
│ ...
tests/
│ └── module (or vendor)
└── requirements.txt
~~~
Let's look deeper on the structure.
All the code will be split by modules or vendor folders. The folders will contain a separate libs and dags folders. The dags folder can do hold DAG- files and sub-folders with DAGs as well. Into libs folder can hold utils modules etc.
The tests will hold unit and integration tests that split by module or vendor.
The plugins folder will be split by modules or vendors too. Files from the directory have to follow the Airflow Plugins convention. We propose to use the next approach:
~~~
...
plugins/
└── vendor_1/
├── commons
└── vendor_utils.py
├── operators
└── vendor_operator.py
├── hooks
└── vendor_hook.py
├── macros
├── ...
└── __init__.py
~~~
#### Using of Airflow Plugins Mechanism
Airflow has a builtin plugins system that requires to create AirflowPlugin instances. This however, overcomplicates the issue and leads to confusion for many people. Airflow is even considering deprecating using the Plugins mechanism for hooks and operators going forward.
**(!) According to the document the Plugins mechanism still must be used only for plugins that make changes to the webserver UI.**
How it works:
Let’s assume you have an Airflow Home directory with the following structure.
**(!) We will assume that vendor name is vnd**
~~~
vnd/
├── commons
└── dags
└── vnd_dag.py
plugins/
└── vnd/
├── operators
└── vnd_operator.py
├── hooks
└── vnd_hook.py
├── sensors
└── vnd_sensor.py
└── __init__.py
~~~
The _vnd_dag_ wants to use _vnd_operator_ and _vnd_sensor_. Also, _vnd_operator_ wants to use **vnd_hook**. When Airflow is running, it will add _DAGS_FOLDER_, _PLUGINS_FOLDER_, and _config/_ to _PATH_. So any python files in those folders should be accessible to import. So from our _vnd_dag.py_ file, we can simply use
```python
from vnd.operators.my_operator import MyOperator
from vnd.sensors.my_sensor import MySensor
```
Since _plugins_ directory from a bucket root was added into _PATH_, therefore the imports above start from a vendor module name.
**(!) Due to internals of the Airflow it strongly not recommended to put many files into _dags/commons_, _plugins/commons_. We recommend to install that as a package by _pip_**
## Rational
Some of vendors provided their parsers. It was hard to be just plug-and-run. There were a lot of questions where to put the parsers, how to import and use for operators.
Because of an absent of any common approach and documentation, external modules can be cause to runtime errors.
## Consequences
1. MR with updated code base has to be created
2. README.md has to has information about the structure and conventions.Dmitriy RudkoDmitriy Rudko2020-10-20https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/14Work Product Component - Load the file plus manifest2021-01-25T23:38:14ZMeena RathinavelWork Product Component - Load the file plus manifest- Check status till "finished" or "failed"
- Return some overall status "finished' or "failed"- Check status till "finished" or "failed"
- Return some overall status "finished' or "failed"M1 - Release 0.1Clifford PattersonJames O'BoyleRohit KurhekarClifford Patterson