diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index a66788dc8fbe3..0b831d238d1c3 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -661,8 +661,10 @@ def _refresh_dag_bundles(self): ).total_seconds() current_version = bundle.get_current_version() if ( - not elapsed_time_since_refresh > bundle.refresh_interval - ) and bundle_model.latest_version == current_version: + elapsed_time_since_refresh < bundle.refresh_interval + and bundle_model.latest_version == current_version + and bundle.name in self._bundle_versions + ): self.log.info("Not time to refresh %s", bundle.name) continue diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 88d5e7bd2283a..1f10ed3eb1b67 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -831,8 +831,9 @@ def test_bundles_are_refreshed(self): """ Ensure bundles are refreshed by the manager, when necessary. + - always refresh all bundles when starting the manager - refresh if the bundle hasn't been refreshed in the refresh_interval - - when the latest_version in the db doesn't match the version this parser knows about + - when the latest_version in the db doesn't match the version this manager knows about """ config = [ { @@ -863,23 +864,36 @@ def test_bundles_are_refreshed(self): ) as mock_bundle_manager: mock_bundle_manager.return_value._bundle_config = {"bundleone": None, "bundletwo": None} mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone, bundletwo] - manager = DagFileProcessorManager(max_runs=1) + + # We should refresh bundleone twice, but bundletwo only once - it has a long refresh_interval + manager = DagFileProcessorManager(max_runs=2) manager.run() - bundleone.refresh.assert_called_once() + assert bundleone.refresh.call_count == 2 bundletwo.refresh.assert_called_once() - # Now, we should only refresh bundleone, as haven't hit the refresh_interval for bundletwo + # Now, we should refresh both bundles, regardless of the refresh_interval + # as we are starting up a fresh manager bundleone.reset_mock() bundletwo.reset_mock() + manager = DagFileProcessorManager(max_runs=2) manager.run() - bundleone.refresh.assert_called_once() - bundletwo.refresh.assert_not_called() + assert bundleone.refresh.call_count == 2 + bundletwo.refresh.assert_called_once() # however, if the version doesn't match, we should still refresh bundletwo.reset_mock() - bundletwo.get_current_version.return_value = "123" + + def _update_bundletwo_version(): + # We will update the bundle version in the db, so the next manager loop + # will believe another processor had seen a new version + with create_session() as session: + bundletwo_model = session.get(DagBundleModel, "bundletwo") + bundletwo_model.latest_version = "123" + + bundletwo.refresh.side_effect = _update_bundletwo_version + manager = DagFileProcessorManager(max_runs=2) manager.run() - bundletwo.refresh.assert_called_once() + assert bundletwo.refresh.call_count == 2 def test_bundles_versions_are_stored(self): config = [