From e547ec5e0ffa0c0b163ca4298690deb3bb113023 Mon Sep 17 00:00:00 2001 From: Siarhei Khaletski <siarhei_khaletski1@epam.com> Date: Tue, 6 Apr 2021 19:32:42 +0300 Subject: [PATCH] GONRG-2008: Documentation has been updated --- README.md | 111 ++++++++++++++++++++---------------------------------- 1 file changed, 40 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index 2b0b2b4..26d508c 100644 --- a/README.md +++ b/README.md @@ -18,22 +18,22 @@ * * [Logging Configuration](#logging-configuration) * * [Remote Logger](#remote-logger) * [DAGs Description](#dags-description) -* * [Opaque Ingestion DAG](#opaque-ingestion-dag) -* * [Manifest Ingestion DAG](#manifest-ingestion-dag) +* * [Osdu_ingest](#osdu_ingest) +* * [Osdu_ingest R2](#osdu_ingest-r2) * [Operators Description](#operators-description) * * [Workflow Status Operator](#workflow-status-operator) -* * [Stale Jobs Scheduler](#stale-jobs-scheduler) -* * [Workflow Finished Sensor operator](#workflow-finished-sensor-operator) +* * [Validate Manifest Schema Operator](#validate-manifest-schema-operator) +* * [Ensure Manifest Integrity Operator](#ensure-manifest-integrity-operator) +* * [Process Manifest Operator](#process-manifest-operator) * [Licence](#licence) - ## Introduction The project is a set of Apache Airflow DAGs implementations to orchestrate data ingestion within OSDU platform. The following DAGs are implemented: * Osdu_ingest - R3 Manifest Ingestion DAG -* Osdu_ingest_r2 - R2 Manifest Ingestion DAG +* Osdu_ingest_r2 - R2 Manifest Ingestion DAG (deprecated) ## Deployment @@ -129,62 +129,19 @@ chmod +x tests/unit_tests.sh && tests/./unit_tests.sh ### Logging Configuration As Airflow initializes the logging configuration at startup, it implies that all logger configuration settings will be kept in `airflow.cfg` file within your project or will be specified as environment variables according to the [Airflow specification](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/set-config.html). -Look Airflow logging properties in the [documentaton](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html) +Look Airflow logging properties in the [documentation](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html) ### Remote Logger For GCP Composer `google_cloud_default` connection is used by default to store all logs into GCS bucket. -Look Airflow [documentaton](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html) documentation for AWS, Azure. +Look Airflow [documentation](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html) documentation for AWS, Azure. ## DAGs Description -### Opaque Ingestion DAG - -The Opaque Ingestion DAG performs ingestion of OSDU opaque data type. The following diagram shows the workflow of the -Opaque Ingestion DAG. - - - -The Opaque Ingestion DAG flow: - -1. Call the Workflow Status Operator with the **running** status. - * Workflow Status Operator queries the Workflow service's **/updateWorkflowStatus** API endpoint with the - **running** status, and then returns the control back to the Opaque Ingestion DAG. -2. Query the Storage service's **/createOrUpdateRecord** API endpoint to create a record for the file. - * The ODES Storage service makes a call to ODES Indexer and returns to the DAG. -3. Call the Workflow Status Operator with the **finished** status. - * The Workflow Status Operator queries the Workflow service's **/updateWorkflowStatus** endpoint to set the workflow - status to **finished** in the database. - -### Manifest Ingestion DAG -The Manifest Ingestion DAG ingests multiple files with their metadata provided in an OSDU manifest. The following -diagram demonstrates the workflow of the Manifest -Ingestion DAG. - - - -Upon an execution request: - -1. Invoke the Workflow Status Operator to set the new status for the workflow. - * The Workflow Status Operator queries the Workflow service's **/updateWorkflowStatus** API endpoint with the - **running** status. -2. Obtain the Work Product Components associated with the Work Product. - * For each Work Product Component, find all associated OSDU Files. For each File in the manifest: - * Start the **ingest** workflow. Call the Workflow service's **/startWorkflow** API endpoint the **ingest** - workflow type. - > The Workflow Finished Sensor operator polls the DAG execution and notifies the DAG to start ingestion of the - > next file. - * Once all Files for the current Work Product Component are ingested, query the Storage service's - **/CreateOrUpdatedRecord** API endpoint to create a record for the current Work Product Component. - * Once all Work Product Components and Files are ingested, switch to the third step. -3. Create a new record for the Work Product. - * Query the Storage service's **/CreateOrUpdateRecord** API endpoint and pass it the Work Product. -4. Search the records by metadata. - * Query the Storage service's **/listRecords** API to obtain the records by metadata. -5. Enrich the records with data from the manifest. - * Query the Storage service's **/UpdateRecord** API endpoint and pass it the metadata from the manifest. - > Only file records are updated. -6. Invoke the Workflow Status Operator with the **finished** job status. - * The Workflow Status Operator queries the Workflow service to set the new workflow status. +### Osdu_ingest +R3 manifest processing with providing integrity. +The DAG can process batch of manifests or single manifest. The logic to manage of direction is implemented inside the DAG by `check_payload_type` task. +### Osdu_ingest R2 +**Note:** The DAG is deprecated. ## Operators Description ### Workflow Status Operator @@ -192,27 +149,39 @@ The Workflow Status Operator is an Airflow operator callable from each DAG. It's of a workflow job and then update the workflow record in the database. Each DAG in the system has to invoke the Workflow Status Operator to update the workflow status. -This operator isn't designed to directly update the status in the database, and it queries the OSDU R2 Workflow +This operator isn't designed to directly update the status in the database, and it queries the OSDU Workflow service's API endpoint. Once the operator sends a request to update status, it cedes control back to the DAG. -### Stale Jobs Scheduler -The Stale Jobs Scheduler is designed to query Apache Airflow to find out any stale workflow jobs, that is, the jobs that -failed during execution but which status wasn't updated to **failed** in the database. +### Validate Manifest Schema Operator +The Workflow Status Operator is an Airflow operator to process manifest against the schemas definitions from Schema Service. +The operator's logic is split by two steps: +1. Common schema validation (without references check) +2. Ensure manifest schema integrity (all references are resolving and validates against its schemas) + +**Note:** All invalid values will be evicted from the manifest and logged. + +The operator output is a new manifest with only valid manifest data (stores in Airflow XCOM). +```json +{"manifest": <OUTPUT>} +``` -This operator queries the Airflow API every N minutes to verify that the workflow jobs that do not have the _finished_ -status are still running. If a workflow job has failed in Airflow, the Stale Jobs Scheduler will set this workflow job -status to **failed** in the database. +### Ensure Manifest Integrity Operator +Operator to validate ref inside manifest R3 and remove invalid entities. +This operator is responsible for the parent - child validation. +All orphan-like entity will be logged and excluded from the validated manifest . -The Stale Jobs Scheduler workflow: +The operator output is a new manifest with only valid manifest data (stores in Airflow XCOM). +```json +{"manifest": <OUTPUT>} +``` -1. Query the database to find all workflow records with the _submitted_ or _running_ statuses. -2. Query Airflow to verify the status of the submitted or running workflow jobs. -3. If Airflow returns the failed status for a workflow job, query Firestore to set the workflow status to FAILED. +### Process Manifest Operator +Operator to process manifest (single manifest file or a list of them). The processing includes validation against schemas, storing records etc. -### Workflow Finished Sensor Operator -The Workflow Finished Sensor operator is a special type of operator that monitors ingestion of a file during the "osdu" -ingestion workflow. Once a file is ingested, this operator notifies the DAG, which then starts ingestion of the next -file in the manifest. +The operator output is a set of ingested records ids (stores in Airflow XCOM). +```json +{"record_ids": [<SET_OF_INGESTED_RECORDS>]} +``` [os-python-sdk]: ../os-python-sdk -- GitLab