diff --git a/.changes/unreleased/Fixes-20230810-154613.yaml b/.changes/unreleased/Fixes-20230810-154613.yaml new file mode 100644 index 000000000..e1794e899 --- /dev/null +++ b/.changes/unreleased/Fixes-20230810-154613.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: change target_lag type to allow for downstream as a option +time: 2023-08-10T15:46:13.896057-05:00 +custom: + Author: McKnight-42 + Issue: "734" diff --git a/.changes/unreleased/Fixes-20230810-163232.yaml b/.changes/unreleased/Fixes-20230810-163232.yaml new file mode 100644 index 000000000..80643d93f --- /dev/null +++ b/.changes/unreleased/Fixes-20230810-163232.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: update snowflake_warehouse field for dynamic tables to be more accounted for +time: 2023-08-10T16:32:32.417917-05:00 +custom: + Author: McKnight-42 + Issue: "735" diff --git a/dbt/adapters/snowflake/relation_configs/__init__.py b/dbt/adapters/snowflake/relation_configs/__init__.py index e53604526..e5ceabe49 100644 --- a/dbt/adapters/snowflake/relation_configs/__init__.py +++ b/dbt/adapters/snowflake/relation_configs/__init__.py @@ -2,14 +2,10 @@ SnowflakeDynamicTableConfig, SnowflakeDynamicTableConfigChangeset, SnowflakeDynamicTableWarehouseConfigChange, + SnowflakeDynamicTableTargetLagConfigChange, ) from dbt.adapters.snowflake.relation_configs.policies import ( SnowflakeIncludePolicy, SnowflakeQuotePolicy, SnowflakeRelationType, ) -from dbt.adapters.snowflake.relation_configs.target_lag import ( - SnowflakeDynamicTableTargetLagConfig, - SnowflakeDynamicTableTargetLagConfigChange, - SnowflakeDynamicTableTargetLagPeriod, -) diff --git a/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt/adapters/snowflake/relation_configs/dynamic_table.py index 36be68f56..6caa7211e 100644 --- a/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -7,10 +7,6 @@ from dbt.contracts.relation import ComponentName from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase -from dbt.adapters.snowflake.relation_configs.target_lag import ( - SnowflakeDynamicTableTargetLagConfig, - SnowflakeDynamicTableTargetLagConfigChange, -) @dataclass(frozen=True, eq=True, unsafe_hash=True) @@ -32,7 +28,7 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): schema_name: str database_name: str query: str - target_lag: SnowflakeDynamicTableTargetLagConfig + target_lag: str snowflake_warehouse: str @classmethod @@ -44,14 +40,10 @@ def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig": ComponentName.Database, config_dict.get("database_name") ), "query": config_dict.get("query"), + "target_lag": config_dict.get("target_lag"), "snowflake_warehouse": config_dict.get("snowflake_warehouse"), } - if target_lag := config_dict.get("target_lag"): - kwargs_dict.update( - {"target_lag": SnowflakeDynamicTableTargetLagConfig.from_dict(target_lag)} - ) - dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict) # type: ignore return dynamic_table @@ -62,14 +54,10 @@ def parse_model_node(cls, model_node: ModelNode) -> dict: "schema_name": model_node.schema, "database_name": model_node.database, "query": model_node.compiled_code, + "target_lag": model_node.config.extra.get("target_lag"), "snowflake_warehouse": model_node.config.extra.get("snowflake_warehouse"), } - if model_node.config.extra.get("target_lag"): - config_dict.update( - {"target_lag": SnowflakeDynamicTableTargetLagConfig.parse_model_node(model_node)} - ) - return config_dict @classmethod @@ -81,21 +69,22 @@ def parse_relation_results(cls, relation_results: RelationResults) -> dict: "schema_name": dynamic_table.get("schema_name"), "database_name": dynamic_table.get("database_name"), "query": dynamic_table.get("text"), + "target_lag": dynamic_table.get("target_lag"), "snowflake_warehouse": dynamic_table.get("warehouse"), } - if dynamic_table.get("target_lag"): - config_dict.update( - { - "target_lag": SnowflakeDynamicTableTargetLagConfig.parse_relation_results( - dynamic_table - ) - } - ) - return config_dict +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class SnowflakeDynamicTableTargetLagConfigChange(RelationConfigChange): + context: Optional[str] = None + + @property + def requires_full_refresh(self) -> bool: + return False + + @dataclass(frozen=True, eq=True, unsafe_hash=True) class SnowflakeDynamicTableWarehouseConfigChange(RelationConfigChange): context: Optional[str] = None diff --git a/dbt/adapters/snowflake/relation_configs/target_lag.py b/dbt/adapters/snowflake/relation_configs/target_lag.py deleted file mode 100644 index 9744e3b00..000000000 --- a/dbt/adapters/snowflake/relation_configs/target_lag.py +++ /dev/null @@ -1,111 +0,0 @@ -from dataclasses import dataclass -from typing import Any, Dict, Optional, Union - -import agate -from dbt.adapters.relation_configs import RelationConfigChange -from dbt.contracts.graph.nodes import ModelNode -from dbt.dataclass_schema import StrEnum - -from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase - - -class SnowflakeDynamicTableTargetLagPeriod(StrEnum): - seconds = "seconds" - minutes = "minutes" - hours = "hours" - days = "days" - second = "second" - minute = "minute" - hour = "hour" - day = "day" - - -@dataclass(frozen=True, eq=True, unsafe_hash=True) -class SnowflakeDynamicTableTargetLagConfig(SnowflakeRelationConfigBase): - """ - This config follow the specs found here: - https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table - - The following parameters are configurable by dbt: - - duration: the numeric part of the lag - - period: the scale part of the lag - - There are currently no non-configurable parameters. - """ - - duration: int - period: SnowflakeDynamicTableTargetLagPeriod - - def __str__(self) -> str: - return f"{self.duration} {self.period}" - - @classmethod - def from_dict(cls, config_dict) -> "SnowflakeDynamicTableTargetLagConfig": - kwargs_dict: Dict[str, Union[int, SnowflakeDynamicTableTargetLagPeriod]] = {} - - if duration := config_dict.get("duration"): - kwargs_dict.update({"duration": int(duration)}) - - if period := config_dict.get("period"): - kwargs_dict.update({"period": SnowflakeDynamicTableTargetLagPeriod(period)}) - - target_lag: "SnowflakeDynamicTableTargetLagConfig" = super().from_dict(kwargs_dict) # type: ignore - return target_lag - - @classmethod - def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]: - """ - Translate ModelNode objects from the user-provided config into a standard dictionary. - - Args: - model_node: the description of the target lag from the user in this format: - - { - "target_lag": "int any("seconds", "minutes", "hours", "days")" - } - - Returns: a standard dictionary describing this `SnowflakeDynamicTableTargetLagConfig` instance - """ - target_lag: str = model_node.config.extra["target_lag"] - return cls._parse_target_lag_string(target_lag) - - @classmethod - def parse_relation_results(cls, relation_results_entry: agate.Row) -> Dict[str, Any]: - """ - Translate agate objects from the database into a standard dictionary. - - Args: - relation_results_entry: the description of the target lag from the database in this format: - - agate.Row({ - "target_lag": "int any("seconds", "minutes", "hours", "days")" - }) - - Returns: a standard dictionary describing this `SnowflakeDynamicTableTargetLagConfig` instance - """ - target_lag: str = relation_results_entry["target_lag"] - return cls._parse_target_lag_string(target_lag) - - @staticmethod - def _parse_target_lag_string(target_lag: str) -> Dict[str, Union[Optional[Union[int, str]]]]: - try: - # Snowflake supports strings like `1 \n minutes` despite the docs not suggesting that - duration_str, *_, period = target_lag.split(" ") - duration = int(duration_str) - except (AttributeError, IndexError): - duration, period = None, None - - config_dict = { - "duration": duration, - "period": period, - } - return config_dict - - -@dataclass(frozen=True, eq=True, unsafe_hash=True) -class SnowflakeDynamicTableTargetLagConfigChange(RelationConfigChange): - context: Optional[SnowflakeDynamicTableTargetLagConfig] = None - - @property - def requires_full_refresh(self) -> bool: - return False diff --git a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py b/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py index 5d206dc9c..84fa22a0c 100644 --- a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py +++ b/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py @@ -43,12 +43,26 @@ def change_config_via_alter(project, dynamic_table): ) set_model_file(project, dynamic_table, new_model) + @staticmethod + def change_config_via_alter_downstream(project, dynamic_table): + initial_model = get_model_file(project, dynamic_table) + new_model = initial_model.replace( + "target_lag='120 seconds'", "target_lag='downstream'" + ) + set_model_file(project, dynamic_table, new_model) + @staticmethod def check_state_alter_change_is_applied(adapter, dynamic_table): # see above assert query_target_lag(adapter, dynamic_table) == "5 minutes" assert query_warehouse(adapter, dynamic_table) == "DBT_TESTING" + @staticmethod + def check_state_alter_change_is_applied_downstream(adapter, dynamic_table): + # see above + assert query_target_lag(adapter, dynamic_table) == "downstream" + assert query_warehouse(adapter, dynamic_table) == "DBT_TESTING" + @staticmethod def change_config_via_replace(project, dynamic_table): # dbt-snowflake does not currently monitor any changes that trigger a full refresh @@ -127,6 +141,20 @@ def test_change_is_applied_via_alter(self, project, adapter, my_dynamic_table): assert_message_in_logs(f"Applying ALTER to: {my_dynamic_table}", logs) assert_message_in_logs(f"Applying REPLACE to: {my_dynamic_table}", logs, False) + def test_change_is_applied_via_alter_downstream(self, project, adapter, my_dynamic_table): + """ + See above about the two commented assertions. In the meantime, these have been validated manually. + """ + # self.check_start_state(adapter, my_dynamic_table) + + self.change_config_via_alter_downstream(project, my_dynamic_table) + _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) + + # self.check_state_alter_change_is_applied_downstream(adapter, my_dynamic_table) + + assert_message_in_logs(f"Applying ALTER to: {my_dynamic_table}", logs) + assert_message_in_logs(f"Applying REPLACE to: {my_dynamic_table}", logs, False) + @pytest.mark.skip( "dbt-snowflake does not currently monitor any changes the trigger a full refresh" )