ADR : Manifest by reference
Status
-
Proposed -
Trialing -
Under review -
Approved -
Retired
Context
A lack of performance of the first implementation of the Ingestion_Dags_R3 was notified by the pre-shipping team A and confirmed by tests realized by EPAM in early April 2021.
In early April 2021, Pre-shipping team A and EPAM identified two important issues with the performance of the manifest based ingestion using Airflow 1.10.15. First, there was found to be a limit on the maximum size of the manifests [1-3] (up to 4Mb) which limited the utility of the application. And second, there have been questions about the likely scalability of the ingestion given the architectural design of the Airflow system [?]. Other issues, the minimum time required to ingest a single record, and the limitation of submitting one DAG per second, has also been identified but is not addressed by this ADR (these will be alleviated by a move to Airflow 2.0).
[1] https://gitlab.opengroup.org/osdu/subcommittees/ea/projects/pre-shipping/home/-/issues/64
[2] https://gitlab.opengroup.org/osdu/subcommittees/ea/projects/pre-shipping/home/-/issues/98
[3] https://gitlab.opengroup.org/osdu/subcommittees/ea/projects/pre-shipping/home/-/issues/99
[4] Feedback from our weekly PMC meetings.
Apache Airflow and XCOM
As we see in Fig. 1, the manifest is initially posted to the ingestion service (DAG) and then flows through operators within the DAG. At each node in the graph (an operator), the manifest must be loaded/saved to/from XCOM (see appendix for definitions). In a manifest based ingestion workflow, the operators apply transformations to the manifest itself, which might include actions like validation (removing invalid records), replacing surrogate keys etc. For a general use case, if a manifest is initially large, it can remain large throughout. This creates two problems, first, there is an overhead for serializing and deserializing the manifest at each operator. Second, (see the appendix), the size of objects that can be stored in XCOM is dependent on the database being used in the backend, resulting in issues [1-3]. The “max manifest size” of the application also becomes strongly platform dependent if we rely on XCOM to store manifests. Moreover, Airflow is a workflow orchestration tool, not a processing engine like Apache Spark; Airflow is not optimized to manage big data, and so from an architectural standpoint, we need to change how we use XCOM.
The solution envisaged is: rather than pass the full manifest payload to the Ingestion Workflow, pass a pointer (via dataset id, or some other construct) to the manifest to be processed.
Proposal
The purpose of this project is to improve the performance of the manifest based ingestion, by reducing the overhead of passing (potentially large) manifests through an https interface, when such an operation is not needed. This approach is also intended to better handle larger manifests for manifest ingestion. This project MUST be conducted after a necessary evolution of the overall development environment which have to take in count the evolution to AIRFLOW 2.0. and can change the process used today to envisage the ingestion workflow by adding a preparation phase before reusing the actually developed operators. On a first pass (in March) the following behavior was proposed:
- In this approach, a user or process would create one or more manifests (could be done using tooling)
- The user or process then uploads the manifests to OSDU using the Dataset service (e.g., get storage instructions, upload file, store metadata record, get the record id)
- Invoke the workflow service passing in on or more record ids that point to the uploaded manifest(s)
- Create a DAG operator capable of fetching the manifest(s) from storage using the Dataset service (get retrieval instructions)
Then, in late May we (GEOSIRIS) presented the following diagram :
On this proposal, the Airflow Context still exits but contains only a reference to a storage containing the full manifest. We could set up (or reuse) a method before starting the DAG, which use a POST from Postman to load the manifest in this storage area (from Landing Zone with Dataset Service).
Then, we will obtain a manifest ID which could be send from Postman to the first Operator. The interaction by XCOM to the Airflow context will remain but will be only containing the manifest Id. The operator will then use this id to POST an GET the full manifest from this storage area by a “Dataset Service” and at the end, if necessary, the manifest could be deleted from this storage area. In this situation the manifest is not only read but also written in the storage area with the “dataset service”. Currently we are not persisting manifests [in the platform], and the consensus is that, by default, we should not be storing the manifests after the records are ingested. We can discuss the interest to be able to record the manifests updated versions produced by the ingestion workflow service but this could be out of this scope.
If it is decided that storing the manifests is in scope we should keep two versions of each manifest at least in the dataset area, the original manifest and the manifest post processing.
Scope
In scope:
-
Develop an efficient method to POST the manifest to/from the landing zone and operators ( by using Dataset Service)
-
Demonstrate how existing operators should use this feature.
-
(optionally) store the final manifest in the persistent zone.
Out of scope:
-
Re-implement existing operators to use this feature.
-
Storing of status of ingested records for logging with the AdminUI.
Milestones
-
Build up locally a complete development environment based on the existing one after the migration from AIRFLOW 1.10.15 to AIRFLOW 2.0.1 by the EPAM team. This will allow to implement a test a cloud agnostic solution which could be delivered to the EPAM team and others.
-
Enhance the Preparation and Loading services to provide a way to Record an original manifest by the dataset service as a file on which an URL is attached. This URL will be communicated to the workflow service (by a POST request).
As we will have to provide this capability, it could be fine if we can impose that, during the preparation and loading step, the other files stored by the dataset service (example LAS, LIS, and WITSML) has to be realized before storing the original manifest in order to ensure that the referenced URLs for these file are correctly associated to the manifest, before the Ingestion workflow. At the End of the preparation and loading service completion each manifest will be associated with an URL.
An other Enhancement of the preparation and Loading could be envisaged (presented at the end).
-
The new process instead of directly push/pull a full manifest with xcom inside Airflow’s running context to the ingestion workflow will communicate the record id of a manifest (instead of the full manifest).The first operator gets the full manifest by using the dataset service from the record id it received. Some modifications may be done on the manifest, and then the operator store the new version on the dataset service. It also obtains a new record id for this updated manifest. This ID is then given to the following operator, by pushing it in workflow context with xcom. Then, after each step on which an operator is updating this manifest, a new record id is requested to store the new version of the manifest. This new version is then given to the following operator by sending the new record id. If an operator needs to access to all manifests created during the workflow, instead of only saving the last updated manifest record id in the Airflow context with xcom, operators could save the list of all record id received plus its updated manifest record id. This corresponds to the representation done on the figure “New approach” above. This could be interesting to implement an operator that verify all results of all other operators for example. During all the Ingestion Workflow all the manifests updated will be kept “in life” by the dataset Service in a data lake. Depending on the final decision, at the end of the process (after all operators has successfully processed), a “cleaning” operator could be added to remove all manifests created by this ingestion workflow. Or.. The last one .. which RecordID will be send to the storage service is kept with the original one.
-
Optional enhancement: Proposition of a more important development of the preparation and Loading step. An idea to generalize this idea to deliver only one RecordID to an ingestion Workflow able to manage a lot of manifest (in batch) could be to define before launching this workflow services a standard way to create a “manifest file package” file containing the references to the manifest files names and their location in a datalake :
This Manifest_File_Package will be communicated to the Workflow package. example of a Manifest_File_Package: { "kind": "osdu:wks:Manifest_File_Package:1.0.0" "Datasets":
"data":
{"DatasetProperties": {
"FileSourceInfo": {
"FileSource": "",
"Name": "load_log_1051_ana01_1962_comp.json ",
"PreloadFilePath": "s3://osdu-manifest- provided/load_log_1051_ana01_1962_comp.json "
}
}
"data":
{"DatasetProperties": {
"FileSourceInfo": {
"FileSource": "",
"Name": "load_1067_aps01_1971_comp.json ",
"PreloadFilePath": "s3://osdu-manifest- provided/1067_aps01_1971_comp.json "
}
}
}
The Ingestion workflow will need to access to the following urls : [](s3://osdu-manifest- provided/load_log_1051_ana01_1962_comp.json)
In this case all manifest will be accessible in the datalake independently and communicated one by one to the Ingestion workflow from the RecordID delivered in the Dataset properties of the manifest_File_Package delivered to the ingestion Workflow (we will have only on POST request for processing a lot of manifests).
Appendix
Apache Airflow docs:
An operator describes a single task in a workflow. Operators are usually (but not always) atomic, meaning they can stand on their own and don’t need to share resources with any other operators…. … This is a subtle but very important point: in general, if two operators need to share information, like a filename or small amount of data, you should consider combining them into a single operator. If it absolutely can’t be avoided, Airflow does have a feature for operator cross-communication called XCom that is described elsewhere in this document. XCOM’s let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”.... Any object that can be pickled can be used as an XCOM value, so users should make sure to use objects of appropriate size.