Newer
Older
## Contents
* [Introduction](#introduction)
* [Deployment](#deployment)
* * [GCP Composer](#gcp-composer)
* * [Installing Python Dependencies](#installing-python-dependencies)
* [DAG Implementation Details](#dag-implementation-details)
* [Required Variables](#required-variables)
* * [Internal Services](#internal-services)
* * [Configuration](#configuration)
* * [OSDU Python SDK](#osdu-python-sdk)
* [Testing](#testing)
* * [Running Unit Tests](#running-unit-tests)
* * [Running E2E Tests](#running-e2e-tests)
* [Logging](#logging)
* * [Logging Configuration](#logging-configuration)
* * [Remote Logger](#remote-logger)
* [DAGs Description](#dags-description)
* * [Opaque Ingestion DAG](#opaque-ingestion-dag)
* * [Manifest Ingestion DAG](#manifest-ingestion-dag)
* [Operators Description](#operators-description)
* * [Workflow Status Operator](#workflow-status-operator)
* * [Stale Jobs Scheduler](#stale-jobs-scheduler)
* * [Workflow Finished Sensor operator](#workflow-finished-sensor-operator)
The project is a set of Apache Airflow DAGs implementations to orchestrate data ingestion within OSDU platform.
The following DAGs are implemented:
* Osdu_ingest - R3 Manifest Ingestion DAG
* Osdu_ingest_r2 - R2 Manifest Ingestion DAG
## Deployment
### GCP Composer
GCP provides Cloud Composer a fully managed workflow orchestration service built on Apache Airflow.
To deploy the Ingestion DAGs on GCP Cloud Composer just upload files from */src* folder into *DAGS_FOLDER* and *PLUGINS_FOLDER* accordingly into the DAG bucket that provided by Composer environment. [More info in documentation.](https://cloud.google.com/composer/docs/quickstart#uploading_the_dag_to)
*DAGS_FOLDER* and *PLUGINS_FOLDER* are setting up by Composer itself.
According to the [DAG implementation details](#dag-implementation-details) need to put [osdu_api] directory into the *DAGS_FOLDER*. Moreover, all required variables have to be set in Airflow meta store by Variables mechanism. [List of the required variables](#required-variables).
#### Installing Python Dependencies
Environment dependencies might be installed by several ways:
1. Setting up an environment into the Cloud Composer Console.
2. Installing local Python library. Put your dependencies into *DAG_FOLDER/libs* directory. Airflow automatically adds *DAG_FOLDER* and *PLUGINS_FOLDER* to the *PATH*.
### Azure
To deploy the Ingestion DAGs to airflow, follow below steps.
- Identify the file share to which DAGs need to be copied. [This](https://community.opengroup.org/osdu/platform/deployment-and-operations/infra-azure-provisioning/-/blob/master/infra/templates/osdu-r3-mvp/service_resources/airflow.tf#L71) is the file share where DAGs for airflow reside.
- Copy contents of */src/dags* to *airflowdags/dags* folder
- Copy contents of */src/plugins/hooks* to *airflowdags/plugins/hooks*
- Copy contents of */src/plugins/operators* to *airflowdags/plugins/operators*
#### Installing Python Dependencies
Python dependencies can be specified as extra pip packages in airflow deployment [here](https://community.opengroup.org/osdu/platform/deployment-and-operations/infra-azure-provisioning/-/blob/master/charts/airflow/helm-config.yaml#L211)
#### Environment Variables & Airflow Variables
Add variables manually in the Airflow UI or through airflow helm charts. [List of the required variables](#required-variables).
Adding variables to helm charts can be found [here](https://community.opengroup.org/osdu/platform/deployment-and-operations/infra-azure-provisioning/-/blob/master/charts/airflow/helm-config.yaml#L157)
More details about airflow variables can be found [here](https://airflow.apache.org/docs/apache-airflow/1.10.12/concepts.html?highlight=airflow_var#variables)
## DAG Implementation Details
OSDU DAGs are cloud platform-agnostic by design. However, there are specific implementation requirements by cloud
platforms, and the OSDU R2 Prototype provides a dedicated Python SDK to make sure that DAGs are independent from the
cloud platforms. This Python SDK is located in a separate [os-python-sdk] folder.
### Common naming convention
Some variables are defined using Airflow Variables.
Variable should has prefix which define where variable are used:
- **core__** - use in common part of DAGs;
- **gcp__**, **azure__**, **ibm__**, **aws__** - use in cloud-specific modules of DAGs;
- **sdk__** - pass to Python SDK.
If variable defines URL to internal services it should have suffix which show the completeness of the URL:
- **__url** - the variable should contain full path to service API endpoint;
- **__host** - the variable should contain just service host value. The full path to service API endpoint constructed inside operators.
### Internal Services
|Variable |Value Description|
|---|---|
| core__service__storage__url | Storage Service API endpoint to save records |
| core__service__search__url | Search Service API endpoint to search queries |
| core__service__workflow__host | Workflow Service host |
| core__service__workflow__url | (Deprecated) Workflow Service API endpoint to update status |
| core__service__file__host | File Service host |
| core__service__schema__url | Schema Service API endpoint to get schema by Id |
### Configuration
|Variable |Value Description|
|---|---|
| core__config__dataload_config_path| Path to dataload.ini file. Used in R2 manifest ingestion|
## Testing
### Running Unit Tests
~~~
tests/./set_airflow_env.sh
~~~
~~~
chmod +x tests/test_dags.sh && tests/./test_dags.sh
~~~
### Running E2E Tests
~~~
tests/./set_airflow_env.sh
~~~
~~~
chmod +x tests/unit_tests.sh && tests/./unit_tests.sh
~~~
## Logging
### Logging Configuration
As Airflow initializes the logging configuration at startup, it implies that all logger configuration settings will be kept in `airflow.cfg` file within your project or will be specified as environment variables according to the [Airflow specification](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/set-config.html).
Look Airflow logging properties in the [documentaton](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html)
### Remote Logger
For GCP Composer `google_cloud_default` connection is used by default to store all logs into GCS bucket.
Look Airflow [documentaton](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html) documentation for AWS, Azure.
## DAGs Description
### Opaque Ingestion DAG
The Opaque Ingestion DAG performs ingestion of OSDU opaque data type. The following diagram shows the workflow of the
Opaque Ingestion DAG.

The Opaque Ingestion DAG flow:
1. Call the Workflow Status Operator with the **running** status.
* Workflow Status Operator queries the Workflow service's **/updateWorkflowStatus** API endpoint with the
**running** status, and then returns the control back to the Opaque Ingestion DAG.
2. Query the Storage service's **/createOrUpdateRecord** API endpoint to create a record for the file.
* The ODES Storage service makes a call to ODES Indexer and returns to the DAG.
3. Call the Workflow Status Operator with the **finished** status.
* The Workflow Status Operator queries the Workflow service's **/updateWorkflowStatus** endpoint to set the workflow
status to **finished** in the database.
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
The Manifest Ingestion DAG ingests multiple files with their metadata provided in an OSDU manifest. The following
diagram demonstrates the workflow of the Manifest
Ingestion DAG.

Upon an execution request:
1. Invoke the Workflow Status Operator to set the new status for the workflow.
* The Workflow Status Operator queries the Workflow service's **/updateWorkflowStatus** API endpoint with the
**running** status.
2. Obtain the Work Product Components associated with the Work Product.
* For each Work Product Component, find all associated OSDU Files. For each File in the manifest:
* Start the **ingest** workflow. Call the Workflow service's **/startWorkflow** API endpoint the **ingest**
workflow type.
> The Workflow Finished Sensor operator polls the DAG execution and notifies the DAG to start ingestion of the
> next file.
* Once all Files for the current Work Product Component are ingested, query the Storage service's
**/CreateOrUpdatedRecord** API endpoint to create a record for the current Work Product Component.
* Once all Work Product Components and Files are ingested, switch to the third step.
3. Create a new record for the Work Product.
* Query the Storage service's **/CreateOrUpdateRecord** API endpoint and pass it the Work Product.
4. Search the records by metadata.
* Query the Storage service's **/listRecords** API to obtain the records by metadata.
5. Enrich the records with data from the manifest.
* Query the Storage service's **/UpdateRecord** API endpoint and pass it the metadata from the manifest.
> Only file records are updated.
6. Invoke the Workflow Status Operator with the **finished** job status.
* The Workflow Status Operator queries the Workflow service to set the new workflow status.
## Operators Description
### Workflow Status Operator
The Workflow Status Operator is an Airflow operator callable from each DAG. It's purpose is to receive the latest status
of a workflow job and then update the workflow record in the database. Each DAG in the system has to invoke the Workflow
Status Operator to update the workflow status.
This operator isn't designed to directly update the status in the database, and it queries the OSDU R2 Workflow
service's API endpoint. Once the operator sends a request to update status, it cedes control back to the DAG.
The Stale Jobs Scheduler is designed to query Apache Airflow to find out any stale workflow jobs, that is, the jobs that
failed during execution but which status wasn't updated to **failed** in the database.
This operator queries the Airflow API every N minutes to verify that the workflow jobs that do not have the _finished_
status are still running. If a workflow job has failed in Airflow, the Stale Jobs Scheduler will set this workflow job
status to **failed** in the database.
The Stale Jobs Scheduler workflow:
1. Query the database to find all workflow records with the _submitted_ or _running_ statuses.
2. Query Airflow to verify the status of the submitted or running workflow jobs.
3. If Airflow returns the failed status for a workflow job, query Firestore to set the workflow status to FAILED.
### Workflow Finished Sensor Operator
The Workflow Finished Sensor operator is a special type of operator that monitors ingestion of a file during the "osdu"
ingestion workflow. Once a file is ingested, this operator notifies the DAG, which then starts ingestion of the next
file in the manifest.
[os-python-sdk]: ../os-python-sdk
## Licence
Copyright © Google LLC
Copyright © EPAM Systems
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
[http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0)
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.