Skip to content

Commit

Permalink
Fix bug where bundles wouldnt be refreshed at least once during first…
Browse files Browse the repository at this point in the history
… loop
  • Loading branch information
jedcunningham committed Jan 11, 2025
1 parent c5e18d1 commit 25da62c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
6 changes: 4 additions & 2 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,10 @@ def _refresh_dag_bundles(self):
).total_seconds()
current_version = bundle.get_current_version()
if (
not elapsed_time_since_refresh > bundle.refresh_interval
) and bundle_model.latest_version == current_version:
elapsed_time_since_refresh < bundle.refresh_interval
and bundle_model.latest_version == current_version
and bundle.name in self._bundle_versions
):
self.log.info("Not time to refresh %s", bundle.name)
continue

Expand Down
30 changes: 22 additions & 8 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,8 +831,9 @@ def test_bundles_are_refreshed(self):
"""
Ensure bundles are refreshed by the manager, when necessary.
- always refresh all bundles when starting the manager
- refresh if the bundle hasn't been refreshed in the refresh_interval
- when the latest_version in the db doesn't match the version this parser knows about
- when the latest_version in the db doesn't match the version this manager knows about
"""
config = [
{
Expand Down Expand Up @@ -863,23 +864,36 @@ def test_bundles_are_refreshed(self):
) as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"bundleone": None, "bundletwo": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone, bundletwo]
manager = DagFileProcessorManager(max_runs=1)

# We should refresh bundleone twice, but bundletwo only once - it has a long refresh_interval
manager = DagFileProcessorManager(max_runs=2)
manager.run()
bundleone.refresh.assert_called_once()
assert bundleone.refresh.call_count == 2
bundletwo.refresh.assert_called_once()

# Now, we should only refresh bundleone, as haven't hit the refresh_interval for bundletwo
# Now, we should refresh both bundles, regardless of the refresh_interval
# as we are starting up a fresh manager
bundleone.reset_mock()
bundletwo.reset_mock()
manager = DagFileProcessorManager(max_runs=2)
manager.run()
bundleone.refresh.assert_called_once()
bundletwo.refresh.assert_not_called()
assert bundleone.refresh.call_count == 2
bundletwo.refresh.assert_called_once()

# however, if the version doesn't match, we should still refresh
bundletwo.reset_mock()
bundletwo.get_current_version.return_value = "123"

def _update_bundletwo_version():
# We will update the bundle version in the db, so the next manager loop
# will believe another processor had seen a new version
with create_session() as session:
bundletwo_model = session.get(DagBundleModel, "bundletwo")
bundletwo_model.latest_version = "123"

bundletwo.refresh.side_effect = _update_bundletwo_version
manager = DagFileProcessorManager(max_runs=2)
manager.run()
bundletwo.refresh.assert_called_once()
assert bundletwo.refresh.call_count == 2

def test_bundles_versions_are_stored(self):
config = [
Expand Down

0 comments on commit 25da62c

Please sign in to comment.