From 30fd161ea8f9abfda0ae80bc54b418ddfc597a38 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Thu, 16 May 2024 15:21:31 +0530 Subject: [PATCH] Mock _executor_names --- python-sdk/src/astro/sql/operators/cleanup.py | 2 ++ python-sdk/tests/sql/operators/test_cleanup.py | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/python-sdk/src/astro/sql/operators/cleanup.py b/python-sdk/src/astro/sql/operators/cleanup.py index bfcb3681b..e1acdbbbb 100644 --- a/python-sdk/src/astro/sql/operators/cleanup.py +++ b/python-sdk/src/astro/sql/operators/cleanup.py @@ -224,6 +224,8 @@ def _get_executor_from_job_id(job_id: int) -> str | None: with create_session() as session: job = session.get(Job, job_id) + if job.executor_class is None and job.executor: + return type(job.executor).__name__ return job.executor_class if job else None def get_all_task_outputs(self, context: Context) -> list[BaseTable]: diff --git a/python-sdk/tests/sql/operators/test_cleanup.py b/python-sdk/tests/sql/operators/test_cleanup.py index 0849a7083..a0b59c099 100644 --- a/python-sdk/tests/sql/operators/test_cleanup.py +++ b/python-sdk/tests/sql/operators/test_cleanup.py @@ -113,11 +113,12 @@ def test_error_raised_with_blocking_op_executors( (None, "SequentialExecutor", True), ], ) -def test_single_worker_mode_backfill(executor_in_job, executor_in_cfg, expected_val): +def test_single_worker_mode_backfill(monkeypatch, executor_in_job, executor_in_cfg, expected_val): """Test that if we run Backfill Job it should be marked as single worker node""" from airflow.jobs.backfill_job_runner import BackfillJobRunner from airflow.jobs.job import Job + monkeypatch.setattr("airflow.executors.executor_loader._executor_names", []) dag = DAG("test_single_worker_mode_backfill", start_date=datetime(2022, 1, 1)) dr = DagRun(dag_id=dag.dag_id) @@ -181,11 +182,12 @@ def test_single_worker_mode_backfill_airflow_2_5(executor_in_job, executor_in_cf (None, "SequentialExecutor", True), ], ) -def test_single_worker_mode_scheduler_job(executor_in_job, executor_in_cfg, expected_val): +def test_single_worker_mode_scheduler_job(monkeypatch, executor_in_job, executor_in_cfg, expected_val): """Test that if we run Scheduler Job it should be marked as single worker node""" from airflow.jobs.job import Job from airflow.jobs.scheduler_job_runner import SchedulerJobRunner + monkeypatch.setattr("airflow.executors.executor_loader._executor_names", []) dag = DAG("test_single_worker_mode_scheduler_job", start_date=datetime(2022, 1, 1)) dr = DagRun(dag_id=dag.dag_id)