diff --git a/dbt_dry_run/execution.py b/dbt_dry_run/execution.py index 1d2085f..b76dd54 100644 --- a/dbt_dry_run/execution.py +++ b/dbt_dry_run/execution.py @@ -24,6 +24,7 @@ def should_check_columns(node: Node) -> bool: check_column = node.get_combined_metadata("dry_run.check_columns") + if check_column is not None: return bool(check_column) diff --git a/dbt_dry_run/node_runner/incremental_runner.py b/dbt_dry_run/node_runner/incremental_runner.py index 0a090e4..e864ddb 100644 --- a/dbt_dry_run/node_runner/incremental_runner.py +++ b/dbt_dry_run/node_runner/incremental_runner.py @@ -108,7 +108,7 @@ def get_merge_sql( USING ( {select_statement} ) - ON True + ON False WHEN NOT MATCHED THEN INSERT ({values_csv}) VALUES ({values_csv}) diff --git a/integration/conftest.py b/integration/conftest.py index 11bb7e9..5b7cc67 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -37,15 +37,26 @@ def __init__( @contextmanager def create_state( - self, node: Node, columns: Iterable[str] + self, + node: Node, + columns: Iterable[str], + partition_by: Optional[str] = None, + require_partition_by: bool = False, ) -> Generator[None, None, None]: node_name = node.to_table_ref_literal() schema_csv = ",\n".join(columns) + partition_by_clause = f""" + PARTITION BY {partition_by} + OPTIONS (require_partition_filter = {require_partition_by}) + """ + if not partition_by: + partition_by_clause = "" create_ddl = f""" - CREATE OR REPLACE TABLE {node_name} + CREATE OR REPLACE TABLE {node_name} ( {schema_csv} - ); + ) + {partition_by_clause}; """ client: Client = self._project.get_connection().handle client.query(create_ddl) diff --git a/integration/projects/test_incremental/models/required_partition_filter.sql b/integration/projects/test_incremental/models/required_partition_filter.sql new file mode 100644 index 0000000..565ce94 --- /dev/null +++ b/integration/projects/test_incremental/models/required_partition_filter.sql @@ -0,0 +1,16 @@ +{{ + config( + materialized="incremental", + on_schema_change="append_new_columns", + require_partition_filter = true, + partition_by={'field': 'snapshot_date', 'data_type': 'date'} + ) +}} + +SELECT + col_1, + col_2, + snapshot_date +FROM (SELECT "foo" as col_1, "bar" as col_2, DATE("2023-01-04") as snapshot_date) + + diff --git a/integration/projects/test_incremental/test_incremental.py b/integration/projects/test_incremental/test_incremental.py index 1836bd6..b35ee33 100644 --- a/integration/projects/test_incremental/test_incremental.py +++ b/integration/projects/test_incremental/test_incremental.py @@ -174,3 +174,21 @@ def test_column_order_preserved_on_schema_change_append_new_columns( node_id, ) assert_report_node_has_columns_in_order(report_node, ["col_2", "col_1"]) + + +def test_required_partition_filter( + compiled_project: ProjectContext, +): + node_id = "model.test_incremental.required_partition_filter" + manifest_node = compiled_project.manifest.nodes[node_id] + columns = ["col_1 STRING", "col_2 STRING", "snapshot_date DATE"] + with compiled_project.create_state(manifest_node, columns, "snapshot_date", True): + run_result = compiled_project.dry_run() + assert_report_produced(run_result) + report_node = get_report_node_by_id( + run_result.report, + node_id, + ) + assert_report_node_has_columns_in_order( + report_node, ["col_1", "col_2", "snapshot_date"] + )