Skip to content
Snippets Groups Projects
Commit f2efbffd authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

Merge branch 'trusted-r3_documentation_updated' into 'master'

GONRG-2008: Documentation has been updated

See merge request !36
parents 9a251e54 e547ec5e
No related branches found
Tags v0.8.0
1 merge request!36GONRG-2008: Documentation has been updated
Pipeline #35691 passed
...@@ -18,22 +18,22 @@ ...@@ -18,22 +18,22 @@
* * [Logging Configuration](#logging-configuration) * * [Logging Configuration](#logging-configuration)
* * [Remote Logger](#remote-logger) * * [Remote Logger](#remote-logger)
* [DAGs Description](#dags-description) * [DAGs Description](#dags-description)
* * [Opaque Ingestion DAG](#opaque-ingestion-dag) * * [Osdu_ingest](#osdu_ingest)
* * [Manifest Ingestion DAG](#manifest-ingestion-dag) * * [Osdu_ingest R2](#osdu_ingest-r2)
* [Operators Description](#operators-description) * [Operators Description](#operators-description)
* * [Workflow Status Operator](#workflow-status-operator) * * [Workflow Status Operator](#workflow-status-operator)
* * [Stale Jobs Scheduler](#stale-jobs-scheduler) * * [Validate Manifest Schema Operator](#validate-manifest-schema-operator)
* * [Workflow Finished Sensor operator](#workflow-finished-sensor-operator) * * [Ensure Manifest Integrity Operator](#ensure-manifest-integrity-operator)
* * [Process Manifest Operator](#process-manifest-operator)
* [Licence](#licence) * [Licence](#licence)
## Introduction ## Introduction
The project is a set of Apache Airflow DAGs implementations to orchestrate data ingestion within OSDU platform. The project is a set of Apache Airflow DAGs implementations to orchestrate data ingestion within OSDU platform.
The following DAGs are implemented: The following DAGs are implemented:
* Osdu_ingest - R3 Manifest Ingestion DAG * Osdu_ingest - R3 Manifest Ingestion DAG
* Osdu_ingest_r2 - R2 Manifest Ingestion DAG * Osdu_ingest_r2 - R2 Manifest Ingestion DAG (deprecated)
## Deployment ## Deployment
...@@ -129,62 +129,19 @@ chmod +x tests/unit_tests.sh && tests/./unit_tests.sh ...@@ -129,62 +129,19 @@ chmod +x tests/unit_tests.sh && tests/./unit_tests.sh
### Logging Configuration ### Logging Configuration
As Airflow initializes the logging configuration at startup, it implies that all logger configuration settings will be kept in `airflow.cfg` file within your project or will be specified as environment variables according to the [Airflow specification](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/set-config.html). As Airflow initializes the logging configuration at startup, it implies that all logger configuration settings will be kept in `airflow.cfg` file within your project or will be specified as environment variables according to the [Airflow specification](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/set-config.html).
Look Airflow logging properties in the [documentaton](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html) Look Airflow logging properties in the [documentation](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html)
### Remote Logger ### Remote Logger
For GCP Composer `google_cloud_default` connection is used by default to store all logs into GCS bucket. For GCP Composer `google_cloud_default` connection is used by default to store all logs into GCS bucket.
Look Airflow [documentaton](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html) documentation for AWS, Azure. Look Airflow [documentation](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html) documentation for AWS, Azure.
## DAGs Description ## DAGs Description
### Opaque Ingestion DAG ### Osdu_ingest
R3 manifest processing with providing integrity.
The Opaque Ingestion DAG performs ingestion of OSDU opaque data type. The following diagram shows the workflow of the The DAG can process batch of manifests or single manifest. The logic to manage of direction is implemented inside the DAG by `check_payload_type` task.
Opaque Ingestion DAG.
![OSDU R2 Opaque Ingestion DAG](https://user-images.githubusercontent.com/21691607/77777705-9c4dd180-7058-11ea-97c7-9e0deb9d2a87.png)
The Opaque Ingestion DAG flow:
1. Call the Workflow Status Operator with the **running** status.
* Workflow Status Operator queries the Workflow service's **/updateWorkflowStatus** API endpoint with the
**running** status, and then returns the control back to the Opaque Ingestion DAG.
2. Query the Storage service's **/createOrUpdateRecord** API endpoint to create a record for the file.
* The ODES Storage service makes a call to ODES Indexer and returns to the DAG.
3. Call the Workflow Status Operator with the **finished** status.
* The Workflow Status Operator queries the Workflow service's **/updateWorkflowStatus** endpoint to set the workflow
status to **finished** in the database.
### Manifest Ingestion DAG
The Manifest Ingestion DAG ingests multiple files with their metadata provided in an OSDU manifest. The following
diagram demonstrates the workflow of the Manifest
Ingestion DAG.
![OSDU R2 Manifest Ingestion DAG](https://user-images.githubusercontent.com/21691607/77666377-8cb38780-6f89-11ea-97b4-57abf507ca5a.png)
Upon an execution request:
1. Invoke the Workflow Status Operator to set the new status for the workflow.
* The Workflow Status Operator queries the Workflow service's **/updateWorkflowStatus** API endpoint with the
**running** status.
2. Obtain the Work Product Components associated with the Work Product.
* For each Work Product Component, find all associated OSDU Files. For each File in the manifest:
* Start the **ingest** workflow. Call the Workflow service's **/startWorkflow** API endpoint the **ingest**
workflow type.
> The Workflow Finished Sensor operator polls the DAG execution and notifies the DAG to start ingestion of the
> next file.
* Once all Files for the current Work Product Component are ingested, query the Storage service's
**/CreateOrUpdatedRecord** API endpoint to create a record for the current Work Product Component.
* Once all Work Product Components and Files are ingested, switch to the third step.
3. Create a new record for the Work Product.
* Query the Storage service's **/CreateOrUpdateRecord** API endpoint and pass it the Work Product.
4. Search the records by metadata.
* Query the Storage service's **/listRecords** API to obtain the records by metadata.
5. Enrich the records with data from the manifest.
* Query the Storage service's **/UpdateRecord** API endpoint and pass it the metadata from the manifest.
> Only file records are updated.
6. Invoke the Workflow Status Operator with the **finished** job status.
* The Workflow Status Operator queries the Workflow service to set the new workflow status.
### Osdu_ingest R2
**Note:** The DAG is deprecated.
## Operators Description ## Operators Description
### Workflow Status Operator ### Workflow Status Operator
...@@ -192,27 +149,39 @@ The Workflow Status Operator is an Airflow operator callable from each DAG. It's ...@@ -192,27 +149,39 @@ The Workflow Status Operator is an Airflow operator callable from each DAG. It's
of a workflow job and then update the workflow record in the database. Each DAG in the system has to invoke the Workflow of a workflow job and then update the workflow record in the database. Each DAG in the system has to invoke the Workflow
Status Operator to update the workflow status. Status Operator to update the workflow status.
This operator isn't designed to directly update the status in the database, and it queries the OSDU R2 Workflow This operator isn't designed to directly update the status in the database, and it queries the OSDU Workflow
service's API endpoint. Once the operator sends a request to update status, it cedes control back to the DAG. service's API endpoint. Once the operator sends a request to update status, it cedes control back to the DAG.
### Stale Jobs Scheduler ### Validate Manifest Schema Operator
The Stale Jobs Scheduler is designed to query Apache Airflow to find out any stale workflow jobs, that is, the jobs that The Workflow Status Operator is an Airflow operator to process manifest against the schemas definitions from Schema Service.
failed during execution but which status wasn't updated to **failed** in the database. The operator's logic is split by two steps:
1. Common schema validation (without references check)
2. Ensure manifest schema integrity (all references are resolving and validates against its schemas)
**Note:** All invalid values will be evicted from the manifest and logged.
The operator output is a new manifest with only valid manifest data (stores in Airflow XCOM).
```json
{"manifest": <OUTPUT>}
```
This operator queries the Airflow API every N minutes to verify that the workflow jobs that do not have the _finished_ ### Ensure Manifest Integrity Operator
status are still running. If a workflow job has failed in Airflow, the Stale Jobs Scheduler will set this workflow job Operator to validate ref inside manifest R3 and remove invalid entities.
status to **failed** in the database. This operator is responsible for the parent - child validation.
All orphan-like entity will be logged and excluded from the validated manifest .
The Stale Jobs Scheduler workflow: The operator output is a new manifest with only valid manifest data (stores in Airflow XCOM).
```json
{"manifest": <OUTPUT>}
```
1. Query the database to find all workflow records with the _submitted_ or _running_ statuses. ### Process Manifest Operator
2. Query Airflow to verify the status of the submitted or running workflow jobs. Operator to process manifest (single manifest file or a list of them). The processing includes validation against schemas, storing records etc.
3. If Airflow returns the failed status for a workflow job, query Firestore to set the workflow status to FAILED.
### Workflow Finished Sensor Operator The operator output is a set of ingested records ids (stores in Airflow XCOM).
The Workflow Finished Sensor operator is a special type of operator that monitors ingestion of a file during the "osdu" ```json
ingestion workflow. Once a file is ingested, this operator notifies the DAG, which then starts ingestion of the next {"record_ids": [<SET_OF_INGESTED_RECORDS>]}
file in the manifest. ```
[os-python-sdk]: ../os-python-sdk [os-python-sdk]: ../os-python-sdk
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment