From b97d68d26316238339476cfcef86985ba8f57dbd Mon Sep 17 00:00:00 2001
From: yan <yan_sushchynski@epam.com>
Date: Thu, 11 Feb 2021 15:59:15 +0300
Subject: [PATCH] Fix dag. Remove subdags

---
 .gitlab-ci.yml                                |  2 -
 src/dags/.airflowignore                       |  1 -
 ...est_r3.py => single_manifest_processor.py} |  0
 src/dags/osdu-ingest-r3.py                    | 59 +++++++++++--------
 src/dags/subdags/__init__.py                  | 14 -----
 src/dags/subdags/process_manifest_subdag.py   | 58 ------------------
 src/plugins/operators/process_manifest_r3.py  |  2 +-
 7 files changed, 34 insertions(+), 102 deletions(-)
 rename src/dags/libs/processors/{process_single_manifest_r3.py => single_manifest_processor.py} (100%)
 delete mode 100644 src/dags/subdags/__init__.py
 delete mode 100644 src/dags/subdags/process_manifest_subdag.py

diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index cf48560..3a02e6b 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -71,8 +71,6 @@ osdu-gcp-deploy:
     - gsutil -m rsync -d -R dags/libs  $OSDU_GCP_DEPL_TARGET/dags/libs
     - gsutil -m rsync -x "dags/providers*" -d -R dags  $OSDU_GCP_DEPL_TARGET/dags/ingestion
     - gsutil -m rsync -d -R dags/providers  $OSDU_GCP_DEPL_TARGET/dags/providers
-    - gsutil -m rsync -x "dags/subdags*" -d -R dags  $OSDU_GCP_DEPL_TARGET/dags/ingestion
-    - gsutil -m rsync -d -R dags/subdags  $OSDU_GCP_DEPL_TARGET/dags/subdags
     - gsutil cp dags/.airflowignore $OSDU_GCP_DEPL_TARGET/dags/
     - gsutil -m rsync -R plugins $OSDU_GCP_DEPL_TARGET/plugins
   except:
diff --git a/src/dags/.airflowignore b/src/dags/.airflowignore
index d00f2ba..f227618 100644
--- a/src/dags/.airflowignore
+++ b/src/dags/.airflowignore
@@ -1,4 +1,3 @@
 configs
 libs
 providers
-subdags
diff --git a/src/dags/libs/processors/process_single_manifest_r3.py b/src/dags/libs/processors/single_manifest_processor.py
similarity index 100%
rename from src/dags/libs/processors/process_single_manifest_r3.py
rename to src/dags/libs/processors/single_manifest_processor.py
diff --git a/src/dags/osdu-ingest-r3.py b/src/dags/osdu-ingest-r3.py
index 3e4cefc..32323f8 100644
--- a/src/dags/osdu-ingest-r3.py
+++ b/src/dags/osdu-ingest-r3.py
@@ -20,15 +20,17 @@ from datetime import timedelta
 import airflow
 from airflow import DAG
 from airflow.models import Variable
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.python_operator import BranchPythonOperator
-from airflow.operators.subdag_operator import SubDagOperator
 from libs.exceptions import NotOSDUSchemaFormatError
+from operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator
+from operators.process_manifest_r3 import ProcessManifestOperatorR3
 from operators.update_status import UpdateStatusOperator
-from subdags.process_manifest_subdag import load_process_manifest_subdag
 
 BATCH_NUMBER = int(Variable.get("batch_count", "3"))
