Skip to content

Commit

Permalink
base microbatch support + tests (#300)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Sep 11, 2024
1 parent 1edeb74 commit 629f22f
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 4 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240911-135404.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Default microbatch strategy implementation and base tests
time: 2024-09-11T13:54:04.231977-04:00
custom:
Author: michelleark
Issue: "302"
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import os
from pprint import pformat
from unittest import mock

import pytest
from freezegun import freeze_time

from dbt.tests.util import relation_from_name, run_dbt

_input_model_sql = """
{{ config(materialized='table', event_time='event_time') }}
select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time
union all
select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time
union all
select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time
"""

_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
select * from {{ ref('input_model') }}
"""


class BaseMicrobatch:
@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
"""
This is the SQL that defines the microbatch model, including any {{ config(..) }}
"""
return _microbatch_model_sql

@pytest.fixture(scope="class")
def input_model_sql(self) -> str:
"""
This is the SQL that defines the input model to the microbatch model, including any {{ config(..) }}.
event_time is a required configuration of this input
"""
return _input_model_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')"

@pytest.fixture(scope="class")
def models(self, microbatch_model_sql, input_model_sql):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_sql,
}

def assert_row_count(self, project, relation_name: str, expected_row_count: int):
relation = relation_from_name(project.adapter, relation_name)
result = project.run_sql(f"select * from {relation}", fetch="all")

assert len(result) == expected_row_count, f"{relation_name}:{pformat(result)}"

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project, insert_two_rows_sql):
# initial run -- backfills all data
with freeze_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# our partition grain is "day" so running the same day without new data should produce the same results
with freeze_time("2020-01-03 14:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# add next two days of data
project.run_sql(insert_two_rows_sql)

self.assert_row_count(project, "input_model", 5)

# re-run without changing current time => no insert
with freeze_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 3)

# re-run by advancing time by one day changing current time => insert 1 row
with freeze_time("2020-01-04 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 4)

# re-run by advancing time by one more day changing current time => insert 1 more row
with freeze_time("2020-01-05 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 5)
2 changes: 1 addition & 1 deletion dbt/adapters/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.5.0"
version = "1.6.0a"
2 changes: 1 addition & 1 deletion dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,7 @@ def valid_incremental_strategies(self):
return ["append"]

def builtin_incremental_strategies(self):
return ["append", "delete+insert", "merge", "insert_overwrite"]
return ["append", "delete+insert", "merge", "insert_overwrite", "microbatch"]

@available.parse_none
def get_incremental_strategy_macro(self, model_context, strategy: str):
Expand Down
4 changes: 2 additions & 2 deletions dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
SerializableIterable = Union[Tuple, FrozenSet]


@dataclass(frozen=True, eq=False, repr=False)
class EventTimeFilter(FakeAPIObject, Hashable):
@dataclass
class EventTimeFilter(FakeAPIObject):
field_name: str
start: Optional[datetime] = None
end: Optional[datetime] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@
{% endmacro %}


{% macro get_incremental_microbatch_sql(arg_dict) %}

{{ return(adapter.dispatch('get_incremental_microbatch_sql', 'dbt')(arg_dict)) }}

{% endmacro %}

{% macro default__get_incremental_microbatch_sql(arg_dict) %}

{{ exceptions.raise_not_implemented('microbatch materialization strategy not implemented for adapter ' + adapter.type()) }}

{% endmacro %}


{% macro get_insert_into_sql(target_relation, temp_relation, dest_columns) %}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
Expand Down

0 comments on commit 629f22f

Please sign in to comment.