diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 3a02e6b111ec4494e3f8edbde72268864d34df72..cf6b94ecde861b007c0ddffc934b9c7709194cbc 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -22,7 +22,7 @@ stages: - deploy pylint: - image: johnybear/osdu-airflow:python36-2 + image: johnybear/osdu-airflow:airflow.1.10.14 stage: linters allow_failure: true script: @@ -32,7 +32,7 @@ pylint: - exit ${EXIT_CODE} isort: - image: johnybear/osdu-airflow:python36-2 + image: johnybear/osdu-airflow:airflow.1.10.14 allow_failure: true stage: linters script: @@ -42,7 +42,7 @@ isort: test_dags: stage: test_dags - image: johnybear/osdu-airflow:python36-2 + image: johnybear/osdu-airflow:airflow.1.10.14 script: - chmod +x tests/test_dags.sh - tests/./test_dags.sh || EXIT_CODE=$? @@ -55,7 +55,7 @@ test_dags: unit_tests: stage: unit_tests - image: johnybear/osdu-airflow:python36-2 + image: johnybear/osdu-airflow:airflow.1.10.14 script: - chmod +x tests/unit_tests.sh - tests/./unit_tests.sh || EXIT_CODE=$? diff --git a/src/dags/osdu-ingest-r3.py b/src/dags/osdu-ingest-r3.py index 3d4bd94bf7e7cb00ca8ddb819be3517cffabb227..5f20aec636b5d36f529a659f962ec6896e2f57b2 100644 --- a/src/dags/osdu-ingest-r3.py +++ b/src/dags/osdu-ingest-r3.py @@ -77,7 +77,8 @@ with DAG( task_id="check_payload_type", dag=dag, provide_context=True, - python_callable=is_batch + python_callable=is_batch, + trigger_rule="none_failed_or_skipped" ) update_status_finished_op = UpdateStatusOperator( @@ -89,21 +90,24 @@ with DAG( validate_schema_operator = ValidateManifestSchemaOperator( task_id="validate_manifest_schema_task", provide_context=True, - dag=dag + dag=dag, + trigger_rule="none_failed_or_skipped" ) ensure_integrity_op = EnsureManifestIntegrityOperator( task_id=ENSURE_INTEGRITY_TASK, provide_context=True, previous_task_id=validate_schema_operator.task_id, - dag=dag + dag=dag, + trigger_rule="none_failed_or_skipped" ) process_single_manifest_file = ProcessManifestOperatorR3( task_id=PROCESS_SINGLE_MANIFEST_FILE, provide_context=True, dag=dag, - previous_task_id=ensure_integrity_op.task_id + previous_task_id=ensure_integrity_op.task_id, + trigger_rule="none_failed_or_skipped" ) # Dummy operator as entry point into parallel task of batch upload @@ -118,7 +122,8 @@ with DAG( provide_context=True, dag=dag, previous_task_id=f"provide_manifest_integrity_task_{batch + 1}", - batch_number=batch + 1 + batch_number=batch + 1, + trigger_rule="none_failed_or_skipped" ) >> update_status_finished_op update_status_running_op >> branch_is_batch_op # pylint: disable=pointless-statement