Skip to content

Manifest by reference, first code version

Valentin Gauthier requested to merge manifest-by-reference into master

Manifest by reference

This code refers to issue 67.

Goal

This issue's goal is to avoid storing large manifest in the airflow's XCOM context. Because XCOM has a 'tiny' limited memory, and it can make the ingestion process fail.

How

The manifest is not anymore stored in the XCOM context and all operators now take the manifest from the DatasetService. To access to the manifest, only the recordId of the manifest is now stored in the XCOM context.

This implies that instead of sending the manifest to trigger the ingestion dag, the user should now first upload his manifest to the DatasetService. And then he should trigger the dag by giving the recordId instead of the complete manifest :

{
    "executionContext": {
        "Payload": {
            "AppKey": "{{YOUR_APP}}",
            "data-partition-id": "{{data_partition_id}}"
        },
        "manifest": "{{MANIFEST_RECORD_ID}}"
    }
}

How to test

If you want to test the ManifestByReference code :

  • Update your code to the last version of the 'master' branch of the osdu-airflow-lib project.
  • Have a dag that uses the new "ByReference" operators.

To ease your tests, you can find here a postman collection.

[Click to expand] Example of a dag that uses the new "ByRefernce" operators
#  Copyright 2020 Google LLC
#  Copyright 2020 EPAM Systems
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

"""DAG for R3 ingestion."""

from datetime import timedelta

import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from osdu_airflow.backward_compatibility.default_args import update_default_args
from osdu_airflow.operators.ensure_manifest_integrity_by_reference import EnsureManifestIntegrityOperatorByReference
from osdu_airflow.operators.process_manifest_r3_by_reference import ProcessManifestOperatorR3ByReference
from osdu_airflow.operators.update_status import UpdateStatusOperator
from osdu_airflow.operators.validate_manifest_schema_by_reference import ValidateManifestSchemaOperatorByReference
from osdu_ingestion.libs.exceptions import NotOSDUSchemaFormatError

BATCH_NUMBER = int(Variable.get("core__ingestion__batch_count", "3"))
PROCESS_SINGLE_MANIFEST_FILE = "process_single_manifest_file_task"
PROCESS_BATCH_MANIFEST_FILE = "batch_upload"
ENSURE_INTEGRITY_TASK = "provide_manifest_integrity_task"
SINGLE_MANIFEST_FILE_FIRST_OPERATOR = "validate_manifest_schema_task"


default_args = {
    "start_date": airflow.utils.dates.days_ago(0),
    "retries": 0,
    "retry_delay": timedelta(seconds=30),
    "trigger_rule": "none_failed",
}

default_args = update_default_args(default_args)

workflow_name = "Osdu_ingest_by_reference"


def is_batch(**context):
    """
    :param context: Dag context
    :return: SubDag to be executed next depending on Manifest type
    """
    manifest = context["dag_run"].conf["execution_context"].get("manifest")

    if isinstance(manifest, dict): 
        subdag = SINGLE_MANIFEST_FILE_FIRST_OPERATOR
    elif isinstance(manifest, str): # str for manifest rec id
        subdag = SINGLE_MANIFEST_FILE_FIRST_OPERATOR
        context["ti"].xcom_push(key="manifest_ref_ids", value=[manifest])
    elif isinstance(manifest, list):
        subdag = PROCESS_BATCH_MANIFEST_FILE
    else:
        raise NotOSDUSchemaFormatError(f"Manifest must be either 'dict' or 'list'. "
                                       f"Got {manifest}.")
    return subdag


with DAG(
    workflow_name,
    default_args=default_args,
    description="R3 manifest processing with providing integrity",
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=60)
) as dag:
    update_status_running_op = UpdateStatusOperator(
        task_id="update_status_running_task",
    )

    branch_is_batch_op = BranchPythonOperator(
        task_id="check_payload_type",
        python_callable=is_batch,
        trigger_rule="none_failed_or_skipped"
    )

    update_status_finished_op = UpdateStatusOperator(
        task_id="update_status_finished_task",
        dag=dag,
        trigger_rule="all_done",
    )

    validate_schema_operator = ValidateManifestSchemaOperatorByReference(
        task_id="validate_manifest_schema_task",
        trigger_rule="none_failed_or_skipped"
    )

    ensure_integrity_op = EnsureManifestIntegrityOperatorByReference(
        task_id=ENSURE_INTEGRITY_TASK,
        previous_task_id=validate_schema_operator.task_id,
        trigger_rule="none_failed_or_skipped"
    )

    process_single_manifest_file = ProcessManifestOperatorR3ByReference(
        task_id=PROCESS_SINGLE_MANIFEST_FILE,
        previous_task_id=ensure_integrity_op.task_id,
        trigger_rule="none_failed_or_skipped"
    )

    # Dummy operator as entry point into parallel task of batch upload
    batch_upload = DummyOperator(
        task_id=PROCESS_BATCH_MANIFEST_FILE
    )

    for batch in range(0, BATCH_NUMBER):
        batch_upload >> ProcessManifestOperatorR3ByReference(
            task_id=f"process_manifest_task_{batch + 1}",
            previous_task_id=f"provide_manifest_integrity_task_{batch + 1}",
            batch_number=batch + 1,
            trigger_rule="none_failed_or_skipped"
        ) >> update_status_finished_op

