Skip to content

Commit

Permalink
Merge pull request #44 from airflow-laminar/tkp/enabled
Browse files Browse the repository at this point in the history
Small fixes to dag enablement
  • Loading branch information
timkpaine authored Feb 15, 2025
2 parents 58fd67b + 6d68439 commit d7109f2
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 7 deletions.
6 changes: 3 additions & 3 deletions airflow_config/configuration/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,13 @@ class DagArgs(BaseModel):
# fail_stop (bool) – Fails currently running tasks when task in DAG fails. Warning: A fail stop dag can only have tasks with the default trigger rule (“all_success”). An exception will be thrown if any task in a fail stop dag has a non default trigger rule.
dag_display_name: Optional[str] = Field(default=None, description="The display name of the DAG which appears on the UI.")

# Extras
enabled: Optional[bool] = Field(default=None, description="Whether the DAG is enabled")

class Dag(DagArgs):
enabled: Optional[bool] = Field(default=True, description="Whether the DAG is enabled")

class Dag(DagArgs):
dag_id: Optional[str] = Field(
default=None, description="The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII)"
)
default_args: Optional[TaskArgs] = Field(default=None, description="Default arguments for tasks in the DAG")

tasks: Optional[Dict[str, Task]] = Field(default_factory=list, description="List of tasks in the DAG")
12 changes: 8 additions & 4 deletions airflow_config/configuration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ def pre_apply(self, dag, dag_kwargs):
# first try to see if per-dag options have default_args for subtasks
per_dag_kwargs = self.dags[dag_kwargs["dag_id"]]

# if dag is disabled, quit right away
if per_dag_kwargs.enabled is False:
# if dag is disabled directly, quit right away
if per_dag_kwargs.enabled is False or (per_dag_kwargs.enabled is None and self.default_dag_args.enabled is False):
sys.exit(0)

default_args = per_dag_kwargs.default_args
Expand All @@ -121,7 +121,7 @@ def pre_apply(self, dag, dag_kwargs):
dag_kwargs["default_args"][attr] = getattr(default_args, attr)

for attr in DagArgs.model_fields:
if attr == "default_args":
if attr in ("default_args", "enabled"):
# skip
continue
if attr == "dag_id":
Expand All @@ -134,12 +134,16 @@ def pre_apply(self, dag, dag_kwargs):
if attr not in dag_kwargs and val:
dag_kwargs[attr] = val

elif self.default_dag_args.enabled is False:
# if dag has no per-dag-config, but default dag args is disabled, quit right away
sys.exit(0)

for attr in TaskArgs.model_fields:
if attr not in dag_kwargs["default_args"] and getattr(self.default_args, attr, None) is not None:
dag_kwargs["default_args"][attr] = getattr(self.default_args, attr)

for attr in DagArgs.model_fields:
if attr not in dag_kwargs:
if attr not in dag_kwargs and attr not in ("enabled",):
val = getattr(self.default_dag_args, attr, None)
if attr not in dag_kwargs and val is not None:
dag_kwargs[attr] = val
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.TaskArgs
owner: test
email: [[email protected]]
email_on_failure: false
email_on_retry: false
retries: 0
depends_on_past: false

default_dag_args:
_target: airflow_config.DagArgs
schedule: "01:00"
start_date: "2024-01-01"
catchup: false
tags: ["utility", "test"]
enabled: false

dags:
example_dag:
default_args:
owner: "custom_owner"
description: "this is an example dag"
schedule: "0 3 * * *"
max_active_tasks: 1
max_active_runs: 1
enabled: true

example_dag2:
default_args:
owner: "custom_owner2"
schedule: "0 4 * * *"
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from unittest.mock import patch

from airflow_config import DAG, load_config


def test_create_dag_from_config():
with patch("sys.exit") as sys_exit:
conf = load_config("config", "test")
DAG(dag_id="testdag", config=conf)
assert sys_exit.call_count == 1
DAG(dag_id="example_dag", config=conf)
assert sys_exit.call_count == 1
DAG(dag_id="example_da2", config=conf)
assert sys_exit.call_count == 2
32 changes: 32 additions & 0 deletions airflow_config/tests/setups/good/dag-disable/config/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.TaskArgs
owner: test
email: [[email protected]]
email_on_failure: false
email_on_retry: false
retries: 0
depends_on_past: false

default_dag_args:
_target: airflow_config.DagArgs
schedule: "01:00"
start_date: "2024-01-01"
catchup: false
tags: ["utility", "test"]
enabled: false

dags:
example_dag:
default_args:
owner: "custom_owner"
description: "this is an example dag"
schedule: "0 3 * * *"
max_active_tasks: 1
max_active_runs: 1

example_dag2:
default_args:
owner: "custom_owner2"
schedule: "0 4 * * *"
12 changes: 12 additions & 0 deletions airflow_config/tests/setups/good/dag-disable/test_dag_disable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from unittest.mock import patch

from airflow_config import DAG, load_config


def test_create_dag_from_config():
with patch("sys.exit") as sys_exit:
conf = load_config("config", "test")
DAG(dag_id="testdag", config=conf)
assert sys_exit.call_count == 1
DAG(dag_id="example_dag", config=conf)
assert sys_exit.call_count == 2

0 comments on commit d7109f2

Please sign in to comment.