Skip to content
Snippets Groups Projects

README.md updates (GONRG-756)

Merged Siarhei Khaletski (EPAM) requested to merge readme_updates into master
1 file
+ 111
22
Compare changes
  • Side-by-side
  • Inline
+ 111
22
# OSDU R2 DAGs
# Ingestion DAGs
## Contents
* [Introduction](#introduction)
* [Opaque Ingestion DAG](#opaque-ingestion-dag)
* [Manifest Ingestion DAG](#manifest-ingestion-dag)
* [DAG implementation details](#dag-implementation-details)
* [Workflow Status Operator](#workflow-status-operator)
* [Stale Jobs Scheduler](#stale-jobs-scheduler)
* [Workflow Finished Sensor operator](#workflow-finished-sensor-operator)
* [Deployment](#deployment)
* * [GCP Composer](#gcp-composer)
* * [Installing Python Dependencies](#installing-python-dependencies)
* [DAG Implementation Details](#dag-implementation-details)
* [Required Variables](#required-variables)
* * [Internal Services](#internal-services)
* * [Configuration](#configuration)
* * [OSDU Python SDK](#osdu-python-sdk)
* [Testing](#testing)
* * [Running Unit Tests](#running-unit-tests)
* * [Running E2E Tests](#running-e2e-tests)
* [DAGs Description](#dags-description)
* * [Opaque Ingestion DAG](#opaque-ingestion-dag)
* * [Manifest Ingestion DAG](#manifest-ingestion-dag)
* [Operators Description](#operators-description)
* * [Workflow Status Operator](#workflow-status-operator)
* * [Stale Jobs Scheduler](#stale-jobs-scheduler)
* * [Workflow Finished Sensor operator](#workflow-finished-sensor-operator)
* [Licence](#licence)
## Introduction
@@ -23,7 +37,73 @@ The Workflow Engine encompasses the following components:
* Stale Jobs Scheduler
* Workflow Finished Sensor Operator
## Opaque Ingestion DAG
## Deployment
### GCP Composer
GCP provides Cloud Composer a fully managed workflow orchestration service built on Apache Airflow.
To deploy the Ingestion DAGs on GCP Cloud Composer just upload files from */src* folder into *DAGS_FOLDER* and *PLUGINS_FOLDER* accordingly into the DAG bucket that provided by Composer environment. [More info in documentation.](https://cloud.google.com/composer/docs/quickstart#uploading_the_dag_to)
*DAGS_FOLDER* and *FLUGINS_FOLDER* are setting up in airflow.cfg file.
According to the [DAG implementation details](#dag-implementation-details) need to put [osdu_api] directory into the *DAGS_FOLDER*. Moreover, all required variables have to be set in Airflow meta store by Variables mechanism. [List of the required variables](#required-variables).
### Installing Python Dependencies
Environment dependencies might be installed by several ways:
1. Installing a Python dependency from PyPI. Cloud Composer picks up *requirements.txt* file from the DAGs bucket.
2. Setting up an environment into the Cloud Composer Console.
3. Installing local Python library. Put your dependencies into *DAG_FOLDER/libs* directory. Airflow automatically adds *DAG_FOLDER* and *PLUGINS_FOLDER* to the *PATH*.
## DAG Implementation Details
OSDU DAGs are cloud platform-agnostic by design. However, there are specific implementation requirements by cloud
platforms, and the OSDU R2 Prototype provides a dedicated Python SDK to make sure that DAGs are independent from the
cloud platforms. This Python SDK is located in a separate [os-python-sdk] folder.
## Required Variables
### Internal Services
Some of the operators send requests to internal services. Hosts and endpoints are sepcified into Airflow Variables.
|Variable |Value Description|
|---|---|
| storage_url | Storage Service URL |
| search_url | Search Service URL |
| update_status_ep | Endpoint to Workflow Service hook call|
### Configuration
|Variable |Value Description|
|---|---|
| dataload_config_path| Path to dataload.ini file|
### OSDU Python SDK
|Variable |Value Description|
|---|---|
| provider | Need to properly initialize OSDU SDK |
|entitlements_module_name | Need to properly initialize OSDU SDK |
## Testing
### Running Unit Tests
~~~
tests/./set_airflow_env.sh
~~~
~~~
chmod +x tests/test_dags.sh && tests/./test_dags.sh
~~~
### Running E2E Tests
~~~
tests/./set_airflow_env.sh
~~~
~~~
chmod +x tests/unit_tests.sh && tests/./unit_tests.sh
~~~
## DAGs Description
### Opaque Ingestion DAG
The Opaque Ingestion DAG performs ingestion of OSDU opaque data type. The following diagram shows the workflow of the
Opaque Ingestion DAG.
@@ -41,8 +121,7 @@ The Opaque Ingestion DAG flow:
* The Workflow Status Operator queries the Workflow service's **/updateWorkflowStatus** endpoint to set the workflow
status to **finished** in the database.
## Manifest Ingestion DAG
### Manifest Ingestion DAG
The Manifest Ingestion DAG ingests multiple files with their metadata provided in an OSDU manifest. The following
diagram demonstrates the workflow of the Manifest
Ingestion DAG.
@@ -73,14 +152,9 @@ Upon an execution request:
6. Invoke the Workflow Status Operator with the **finished** job status.
* The Workflow Status Operator queries the Workflow service to set the new workflow status.
## DAG implementation details
OSDU DAGs are cloud platform-agnostic by design. However, there are specific implementation requirements by cloud
platforms, and the OSDU R2 Prototype provides a dedicated Python SDK to make sure that DAGs are independent from the
cloud platforms. This Python SDK is located in a separate [os-python-sdk] folder.
## Workflow Status Operator
## Operators Description
### Workflow Status Operator
The Workflow Status Operator is an Airflow operator callable from each DAG. It's purpose is to receive the latest status
of a workflow job and then update the workflow record in the database. Each DAG in the system has to invoke the Workflow
Status Operator to update the workflow status.
@@ -88,8 +162,7 @@ Status Operator to update the workflow status.
This operator isn't designed to directly update the status in the database, and it queries the OSDU R2 Workflow
service's API endpoint. Once the operator sends a request to update status, it cedes control back to the DAG.
## Stale Jobs Scheduler
### Stale Jobs Scheduler
The Stale Jobs Scheduler is designed to query Apache Airflow to find out any stale workflow jobs, that is, the jobs that
failed during execution but which status wasn't updated to **failed** in the database.
@@ -103,10 +176,26 @@ The Stale Jobs Scheduler workflow:
2. Query Airflow to verify the status of the submitted or running workflow jobs.
3. If Airflow returns the failed status for a workflow job, query Firestore to set the workflow status to FAILED.
## Workflow Finished Sensor Operator
### Workflow Finished Sensor Operator
The Workflow Finished Sensor operator is a special type of operator that monitors ingestion of a file during the "osdu"
ingestion workflow. Once a file is ingested, this operator notifies the DAG, which then starts ingestion of the next
file in the manifest.
[os-python-sdk]: ../os-python-sdk
\ No newline at end of file
[os-python-sdk]: ../os-python-sdk
## Licence
Copyright © Google LLC
Copyright © EPAM Systems
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
[http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0)
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Loading