Skip to content

Commit

Permalink
Slow down DAG processor refreshing bundles (apache#46870)
Browse files Browse the repository at this point in the history
Right now we check if any bundles need to be refreshed every loop,
which can be very frequent (e.g. 10 times per second or more). This
introduces a new config, [dag_processor] bundle_refresh_check_interval,
that allows for slowing down how often we do it. Since we send
queries to the db, having some control here is helpful.

---------

Co-authored-by: Daniel Standish <[email protected]>
  • Loading branch information
jedcunningham and dstandish authored Feb 20, 2025
1 parent c505b24 commit 43e3f6a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 0 deletions.
10 changes: 10 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2669,6 +2669,16 @@ dag_processor:
type: integer
example: ~
default: "30"
bundle_refresh_check_interval:
description: |
How often the DAG processor should check if any DAG bundles are ready for a refresh, either by hitting
the bundles refresh_interval or because another DAG processor has seen a newer version of the bundle.
A low value means we check more frequently, and have a smaller window of time where DAG processors are
out of sync with each other, parsing different versions of the same bundle.
version_added: ~
type: integer
example: ~
default: "5"
fastapi:
description: Configuration for the Fastapi webserver.
options:
Expand Down
1 change: 1 addition & 0 deletions airflow/config_templates/unit_tests.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ scheduler_heartbeat_sec = 5

[dag_processor]
parsing_processes = 2
bundle_refresh_check_interval = 0

[triggerer]
# Those values are set so that during unit tests things run faster than usual.
Expand Down
19 changes: 19 additions & 0 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ class DagFileProcessorManager(LoggingMixin):
base_log_dir: str = attrs.field(factory=_config_get_factory("scheduler", "CHILD_PROCESS_LOG_DIRECTORY"))
_latest_log_symlink_date: datetime = attrs.field(factory=datetime.today, init=False)

bundle_refresh_check_interval: int = attrs.field(
factory=_config_int_factory("dag_processor", "bundle_refresh_check_interval")
)
_bundles_last_refreshed: float = attrs.field(default=0, init=False)
"""Last time we checked if any bundles are ready to be refreshed"""

def register_exit_signals(self):
"""Register signals that stop child processes."""
signal.signal(signal.SIGINT, self._exit_gracefully)
Expand Down Expand Up @@ -447,6 +453,19 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
"""Refresh DAG bundles, if required."""
now = timezone.utcnow()

# we don't need to check if it's time to refresh every loop - that is way too often
next_check = self._bundles_last_refreshed + self.bundle_refresh_check_interval
now_seconds = time.monotonic()
if now_seconds < next_check:
self.log.debug(
"Not time to check if DAG Bundles need refreshed yet - skipping. "
"Next check in %.2f seconds",
next_check - now_seconds,
)
return

self._bundles_last_refreshed = now_seconds

for bundle in self._dag_bundles:
# TODO: AIP-66 handle errors in the case of incomplete cloning? And test this.
# What if the cloning/refreshing took too long(longer than the dag processor timeout)
Expand Down
30 changes: 30 additions & 0 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,36 @@ def _update_bundletwo_version():
manager.run()
assert bundletwo.refresh.call_count == 2

def test_bundle_refresh_check_interval(self):
"""Ensure dag processor doesn't refresh bundles every loop."""
config = [
{
"name": "bundleone",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 0},
},
]

bundleone = MagicMock()
bundleone.name = "bundleone"
bundleone.path = "/dev/null"
bundleone.refresh_interval = 0
bundleone.get_current_version.return_value = None

with conf_vars(
{
("dag_processor", "dag_bundle_config_list"): json.dumps(config),
("dag_processor", "bundle_refresh_check_interval"): "10",
}
):
DagBundlesManager().sync_bundles_to_db()
manager = DagFileProcessorManager(max_runs=2)
manager._dag_bundles = [bundleone]
manager._refresh_dag_bundles({})
assert bundleone.refresh.call_count == 1
manager._refresh_dag_bundles({})
assert bundleone.refresh.call_count == 1 # didn't fresh the second time

def test_bundles_versions_are_stored(self, session):
config = [
{
Expand Down

0 comments on commit 43e3f6a

Please sign in to comment.