Skip to content
Snippets Groups Projects
Siarhei Khaletski (EPAM)'s avatar
Siarhei Khaletski (EPAM) authored
Fix for pre-defined Ids and autogenerated Ids

See merge request !11
0af46d4b
History

Ingestion DAGs

Contents

Introduction

The OSDU R2 Prototype includes a Workflow Engine, an implementation of Apache Airflow, to orchestrate business processes. In particular, the Workflow Engine handles ingestion of opaque and well log .las files in OSDU R2.

The Workflow Engine encompasses the following components:

  • Opaque Ingestion DAG
  • OSDU Ingestion DAG
  • Workflow Status Operator
  • Stale Jobs Scheduler
  • Workflow Finished Sensor Operator

Deployment

GCP Composer

GCP provides Cloud Composer a fully managed workflow orchestration service built on Apache Airflow.

To deploy the Ingestion DAGs on GCP Cloud Composer just upload files from /src folder into DAGS_FOLDER and PLUGINS_FOLDER accordingly into the DAG bucket that provided by Composer environment. More info in documentation.

DAGS_FOLDER and FLUGINS_FOLDER are setting up in airflow.cfg file.

According to the DAG implementation details need to put [osdu_api] directory into the DAGS_FOLDER. Moreover, all required variables have to be set in Airflow meta store by Variables mechanism. List of the required variables.

Installing Python Dependencies

Environment dependencies might be installed by several ways:

  1. Installing a Python dependency from PyPI. Cloud Composer picks up requirements.txt file from the DAGs bucket.
  2. Setting up an environment into the Cloud Composer Console.
  3. Installing local Python library. Put your dependencies into DAG_FOLDER/libs directory. Airflow automatically adds DAG_FOLDER and PLUGINS_FOLDER to the PATH.

DAG Implementation Details

OSDU DAGs are cloud platform-agnostic by design. However, there are specific implementation requirements by cloud platforms, and the OSDU R2 Prototype provides a dedicated Python SDK to make sure that DAGs are independent from the cloud platforms. This Python SDK is located in a separate os-python-sdk folder.

Required Variables

Internal Services

Some of the operators send requests to internal services. Hosts and endpoints are sepcified into Airflow Variables.

Variable Value Description
storage_url Storage Service URL
search_url Search Service URL
update_status_ep Endpoint to Workflow Service hook call

Configuration

Variable Value Description
dataload_config_path Path to dataload.ini file

OSDU Python SDK

Variable Value Description
provider Need to properly initialize OSDU SDK
entitlements_module_name Need to properly initialize OSDU SDK

Testing

Running Unit Tests

tests/./set_airflow_env.sh
chmod +x tests/test_dags.sh && tests/./test_dags.sh

Running E2E Tests

tests/./set_airflow_env.sh
chmod +x tests/unit_tests.sh && tests/./unit_tests.sh

Logging

Logging Configuration

As Airflow initializes the logging configuration at startup, it implies that all logger configuration settings will be kept in airflow.cfg file within your project or will be specified as environment variables according to the Airflow specification.

Look Airflow logging properties in the documentaton

Remote Logger

For GCP Composer google_cloud_default connection is used by default to store all logs into GCS bucket. Look Airflow documentaton documentation for AWS, Azure.

DAGs Description

Opaque Ingestion DAG

The Opaque Ingestion DAG performs ingestion of OSDU opaque data type. The following diagram shows the workflow of the Opaque Ingestion DAG.

OSDU R2 Opaque Ingestion DAG

The Opaque Ingestion DAG flow:

  1. Call the Workflow Status Operator with the running status.
    • Workflow Status Operator queries the Workflow service's /updateWorkflowStatus API endpoint with the running status, and then returns the control back to the Opaque Ingestion DAG.
  2. Query the Storage service's /createOrUpdateRecord API endpoint to create a record for the file.
    • The ODES Storage service makes a call to ODES Indexer and returns to the DAG.
  3. Call the Workflow Status Operator with the finished status.
    • The Workflow Status Operator queries the Workflow service's /updateWorkflowStatus endpoint to set the workflow status to finished in the database.

Manifest Ingestion DAG

The Manifest Ingestion DAG ingests multiple files with their metadata provided in an OSDU manifest. The following diagram demonstrates the workflow of the Manifest Ingestion DAG.

OSDU R2 Manifest Ingestion DAG

Upon an execution request:

  1. Invoke the Workflow Status Operator to set the new status for the workflow.
    • The Workflow Status Operator queries the Workflow service's /updateWorkflowStatus API endpoint with the running status.
  2. Obtain the Work Product Components associated with the Work Product.
    • For each Work Product Component, find all associated OSDU Files. For each File in the manifest:
      • Start the ingest workflow. Call the Workflow service's /startWorkflow API endpoint the ingest workflow type.

      The Workflow Finished Sensor operator polls the DAG execution and notifies the DAG to start ingestion of the next file.

    • Once all Files for the current Work Product Component are ingested, query the Storage service's /CreateOrUpdatedRecord API endpoint to create a record for the current Work Product Component.
    • Once all Work Product Components and Files are ingested, switch to the third step.
  3. Create a new record for the Work Product.
    • Query the Storage service's /CreateOrUpdateRecord API endpoint and pass it the Work Product.
  4. Search the records by metadata.
    • Query the Storage service's /listRecords API to obtain the records by metadata.
  5. Enrich the records with data from the manifest.
    • Query the Storage service's /UpdateRecord API endpoint and pass it the metadata from the manifest.

    Only file records are updated.

  6. Invoke the Workflow Status Operator with the finished job status.
    • The Workflow Status Operator queries the Workflow service to set the new workflow status.

Operators Description

Workflow Status Operator

The Workflow Status Operator is an Airflow operator callable from each DAG. It's purpose is to receive the latest status of a workflow job and then update the workflow record in the database. Each DAG in the system has to invoke the Workflow Status Operator to update the workflow status.

This operator isn't designed to directly update the status in the database, and it queries the OSDU R2 Workflow service's API endpoint. Once the operator sends a request to update status, it cedes control back to the DAG.

Stale Jobs Scheduler

The Stale Jobs Scheduler is designed to query Apache Airflow to find out any stale workflow jobs, that is, the jobs that failed during execution but which status wasn't updated to failed in the database.

This operator queries the Airflow API every N minutes to verify that the workflow jobs that do not have the finished status are still running. If a workflow job has failed in Airflow, the Stale Jobs Scheduler will set this workflow job status to failed in the database.

The Stale Jobs Scheduler workflow:

  1. Query the database to find all workflow records with the submitted or running statuses.
  2. Query Airflow to verify the status of the submitted or running workflow jobs.
  3. If Airflow returns the failed status for a workflow job, query Firestore to set the workflow status to FAILED.

Workflow Finished Sensor Operator

The Workflow Finished Sensor operator is a special type of operator that monitors ingestion of a file during the "osdu" ingestion workflow. Once a file is ingested, this operator notifies the DAG, which then starts ingestion of the next file in the manifest.

Licence

Copyright © Google LLC Copyright © EPAM Systems

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.