diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 159f8f00006c16ba9d703e8dff97f96833b52f9f..a533a36754aaa8b4765f9014e791e104a44d27da 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -71,8 +71,6 @@ osdu-gcp-deploy: - gsutil -m rsync -d -R dags/libs $OSDU_GCP_DEPL_TARGET/dags/libs - gsutil -m rsync -x "dags/providers*" -d -R dags $OSDU_GCP_DEPL_TARGET/dags/ingestion - gsutil -m rsync -d -R dags/providers $OSDU_GCP_DEPL_TARGET/dags/providers - - gsutil -m rsync -x "dags/subdags*" -d -R dags $OSDU_GCP_DEPL_TARGET/dags/ingestion - - gsutil -m rsync -d -R dags/subdags $OSDU_GCP_DEPL_TARGET/dags/subdags - gsutil cp dags/.airflowignore $OSDU_GCP_DEPL_TARGET/dags/ - gsutil -m rsync -R plugins $OSDU_GCP_DEPL_TARGET/plugins only: diff --git a/src/dags/.airflowignore b/src/dags/.airflowignore index d00f2ba547c9fdbd2fabd872bcde3165f4254d9e..f227618f1f9141a89dbcabacf18a6dd009506c16 100644 --- a/src/dags/.airflowignore +++ b/src/dags/.airflowignore @@ -1,4 +1,3 @@ configs libs providers -subdags diff --git a/src/dags/libs/processors/process_single_manifest_r3.py b/src/dags/libs/processors/single_manifest_processor.py similarity index 100% rename from src/dags/libs/processors/process_single_manifest_r3.py rename to src/dags/libs/processors/single_manifest_processor.py diff --git a/src/dags/osdu-ingest-r3.py b/src/dags/osdu-ingest-r3.py index 3e4cefce18251727cc815ff5dab4fc87bd22ac8c..32323f869a8902ecc31548f072daefe96bfa17ef 100644 --- a/src/dags/osdu-ingest-r3.py +++ b/src/dags/osdu-ingest-r3.py @@ -20,15 +20,17 @@ from datetime import timedelta import airflow from airflow import DAG from airflow.models import Variable +from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import BranchPythonOperator -from airflow.operators.subdag_operator import SubDagOperator from libs.exceptions import NotOSDUSchemaFormatError +from operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator +from operators.process_manifest_r3 import ProcessManifestOperatorR3 from operators.update_status import UpdateStatusOperator -from subdags.process_manifest_subdag import load_process_manifest_subdag BATCH_NUMBER = int(Variable.get("batch_count", "3")) -PROCESS_SINGLE_MANIFEST_FILE_SD = "process_single_manifest_file_subdag" -PROCESS_BATCH_MANIFEST_FILE_SD = "process_batch_manifest_subdag" +PROCESS_SINGLE_MANIFEST_FILE = "process_single_manifest_file_task" +PROCESS_BATCH_MANIFEST_FILE = "batch_upload" +ENSURE_INTEGRITY_TASK = "provide_manifest_integrity_task" default_args = { "start_date": airflow.utils.dates.days_ago(0), @@ -48,9 +50,9 @@ def is_batch(**context): manifest = context["dag_run"].conf["execution_context"].get("manifest") if isinstance(manifest, dict): - subdag = PROCESS_SINGLE_MANIFEST_FILE_SD + subdag = ENSURE_INTEGRITY_TASK elif isinstance(manifest, list): - subdag = PROCESS_BATCH_MANIFEST_FILE_SD + subdag = PROCESS_BATCH_MANIFEST_FILE else: raise NotOSDUSchemaFormatError(f"Manifest must be either 'dict' or 'list'. " f"Got {manifest}.") @@ -82,29 +84,34 @@ with DAG( trigger_rule="all_done", ) - process_single_manifest_file_subdag = SubDagOperator( - task_id=PROCESS_SINGLE_MANIFEST_FILE_SD, - subdag=load_process_manifest_subdag( - "Osdu_ingest", - "process_single_manifest_file_subdag", - default_args - ), - default_args=default_args, - dag=dag, + process_single_manifest_file = ProcessManifestOperatorR3( + task_id=PROCESS_SINGLE_MANIFEST_FILE, + provide_context=True, + dag=dag, + previous_task_id=f"provide_manifest_integrity_task" + ) + + ensure_integrity_op = EnsureManifestIntegrityOperator( + task_id=ENSURE_INTEGRITY_TASK, + provide_context=True, + dag=dag ) - process_batch_manifest_subdag = SubDagOperator( - task_id=PROCESS_BATCH_MANIFEST_FILE_SD, - subdag=load_process_manifest_subdag( - "Osdu_ingest", - "process_batch_manifest_subdag", - default_args, - batch_number=BATCH_NUMBER - ), - default_args=default_args, + # Dummy operator as entry point into parallel task of batch upload + batch_upload = DummyOperator( dag=dag, + task_id=PROCESS_BATCH_MANIFEST_FILE ) + for batch in range(0, BATCH_NUMBER): + batch_upload >> ProcessManifestOperatorR3( + task_id=f"process_manifest_task_{batch + 1}", + provide_context=True, + dag=dag, + previous_task_id=f"provide_manifest_integrity_task_{batch + 1}", + batch_number=batch + 1 + ) >> update_status_finished_op + update_status_running_op >> branch_is_batch_op # pylint: disable=pointless-statement -branch_is_batch_op >> process_batch_manifest_subdag >> update_status_finished_op # pylint: disable=pointless-statement -branch_is_batch_op >> process_single_manifest_file_subdag >> update_status_finished_op # pylint: disable=pointless-statement +branch_is_batch_op >> batch_upload # pylint: disable=pointless-statement +branch_is_batch_op >> ensure_integrity_op >> process_single_manifest_file >> update_status_finished_op # pylint: disable=pointless-statement diff --git a/src/dags/subdags/__init__.py b/src/dags/subdags/__init__.py deleted file mode 100644 index de8f5ce5bf56bdd78824065dbbd52846984960b1..0000000000000000000000000000000000000000 --- a/src/dags/subdags/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright 2021 Google LLC -# Copyright 2021 EPAM Systems -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/dags/subdags/process_manifest_subdag.py b/src/dags/subdags/process_manifest_subdag.py deleted file mode 100644 index 83a6ac700ecbdf970af58dcce81056cf4fff7b6e..0000000000000000000000000000000000000000 --- a/src/dags/subdags/process_manifest_subdag.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright 2021 Google LLC -# Copyright 2021 EPAM Systems -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""SubDAG for processing R3 manifests.""" - -from airflow import DAG -from airflow.models import Variable -from operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator -from operators.process_manifest_r3 import ProcessManifestOperatorR3 - - -SINGLE_MANIFEST = 1 - - -def load_process_manifest_subdag(parent_dag_name: str, child_dag_name: str, args: dict, - batch_number=SINGLE_MANIFEST) -> DAG: - """ - Load SubDag with steps of checking manifests' integrity and processing manifests. - - :param parent_dag_name: Name of the parent DAG - :param child_dag_name: Name of - :param args: Default args - :return: SubDAG - """ - dag_subdag = DAG( - dag_id='{0}.{1}'.format(parent_dag_name, child_dag_name), - default_args=args, - schedule_interval="@daily", - ) - with dag_subdag: - for batch in range(0, batch_number): - process_manifest_op = ProcessManifestOperatorR3( - task_id=f"process_manifest_task_{batch + 1}", - provide_context=True, - dag=dag_subdag, - previous_task_id=f"provide_manifest_integrity_task_{batch + 1}", - batch_number=batch + 1 - ) - if batch_number == SINGLE_MANIFEST: - ensure_integrity_op = EnsureManifestIntegrityOperator( - task_id=f"provide_manifest_integrity_task_{batch + 1}", - provide_context=True, - dag=dag_subdag - ) - ensure_integrity_op >> process_manifest_op - return dag_subdag diff --git a/src/plugins/operators/process_manifest_r3.py b/src/plugins/operators/process_manifest_r3.py index 361e630e1bd29d354071ffb8ebc20706e5cf1890..6ddebab7c6116b53b1a46717b447609a9ce8552b 100644 --- a/src/plugins/operators/process_manifest_r3.py +++ b/src/plugins/operators/process_manifest_r3.py @@ -33,7 +33,7 @@ from libs.handle_file import FileHandler from libs.refresh_token import AirflowTokenRefresher from libs.process_manifest_r3 import ManifestProcessor from libs.validate_schema import SchemaValidator -from libs.processors.process_single_manifest_r3 import SingleManifestProcessor +from libs.processors.single_manifest_processor import SingleManifestProcessor from requests import HTTPError logger = logging.getLogger()