Skip to content

Commit

Permalink
[ADAP-631] Convert Target_lag to str type to add Downstream option (d…
Browse files Browse the repository at this point in the history
…bt-labs#732)

* update RELEASE_BRANCH env

* initial push for ADAP-631 to convert target_lag to a str type

* readd SnowflakeDynamicTableTargetLagConfigChange class as part of dynamic_table.py

* pull in ADAP-774 and add changelog

* add missing changelog entry for pr 727

* update to main, and add basic test case for passing downstream via alter
  • Loading branch information
McKnight-42 authored and philippe-boyd-maxa committed Nov 27, 2023
1 parent 674defc commit 8b5b6cf
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 140 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230810-154613.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: change target_lag type to allow for downstream as a option
time: 2023-08-10T15:46:13.896057-05:00
custom:
Author: McKnight-42
Issue: "734"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230810-163232.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: update snowflake_warehouse field for dynamic tables to be more accounted for
time: 2023-08-10T16:32:32.417917-05:00
custom:
Author: McKnight-42
Issue: "735"
6 changes: 1 addition & 5 deletions dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
)
from dbt.adapters.snowflake.relation_configs.policies import (
SnowflakeIncludePolicy,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.target_lag import (
SnowflakeDynamicTableTargetLagConfig,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableTargetLagPeriod,
)
37 changes: 13 additions & 24 deletions dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
from dbt.contracts.relation import ComponentName

from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.target_lag import (
SnowflakeDynamicTableTargetLagConfig,
SnowflakeDynamicTableTargetLagConfigChange,
)


@dataclass(frozen=True, eq=True, unsafe_hash=True)
Expand All @@ -32,7 +28,7 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
schema_name: str
database_name: str
query: str
target_lag: SnowflakeDynamicTableTargetLagConfig
target_lag: str
snowflake_warehouse: str

@classmethod
Expand All @@ -44,14 +40,10 @@ def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
ComponentName.Database, config_dict.get("database_name")
),
"query": config_dict.get("query"),
"target_lag": config_dict.get("target_lag"),
"snowflake_warehouse": config_dict.get("snowflake_warehouse"),
}

if target_lag := config_dict.get("target_lag"):
kwargs_dict.update(
{"target_lag": SnowflakeDynamicTableTargetLagConfig.from_dict(target_lag)}
)

dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict) # type: ignore
return dynamic_table

Expand All @@ -62,14 +54,10 @@ def parse_model_node(cls, model_node: ModelNode) -> dict:
"schema_name": model_node.schema,
"database_name": model_node.database,
"query": model_node.compiled_code,
"target_lag": model_node.config.extra.get("target_lag"),
"snowflake_warehouse": model_node.config.extra.get("snowflake_warehouse"),
}

if model_node.config.extra.get("target_lag"):
config_dict.update(
{"target_lag": SnowflakeDynamicTableTargetLagConfig.parse_model_node(model_node)}
)

return config_dict

@classmethod
Expand All @@ -81,21 +69,22 @@ def parse_relation_results(cls, relation_results: RelationResults) -> dict:
"schema_name": dynamic_table.get("schema_name"),
"database_name": dynamic_table.get("database_name"),
"query": dynamic_table.get("text"),
"target_lag": dynamic_table.get("target_lag"),
"snowflake_warehouse": dynamic_table.get("warehouse"),
}

if dynamic_table.get("target_lag"):
config_dict.update(
{
"target_lag": SnowflakeDynamicTableTargetLagConfig.parse_relation_results(
dynamic_table
)
}
)

return config_dict


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableTargetLagConfigChange(RelationConfigChange):
context: Optional[str] = None

@property
def requires_full_refresh(self) -> bool:
return False


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableWarehouseConfigChange(RelationConfigChange):
context: Optional[str] = None
Expand Down
111 changes: 0 additions & 111 deletions dbt/adapters/snowflake/relation_configs/target_lag.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,26 @@ def change_config_via_alter(project, dynamic_table):
)
set_model_file(project, dynamic_table, new_model)

@staticmethod
def change_config_via_alter_downstream(project, dynamic_table):
initial_model = get_model_file(project, dynamic_table)
new_model = initial_model.replace(
"target_lag='120 seconds'", "target_lag='downstream'"
)
set_model_file(project, dynamic_table, new_model)

@staticmethod
def check_state_alter_change_is_applied(adapter, dynamic_table):
# see above
assert query_target_lag(adapter, dynamic_table) == "5 minutes"
assert query_warehouse(adapter, dynamic_table) == "DBT_TESTING"

@staticmethod
def check_state_alter_change_is_applied_downstream(adapter, dynamic_table):
# see above
assert query_target_lag(adapter, dynamic_table) == "downstream"
assert query_warehouse(adapter, dynamic_table) == "DBT_TESTING"

@staticmethod
def change_config_via_replace(project, dynamic_table):
# dbt-snowflake does not currently monitor any changes that trigger a full refresh
Expand Down Expand Up @@ -127,6 +141,20 @@ def test_change_is_applied_via_alter(self, project, adapter, my_dynamic_table):
assert_message_in_logs(f"Applying ALTER to: {my_dynamic_table}", logs)
assert_message_in_logs(f"Applying REPLACE to: {my_dynamic_table}", logs, False)

def test_change_is_applied_via_alter_downstream(self, project, adapter, my_dynamic_table):
"""
See above about the two commented assertions. In the meantime, these have been validated manually.
"""
# self.check_start_state(adapter, my_dynamic_table)

self.change_config_via_alter_downstream(project, my_dynamic_table)
_, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name])

# self.check_state_alter_change_is_applied_downstream(adapter, my_dynamic_table)

assert_message_in_logs(f"Applying ALTER to: {my_dynamic_table}", logs)
assert_message_in_logs(f"Applying REPLACE to: {my_dynamic_table}", logs, False)

@pytest.mark.skip(
"dbt-snowflake does not currently monitor any changes the trigger a full refresh"
)
Expand Down

0 comments on commit 8b5b6cf

Please sign in to comment.