Skip to content

Commit

Permalink
fix: unrecognized _partitiontime for time ingestion partitioned models
Browse files Browse the repository at this point in the history
  • Loading branch information
malikfm authored and Malik Fajar committed Jun 25, 2024
1 parent a87aefc commit a5f6b44
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 11 deletions.
1 change: 1 addition & 0 deletions dbt_dry_run/models/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class PartitionBy(BaseModel):
field: str
data_type: Literal["timestamp", "date", "datetime", "int64"]
range: Optional[IntPartitionRange]
time_ingestion_partitioning: Optional[bool]

@root_validator(pre=True)
def lower_data_type(cls, values: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
60 changes: 49 additions & 11 deletions dbt_dry_run/node_runner/incremental_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dbt_dry_run import flags
from dbt_dry_run.exception import SchemaChangeException, UpstreamFailedException
from dbt_dry_run.literals import insert_dependant_sql_literals
from dbt_dry_run.models import Table
from dbt_dry_run.models import BigQueryFieldMode, BigQueryFieldType, Table, TableField
from dbt_dry_run.models.manifest import Node, OnSchemaChange
from dbt_dry_run.node_runner import NodeRunner
from dbt_dry_run.results import DryRunResult, DryRunStatus
Expand Down Expand Up @@ -175,6 +175,38 @@ def _get_full_refresh_config(self, node: Node) -> bool:
return node.config.full_refresh
return flags.FULL_REFRESH

def _is_time_ingestion_partitioned(self, node: Node) -> bool:
if node.config.partition_by:
if node.config.partition_by.time_ingestion_partitioning is True:
return True
return False

def _replace_partition_with_time_ingestion_column(
self, dry_run_result: DryRunResult
) -> DryRunResult:
if not dry_run_result.table:
return dry_run_result

if not dry_run_result.node.config.partition_by:
return dry_run_result

existing_partition_field = dry_run_result.node.config.partition_by.field

new_partition_field = TableField(
name="_PARTITIONTIME",
type=BigQueryFieldType.TIMESTAMP,
mode=BigQueryFieldMode.NULLABLE,
)

final_fields = [
field
for field in dry_run_result.table.fields
if field.name != existing_partition_field
]
final_fields.append(new_partition_field)

return dry_run_result.replace_table(Table(fields=final_fields))

def run(self, node: Node) -> DryRunResult:
try:
sql_with_literals = insert_dependant_sql_literals(node, self._results)
Expand All @@ -191,15 +223,21 @@ def run(self, node: Node) -> DryRunResult:

full_refresh = self._get_full_refresh_config(node)

if result.status == DryRunStatus.SUCCESS and not full_refresh:
target_table = self._sql_runner.get_node_schema(node)
if target_table:
on_schema_change = node.config.on_schema_change or OnSchemaChange.IGNORE
handler = ON_SCHEMA_CHANGE_TABLE_HANDLER[on_schema_change]
result = self._verify_merge_type_compatibility(
node, sql_with_literals, result, target_table
)
if result.status == DryRunStatus.SUCCESS:
result = handler(result, target_table)
if result.status == DryRunStatus.SUCCESS:
if self._is_time_ingestion_partitioned(node):
result = self._replace_partition_with_time_ingestion_column(result)

if not full_refresh:
target_table = self._sql_runner.get_node_schema(node)
if target_table:
on_schema_change = (
node.config.on_schema_change or OnSchemaChange.IGNORE
)
handler = ON_SCHEMA_CHANGE_TABLE_HANDLER[on_schema_change]
result = self._verify_merge_type_compatibility(
node, sql_with_literals, result, target_table
)
if result.status == DryRunStatus.SUCCESS:
result = handler(result, target_table)

return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{{
config(
materialized="incremental",
partition_by={
"field": "executed_at",
"data_type": "date",
"time_ingestion_partitioning": true
}
)
}}

SELECT
executed_at,
col_1,
col_2
FROM (SELECT DATE('2024-06-06') as executed_at, "foo" as col_1, "bar" as col_2)
18 changes: 18 additions & 0 deletions integration/projects/test_incremental/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,21 @@ def test_sql_header_and_max_partition(
assert_report_node_has_columns_in_order(
report_node, ["snapshot_date", "my_string", "my_func_output"]
)


def test_partition_by_time_ingestion(
compiled_project: ProjectContext,
):
node_id = "model.test_incremental.partition_by_time_ingestion"
manifest_node = compiled_project.manifest.nodes[node_id]
columns = ["col_1 STRING", "col_2 STRING"]
with compiled_project.create_state(manifest_node, columns, "_PARTITIONTIME", False):
run_result = compiled_project.dry_run()
assert_report_produced(run_result)
report_node = get_report_node_by_id(
run_result.report,
node_id,
)
assert_report_node_has_columns_in_order(
report_node, ["col_1", "col_2", "_PARTITIONTIME"]
)

0 comments on commit a5f6b44

Please sign in to comment.