Skip to content
Snippets Groups Projects
Commit 695abc61 authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

Merge branch 'GONRG-2591_Implement_Airflow_2_compatibility' into 'master'

GONRG-2591: Airflow 2 backward compatibility

See merge request !61
parents 92cfca53 ac010abd
No related branches found
No related tags found
1 merge request!61GONRG-2591: Airflow 2 backward compatibility
Pipeline #51092 passed
......@@ -25,6 +25,7 @@
* * [Validate Manifest Schema Operator](#validate-manifest-schema-operator)
* * [Ensure Manifest Integrity Operator](#ensure-manifest-integrity-operator)
* * [Process Manifest Operator](#process-manifest-operator)
* [Backward compatibility](#backward-compatibility)
* [Licence](#licence)
## Introduction
......@@ -186,6 +187,42 @@ The operator output is a set of ingested records ids (stores in Airflow XCOM).
[os-python-sdk]: ../os-python-sdk
## Backward compatibility
At the current moment, Ingestion DAGs can work with Airflow 2.x and >=1.10.10.
To avoid incompatibilities a few code changes were introduced.
Use `libs.airflow.backward_compatibility.airflow_utils:apply_default` instead of `airflow.utils.apply_default` in operators.
Example:
```python
from libs.airflow.backward_compatibility.airflow_utils import apply_defaults
...
class SomeOperator(BaseOperator):
@apply_defaults
def __init__(self, *args, **kwargs):
...
```
Also, do not pass `provide_contex=True` to tasks directly. Use `libs.airflow.backward_compatibility.default_args:update_default_args` instead.
```python
from libs.airflow.backward_compatibility.default_args import update_default_args
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
"retries": 0,
"retry_delay": timedelta(seconds=30),
"trigger_rule": "none_failed",
}
default_args = update_default_args(default_args)
```
## Licence
Copyright © Google LLC
Copyright © EPAM Systems
......
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module is for Airflow specific code only."""
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" This module is for backward compatibility"""
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Mock airflow.utils"""
import logging
from typing import Callable
import airflow
def deprecated_function_decorator(func: Callable):
"""
This decorator is used to mock deprecated decorators if we use Airflow 2.0
:param func:
:return:
"""
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
if not airflow.__version__.startswith("2"):
logging.warning(f"'apply_defaults' is going to be removed in Airflow 2. Do not use it in the future.")
from airflow.utils import apply_defaults
else:
logging.warning(f"'apply_defaults' is removed in Airflow 2. "
f"It is used here due to backward compatibility.")
apply_defaults = deprecated_function_decorator
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import airflow
def update_default_args(default_args: dict) -> dict:
"""
Update default args of tasks with necessary args depending on Airflow version
:param default_args:
:return:
"""
if not airflow.__version__.startswith("2"):
default_args.update(
{
"provide_context": True
}
)
logging.warning(f"'provide_context' argument is going to be removed in Airflow 2.")
else:
logging.info(f"Airflow {airflow.__version__} is used. No need to update 'default_args'.")
return default_args
......@@ -19,6 +19,7 @@ from datetime import timedelta
import airflow
from airflow import DAG
from libs.airflow.backward_compatibility.default_args import update_default_args
from operators.deprecated.update_status import UpdateStatusOperator
from operators.process_manifest_r2 import ProcessManifestOperatorR2
from operators.search_record_id import SearchRecordIdOperator
......@@ -30,6 +31,8 @@ default_args = {
"trigger_rule": "none_failed",
}
default_args = update_default_args(default_args)
workflow_name = "Osdu_ingest_r2"
dag = DAG(
workflow_name,
......@@ -52,13 +55,11 @@ update_status_finished_op = UpdateStatusOperator(
process_manifest_op = ProcessManifestOperatorR2(
task_id="proccess_manifest_task",
provide_context=True,
dag=dag
)
search_record_ids_op = SearchRecordIdOperator(
task_id="search_record_ids_task",
provide_context=True,
dag=dag,
retries=1
)
......
......@@ -22,6 +22,7 @@ from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from libs.airflow.backward_compatibility.default_args import update_default_args
from libs.exceptions import NotOSDUSchemaFormatError
from operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator
from operators.process_manifest_r3 import ProcessManifestOperatorR3
......@@ -34,6 +35,7 @@ PROCESS_BATCH_MANIFEST_FILE = "batch_upload"
ENSURE_INTEGRITY_TASK = "provide_manifest_integrity_task"
SINGLE_MANIFEST_FILE_FIRST_OPERATOR = "validate_manifest_schema_task"
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
"retries": 0,
......@@ -41,6 +43,8 @@ default_args = {
"trigger_rule": "none_failed",
}
default_args = update_default_args(default_args)
workflow_name = "Osdu_ingest"
......@@ -70,13 +74,10 @@ with DAG(
) as dag:
update_status_running_op = UpdateStatusOperator(
task_id="update_status_running_task",
dag=dag
)
branch_is_batch_op = BranchPythonOperator(
task_id="check_payload_type",
dag=dag,
provide_context=True,
python_callable=is_batch,
trigger_rule="none_failed_or_skipped"
)
......@@ -89,38 +90,29 @@ with DAG(
validate_schema_operator = ValidateManifestSchemaOperator(
task_id="validate_manifest_schema_task",
provide_context=True,
dag=dag,
trigger_rule="none_failed_or_skipped"
)
ensure_integrity_op = EnsureManifestIntegrityOperator(
task_id=ENSURE_INTEGRITY_TASK,
provide_context=True,
previous_task_id=validate_schema_operator.task_id,
dag=dag,
trigger_rule="none_failed_or_skipped"
)
process_single_manifest_file = ProcessManifestOperatorR3(
task_id=PROCESS_SINGLE_MANIFEST_FILE,
provide_context=True,
dag=dag,
previous_task_id=ensure_integrity_op.task_id,
trigger_rule="none_failed_or_skipped"
)
# Dummy operator as entry point into parallel task of batch upload
batch_upload = DummyOperator(
dag=dag,
task_id=PROCESS_BATCH_MANIFEST_FILE
)
for batch in range(0, BATCH_NUMBER):
batch_upload >> ProcessManifestOperatorR3(
task_id=f"process_manifest_task_{batch + 1}",
provide_context=True,
dag=dag,
previous_task_id=f"provide_manifest_integrity_task_{batch + 1}",
batch_number=batch + 1,
trigger_rule="none_failed_or_skipped"
......
......@@ -15,10 +15,9 @@
"""R3 Validate reference Manifest operator."""
import logging
from typing import List, TypeVar
from airflow.utils import apply_defaults
from airflow.models import BaseOperator, Variable
from libs.airflow.backward_compatibility.airflow_utils import apply_defaults
from libs.context import Context
from libs.validation.validate_referential_integrity import ManifestIntegrity
from libs.validation.validate_file_source import FileSourceValidator
......
......@@ -22,9 +22,9 @@ import logging
from math import ceil
from typing import List, Tuple
from airflow.utils import apply_defaults
from airflow.models import BaseOperator, Variable
from jsonschema import SchemaError
from libs.airflow.backward_compatibility.airflow_utils import apply_defaults
from libs.constants import SURROGATE_KEYS_PATHS, DATA_TYPES_WITH_SURROGATE_KEYS
from libs.context import Context
from libs.exceptions import EmptyManifestError, NotOSDUSchemaFormatError, \
......@@ -36,7 +36,6 @@ from libs.process_manifest_r3 import ManifestProcessor
from libs.processors.single_manifest_processor import SingleManifestProcessor
from libs.search_client import SearchClient
from libs.types import ManifestType
from libs.validation.validate_data_integrity import DataIntegrityValidator
from libs.validation.validate_file_source import FileSourceValidator
from libs.validation.validate_referential_integrity import ManifestIntegrity
from libs.validation.validate_schema import SchemaValidator
......
......@@ -21,7 +21,7 @@ import logging
from typing import Tuple
from airflow.models import BaseOperator, Variable
from airflow.utils import apply_defaults
from libs.airflow.backward_compatibility.airflow_utils import apply_defaults
from libs.context import Context
from libs.refresh_token import AirflowTokenRefresher
from libs.exceptions import PipelineFailedError
......
......@@ -19,8 +19,8 @@ Validate Manifest against R3 schemas operator.
import logging
from airflow.utils import apply_defaults
from airflow.models import BaseOperator, Variable
from libs.airflow.backward_compatibility.airflow_utils import apply_defaults
from libs.constants import DATA_TYPES_WITH_SURROGATE_KEYS, SURROGATE_KEYS_PATHS
from libs.context import Context
from libs.exceptions import EmptyManifestError, GenericManifestSchemaError
......@@ -47,7 +47,6 @@ class ValidateManifestSchemaOperator(BaseOperator, ReceivingContextMixin):
'core__config__show_skipped_ids', default_var=False
)
def execute(self, context: dict):
"""Execute manifest validation then process it.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment