Skip to content

Commit

Permalink
revert default iceberg to false, update tests to test both scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
mikealfare committed Sep 23, 2024
1 parent 610d379 commit d9b8a6b
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 42 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _behavior_flags(self) -> List[BehaviorFlag]:
return [
{
"name": "enable_iceberg_materializations",
"default": True,
"default": False,
"description": (
"Enabling Iceberg materializations introduces latency to metadata queries, "
"specifically within the list_relations_without_caching macro. Since Iceberg "
Expand Down
22 changes: 19 additions & 3 deletions tests/functional/relation_tests/dynamic_table_tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,25 @@


class TestBasic:
iceberg: bool = False

@pytest.fixture(scope="class", autouse=True)
def seeds(self):
return {"my_seed.csv": models.SEED}

@pytest.fixture(scope="class", autouse=True)
def models(self):
yield {
my_models = {
"my_dynamic_table.sql": models.DYNAMIC_TABLE,
"my_dynamic_table_downstream.sql": models.DYNAMIC_TABLE_DOWNSTREAM,
"my_dynamic_iceberg_table.sql": models.DYNAMIC_ICEBERG_TABLE,
}
if self.iceberg:
my_models.update(
{
"my_dynamic_iceberg_table.sql": models.DYNAMIC_ICEBERG_TABLE,
}
)
yield my_models

@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
Expand All @@ -29,4 +36,13 @@ def test_dynamic_table_full_refresh(self, project):
run_dbt(["run", "--full-refresh"])
assert query_relation_type(project, "my_dynamic_table") == "dynamic_table"
assert query_relation_type(project, "my_dynamic_table_downstream") == "dynamic_table"
assert query_relation_type(project, "my_dynamic_iceberg_table") == "dynamic_table"
if self.iceberg:
assert query_relation_type(project, "my_dynamic_iceberg_table") == "dynamic_table"


class TestBasicIcebergOn(TestBasic):
iceberg = True

@pytest.fixture(scope="class")
def project_config_update(self):
return {"flags": {"enable_iceberg_materializations": True}}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,26 @@


class Changes:
iceberg: bool = False

@pytest.fixture(scope="class", autouse=True)
def seeds(self):
yield {"my_seed.csv": models.SEED}

@pytest.fixture(scope="class", autouse=True)
def models(self):
yield {
my_models = {
"dynamic_table_alter.sql": models.DYNAMIC_TABLE,
"dynamic_table_replace.sql": models.DYNAMIC_TABLE,
"dynamic_table_iceberg_alter.sql": models.DYNAMIC_ICEBERG_TABLE,
"dynamic_table_iceberg_replace.sql": models.DYNAMIC_ICEBERG_TABLE,
}
if self.iceberg:
my_models.update(
{
"dynamic_table_iceberg_alter.sql": models.DYNAMIC_ICEBERG_TABLE,
"dynamic_table_iceberg_replace.sql": models.DYNAMIC_ICEBERG_TABLE,
}
)
yield my_models

@pytest.fixture(scope="function", autouse=True)
def setup_class(self, project):
Expand All @@ -35,20 +42,23 @@ def setup_method(self, project, setup_class):

update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE_ALTER)
update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE_REPLACE)
update_model(project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE_ALTER)
update_model(
project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE_REPLACE
)
if self.iceberg:
update_model(
project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE_ALTER
)
update_model(
project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE_REPLACE
)

yield

update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE)
update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE)
update_model(project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE)
update_model(project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE)
if self.iceberg:
update_model(project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE)
update_model(project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE)

@staticmethod
def assert_changes_are_applied(project):
def assert_changes_are_applied(self, project):
altered = describe_dynamic_table(project, "dynamic_table_alter")
assert altered.snowflake_warehouse == "DBT_TESTING"
assert altered.target_lag == "5 minutes" # this updated
Expand All @@ -59,18 +69,18 @@ def assert_changes_are_applied(project):
assert replaced.target_lag == "2 minutes"
assert replaced.refresh_mode == "FULL" # this updated

altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter")
assert altered_iceberg.snowflake_warehouse == "DBT_TESTING"
assert altered_iceberg.target_lag == "5 minutes" # this updated
assert altered_iceberg.refresh_mode == "INCREMENTAL"
if self.iceberg:
altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter")
assert altered_iceberg.snowflake_warehouse == "DBT_TESTING"
assert altered_iceberg.target_lag == "5 minutes" # this updated
assert altered_iceberg.refresh_mode == "INCREMENTAL"

replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace")
assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING"
assert replaced_iceberg.target_lag == "2 minutes"
assert replaced_iceberg.refresh_mode == "FULL" # this updated
replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace")
assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING"
assert replaced_iceberg.target_lag == "2 minutes"
assert replaced_iceberg.refresh_mode == "FULL" # this updated

@staticmethod
def assert_changes_are_not_applied(project):
def assert_changes_are_not_applied(self, project):
altered = describe_dynamic_table(project, "dynamic_table_alter")
assert altered.snowflake_warehouse == "DBT_TESTING"
assert altered.target_lag == "2 minutes" # this would have updated, but didn't
Expand All @@ -81,17 +91,18 @@ def assert_changes_are_not_applied(project):
assert replaced.target_lag == "2 minutes"
assert replaced.refresh_mode == "INCREMENTAL" # this would have updated, but didn't

altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter")
assert altered_iceberg.snowflake_warehouse == "DBT_TESTING"
assert altered_iceberg.target_lag == "2 minutes" # this would have updated, but didn't
assert altered_iceberg.refresh_mode == "INCREMENTAL"
if self.iceberg:
altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter")
assert altered_iceberg.snowflake_warehouse == "DBT_TESTING"
assert altered_iceberg.target_lag == "2 minutes" # this would have updated, but didn't
assert altered_iceberg.refresh_mode == "INCREMENTAL"

replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace")
assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING"
assert replaced_iceberg.target_lag == "2 minutes"
assert (
replaced_iceberg.refresh_mode == "INCREMENTAL"
) # this would have updated, but didn't
replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace")
assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING"
assert replaced_iceberg.target_lag == "2 minutes"
assert (
replaced_iceberg.refresh_mode == "INCREMENTAL"
) # this would have updated, but didn't

def test_full_refresh_is_always_successful(self, project):
# this always passes and always changes the configuration, regardless of on_configuration_change
Expand All @@ -111,6 +122,17 @@ def test_changes_are_applied(self, project):
self.assert_changes_are_applied(project)


class TestChangesApplyIcebergOn(TestChangesApply):
iceberg = True

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {"on_configuration_change": "apply"},
"flags": {"enable_iceberg_materializations": True},
}