-PROCESS_SINGLE_MANIFEST_FILE_SD = "process_single_manifest_file_subdag"
-PROCESS_BATCH_MANIFEST_FILE_SD = "process_batch_manifest_subdag"
+PROCESS_SINGLE_MANIFEST_FILE = "process_single_manifest_file_task"
+PROCESS_BATCH_MANIFEST_FILE = "batch_upload"
+ENSURE_INTEGRITY_TASK = "provide_manifest_integrity_task"
 
 default_args = {
     "start_date": airflow.utils.dates.days_ago(0),
@@ -48,9 +50,9 @@ def is_batch(**context):
     manifest = context["dag_run"].conf["execution_context"].get("manifest")
 
     if isinstance(manifest, dict):
-        subdag = PROCESS_SINGLE_MANIFEST_FILE_SD
+        subdag = ENSURE_INTEGRITY_TASK
     elif isinstance(manifest, list):
-        subdag = PROCESS_BATCH_MANIFEST_FILE_SD
+        subdag = PROCESS_BATCH_MANIFEST_FILE
     else:
         raise NotOSDUSchemaFormatError(f"Manifest must be either 'dict' or 'list'. "
                                        f"Got {manifest}.")
@@ -82,29 +84,34 @@ with DAG(
         trigger_rule="all_done",
     )
 
-    process_single_manifest_file_subdag = SubDagOperator(
-        task_id=PROCESS_SINGLE_MANIFEST_FILE_SD,
-        subdag=load_process_manifest_subdag(
-            "Osdu_ingest",
-            "process_single_manifest_file_subdag",
-            default_args
-        ),
-        default_args=default_args,
-        dag=dag,
+    process_single_manifest_file = ProcessManifestOperatorR3(
+                task_id=PROCESS_SINGLE_MANIFEST_FILE,
+                provide_context=True,
+                dag=dag,
+                previous_task_id=f"provide_manifest_integrity_task"
+            )
+
+    ensure_integrity_op = EnsureManifestIntegrityOperator(
+        task_id=ENSURE_INTEGRITY_TASK,
+        provide_context=True,
+        dag=dag
     )
 
-    process_batch_manifest_subdag = SubDagOperator(
-        task_id=PROCESS_BATCH_MANIFEST_FILE_SD,
-        subdag=load_process_manifest_subdag(
-            "Osdu_ingest",
-            "process_batch_manifest_subdag",
-            default_args,
-            batch_number=BATCH_NUMBER
-        ),
-        default_args=default_args,
+    # Dummy operator as entry point into parallel task of batch upload
+    batch_upload = DummyOperator(
         dag=dag,
+        task_id=PROCESS_BATCH_MANIFEST_FILE
     )
 
+    for batch in range(0, BATCH_NUMBER):
+        batch_upload >> ProcessManifestOperatorR3(
+            task_id=f"process_manifest_task_{batch + 1}",
+            provide_context=True,
+            dag=dag,
+            previous_task_id=f"provide_manifest_integrity_task_{batch + 1}",
+            batch_number=batch + 1
+        ) >> update_status_finished_op
+
 update_status_running_op >> branch_is_batch_op  # pylint: disable=pointless-statement
-branch_is_batch_op >> process_batch_manifest_subdag >> update_status_finished_op  # pylint: disable=pointless-statement
-branch_is_batch_op >> process_single_manifest_file_subdag >> update_status_finished_op  # pylint: disable=pointless-statement
+branch_is_batch_op >> batch_upload  # pylint: disable=pointless-statement
+branch_is_batch_op >> ensure_integrity_op >> process_single_manifest_file >> update_status_finished_op  # pylint: disable=pointless-statement
diff --git a/src/dags/subdags/__init__.py b/src/dags/subdags/__init__.py
deleted file mode 100644
index de8f5ce..0000000
--- a/src/dags/subdags/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-#  Copyright 2021 Google LLC
-#  Copyright 2021 EPAM Systems
-#
-#  Licensed under the Apache License, Version 2.0 (the "License");
-#  you may not use this file except in compliance with the License.
-#  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
diff --git a/src/dags/subdags/process_manifest_subdag.py b/src/dags/subdags/process_manifest_subdag.py
deleted file mode 100644
index 83a6ac7..0000000
--- a/src/dags/subdags/process_manifest_subdag.py
+++ /dev/null
@@ -1,58 +0,0 @@
-#  Copyright 2021 Google LLC
-#  Copyright 2021 EPAM Systems
-#
-#  Licensed under the Apache License, Version 2.0 (the "License");
-#  you may not use this file except in compliance with the License.
-#  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-"""SubDAG for processing R3 manifests."""
-
-from airflow import DAG
-from airflow.models import Variable
-from operators.ensure_manifest_integrity import EnsureManifestIntegrityOperator
-from operators.process_manifest_r3 import ProcessManifestOperatorR3
-
-
-SINGLE_MANIFEST = 1
-
-
-def load_process_manifest_subdag(parent_dag_name: str, child_dag_name: str, args: dict,
-                                 batch_number=SINGLE_MANIFEST) -> DAG:
-    """
-    Load SubDag with steps of checking manifests' integrity and processing manifests.
-
-    :param parent_dag_name: Name of the parent DAG
-    :param child_dag_name: Name of
-    :param args: Default args
-    :return: SubDAG
-    """
-    dag_subdag = DAG(
-        dag_id='{0}.{1}'.format(parent_dag_name, child_dag_name),
-        default_args=args,
-        schedule_interval="@daily",
-    )
-    with dag_subdag:
-        for batch in range(0, batch_number):
-            process_manifest_op = ProcessManifestOperatorR3(
-                task_id=f"process_manifest_task_{batch + 1}",
-                provide_context=True,
-                dag=dag_subdag,
-                previous_task_id=f"provide_manifest_integrity_task_{batch + 1}",
-                batch_number=batch + 1
-            )
-            if batch_number == SINGLE_MANIFEST:
-                ensure_integrity_op = EnsureManifestIntegrityOperator(
-                    task_id=f"provide_manifest_integrity_task_{batch + 1}",
-                    provide_context=True,
-                    dag=dag_subdag
-                )
-                ensure_integrity_op >> process_manifest_op
-    return dag_subdag
diff --git a/src/plugins/operators/process_manifest_r3.py b/src/plugins/operators/process_manifest_r3.py
index 361e630..6ddebab 100644
--- a/src/plugins/operators/process_manifest_r3.py
+++ b/src/plugins/operators/process_manifest_r3.py
@@ -33,7 +33,7 @@ from libs.handle_file import FileHandler
 from libs.refresh_token import AirflowTokenRefresher
 from libs.process_manifest_r3 import ManifestProcessor
 from libs.validate_schema import SchemaValidator
-from libs.processors.process_single_manifest_r3 import SingleManifestProcessor
+from libs.processors.single_manifest_processor import SingleManifestProcessor
 from requests import HTTPError
 
 logger = logging.getLogger()
-- 
GitLab