issue with airflow scheduler after airflow-2.8.1 migration

Background

  • Azure has migrated from airflow-2.4.3 to airflow-2.8.1
  • from python-3.8 to python-3.11

Related to

Issue Observed

  • Dags, CSV-parser, segy-to-zgy-conversion, osdu-ingest and few other integration test dags has no issues.

  • Parallel runs and load testing of dummy dags with 1000 runs has no issues.parallel_dags.py

  • Issue with eds-ingest and eds-scheduler was observed in both dev and dev-secondary environments, airflow scheduler pod goes to continous crashloopbackoff effecting the whole environment.

[[34m2024-07-09T10:27:04.507+0000[0m] {[34mscheduler_job_runner.py:[0m1751} WARNING[0m - Failing (128) jobs without heartbeat after 2024-07-09 10:22:04.481136+00:00[0m
[[34m2024-07-09T10:27:04.508+0000[0m] {[34mtask_context_logger.py:[0m91} ERROR[0m - Detected zombie job: {'full_filepath': '/opt/airflow/dags/eds_ingestion_dags.zip/src_dags_fetch_ingest_scheduler_dag.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'eds_ingest', 'Task Id': 'fetch_client', 'Run Id': '77daa34d-0b75-409d-95f2-c7509429c726', 'Hostname': 'airflow2-worker-0.airflow2-worker.airflow2.svc.cluster.local', 'External Executor Id': 'be7a2837-b86d-47b4-84d8-95673a16cdb4'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7202ea8ecbd0>, 'is_failure_callback': True} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)[0m
[[34m2024-07-09T10:27:04.524+0000[0m] {[34mscheduler_job_runner.py:[0m872} ERROR[0m - Exception when executing SchedulerJob._run_scheduler_loop[0m
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/log/task_context_logger.py", line 101, in _log
    task_handler.set_context(ti, identifier=self.component_name)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/microsoft/azure/log/wasb_task_handler.py", line 89, in set_context
    super().set_context(ti, identifier=identifier)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/log/file_task_handler.py", line 218, in set_context
    local_loc = self._init_file(ti, identifier=identifier)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/log/file_task_handler.py", line 511, in _init_file
    local_relative_path = self._render_filename(ti, ti.try_number)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/log/file_task_handler.py", line 271, in _render_filename
    return render_template_to_string(jinja_tpl, context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/helpers.py", line 289, in render_template_to_string
    return render_template(template, cast(MutableMapping[str, Any], context), native=False)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/helpers.py", line 284, in render_template
    return "".join(nodes)
           ^^^^^^^^^^^^^^
  File "<template>", line 30, in root
  File "/home/airflow/.local/lib/python3.11/site-packages/jinja2/environment.py", line 485, in getattr
    return getattr(obj, attribute)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/jinja2/runtime.py", line 859, in __getattr__
    return self._fail_with_undefined_error()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/jinja2/runtime.py", line 852, in _fail_with_undefined_error
    raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: 'dag_run' is undefined

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 855, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1001, in _run_scheduler_loop
    next_event = timers.run(blocking=False)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/sched.py", line 151, in run
    action(*argument, **kwargs)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/event_scheduler.py", line 40, in repeat
    action(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1766, in _find_zombies
    self._task_context_logger.error(log_message, ti=ti)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/log/task_context_logger.py", line 137, in error
    self._log(logging.ERROR, msg, *args, ti=ti)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/log/task_context_logger.py", line 110, in _log
    task_handler.close()
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/microsoft/azure/log/wasb_task_handler.py", line 120, in close
    with open(local_loc) as logfile:
         ^^^^^^^^^^^^^^^
IsADirectoryError: [Errno 21] Is a directory: '/opt/airflow/logs/'
[[34m2024-07-09T10:27:05.530+0000[0m] {[34mprocess_utils.py:[0m131} INFO[0m - Sending 15 to group 71. PIDs of all processes in the group: [71][0m
[[34m2024-07-09T10:27:05.530+0000[0m] {[34mprocess_utils.py:[0m86} INFO[0m - Sending the signal 15 to group 71[0m
[[34m2024-07-09T10:27:05.743+0000[0m] {[34mprocess_utils.py:[0m79} INFO[0m - Process psutil.Process(pid=71, status='terminated', exitcode=0, started='10:26:52') (71) terminated with exit code 0[0m
[[34m2024-07-09T10:27:05.744+0000[0m] {[34mscheduler_job_runner.py:[0m884} INFO[0m - Exited execute loop[0m
[[34m2024-07-09T10:27:05.773+0000[0m] {[34mscheduler_command.py:[0m54} ERROR[0m - Exception when running scheduler job[0m
Traceback (most recent call last):

Next steps identified

  • Downgrade airflow-2.8.1 version to airflow-2.7.3 (similar to GC ) and test in glab and staging environments to unblock M24