update_status_running_op >> branch_is_batch_op  # pylint: disable=pointless-statement
branch_is_batch_op >> batch_upload  # pylint: disable=pointless-statement
branch_is_batch_op >> validate_schema_operator >> ensure_integrity_op >> process_single_manifest_file >> update_status_finished_op  # pylint: disable=pointless-statement

Note: If you also copy the osdu-ingest-r3 dag, do not forget to change the DAG name (e.g. workflow_name = "Osdu_ingest_by_reference")

As you can see in the dag, the changes are the use of the new operators : ValidateManifestSchemaOperatorByReference, EnsureManifestIntegrityOperatorByReference and ProcessManifestOperatorR3ByReference. Also, to be able to take the record_id instead of the full manifest from the dag-triggering request, we modified the is_batch function :

...
if isinstance(manifest, dict): 
    subdag = SINGLE_MANIFEST_FILE_FIRST_OPERATOR
elif isinstance(manifest, str): # str for manifest rec id
    subdag = SINGLE_MANIFEST_FILE_FIRST_OPERATOR
    context["ti"].xcom_push(key="manifest_ref_ids", value=[manifest])
...

Here we push the record_id in the XCOM context list manifest_ref_ids. This will allows the first operator that needs the manifest (ValidateManifestSchemaOperatorByReference) to take it by using the history parameter (see. the get manifest part).

Run the code on your local machine

To run the code locally, you can check the project individual-airflow.

Locally, the workflow service may not run and block the following operators. To skip its tasks an just test the manifest_by_reference, you can comment it in the Osdu_ingest_by_reference DAG:

# update_status_running_op >> branch_is_batch_op  # pylint: disable=pointless-statement

You can trigger your DAG (from the scheduler container) with a very large file using a python script like this:

from airflow.api.client.local_client import Client
c = Client(None, None)
c.trigger_dag(dag_id='Osdu_ingest_by_reference', conf=open('PATH_TO_MY_FILE_ON_SCHEDULER_CONTAINER').read())

For more simplicity, please find a file that takes a manifest, upload it on the dataset-service, and launch the dags ("by_reference" version and the normal version). Please read the script and change variables at the top of the file. You should also change the "dag_id" parameter when function "trigger_dag" is called.

Note : this file must be launch inside the "airflow scheduler" container, with a manifest file that is also inside this container.

Click to expand and see the script
from airflow.api.client.local_client import Client
import json
import sys
import os
from airflow.models import Variable
import requests
from pprint import pprint

DATASET_URL = Variable.get("core__service__dataset__host") + "/api/dataset/v1" # getting variable from airflow
data_partition_id = Variable.get("data_partition_id") # getting variable from airflow

ACL = {
    "viewers": [
        "data.default.viewers@"+data_partition_id+".[MY_ACL_SERVICE_URL]"
    ],
    "owners": [
        "data.default.owners@"+data_partition_id+".[MY_ACL_SERVICE_URL]"
    ]
}
legaltags = [data_partition_id+"-demo-legaltag"]

LEGAL = {
    "legaltags": legaltags,
    "otherRelevantDataCountries": ["US"],
    "status": "compliant"   
}

prefix = ('{\
  "execution_context": {\
    "acl": ' + json.dumps(ACL) + ','
    '"legal": ' + json.dumps(LEGAL) + ',' +
    '"Payload": {\
      "AppKey": "test-app",\
      "data-partition-id": "' + data_partition_id + '"'
    '},"manifest":')

postfix = "}}"


