osdu-airflow-lib merge requestshttps://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests2021-09-10T15:14:20Zhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/2GONRG-3109: move common logic from ingestion2021-09-10T15:14:20ZAleksandr Spivakov (EPAM)GONRG-3109: move common logic from ingestionM9 - Release 0.12Siarhei Khaletski (EPAM)Siarhei Khaletski (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/6GONRG-3452: Move Ingestion logic from Python SDK2021-10-12T15:11:11ZYan Sushchynski (EPAM)GONRG-3452: Move Ingestion logic from Python SDKChange location Manifest Based Ingestion code from Python SDK to separate repo.Change location Manifest Based Ingestion code from Python SDK to separate repo.M9 - Release 0.12Siarhei Khaletski (EPAM)Siarhei Khaletski (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/9GONRG-3605: Update deps version to 0.12.02021-10-27T12:37:38ZYan Sushchynski (EPAM)GONRG-3605: Update deps version to 0.12.0Update dependencies' versions to 0.12.0Update dependencies' versions to 0.12.0M9 - Release 0.12Siarhei Khaletski (EPAM)Siarhei Khaletski (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/8GONRG-3299: Unit tests for AF1 and AF22021-11-03T09:01:53ZYan Sushchynski (EPAM)GONRG-3299: Unit tests for AF1 and AF2Add tests for `Operators` in AF1 and AF2 images.Add tests for `Operators` in AF1 and AF2 images.M10 - Release 0.13Siarhei Khaletski (EPAM)Aleksandr Spivakov (EPAM)Siarhei Khaletski (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/10GONRG-3618: Local build setup2021-11-03T09:08:13ZYan Sushchynski (EPAM)GONRG-3618: Local build setupResolves the issue: https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/issues/1, when it was needed to specify extra environmental variables if the package was built locally.Resolves the issue: https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/issues/1, when it was needed to specify extra environmental variables if the package was built locally.M10 - Release 0.13Siarhei Khaletski (EPAM)Siarhei Khaletski (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/12GONRG-3618: Local build setup2021-11-03T11:00:40ZYan Sushchynski (EPAM)GONRG-3618: Local build setupResolves the issue: https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/issues/1, when it was needed to specify extra environmental variables if the package was built locally.Resolves the issue: https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/issues/1, when it was needed to specify extra environmental variables if the package was built locally.M10 - Release 0.13Siarhei Khaletski (EPAM)Siarhei Khaletski (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/7GONRG-3119: Python SDK Usage2021-11-12T11:21:59ZYan Sushchynski (EPAM)GONRG-3119: Python SDK UsageAdd using Python SDK `clients` instead of `requests`.Add using Python SDK `clients` instead of `requests`.M10 - Release 0.13Siarhei Khaletski (EPAM)Siarhei Khaletski (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/14GONRG-3856 update airflow variable2021-11-30T17:13:41ZAleksandr Spivakov (EPAM)GONRG-3856 update airflow variableForce variable `core__config__show_skipped_ids` to be json valid to remove ambiguitiesForce variable `core__config__show_skipped_ids` to be json valid to remove ambiguitiesM10 - Release 0.13Siarhei Khaletski (EPAM)Yan Sushchynski (EPAM)Siarhei Khaletski (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/16GONRG-3789: Open VDS Metadata2021-12-14T14:15:01ZYan Sushchynski (EPAM)GONRG-3789: Open VDS MetadataAdd an operator to populate converted OpenVDS metadata.Add an operator to populate converted OpenVDS metadata.M10 - Release 0.13Siarhei Khaletski (EPAM)Siarhei Khaletski (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/15GONRG-3783: Common pipeline for osdu-*2021-12-22T18:51:45ZYan Sushchynski (EPAM)GONRG-3783: Common pipeline for osdu-*Common pipelinesCommon pipelinesM10 - Release 0.13Vladislav Shishko (EPAM)Vladislav Shishko (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/26Update version to 0.162022-07-18T13:36:27ZYan Sushchynski (EPAM)Update version to 0.16M13 - Release 0.16Yan Sushchynski (EPAM)Yan Sushchynski (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/24Gonrg 5217 ovds headers override2022-12-13T00:03:52ZYan Sushchynski (EPAM)Gonrg 5217 ovds headers overrideTicket:
https://community.opengroup.org/osdu/platform/data-flow/ingestion/segy-to-vds-conversion/-/issues/12
In this MR we implement passing extra information about SEG-Y header byte locations to OpenVDS Converter.
The information abou...Ticket:
https://community.opengroup.org/osdu/platform/data-flow/ingestion/segy-to-vds-conversion/-/issues/12
In this MR we implement passing extra information about SEG-Y header byte locations to OpenVDS Converter.
The information about byte locations is taken from "data.VectorHeaderMapping" field of FileColelction.Segy.
The following documentation describes OpenVDS "header" arguments:
https://community.opengroup.org/osdu/platform/domain-data-mgmt-services/seismic/open-vds/-/blob/master/tools/SEGYImport/README.mdM13 - Release 0.16Yan Sushchynski (EPAM)Yan Sushchynski (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/17Manifest by reference, first code version2022-12-12T14:57:38ZValentin GauthierManifest by reference, first code version# Manifest by reference
This code refers to [issue 67](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/67).
## Goal
This issue's goal is to avoid storing large manifest in the airflow's XCOM con...# Manifest by reference
This code refers to [issue 67](https://community.opengroup.org/osdu/platform/data-flow/ingestion/ingestion-dags/-/issues/67).
## Goal
This issue's goal is to avoid storing large manifest in the airflow's XCOM context. Because XCOM has a 'tiny' limited memory, and it can make the ingestion process fail.
## How
The manifest is not anymore stored in the XCOM context and all operators now take the manifest from the DatasetService. To access to the manifest, only the recordId of the manifest is now stored in the XCOM context.
This implies that instead of sending the manifest to trigger the ingestion dag, the user should now first upload his manifest to the DatasetService. And then he should trigger the dag by *giving the recordId instead of the complete manifest* :
```json
{
"executionContext": {
"Payload": {
"AppKey": "{{YOUR_APP}}",
"data-partition-id": "{{data_partition_id}}"
},
"manifest": "{{MANIFEST_RECORD_ID}}"
}
}
```
## How to test
If you want to test the ManifestByReference code :
- Update your code to the last version of the 'master' branch of the [osdu-airflow-lib](https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib) project.
- Have a dag that uses the new "ByReference" operators.
To ease your tests, you can find [here](
https://community.opengroup.org/osdu/platform/testing/-/tree/master/Dev/Manifest_By_Reference) a **postman collection**.
<details>
<summary>[Click to expand] Example of a dag that uses the new "ByRefernce" operators</summary>
```python
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""DAG for R3 ingestion."""
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from osdu_airflow.backward_compatibility.default_args import update_default_args
from osdu_airflow.operators.ensure_manifest_integrity_by_reference import EnsureManifestIntegrityOperatorByReference
from osdu_airflow.operators.process_manifest_r3_by_reference import ProcessManifestOperatorR3ByReference
from osdu_airflow.operators.update_status import UpdateStatusOperator
from osdu_airflow.operators.validate_manifest_schema_by_reference import ValidateManifestSchemaOperatorByReference
from osdu_ingestion.libs.exceptions import NotOSDUSchemaFormatError
BATCH_NUMBER = int(Variable.get("core__ingestion__batch_count", "3"))
PROCESS_SINGLE_MANIFEST_FILE = "process_single_manifest_file_task"
PROCESS_BATCH_MANIFEST_FILE = "batch_upload"
ENSURE_INTEGRITY_TASK = "provide_manifest_integrity_task"
SINGLE_MANIFEST_FILE_FIRST_OPERATOR = "validate_manifest_schema_task"
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
"retries": 0,
"retry_delay": timedelta(seconds=30),
"trigger_rule": "none_failed",
}
default_args = update_default_args(default_args)
workflow_name = "Osdu_ingest_by_reference"
def is_batch(**context):
"""
:param context: Dag context
:return: SubDag to be executed next depending on Manifest type
"""
manifest = context["dag_run"].conf["execution_context"].get("manifest")
if isinstance(manifest, dict):
subdag = SINGLE_MANIFEST_FILE_FIRST_OPERATOR
elif isinstance(manifest, str): # str for manifest rec id
subdag = SINGLE_MANIFEST_FILE_FIRST_OPERATOR
context["ti"].xcom_push(key="manifest_ref_ids", value=[manifest])
elif isinstance(manifest, list):
subdag = PROCESS_BATCH_MANIFEST_FILE
else:
raise NotOSDUSchemaFormatError(f"Manifest must be either 'dict' or 'list'. "
f"Got {manifest}.")
return subdag
with DAG(
workflow_name,
default_args=default_args,
description="R3 manifest processing with providing integrity",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60)
) as dag:
update_status_running_op = UpdateStatusOperator(
task_id="update_status_running_task",
)
branch_is_batch_op = BranchPythonOperator(
task_id="check_payload_type",
python_callable=is_batch,
trigger_rule="none_failed_or_skipped"
)
update_status_finished_op = UpdateStatusOperator(
task_id="update_status_finished_task",
dag=dag,
trigger_rule="all_done",
)
validate_schema_operator = ValidateManifestSchemaOperatorByReference(
task_id="validate_manifest_schema_task",
trigger_rule="none_failed_or_skipped"
)
ensure_integrity_op = EnsureManifestIntegrityOperatorByReference(
task_id=ENSURE_INTEGRITY_TASK,
previous_task_id=validate_schema_operator.task_id,
trigger_rule="none_failed_or_skipped"
)
process_single_manifest_file = ProcessManifestOperatorR3ByReference(
task_id=PROCESS_SINGLE_MANIFEST_FILE,
previous_task_id=ensure_integrity_op.task_id,
trigger_rule="none_failed_or_skipped"
)
# Dummy operator as entry point into parallel task of batch upload
batch_upload = DummyOperator(
task_id=PROCESS_BATCH_MANIFEST_FILE
)
for batch in range(0, BATCH_NUMBER):
batch_upload >> ProcessManifestOperatorR3ByReference(
task_id=f"process_manifest_task_{batch + 1}",
previous_task_id=f"provide_manifest_integrity_task_{batch + 1}",
batch_number=batch + 1,
trigger_rule="none_failed_or_skipped"
) >> update_status_finished_op
update_status_running_op >> branch_is_batch_op # pylint: disable=pointless-statement
branch_is_batch_op >> batch_upload # pylint: disable=pointless-statement
branch_is_batch_op >> validate_schema_operator >> ensure_integrity_op >> process_single_manifest_file >> update_status_finished_op # pylint: disable=pointless-statement
```
</details>
**Note:** If you also copy the *osdu-ingest-r3* dag, do not forget to change the DAG name (e.g. `workflow_name = "Osdu_ingest_by_reference"`)
As you can see in the dag, the changes are the use of the new operators : *ValidateManifestSchemaOperatorByReference*, *EnsureManifestIntegrityOperatorByReference* and ProcessManifestOperatorR3ByReference.
Also, to be able to take the record_id instead of the full manifest from the dag-triggering request, we modified the *is_batch* function :
```python
...
if isinstance(manifest, dict):
subdag = SINGLE_MANIFEST_FILE_FIRST_OPERATOR
elif isinstance(manifest, str): # str for manifest rec id
subdag = SINGLE_MANIFEST_FILE_FIRST_OPERATOR
context["ti"].xcom_push(key="manifest_ref_ids", value=[manifest])
...
```
Here we push the record_id in the XCOM context list **manifest_ref_ids**. This will allows the first operator that needs the manifest (*ValidateManifestSchemaOperatorByReference*) to take it by using the *history* parameter (see. the [get manifest part](#example-to-get-the-manifest-to-the-datasetservice-)).
### Run the code on your local machine
To run the code locally, you can check the project [individual-airflow](https://community.opengroup.org/osdu/platform/deployment-and-operations/individual-airflow.git).
Locally, the _workflow_ service may not run and block the following operators. To skip its tasks an just test the _manifest_by_reference_, you can comment it in the _Osdu_ingest_by_reference_ DAG:
```python
# update_status_running_op >> branch_is_batch_op # pylint: disable=pointless-statement
```
You can trigger your DAG (from the scheduler container) with a very large file using a python script like this:
```python
from airflow.api.client.local_client import Client
c = Client(None, None)
c.trigger_dag(dag_id='Osdu_ingest_by_reference', conf=open('PATH_TO_MY_FILE_ON_SCHEDULER_CONTAINER').read())
```
For more simplicity, please find a file that takes a manifest, upload it on the dataset-service, and launch the dags ("by_reference" version and the normal version).
Please read the script and change variables at the top of the file. You should also change the "dag_id" parameter when function "trigger_dag" is called.
*Note :* this file must be launch inside the "airflow scheduler" container, with a manifest file that is also inside this container.
<details>
<summary>Click to expand and see the script</summary>
```python
from airflow.api.client.local_client import Client
import json
import sys
import os
from airflow.models import Variable
import requests
from pprint import pprint
DATASET_URL = Variable.get("core__service__dataset__host") + "/api/dataset/v1" # getting variable from airflow
data_partition_id = Variable.get("data_partition_id") # getting variable from airflow
ACL = {
"viewers": [
"data.default.viewers@"+data_partition_id+".[MY_ACL_SERVICE_URL]"
],
"owners": [
"data.default.owners@"+data_partition_id+".[MY_ACL_SERVICE_URL]"
]
}
legaltags = [data_partition_id+"-demo-legaltag"]
LEGAL = {
"legaltags": legaltags,
"otherRelevantDataCountries": ["US"],
"status": "compliant"
}
prefix = ('{\
"execution_context": {\
"acl": ' + json.dumps(ACL) + ','
'"legal": ' + json.dumps(LEGAL) + ',' +
'"Payload": {\
"AppKey": "test-app",\
"data-partition-id": "' + data_partition_id + '"'
'},"manifest":')
postfix = "}}"
class MyClient:
def register_dataset(self, dataset_registries, bearer_token=None):
return requests.put('{}{}'.format(DATASET_URL, '/registerDataset'),
json=dataset_registries,
headers={'Authorization': 'Bearer ' + bearer_token,
'data-partition-id': data_partition_id,
'Content-Type': "application/json"})
def get_storage_instructions(self, resource_type_id: str, bearer_token=None):
params = {'kindSubType' : resource_type_id}
print("Get req on ", '{}{}'.format(DATASET_URL, '/getStorageInstructions'))
return requests.get('{}{}'.format(DATASET_URL, '/getStorageInstructions'),
params=params,
headers={'data-partition-id' : data_partition_id, 'Authorization': 'Bearer ' + bearer_token})
def get_retrieval_instructions(self, record_id: str, bearer_token=None):
params = {'datasetRegistryIds': [record_id]}
print("Post req on ", '{}{}'.format(DATASET_URL, '/getRetrievalInstructions'))
return requests.post('{}{}'.format(DATASET_URL, '/getRetrievalInstructions'),
json=params,
headers={'data-partition-id' : data_partition_id, 'Authorization': 'Bearer ' + bearer_token})
def make_request(self, method, url, bearer_token, data=None):
if (method == "HttpMethod.PUT"):
return requests.put(url,
data=data,
headers={'Content-Type' : 'application/json'})
elif (method == "HttpMethod.GET"):
return requests.get(url, headers={'Content-Type' : 'application/json'})
def upload_manifest_on_dataset_service(token, manifest:str):
client = MyClient()
storage_instruction = client.get_storage_instructions(resource_type_id="dataset--File.Generic", bearer_token=token)
print("# Storage instructions : ")
pprint(storage_instruction.json())
if(storage_instruction.status_code == 200):
signedUrl = ""
try:
signedUrl = storage_instruction.json()['storageLocation']['signedUrl']
except Exception as e:
print("No signed url found for storage location")
unsignedUrl = ""
try:
unsignedUrl = storage_instruction.json()['storageLocation']['unsignedUrl']
except Exception as e:
print("No unsigned url found for storage location")
fileSource = ""
try:
fileSource = storage_instruction.json()['storageLocation']['fileSource']
except Exception as e:
print("No filesource found for storage location")
signedUploadFileName = ""
try:
signedUploadFileName = storage_instruction.json()['storageLocation']['signedUploadFileName']
except Exception as e:
print("No signedUploadFileName found for storage location")
print("Uploading data on dataset service ...")
if isinstance(manifest, dict):
manifest = json.dumps(manifest)
put_result = client.make_request(method="HttpMethod.PUT", url=signedUrl,
data=json.dumps(manifest), bearer_token=token)
print("# Put Result : ")
pprint(put_result)
dataset_register_data = {
"datasetRegistries": [
{
# "id": data_partition_id + ":dataset--File.Generic:" + fileSource[1:],
"version": 1614105463059152,
"kind": data_partition_id + ":wks:dataset--File.Generic:1.0.0",
"acl": ACL,
"legal": LEGAL,
"data": {
"DatasetProperties": {
"FileSourceInfo": {
"FileSource": fileSource,
"PreLoadFilePath": unsignedUrl+signedUploadFileName
}
},
"ResourceSecurityClassification": data_partition_id + ":reference-data--ResourceSecurityClassification:RESTRICTED:",
"SchemaFormatTypeID": data_partition_id + ":reference-data--SchemaFormatType:TabSeparatedColumnarText:"
},
"meta": [],
"tags": {}
}
]
}
print("# Data to put on dataset service")
print(json.dumps(dataset_register_data, indent = 4))
registered_dataset = client.register_dataset(dataset_registries=dataset_register_data, bearer_token=token)
print("# Registered Dataset : ")
pprint(registered_dataset.json())
print("# register_dataset id :", registered_dataset.json()['datasetRegistries'][0]['id'])
print("\n===># Register_dataset id :", registered_dataset.json()['datasetRegistries'][0]['id'])
return registered_dataset.json()['datasetRegistries'][0]['id']
else:
print("Error while requesting storage instruction ", storage_instruction)
print("storage_instruction:", storage_instruction)
print(storage_instruction.__dict__)
return None
def get_dataset(record_id: str, bearer_token: str):
client = MyClient()
retrieval = client.get_retrieval_instructions(record_id=record_id, bearer_token=bearer_token)
print("#0 ", retrieval.json()["delivery"][0]["retrievalProperties"]["signedUrl"])
retrievalContentURL = retrieval.json()["delivery"][0]["retrievalProperties"]["signedUrl"]
recovered_data = client.make_request(method="HttpMethod.GET", url=retrievalContentURL, bearer_token=bearer_token)
try:
print("#1 ", str(recovered_data.json())[:100])
except Exception as e:
print(e)
return recovered_data.json()
def main(ref_record_id: str, file_content: str, run_id: str):
c = Client(None, None)
print("Launching record : ", run_id)
conf_no_ref = file_content
if "execution_context" not in conf_no_ref:
conf_no_ref = prefix + file_content + postfix
c.trigger_dag(dag_id='Osdu_ingest_by_reference',
run_id= run_id + "_by_ref",
conf=prefix + '"' + ref_record_id + '"' + postfix)
c.trigger_dag(dag_id='Osdu_ingest',
run_id=run_id,
conf=conf_no_ref)
if __name__ == "__main__":
print("DATASET url :", DATASET_URL)
print("Usage : dag_triggerer_ref.py [FILE_PATH] [RUN_ID] [TOKEN] [[RECORD_ID]]")
print("# if you allready uploaded your file and have a record id, set it on the 4th argument, else the file will be automatically uploaded")
token = sys.argv[3]
# print("TOKEN ", token)
f = open(sys.argv[1])
file_content = f.read()
# file_content = file_content.replace("NOT_MY_DATA_PARTITION_ID", str(data_partition_id))
# file_content = file_content.replace("NOT_MY_DOMAIN", data_partition_id+".MY_DOMAIN")
# file_content = file_content.replace("NOT_MY_LEGAL_TAG", data_partition_id+"-MY_LEGAL_TAG")
record_id = None
if len(sys.argv) > 4:
record_id = sys.argv[4]
else:
try:
record_id = upload_manifest_on_dataset_service(token=token, manifest=file_content)
except Exception as e:
print(e)
raise e
# exit(0)
if record_id is not None:
main(ref_record_id=record_id, file_content=file_content, run_id=sys.argv[2])
else:
print("#ERR) Saving data in dataset service failed")
```
</details>
## Details
Most of the modifications have been done in the [ReceivingContextMixin.py](https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/blob/master/osdu_airflow/operators/mixins/ReceivingContextMixin.py) file. This file contains now the function that allows to take the manifest from the DatasetService.
There are now new operators that are copies of the "normal" ones, to use the manifest by reference functionality. The only modifications in these operators are for getting the manifest and putting the modified manifest from/to the DatasetService.
### Example to get the manifest to the DatasetService :
```python
manifest_data = self._get_manifest_data_by_reference(context=context,
execution_context=execution_context,
use_history=False,
config_manager=None,
data_partition_id=None,
token_refresher=token_refresher,
logger=logger)
```
Descriptions of important parameters :
- *use_history* : if set to **True**, the record_id will be taken from a record_id list in the XCOM context. Else, the manifest will be taken from the result returned by the previous operator.
- *config_manager* : used to create an instance of [DatasetDmsClient](https://community.opengroup.org/osdu/platform/system/sdks/common-python-sdk/-/blob/master/osdu_api/clients/dataset/dataset_dms_client.py). If set to **None**, the value will be set to an instance of [DefaultConfigManager](https://community.opengroup.org/osdu/platform/system/sdks/common-python-sdk/-/blob/master/osdu_api/configuration/config_manager.py).
- *data_partition_id* : If set to **None**, the value will be taken from the environment variable **data_partition_id**.
### Example to put the manifest on the DatasetService (and get the recordId) :
```python
record_id = self._put_manifest_data_by_reference(context=context,
execution_context=execution_context,
manifest=manifest,
use_history=False,
config_manager=None,
data_partition_id=None,
token_refresher=token_refresher,
logger=logger)
```
Descriptions of important parameters :
- *use_history* : if set to **True**, the record_id will be also stored in a list, in the XCOM context. If **all** operators enable this parameter, it allows at any time of the ingestion process to access to all temporary manifests pushed in the DatasetService during the dag execution.
- *manifest* : the manifest you want to store.
- *config_manager* : used to create an instance of [DatasetDmsClient](https://community.opengroup.org/osdu/platform/system/sdks/common-python-sdk/-/blob/master/osdu_api/clients/dataset/dataset_dms_client.py) and a [DatasetRegistryClient](https://community.opengroup.org/osdu/platform/system/sdks/common-python-sdk/-/blob/master/osdu_api/clients/dataset/dataset_registry_client.py). If set to **None**, the value will be set to an instance of [DefaultConfigManager](https://community.opengroup.org/osdu/platform/system/sdks/common-python-sdk/-/blob/master/osdu_api/configuration/config_manager.py).
- *data_partition_id* : If set to **None**, the value will be taken from the environment variable **data_partition_id**.
### New operators
For the tests I created copy of operators "process_manifest_r3", "ensure_manifest_integrity" and "validate_manifest_schema". This allows to keep the version that does not use the _manifest_by_reference_ code, to do some efficency comparison.M13 - Release 0.16https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/29Create separate update_status operator2022-08-03T16:08:05ZYan Sushchynski (EPAM)Create separate update_status operatorCreate separate update_status operatorCreate separate update_status operatorM13 - Release 0.16Valentin GauthierValentin Gauthierhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/30Add rc version2022-08-12T15:12:38ZYan Sushchynski (EPAM)Add rc versionM14 - Release 0.17https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/27GONRG-3815: Make osdu_api.ini optional2022-10-09T15:57:28ZYan Sushchynski (EPAM)GONRG-3815: Make osdu_api.ini optionalThe goal of this MR is to make using `osdu_api.ini` file for `osdu_api` optional.
Now, we are able to use `Airflow variables` for initializing different `osdu_api` clients.
Also, this MR depends on https://community.opengroup.org/osdu/p...The goal of this MR is to make using `osdu_api.ini` file for `osdu_api` optional.
Now, we are able to use `Airflow variables` for initializing different `osdu_api` clients.
Also, this MR depends on https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-ingestion-lib/-/merge_requests/25M14 - Release 0.17Yan Sushchynski (EPAM)Yan Sushchynski (EPAM)https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/32FIX: PreloadFilePath spelling in Manifest-by-reference2022-08-31T12:05:15ZYan Sushchynski (EPAM)FIX: PreloadFilePath spelling in Manifest-by-referenceFix `PreloadFilePath` field name in Manifest-by-reference to make osdu-airflow compatible with this MR: https://community.opengroup.org/osdu/platform/system/dataset/-/merge_requests/234Fix `PreloadFilePath` field name in Manifest-by-reference to make osdu-airflow compatible with this MR: https://community.opengroup.org/osdu/platform/system/dataset/-/merge_requests/234M14 - Release 0.17https://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/33cherry-pick-04d842cd-fix-spelling-PreloadfilePath2022-09-01T15:57:24ZYan Sushchynski (EPAM)cherry-pick-04d842cd-fix-spelling-PreloadfilePathFIX: PreloadFilePath spelling in Manifest-by-reference
See merge request osdu/platform/data-flow/ingestion/osdu-airflow-lib!32
(cherry picked from commit 04d842cdbbdf039ba8c6e1490566405cf52800fa)
ee25bc84 fix: spelling of PreloadFile...FIX: PreloadFilePath spelling in Manifest-by-reference
See merge request osdu/platform/data-flow/ingestion/osdu-airflow-lib!32
(cherry picked from commit 04d842cdbbdf039ba8c6e1490566405cf52800fa)
ee25bc84 fix: spelling of PreloadFilePathM13 - Release 0.16David Diederichd.diederich@opengroup.orgChad LeongDavid Diederichd.diederich@opengroup.orghttps://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/34Refactor Manifest-by-reference operators to be compatible with 0.17 PythonSDK...2022-09-22T12:00:14ZYan Sushchynski (EPAM)Refactor Manifest-by-reference operators to be compatible with 0.17 PythonSDK and osdu-ingestion-libRefactor Manifest-by-reference operators to be compatible with PythonSDK and osdu-ingestion-lib in 0.17Refactor Manifest-by-reference operators to be compatible with PythonSDK and osdu-ingestion-lib in 0.17M14 - Release 0.17Valentin GauthierValentin Gauthierhttps://community.opengroup.org/osdu/platform/data-flow/ingestion/osdu-airflow-lib/-/merge_requests/35Set no_auth parameter2022-10-25T13:12:53ZMorris EstepaSet no_auth parameterSet the no_auth parameter when uploading files to signed URLs
Closes #2Set the no_auth parameter when uploading files to signed URLs
Closes #2M14 - Release 0.17Morris EstepaMorris Estepa