Skip to content
Snippets Groups Projects

Cherrypick/m14 to m16

Merged Mahsa Hanifi requested to merge cherrypick/m14-to-m16 into azure/m16-master
11 files
+ 152
73
Compare changes
  • Side-by-side
  • Inline
Files
11
@@ -15,8 +15,8 @@ default_args = {
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'retries': 0,
'retry_delay': timedelta(seconds=30),
}
default_args = update_default_args(default_args)
@@ -41,7 +41,8 @@ params = {
"authorization": authorization,
"dataPartitionId": dataPartitionId,
"steps": steps,
"dataServiceName": "{{ dag_run.conf['execution_context'].get('data_service_to_use', 'file') }}"
"dataServiceName": "{{ dag_run.conf['execution_context'].get('data_service_to_use', 'file') }}",
"userId": "{{ dag_run.conf['execution_context']['userId'] }}"
}
# Get environment variables
@@ -52,13 +53,16 @@ if data_service_to_use:
operator_kwargs = {| K8S_POD_OPERATOR_KWARGS or {} |}
retry_configuration = {| OPERATOR_RETRY_CONFIG or {} |}
with DAG(
DAG_NAME,
DAG_NAME,
default_args=default_args,
schedule_interval=None
) as dag:
update_status_running = UpdateStatusOperator(
task_id="update_status_running",
**retry_configuration
)
csv_parser = KubernetesPodOperator(
@@ -69,11 +73,13 @@ with DAG(
arguments=[dumps(params)],
is_delete_operator_pod=True,
image=DOCKER_IMAGE,
**operator_kwargs)
**operator_kwargs,
**retry_configuration)
update_status_finished = UpdateStatusOperator(
task_id="update_status_finished",
trigger_rule="all_done"
trigger_rule="all_done",
**retry_configuration
)
update_status_running >> csv_parser >> update_status_finished # pylint: disable=pointless-statement
Loading