class MyClient:
    def register_dataset(self, dataset_registries, bearer_token=None):
        return requests.put('{}{}'.format(DATASET_URL, '/registerDataset'), 
                                json=dataset_registries, 
                                headers={'Authorization': 'Bearer ' + bearer_token,
                                        'data-partition-id': data_partition_id,
                                        'Content-Type': "application/json"})
        
    def get_storage_instructions(self, resource_type_id: str, bearer_token=None):
        params = {'kindSubType' : resource_type_id}
        print("Get req on ", '{}{}'.format(DATASET_URL, '/getStorageInstructions'))
        return requests.get('{}{}'.format(DATASET_URL, '/getStorageInstructions'), 
                                params=params, 
                                headers={'data-partition-id' : data_partition_id, 'Authorization': 'Bearer ' + bearer_token})

    def get_retrieval_instructions(self, record_id: str, bearer_token=None):
        params = {'datasetRegistryIds': [record_id]} 
        print("Post req on ", '{}{}'.format(DATASET_URL, '/getRetrievalInstructions'))
        return requests.post('{}{}'.format(DATASET_URL, '/getRetrievalInstructions'), 
                                json=params, 
                                headers={'data-partition-id' : data_partition_id, 'Authorization': 'Bearer ' + bearer_token})

    def make_request(self, method, url, bearer_token, data=None):
        if (method == "HttpMethod.PUT"):
            return requests.put(url, 
                                data=data, 
                                headers={'Content-Type' : 'application/json'})
        elif (method == "HttpMethod.GET"):
            return requests.get(url, headers={'Content-Type' : 'application/json'})


def upload_manifest_on_dataset_service(token, manifest:str):
    client = MyClient()
    storage_instruction = client.get_storage_instructions(resource_type_id="dataset--File.Generic", bearer_token=token)

    print("# Storage instructions : ")
    pprint(storage_instruction.json())

    if(storage_instruction.status_code == 200):
        signedUrl = "" 
        try:
            signedUrl = storage_instruction.json()['storageLocation']['signedUrl']
        except Exception as e:
            print("No signed url found for storage location")

        unsignedUrl = "" 
        try:
            unsignedUrl = storage_instruction.json()['storageLocation']['unsignedUrl']
        except Exception as e:
            print("No unsigned url found for storage location")

        fileSource = "" 
        try:
            fileSource = storage_instruction.json()['storageLocation']['fileSource']
        except Exception as e:
            print("No filesource found for storage location")

        signedUploadFileName = ""
        try:
            signedUploadFileName = storage_instruction.json()['storageLocation']['signedUploadFileName']
        except Exception as e:
            print("No signedUploadFileName found for storage location")

        print("Uploading data on dataset service ...")
        if isinstance(manifest, dict):
            manifest = json.dumps(manifest)
        put_result = client.make_request(method="HttpMethod.PUT", url=signedUrl, 
                data=json.dumps(manifest), bearer_token=token)

        print("# Put Result : ")
        pprint(put_result)

        dataset_register_data = {
            "datasetRegistries": [
                {
                    # "id": data_partition_id + ":dataset--File.Generic:" + fileSource[1:],
                    "version": 1614105463059152,
                    "kind": data_partition_id + ":wks:dataset--File.Generic:1.0.0",
                    "acl": ACL,
                    "legal": LEGAL,
                    "data": {
                        "DatasetProperties": {
                            "FileSourceInfo": {
                                "FileSource": fileSource,
                                "PreLoadFilePath": unsignedUrl+signedUploadFileName
                            }
                        },
                        "ResourceSecurityClassification": data_partition_id + ":reference-data--ResourceSecurityClassification:RESTRICTED:",
                        "SchemaFormatTypeID": data_partition_id + ":reference-data--SchemaFormatType:TabSeparatedColumnarText:"
                    },
                    "meta": [],
                    "tags": {}
                }
            ]
        }

        print("# Data to put on dataset service")
        print(json.dumps(dataset_register_data, indent = 4))
        registered_dataset = client.register_dataset(dataset_registries=dataset_register_data, bearer_token=token)
        print("# Registered Dataset : ")
        pprint(registered_dataset.json())
        print("# register_dataset id :", registered_dataset.json()['datasetRegistries'][0]['id'])

        
        print("\n===># Register_dataset id :", registered_dataset.json()['datasetRegistries'][0]['id'])
        return registered_dataset.json()['datasetRegistries'][0]['id']

    else:
        print("Error while requesting storage instruction ", storage_instruction)
        print("storage_instruction:", storage_instruction)
        print(storage_instruction.__dict__)
    return None

