As Airflow initializes the logging configuration at startup, it implies that all logger configuration settings will be kept in `airflow.cfg` file within your project or will be specified as environment variables according to the [Airflow specification](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/set-config.html).
Look Airflow logging properties in the [documentaton](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html)
Look Airflow logging properties in the [documentation](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html)
### Remote Logger
For GCP Composer `google_cloud_default` connection is used by default to store all logs into GCS bucket.
Look Airflow [documentaton](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html) documentation for AWS, Azure.
Look Airflow [documentation](https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/write-logs.html) documentation for AWS, Azure.
## DAGs Description
### Opaque Ingestion DAG
The Opaque Ingestion DAG performs ingestion of OSDU opaque data type. The following diagram shows the workflow of the
1. Invoke the Workflow Status Operator to set the new status for the workflow.
* The Workflow Status Operator queries the Workflow service's **/updateWorkflowStatus** API endpoint with the
**running** status.
2. Obtain the Work Product Components associated with the Work Product.
* For each Work Product Component, find all associated OSDU Files. For each File in the manifest:
* Start the **ingest** workflow. Call the Workflow service's **/startWorkflow** API endpoint the **ingest**
workflow type.
> The Workflow Finished Sensor operator polls the DAG execution and notifies the DAG to start ingestion of the
> next file.
* Once all Files for the current Work Product Component are ingested, query the Storage service's
**/CreateOrUpdatedRecord** API endpoint to create a record for the current Work Product Component.
* Once all Work Product Components and Files are ingested, switch to the third step.
3. Create a new record for the Work Product.
* Query the Storage service's **/CreateOrUpdateRecord** API endpoint and pass it the Work Product.
4. Search the records by metadata.
* Query the Storage service's **/listRecords** API to obtain the records by metadata.
5. Enrich the records with data from the manifest.
* Query the Storage service's **/UpdateRecord** API endpoint and pass it the metadata from the manifest.
> Only file records are updated.
6. Invoke the Workflow Status Operator with the **finished** job status.
* The Workflow Status Operator queries the Workflow service to set the new workflow status.
### Osdu_ingest
R3 manifest processing with providing integrity.
The DAG can process batch of manifests or single manifest. The logic to manage of direction is implemented inside the DAG by `check_payload_type` task.
### Osdu_ingest R2
**Note:** The DAG is deprecated.
## Operators Description
### Workflow Status Operator
...
...
@@ -192,27 +149,39 @@ The Workflow Status Operator is an Airflow operator callable from each DAG. It's
of a workflow job and then update the workflow record in the database. Each DAG in the system has to invoke the Workflow
Status Operator to update the workflow status.
This operator isn't designed to directly update the status in the database, and it queries the OSDU R2 Workflow
This operator isn't designed to directly update the status in the database, and it queries the OSDU Workflow
service's API endpoint. Once the operator sends a request to update status, it cedes control back to the DAG.
### Stale Jobs Scheduler
The Stale Jobs Scheduler is designed to query Apache Airflow to find out any stale workflow jobs, that is, the jobs that
failed during execution but which status wasn't updated to **failed** in the database.
### Validate Manifest Schema Operator
The Workflow Status Operator is an Airflow operator to process manifest against the schemas definitions from Schema Service.
The operator's logic is split by two steps:
1. Common schema validation (without references check)
2. Ensure manifest schema integrity (all references are resolving and validates against its schemas)
**Note:** All invalid values will be evicted from the manifest and logged.
The operator output is a new manifest with only valid manifest data (stores in Airflow XCOM).
```json
{"manifest":<OUTPUT>}
```
This operator queries the Airflow API every N minutes to verify that the workflow jobs that do not have the _finished_
status are still running. If a workflow job has failed in Airflow, the Stale Jobs Scheduler will set this workflow job
status to **failed** in the database.
### Ensure Manifest Integrity Operator
Operator to validate ref inside manifest R3 and remove invalid entities.
This operator is responsible for the parent - child validation.
All orphan-like entity will be logged and excluded from the validated manifest .
The Stale Jobs Scheduler workflow:
The operator output is a new manifest with only valid manifest data (stores in Airflow XCOM).
```json
{"manifest":<OUTPUT>}
```
1. Query the database to find all workflow records with the _submitted_ or _running_ statuses.
2. Query Airflow to verify the status of the submitted or running workflow jobs.
3. If Airflow returns the failed status for a workflow job, query Firestore to set the workflow status to FAILED.
### Process Manifest Operator
Operator to process manifest (single manifest file or a list of them). The processing includes validation against schemas, storing records etc.
### Workflow Finished Sensor Operator
The Workflow Finished Sensor operator is a special type of operator that monitors ingestion of a file during the "osdu"
ingestion workflow. Once a file is ingested, this operator notifies the DAG, which then starts ingestion of the next
file in the manifest.
The operator output is a set of ingested records ids (stores in Airflow XCOM).