Skip to content

Commit

Permalink
Ensure teardown tasks are executed when DAG run is set to failed (#45530
Browse files Browse the repository at this point in the history
)

* Ensure teardown tasks are executed when DAG run is set to failed

* Also handle the case of setting DAG to success

* Add some documentation to behavior changes

* Add some documentation to behavior changes
  • Loading branch information
jscheffl authored Jan 11, 2025
1 parent 13f5bd6 commit 1e8977a
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 18 deletions.
41 changes: 24 additions & 17 deletions airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,18 @@ def set_dag_run_state_to_success(
return []
if not run_id:
raise ValueError(f"Invalid dag_run_id: {run_id}")

# Mark all task instances of the dag run to success - except for teardown as they need to complete work.
normal_tasks = [task for task in dag.tasks if not task.is_teardown]

# Mark the dag run to success.
if commit:
if commit and len(normal_tasks) == len(dag.tasks):
_set_dag_run_state(dag.dag_id, run_id, DagRunState.SUCCESS, session)

# Mark all task instances of the dag run to success.
for task in dag.tasks:
for task in normal_tasks:
task.dag = dag
return set_state(
tasks=dag.tasks,
tasks=normal_tasks,
run_id=run_id,
state=TaskInstanceState.SUCCESS,
commit=commit,
Expand Down Expand Up @@ -280,10 +283,6 @@ def set_dag_run_state_to_failed(
if not run_id:
raise ValueError(f"Invalid dag_run_id: {run_id}")

# Mark the dag run to failed.
if commit:
_set_dag_run_state(dag.dag_id, run_id, DagRunState.FAILED, session)

running_states = (
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
Expand All @@ -292,25 +291,26 @@ def set_dag_run_state_to_failed(

# Mark only RUNNING task instances.
task_ids = [task.task_id for task in dag.tasks]
tis = session.scalars(
running_tis: list[TaskInstance] = session.scalars(
select(TaskInstance).where(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.run_id == run_id,
TaskInstance.task_id.in_(task_ids),
TaskInstance.state.in_(running_states),
)
)
).all()

task_ids_of_running_tis = [task_instance.task_id for task_instance in tis]
# Do not kill teardown tasks
task_ids_of_running_tis = [ti.task_id for ti in running_tis if not dag.task_dict[ti.task_id].is_teardown]

tasks = []
running_tasks = []
for task in dag.tasks:
if task.task_id in task_ids_of_running_tis:
task.dag = dag
tasks.append(task)
running_tasks.append(task)

# Mark non-finished tasks as SKIPPED.
tis = session.scalars(
pending_tis: list[TaskInstance] = session.scalars(
select(TaskInstance).filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.run_id == run_id,
Expand All @@ -324,12 +324,19 @@ def set_dag_run_state_to_failed(
)
).all()

# Do not skip teardown tasks
pending_normal_tis = [ti for ti in pending_tis if not dag.task_dict[ti.task_id].is_teardown]

if commit:
for ti in tis:
for ti in pending_normal_tis:
ti.set_state(TaskInstanceState.SKIPPED)

return tis + set_state(
tasks=tasks,
# Mark the dag run to failed if there is no pending teardown (else this would not be scheduled later).
if not any(dag.task_dict[ti.task_id].is_teardown for ti in (running_tis + pending_tis)):
_set_dag_run_state(dag.dag_id, run_id, DagRunState.FAILED, session)

return pending_normal_tis + set_state(
tasks=running_tasks,
run_id=run_id,
state=TaskInstanceState.FAILED,
commit=commit,
Expand Down
8 changes: 7 additions & 1 deletion docs/apache-airflow/howto/setup-and-teardown.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ Key features of setup and teardown tasks:

* If you clear a task, its setups and teardowns will be cleared.
* By default, teardown tasks are ignored for the purpose of evaluating dag run state.
* A teardown task will run if its setup was successful, even if its work tasks failed.
* A teardown task will run if its setup was successful, even if its work tasks failed. But it will skip if the setup was skipped.
* Teardown tasks are ignored when setting dependencies against task groups.
* Teardown will also be carried out if the DAG run is manually set to "failed" or "success" to ensure resources will be cleaned-up.

How setup and teardown works
""""""""""""""""""""""""""""
Expand Down Expand Up @@ -231,3 +232,8 @@ Trigger rule behavior for teardowns
"""""""""""""""""""""""""""""""""""

Teardowns use a (non-configurable) trigger rule called ALL_DONE_SETUP_SUCCESS. With this rule, as long as all upstreams are done and at least one directly connected setup is successful, the teardown will run. If all of a teardown's setups were skipped or failed, those states will propagate to the teardown.

Side-effect on manual DAG state changes
"""""""""""""""""""""""""""""""""""""""

As teardown tasks are often used to clean-up resources they need to run also if the DAG is manually terminated. For the purpose of early termination a user can manually mark the DAG run as "success" or "failed" which kills all tasks before completion. If the DAG contains teardown tasks, they will still be executed. Therefore as a side effect allowing teardown tasks to be scheduled, a DAG will not be immediately set to a terminal state if the user requests so.
12 changes: 12 additions & 0 deletions newsfragments/45530.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Ensure teardown tasks are executed when DAG run is set to failed

Previously when a DAG run was manually set to "failed" or to "success" state the terminal state was set to all tasks.
But this was a gap for cases when setup- and teardown tasks were defined: If teardown was used to clean-up infrastructure
or other resources, they were also skipped and thus resources could stay allocated.

As of now when setup tasks had been executed before and the DAG is manually set to "failed" or "success" then teardown
tasks are executed. Teardown tasks are skipped if the setup was also skipped.

As a side effect this means if the DAG contains teardown tasks, then the manual marking of DAG as "failed" or "success"
will need to keep the DAG in running state to ensure that teardown tasks will be scheduled. They would not be scheduled
if the DAG is diorectly set to "failed" or "success".
76 changes: 76 additions & 0 deletions tests/api/common/test_mark_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

import pytest

from airflow.api.common.mark_tasks import set_dag_run_state_to_failed, set_dag_run_state_to_success
from airflow.operators.empty import EmptyOperator
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance

from tests_common.pytest_plugin import DagMaker

pytestmark = pytest.mark.db_test


def test_set_dag_run_state_to_failed(dag_maker: DagMaker):
with dag_maker("TEST_DAG_1"):
with EmptyOperator(task_id="teardown").as_teardown():
EmptyOperator(task_id="running")
EmptyOperator(task_id="pending")
dr = dag_maker.create_dagrun()
for ti in dr.get_task_instances():
if ti.task_id == "running":
ti.set_state(TaskInstanceState.RUNNING)
dag_maker.session.flush()
assert dr.dag

updated_tis: list[TaskInstance] = set_dag_run_state_to_failed(
dag=dr.dag, run_id=dr.run_id, commit=True, session=dag_maker.session
)
assert len(updated_tis) == 2
task_dict = {ti.task_id: ti for ti in updated_tis}
assert task_dict["running"].state == TaskInstanceState.FAILED
assert task_dict["pending"].state == TaskInstanceState.SKIPPED
assert "teardown" not in task_dict


def test_set_dag_run_state_to_success(dag_maker: DagMaker):
with dag_maker("TEST_DAG_1"):
with EmptyOperator(task_id="teardown").as_teardown():
EmptyOperator(task_id="running")
EmptyOperator(task_id="pending")
dr = dag_maker.create_dagrun()
for ti in dr.get_task_instances():
if ti.task_id == "running":
ti.set_state(TaskInstanceState.RUNNING)
dag_maker.session.flush()
assert dr.dag

updated_tis: list[TaskInstance] = set_dag_run_state_to_success(
dag=dr.dag, run_id=dr.run_id, commit=True, session=dag_maker.session
)
assert len(updated_tis) == 2
task_dict = {ti.task_id: ti for ti in updated_tis}
assert task_dict["running"].state == TaskInstanceState.SUCCESS
assert task_dict["pending"].state == TaskInstanceState.SUCCESS
assert "teardown" not in task_dict

0 comments on commit 1e8977a

Please sign in to comment.