def get_dataset(record_id: str, bearer_token: str):
    client = MyClient()
    retrieval = client.get_retrieval_instructions(record_id=record_id, bearer_token=bearer_token)
    print("#0 ", retrieval.json()["delivery"][0]["retrievalProperties"]["signedUrl"])

    retrievalContentURL = retrieval.json()["delivery"][0]["retrievalProperties"]["signedUrl"]
    recovered_data = client.make_request(method="HttpMethod.GET", url=retrievalContentURL, bearer_token=bearer_token)
    try:
        print("#1 ", str(recovered_data.json())[:100])
    except Exception as e:
        print(e)
    return recovered_data.json()


def main(ref_record_id: str, file_content: str, run_id: str):
    c = Client(None, None)

    print("Launching record : ", run_id)

    conf_no_ref = file_content
    if "execution_context" not in conf_no_ref:
        conf_no_ref = prefix + file_content + postfix

    c.trigger_dag(dag_id='Osdu_ingest_by_reference',
                  run_id= run_id + "_by_ref",
                  conf=prefix + '"' + ref_record_id + '"' + postfix)

    c.trigger_dag(dag_id='Osdu_ingest',
                  run_id=run_id,
                  conf=conf_no_ref)


if __name__ == "__main__":
    print("DATASET url :", DATASET_URL)
    print("Usage : dag_triggerer_ref.py [FILE_PATH] [RUN_ID] [TOKEN] [[RECORD_ID]]")
    print("# if you allready uploaded your file and have a record id, set it on the 4th argument, else the file will be automatically uploaded")
    token = sys.argv[3]
    # print("TOKEN ", token)

    f = open(sys.argv[1])
    file_content = f.read()
    # file_content = file_content.replace("NOT_MY_DATA_PARTITION_ID", str(data_partition_id))
    # file_content = file_content.replace("NOT_MY_DOMAIN", data_partition_id+".MY_DOMAIN")
    # file_content = file_content.replace("NOT_MY_LEGAL_TAG", data_partition_id+"-MY_LEGAL_TAG")

    record_id = None
    if len(sys.argv) > 4:
        record_id = sys.argv[4]
    else:
        try:
            record_id = upload_manifest_on_dataset_service(token=token, manifest=file_content)
        except Exception as e:
            print(e)
            raise e
    # exit(0)
    if record_id is not None:
        main(ref_record_id=record_id, file_content=file_content, run_id=sys.argv[2])
    else:
        print("#ERR) Saving data in dataset service failed")

Details

Most of the modifications have been done in the ReceivingContextMixin.py file. This file contains now the function that allows to take the manifest from the DatasetService.

There are now new operators that are copies of the "normal" ones, to use the manifest by reference functionality. The only modifications in these operators are for getting the manifest and putting the modified manifest from/to the DatasetService.

Example to get the manifest to the DatasetService :

manifest_data = self._get_manifest_data_by_reference(context=context,
                                                     execution_context=execution_context,
                                                     use_history=False,
                                                     config_manager=None,
                                                     data_partition_id=None,
                                                     token_refresher=token_refresher,
                                                     logger=logger)

Descriptions of important parameters :

  • use_history : if set to True, the record_id will be taken from a record_id list in the XCOM context. Else, the manifest will be taken from the result returned by the previous operator.
  • config_manager : used to create an instance of DatasetDmsClient. If set to None, the value will be set to an instance of DefaultConfigManager.
  • data_partition_id : If set to None, the value will be taken from the environment variable data_partition_id.

Example to put the manifest on the DatasetService (and get the recordId) :

record_id = self._put_manifest_data_by_reference(context=context,
                                                 execution_context=execution_context,
                                                 manifest=manifest,
                                                 use_history=False,
                                                 config_manager=None,
                                                 data_partition_id=None,
                                                 token_refresher=token_refresher,
                                                 logger=logger)

Descriptions of important parameters :

  • use_history : if set to True, the record_id will be also stored in a list, in the XCOM context. If all operators enable this parameter, it allows at any time of the ingestion process to access to all temporary manifests pushed in the DatasetService during the dag execution.
  • manifest : the manifest you want to store.
  • config_manager : used to create an instance of DatasetDmsClient and a DatasetRegistryClient. If set to None, the value will be set to an instance of DefaultConfigManager.
  • data_partition_id : If set to None, the value will be taken from the environment variable data_partition_id.

New operators

For the tests I created copy of operators "process_manifest_r3", "ensure_manifest_integrity" and "validate_manifest_schema". This allows to keep the version that does not use the manifest_by_reference code, to do some efficency comparison.

Edited by Valentin Gauthier

Merge request reports