Skip to content
Snippets Groups Projects
Commit b97d68d2 authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM) Committed by Siarhei Khaletski (EPAM)
Browse files

Fix dag. Remove subdags

parent 5a58168e
No related branches found
No related tags found
1 merge request!24Ingestion updates
......@@ -71,8 +71,6 @@ osdu-gcp-deploy:
- gsutil -m rsync -d -R dags/libs $OSDU_GCP_DEPL_TARGET/dags/libs
- gsutil -m rsync -x "dags/providers*" -d -R dags $OSDU_GCP_DEPL_TARGET/dags/ingestion
- gsutil -m rsync -d -R dags/providers $OSDU_GCP_DEPL_TARGET/dags/providers
- gsutil -m rsync -x "dags/subdags*" -d -R dags $OSDU_GCP_DEPL_TARGET/dags/ingestion
- gsutil -m rsync -d -R dags/subdags $OSDU_GCP_DEPL_TARGET/dags/subdags
- gsutil cp dags/.airflowignore $OSDU_GCP_DEPL_TARGET/dags/
- gsutil -m rsync -R plugins $OSDU_GCP_DEPL_TARGET/plugins
except:
......
configs
libs
providers
subdags
......@@ -20,15 +20,17 @@ from datetime import timedelta
import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from libs.exceptions import NotOSDUSchemaFormatError
from operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator
from operators.process_manifest_r3 import ProcessManifestOperatorR3
from operators.update_status import UpdateStatusOperator
from subdags.process_manifest_subdag import load_process_manifest_subdag
BATCH_NUMBER = int(Variable.get("batch_count", "3"))
PROCESS_SINGLE_MANIFEST_FILE_SD = "process_single_manifest_file_subdag"
PROCESS_BATCH_MANIFEST_FILE_SD = "process_batch_manifest_subdag"
PROCESS_SINGLE_MANIFEST_FILE = "process_single_manifest_file_task"
PROCESS_BATCH_MANIFEST_FILE = "batch_upload"
ENSURE_INTEGRITY_TASK = "provide_manifest_integrity_task"
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
......@@ -48,9 +50,9 @@ def is_batch(**context):
manifest = context["dag_run"].conf["execution_context"].get("manifest")
if isinstance(manifest, dict):
subdag = PROCESS_SINGLE_MANIFEST_FILE_SD
subdag = ENSURE_INTEGRITY_TASK
elif isinstance(manifest, list):
subdag = PROCESS_BATCH_MANIFEST_FILE_SD
subdag = PROCESS_BATCH_MANIFEST_FILE
else:
raise NotOSDUSchemaFormatError(f"Manifest must be either 'dict' or 'list'. "
f"Got {manifest}.")
......@@ -82,29 +84,34 @@ with DAG(
trigger_rule="all_done",
)
process_single_manifest_file_subdag = SubDagOperator(
task_id=PROCESS_SINGLE_MANIFEST_FILE_SD,
subdag=load_process_manifest_subdag(
"Osdu_ingest",
"process_single_manifest_file_subdag",
default_args
),
default_args=default_args,
dag=dag,
process_single_manifest_file = ProcessManifestOperatorR3(
task_id=PROCESS_SINGLE_MANIFEST_FILE,
provide_context=True,
dag=dag,
previous_task_id=f"provide_manifest_integrity_task"
)
ensure_integrity_op = EnsureManifestIntegrityOperator(
task_id=ENSURE_INTEGRITY_TASK,
provide_context=True,
dag=dag
)
process_batch_manifest_subdag = SubDagOperator(
task_id=PROCESS_BATCH_MANIFEST_FILE_SD,
subdag=load_process_manifest_subdag(
"Osdu_ingest",
"process_batch_manifest_subdag",
default_args,
batch_number=BATCH_NUMBER
),
default_args=default_args,
# Dummy operator as entry point into parallel task of batch upload
batch_upload = DummyOperator(
dag=dag,
task_id=PROCESS_BATCH_MANIFEST_FILE
)
for batch in range(0, BATCH_NUMBER):
batch_upload >> ProcessManifestOperatorR3(
task_id=f"process_manifest_task_{batch + 1}",
provide_context=True,
dag=dag,
previous_task_id=f"provide_manifest_integrity_task_{batch + 1}",
batch_number=batch + 1
) >> update_status_finished_op
update_status_running_op >> branch_is_batch_op # pylint: disable=pointless-statement
branch_is_batch_op >> process_batch_manifest_subdag >> update_status_finished_op # pylint: disable=pointless-statement
branch_is_batch_op >> process_single_manifest_file_subdag >> update_status_finished_op # pylint: disable=pointless-statement
branch_is_batch_op >> batch_upload # pylint: disable=pointless-statement
branch_is_batch_op >> ensure_integrity_op >> process_single_manifest_file >> update_status_finished_op # pylint: disable=pointless-statement
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SubDAG for processing R3 manifests."""
from airflow import DAG
from airflow.models import Variable
from operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator
from operators.process_manifest_r3 import ProcessManifestOperatorR3
SINGLE_MANIFEST = 1
def load_process_manifest_subdag(parent_dag_name: str, child_dag_name: str, args: dict,
batch_number=SINGLE_MANIFEST) -> DAG:
"""
Load SubDag with steps of checking manifests' integrity and processing manifests.
:param parent_dag_name: Name of the parent DAG
:param child_dag_name: Name of
:param args: Default args
:return: SubDAG
"""
dag_subdag = DAG(
dag_id='{0}.{1}'.format(parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@daily",
)
with dag_subdag:
for batch in range(0, batch_number):
process_manifest_op = ProcessManifestOperatorR3(
task_id=f"process_manifest_task_{batch + 1}",
provide_context=True,
dag=dag_subdag,
previous_task_id=f"provide_manifest_integrity_task_{batch + 1}",
batch_number=batch + 1
)
if batch_number == SINGLE_MANIFEST:
ensure_integrity_op = EnsureManifestIntegrityOperator(
task_id=f"provide_manifest_integrity_task_{batch + 1}",
provide_context=True,
dag=dag_subdag
)
ensure_integrity_op >> process_manifest_op
return dag_subdag
......@@ -33,7 +33,7 @@ from libs.handle_file import FileHandler
from libs.refresh_token import AirflowTokenRefresher
from libs.process_manifest_r3 import ManifestProcessor
from libs.validate_schema import SchemaValidator
from libs.processors.process_single_manifest_r3 import SingleManifestProcessor
from libs.processors.single_manifest_processor import SingleManifestProcessor
from requests import HTTPError
logger = logging.getLogger()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment