Skip to content

Commit

Permalink
fix: unrecognized _partitiontime for time ingestion partitioned models
Browse files Browse the repository at this point in the history
  • Loading branch information
malikfm authored and Malik Fajar committed Jul 1, 2024
1 parent a87aefc commit f8867cc
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 1 deletion.
1 change: 1 addition & 0 deletions dbt_dry_run/models/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class PartitionBy(BaseModel):
field: str
data_type: Literal["timestamp", "date", "datetime", "int64"]
range: Optional[IntPartitionRange]
time_ingestion_partitioning: Optional[bool]

@root_validator(pre=True)
def lower_data_type(cls, values: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
34 changes: 33 additions & 1 deletion dbt_dry_run/node_runner/incremental_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dbt_dry_run import flags
from dbt_dry_run.exception import SchemaChangeException, UpstreamFailedException
from dbt_dry_run.literals import insert_dependant_sql_literals
from dbt_dry_run.models import Table
from dbt_dry_run.models import BigQueryFieldMode, BigQueryFieldType, Table, TableField
from dbt_dry_run.models.manifest import Node, OnSchemaChange
from dbt_dry_run.node_runner import NodeRunner
from dbt_dry_run.results import DryRunResult, DryRunStatus
Expand Down Expand Up @@ -175,6 +175,32 @@ def _get_full_refresh_config(self, node: Node) -> bool:
return node.config.full_refresh
return flags.FULL_REFRESH

def _is_time_ingestion_partitioned(self, node: Node) -> bool:
if node.config.partition_by:
if node.config.partition_by.time_ingestion_partitioning is True:
return True
return False

def _replace_partition_with_time_ingestion_column(
self, dry_run_result: DryRunResult
) -> DryRunResult:
if not dry_run_result.table:
return dry_run_result

if not dry_run_result.node.config.partition_by:
return dry_run_result

new_partition_field = TableField(
name="_PARTITIONTIME",
type=BigQueryFieldType.TIMESTAMP,
mode=BigQueryFieldMode.NULLABLE,
)

final_fields = [field for field in dry_run_result.table.fields]
final_fields.append(new_partition_field)

return dry_run_result.replace_table(Table(fields=final_fields))

def run(self, node: Node) -> DryRunResult:
try:
sql_with_literals = insert_dependant_sql_literals(node, self._results)
Expand Down Expand Up @@ -202,4 +228,10 @@ def run(self, node: Node) -> DryRunResult:
if result.status == DryRunStatus.SUCCESS:
result = handler(result, target_table)

if (
result.status == DryRunStatus.SUCCESS
and self._is_time_ingestion_partitioned(node)
):
result = self._replace_partition_with_time_ingestion_column(result)

return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{{
config(
materialized="incremental",
partition_by={
"field": "executed_at",
"data_type": "date",
"time_ingestion_partitioning": true
}
)
}}

SELECT
executed_at,
col_1,
col_2
FROM (SELECT DATE('2024-06-06') as executed_at, "foo" as col_1, "bar" as col_2)
18 changes: 18 additions & 0 deletions integration/projects/test_incremental/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,21 @@ def test_sql_header_and_max_partition(
assert_report_node_has_columns_in_order(
report_node, ["snapshot_date", "my_string", "my_func_output"]
)


def test_partition_by_time_ingestion(
compiled_project: ProjectContext,
):
node_id = "model.test_incremental.partition_by_time_ingestion"
manifest_node = compiled_project.manifest.nodes[node_id]
columns = ["executed_at", "col_1 STRING", "col_2 STRING"]
with compiled_project.create_state(manifest_node, columns, "_PARTITIONTIME", False):
run_result = compiled_project.dry_run()
assert_report_produced(run_result)
report_node = get_report_node_by_id(
run_result.report,
node_id,
)
assert_report_node_has_columns_in_order(
report_node, ["executed_at", "col_1", "col_2", "_PARTITIONTIME"]
)

0 comments on commit f8867cc

Please sign in to comment.