diff --git a/tests/functional/iceberg/test_incremental_models.py b/tests/functional/iceberg/test_incremental_models.py index f8f1d6b89..35ccdcd89 100644 --- a/tests/functional/iceberg/test_incremental_models.py +++ b/tests/functional/iceberg/test_incremental_models.py @@ -3,7 +3,7 @@ from pathlib import Path -from dbt.tests.util import run_dbt, run_dbt_and_capture +from dbt.tests.util import run_dbt, run_dbt_and_capture, write_file _SEED_INCREMENTAL_STRATEGIES = """ @@ -32,6 +32,7 @@ incremental_strategy='{strategy}', unique_key="world_id", external_volume = "s3_iceberg_snow", + on_schema_change = "sync_all_columns" ) }}}} select * from {{{{ ref('upstream_table') }}}} @@ -123,3 +124,46 @@ def test_incremental_strategies_with_update(self, project, setup_class): self.__check_correct_operations(self.append, rows_affected=2) self.__check_correct_operations("merge", rows_affected=1) self.__check_correct_operations("delete_insert", rows_affected=1) + + +class TestIcebergIncrementalOnSchemaChangeMutatesRelations: + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": True}} + + @pytest.fixture(scope="class") + def seeds(self): + return { + "seed.csv": _SEED_INCREMENTAL_STRATEGIES, + } + + @pytest.fixture(scope="function", autouse=True) + def setup_class(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + yield + + @pytest.fixture(scope="class") + def models(self): + return { + "upstream_table.sql": _MODEL_BASIC_TABLE_MODEL, + "merge.sql": _MODEL_INCREMENTAL_ICEBERG_MERGE, + } + + def test_sync_and_append_semantics(self, project, setup_class): + model_file = project.project_root / Path("models") / Path("merge.sql") + sql = f"show columns in {project.database}.{project.test_schema}.merge;" + column_names = [column[2] for column in project.run_sql(sql, fetch="all")] + assert len(column_names) == 3 + + write_file(_MODEL_INCREMENTAL_ICEBERG_MERGE.replace("*", "*, 1 as new_column"), model_file) + run_dbt() + column_names = [column[2].lower() for column in project.run_sql(sql, fetch="all")] + assert len(column_names) == 4 + assert "new_column" in column_names + + write_file(_MODEL_INCREMENTAL_ICEBERG_MERGE, model_file) + run_dbt() + column_names = [column[2].lower() for column in project.run_sql(sql, fetch="all")] + assert len(column_names) == 3 + assert "new_column" not in column_names