From f1dddaa6e96b743498440925a55e66a7d5cdacfa Mon Sep 17 00:00:00 2001 From: Brice Luu Date: Sat, 29 Apr 2023 00:01:06 +0200 Subject: [PATCH] Ignore parent tests added edges for build selection (#7431) --- .../unreleased/Fixes-20230421-172428.yaml | 6 ++++ core/dbt/compilation.py | 2 +- core/dbt/graph/graph.py | 17 ++++++++-- tests/functional/build/fixtures.py | 21 ++++++++++++ tests/functional/build/test_build.py | 32 +++++++++++++++++++ .../defer_state/test_run_results_state.py | 11 +++---- 6 files changed, 80 insertions(+), 9 deletions(-) create mode 100644 .changes/unreleased/Fixes-20230421-172428.yaml diff --git a/.changes/unreleased/Fixes-20230421-172428.yaml b/.changes/unreleased/Fixes-20230421-172428.yaml new file mode 100644 index 00000000000..82c489918b1 --- /dev/null +++ b/.changes/unreleased/Fixes-20230421-172428.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: dbt build selection of tests' descendants +time: 2023-04-21T17:24:28.335866975+02:00 +custom: + Author: b-luu + Issue: "7289" diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index 3cb4285fe21..a2002069257 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -491,7 +491,7 @@ def add_test_edges(self, linker: Linker, manifest: Manifest) -> None: # is a subset of all upstream nodes of the current node, # add an edge from the upstream test to the current node. if test_depends_on.issubset(upstream_nodes): - linker.graph.add_edge(upstream_test, node_id) + linker.graph.add_edge(upstream_test, node_id, edge_type="parent_test") def compile(self, manifest: Manifest, write=True, add_test_edges=False) -> Graph: self.initialize() diff --git a/core/dbt/graph/graph.py b/core/dbt/graph/graph.py index 29f24cae734..69a2f21258a 100644 --- a/core/dbt/graph/graph.py +++ b/core/dbt/graph/graph.py @@ -28,16 +28,29 @@ def ancestors(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[Uniq """Returns all nodes having a path to `node` in `graph`""" if not self.graph.has_node(node): raise DbtInternalError(f"Node {node} not found in the graph!") + filtered_graph = self.exclude_edge_type("parent_test") return { child - for _, child in nx.bfs_edges(self.graph, node, reverse=True, depth_limit=max_depth) + for _, child in nx.bfs_edges(filtered_graph, node, reverse=True, depth_limit=max_depth) } def descendants(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[UniqueId]: """Returns all nodes reachable from `node` in `graph`""" if not self.graph.has_node(node): raise DbtInternalError(f"Node {node} not found in the graph!") - return {child for _, child in nx.bfs_edges(self.graph, node, depth_limit=max_depth)} + filtered_graph = self.exclude_edge_type("parent_test") + return {child for _, child in nx.bfs_edges(filtered_graph, node, depth_limit=max_depth)} + + def exclude_edge_type(self, edge_type_to_exclude): + return nx.restricted_view( + self.graph, + nodes=[], + edges=( + (a, b) + for a, b in self.graph.edges + if self.graph[a][b].get("edge_type") == edge_type_to_exclude + ), + ) def select_childrens_parents(self, selected: Set[UniqueId]) -> Set[UniqueId]: ancestors_for = self.select_children(selected) | selected diff --git a/tests/functional/build/fixtures.py b/tests/functional/build/fixtures.py index 7c4d93e6186..e6f8dd4f0b3 100644 --- a/tests/functional/build/fixtures.py +++ b/tests/functional/build/fixtures.py @@ -206,6 +206,27 @@ - not_null """ +models_triple_blocking__test_yml = """ +version: 2 + +models: + - name: model_a + columns: + - name: id + tests: + - not_null + - name: model_b + columns: + - name: id + tests: + - not_null + - name: model_c + columns: + - name: id + tests: + - not_null +""" + models_interdependent__model_a_sql = """ select 1 as id """ diff --git a/tests/functional/build/test_build.py b/tests/functional/build/test_build.py index eb9529be102..fb909d69f4b 100644 --- a/tests/functional/build/test_build.py +++ b/tests/functional/build/test_build.py @@ -18,6 +18,7 @@ models_simple_blocking__model_a_sql, models_simple_blocking__model_b_sql, models_simple_blocking__test_yml, + models_triple_blocking__test_yml, models_interdependent__test_yml, models_interdependent__model_a_sql, models_interdependent__model_b_sql, @@ -196,3 +197,34 @@ def test_interdependent_models_fail(self, project): actual = [str(r.status) for r in results] expected = ["error"] * 4 + ["skipped"] * 7 + ["pass"] * 2 + ["success"] * 3 assert sorted(actual) == sorted(expected) + + +class TestDownstreamSelection: + @pytest.fixture(scope="class") + def models(self): + return { + "model_a.sql": models_simple_blocking__model_a_sql, + "model_b.sql": models_simple_blocking__model_b_sql, + "test.yml": models_simple_blocking__test_yml, + } + + def test_downstream_selection(self, project): + """Ensure that selecting test+ does not select model_a's other children""" + results = run_dbt(["build", "--select", "model_a not_null_model_a_id+"], expect_pass=True) + assert len(results) == 2 + + +class TestLimitedUpstreamSelection: + @pytest.fixture(scope="class") + def models(self): + return { + "model_a.sql": models_interdependent__model_a_sql, + "model_b.sql": models_interdependent__model_b_sql, + "model_c.sql": models_interdependent__model_c_sql, + "test.yml": models_triple_blocking__test_yml, + } + + def test_limited_upstream_selection(self, project): + """Ensure that selecting 1+model_c only selects up to model_b (+ tests of both)""" + results = run_dbt(["build", "--select", "1+model_c"], expect_pass=True) + assert len(results) == 4 diff --git a/tests/functional/defer_state/test_run_results_state.py b/tests/functional/defer_state/test_run_results_state.py index aa1dc549272..69dc77a1dd3 100644 --- a/tests/functional/defer_state/test_run_results_state.py +++ b/tests/functional/defer_state/test_run_results_state.py @@ -216,9 +216,9 @@ def test_build_run_results_state(self, project): results = run_dbt( ["build", "--select", "result:fail+", "--state", "./state"], expect_pass=False ) - assert len(results) == 2 + assert len(results) == 1 nodes = set([elem.node.name for elem in results]) - assert nodes == {"table_model", "unique_view_model_id"} + assert nodes == {"unique_view_model_id"} results = run_dbt(["ls", "--select", "result:fail+", "--state", "./state"]) assert len(results) == 1 @@ -240,9 +240,9 @@ def test_build_run_results_state(self, project): results = run_dbt( ["build", "--select", "result:warn+", "--state", "./state"], expect_pass=True ) - assert len(results) == 2 # includes table_model to be run + assert len(results) == 1 nodes = set([elem.node.name for elem in results]) - assert nodes == {"table_model", "unique_view_model_id"} + assert nodes == {"unique_view_model_id"} results = run_dbt(["ls", "--select", "result:warn+", "--state", "./state"]) assert len(results) == 1 @@ -483,12 +483,11 @@ def test_concurrent_selectors_build_run_results_state(self, project): ], expect_pass=False, ) - assert len(results) == 5 + assert len(results) == 4 nodes = set([elem.node.name for elem in results]) assert nodes == { "error_model", "downstream_of_error_model", "table_model_modified_example", - "table_model", "unique_view_model_id", }