class TestChangesContinue(Changes):
@pytest.fixture(scope="class")
def project_config_update(self):
Expand All @@ -122,6 +144,17 @@ def test_changes_are_not_applied(self, project):
self.assert_changes_are_not_applied(project)


class TestChangesContinueIcebergOn(TestChangesContinue):
iceberg = True

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {"on_configuration_change": "continue"},
"flags": {"enable_iceberg_materializations": True},
}


class TestChangesFail(Changes):
@pytest.fixture(scope="class")
def project_config_update(self):
Expand All @@ -131,3 +164,14 @@ def test_changes_are_not_applied(self, project):
# this fails and does not change the configuration
run_dbt(["run"], expect_pass=False)
self.assert_changes_are_not_applied(project)


class TestChangesFailIcebergOn(TestChangesFail):
iceberg = True

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {"on_configuration_change": "fail"},
"flags": {"enable_iceberg_materializations": True},
}
40 changes: 33 additions & 7 deletions tests/functional/relation_tests/test_relation_type_change.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,52 @@ def error_message(self) -> str:

class TestRelationTypeChange:

@staticmethod
def include(scenario) -> bool:
return (
scenario.initial.table_format != "iceberg" and scenario.final.table_format != "iceberg"
)

@pytest.fixture(scope="class", autouse=True)
def seeds(self):
return {"my_seed.csv": models.SEED}

@pytest.fixture(scope="class", autouse=True)
def models(self):
yield {f"{scenario.name}.sql": scenario.initial.model for scenario in scenarios}
yield {
f"{scenario.name}.sql": scenario.initial.model
for scenario in scenarios
if self.include(scenario)
}

@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
run_dbt(["seed"])
run_dbt(["run"])
for scenario in scenarios:
update_model(project, scenario.name, scenario.final.model)
if self.include(scenario):
update_model(project, scenario.name, scenario.final.model)
run_dbt(["run"])

@pytest.mark.parametrize("scenario", scenarios, ids=[scenario.name for scenario in scenarios])
def test_replace(self, project, scenario):
relation_type = query_relation_type(project, scenario.name)
assert relation_type == scenario.final.relation_type, scenario.error_message
if relation_type == "dynamic_table":
dynamic_table = describe_dynamic_table(project, scenario.name)
assert dynamic_table.catalog.table_format == scenario.final.table_format
if self.include(scenario):
relation_type = query_relation_type(project, scenario.name)
assert relation_type == scenario.final.relation_type, scenario.error_message
if relation_type == "dynamic_table":
dynamic_table = describe_dynamic_table(project, scenario.name)
assert dynamic_table.catalog.table_format == scenario.final.table_format
else:
pytest.skip()


class TestRelationTypeChangeIcebergOn(TestRelationTypeChange):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"flags": {"enable_iceberg_materializations": True}}

@staticmethod
def include(scenario) -> bool:
return (
scenario.initial.table_format == "iceberg" or scenario.final.table_format == "iceberg"
)

0 comments on commit d9b8a6b

Please sign in to comment.