Skip to content
Snippets Groups Projects
Walter D's avatar
Walter D authored
Ibm pipeline

See merge request !111
6d6006f2
History

Ingestion DAGs

Contents

Introduction

The project is a set of Apache Airflow DAGs implementations to orchestrate data ingestion within OSDU platform. The following DAGs are implemented:

  • Osdu_ingest - R3 Manifest Ingestion DAG
  • Osdu_ingest_r2 - R2 Manifest Ingestion DAG (deprecated)

Deployment

GCP Composer

GCP provides Cloud Composer a fully managed workflow orchestration service built on Apache Airflow.

To deploy the Ingestion DAGs on GCP Cloud Composer just upload files from /src folder into DAGS_FOLDER and PLUGINS_FOLDER accordingly into the DAG bucket that provided by Composer environment. More info in documentation.

DAGS_FOLDER and PLUGINS_FOLDER are setting up by Composer itself.

According to the DAG implementation details need to put [osdu_api] directory into the DAGS_FOLDER. Moreover, all required variables have to be set in Airflow meta store by Variables mechanism. List of the required variables.

Azure

To deploy the Ingestion DAGs to airflow, follow below steps.

  • Identify the file share to which DAGs need to be copied. This is the file share where DAGs for airflow reside.
  • Copy contents of /src/dags to airflowdags/dags folder
  • Copy contents of /src/plugins/hooks to airflowdags/plugins/hooks
  • Copy contents of /src/plugins/operators to airflowdags/plugins/operators

Installing Python Dependencies

Environment dependencies might be installed by several ways:

  1. Install packages via pip on the environment where Airflow runs.
  2. Although it is not recommended, it is possible to install Python libraries locally. Put your dependencies into DAG_FOLDER/libs directory. Airflow automatically adds DAG_FOLDER and PLUGINS_FOLDER to the PATH.

Python dependencies can be specified as extra pip packages in airflow deployment here

The DAGs require Python SDK to be installed. It can be installed to the environment via pip:

pip install osdu-api --extra-index-url https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple

Also, the DAGs require osdu-airflow-lib package to be installed for common code (operators, osdu-airflow utils etc.).

pip install 'osdu-airflow' --extra-index-url=https://community.opengroup.org/api/v4/projects/668/packages/pypi/simple

Environment Variables & Airflow Variables

Add variables manually in the Airflow UI or through airflow helm charts. List of the required variables.

Adding variables to helm charts can be found here

More details about airflow variables can be found here

DAG Implementation Details

OSDU DAGs are cloud platform-agnostic by design. However, there are specific implementation requirements by cloud platforms, and the OSDU R2 Prototype provides a dedicated Python SDK to make sure that DAGs are independent from the cloud platforms. This Python SDK must be installed from the corresponding repository.

Required Variables

Common naming convention

Some variables are defined using Airflow Variables. Variable should has prefix which define where variable are used:

  • core__ - use in common part of DAGs;
  • gcp__, azure__, ibm__, aws__ - use in cloud-specific modules of DAGs;
  • sdk__ - pass to Python SDK.

If variable defines URL to internal services it should have suffix which show the completeness of the URL:

  • __url - the variable should contain full path to service API endpoint;
  • __host - the variable should contain just service host value. The full path to service API endpoint constructed inside operators.

Internal Services

Variable Value Description
core__service__storage__url Storage Service API endpoint to save records
core__service__search__url Search Service API endpoint to search queries
core__service__workflow__host Workflow Service host
core__service__workflow__url (Deprecated) Workflow Service API endpoint to update status
core__service__file__host File Service host
core__service__schema__url Schema Service API endpoint to get schema by Id

Configuration

Variable Value Description
core__config__dataload_config_path Path to dataload.ini file. Used in R2 manifest ingestion

Ingestion

Variable Value Description
core__ingestion__batch_save_enabled If this value is set to true, then save the Manifest's entities in Storage Service by batches
core__ingestion__batch_save_size Size of the batch of entities to save in Storage Service

Running E2E Tests

tests/./set_airflow_env.sh
chmod +x tests/unit_tests.sh && tests/./unit_tests.sh

Logging

Logging Configuration

As Airflow initializes the logging configuration at startup, it implies that all logger configuration settings will be kept in airflow.cfg file within your project or will be specified as environment variables according to the Airflow specification.

Look Airflow logging properties in the documentation

Remote Logger

For GCP Composer google_cloud_default connection is used by default to store all logs into GCS bucket. Look Airflow documentation documentation for AWS, Azure.

DAGs Description

Osdu_ingest

R3 manifest processing with providing integrity. The DAG can process batch of manifests or single manifest. The logic to manage of direction is implemented inside the DAG by check_payload_type task.

Osdu_ingest R2

Note: The DAG is deprecated.

Operators Description

Workflow Status Operator

The Workflow Status Operator is an Airflow operator callable from each DAG. It's purpose is to receive the latest status of a workflow job and then update the workflow record in the database. Each DAG in the system has to invoke the Workflow Status Operator to update the workflow status.

This operator isn't designed to directly update the status in the database, and it queries the OSDU Workflow service's API endpoint. Once the operator sends a request to update status, it cedes control back to the DAG.

Validate Manifest Schema Operator

The Workflow Status Operator is an Airflow operator to process manifest against the schemas definitions from Schema Service. The operator's logic is split by two steps:

  1. Common schema validation (without references check)
  2. Ensure manifest schema integrity (all references are resolving and validates against its schemas)

Note: All invalid values will be evicted from the manifest and logged.

The operator output is a new manifest with only valid manifest data (stores in Airflow XCOM).

{"manifest": <OUTPUT>}

Ensure Manifest Integrity Operator

Operator to validate ref inside manifest R3 and remove invalid entities. This operator is responsible for the parent - child validation. All orphan-like entity will be logged and excluded from the validated manifest .

The operator output is a new manifest with only valid manifest data (stores in Airflow XCOM).

{"manifest": <OUTPUT>}

Process Manifest Operator

Operator to process manifest (single manifest file or a list of them). The processing includes validation against schemas, storing records etc.

The operator output is a set of ingested records ids (stores in Airflow XCOM).

{"record_ids": [<SET_OF_INGESTED_RECORDS>]}

Backward compatibility

At the current moment, Ingestion DAGs can work with Airflow 2.x and >=1.10.10 with osdu-airflow-lib package installed.

To avoid incompatibilities a few code changes must be introduced.

Use osdu_airflow.backward_compatibility.airflow_utils:apply_default instead of airflow.utils.apply_default in operators.

Example:

from osdu_airflow.backward_compatibility.airflow_utils import apply_defaults
...

class SomeOperator(BaseOperator):

    @apply_defaults
    def __init__(self, *args, **kwargs):
        ...

Also, do not pass provide_contex=True to tasks directly. Use libs.airflow.backward_compatibility.default_args:update_default_args instead.

from osdu_airflow.backward_compatibility.default_args import update_default_args

default_args = {
    "start_date": airflow.utils.dates.days_ago(0),
    "retries": 0,
    "retry_delay": timedelta(seconds=30),
    "trigger_rule": "none_failed",
}

default_args = update_default_args(default_args)

Licence

Copyright © Google LLC Copyright © EPAM Systems

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.