Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

base microbatch support + tests #300

Merged
merged 10 commits into from
Sep 11, 2024
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240911-135404.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Default microbatch strategy implementation and base tests
time: 2024-09-11T13:54:04.231977-04:00
custom:
Author: michelleark
Issue: "302"
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import os
from pprint import pformat
from unittest import mock

import pytest
from freezegun import freeze_time

from dbt.tests.util import relation_from_name, run_dbt

_input_model_sql = """
{{ config(materialized='table', event_time='event_time') }}
select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time
union all
select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time
union all
select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time
"""

_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
select * from {{ ref('input_model') }}
"""


class BaseMicrobatch:
@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
"""
This is the SQL that defines the microbatch model, including any {{ config(..) }}
"""
return _microbatch_model_sql

@pytest.fixture(scope="class")
def input_model_sql(self) -> str:
"""
This is the SQL that defines the input model to the microbatch model, including any {{ config(..) }}.
event_time is a required configuration of this input
"""
return _input_model_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')"

@pytest.fixture(scope="class")
def models(self, microbatch_model_sql, input_model_sql):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_sql,
}

def assert_row_count(self, project, relation_name: str, expected_row_count: int):
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
relation = relation_from_name(project.adapter, relation_name)
result = project.run_sql(f"select * from {relation}", fetch="all")

assert len(result) == expected_row_count, f"{relation_name}:{pformat(result)}"
mikealfare marked this conversation as resolved.
Show resolved Hide resolved

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project, insert_two_rows_sql):
# initial run -- backfills all data
with freeze_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# our partition grain is "day" so running the same day without new data should produce the same results
with freeze_time("2020-01-03 14:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# add next two days of data
project.run_sql(insert_two_rows_sql)

self.assert_row_count(project, "input_model", 5)

# re-run without changing current time => no insert
with freeze_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 3)

# re-run by advancing time by one day changing current time => insert 1 row
with freeze_time("2020-01-04 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 4)

# re-run by advancing time by one more day changing current time => insert 1 more row
with freeze_time("2020-01-05 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 5)
2 changes: 1 addition & 1 deletion dbt/adapters/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.5.0"
version = "1.6.0a"
2 changes: 1 addition & 1 deletion dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,7 @@ def valid_incremental_strategies(self):
return ["append"]

def builtin_incremental_strategies(self):
return ["append", "delete+insert", "merge", "insert_overwrite"]
return ["append", "delete+insert", "merge", "insert_overwrite", "microbatch"]

@available.parse_none
def get_incremental_strategy_macro(self, model_context, strategy: str):
Expand Down
4 changes: 2 additions & 2 deletions dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
SerializableIterable = Union[Tuple, FrozenSet]


@dataclass(frozen=True, eq=False, repr=False)
class EventTimeFilter(FakeAPIObject, Hashable):
@dataclass
class EventTimeFilter(FakeAPIObject):
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
field_name: str
start: Optional[datetime] = None
end: Optional[datetime] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@
{% endmacro %}


{% macro get_incremental_microbatch_sql(arg_dict) %}

{{ return(adapter.dispatch('get_incremental_microbatch_sql', 'dbt')(arg_dict)) }}

{% endmacro %}

{% macro default__get_incremental_microbatch_sql(arg_dict) %}

{{ exceptions.raise_not_implemented('microbatch materialization strategy not implemented for adapter ' + adapter.type()) }}

{% endmacro %}


{% macro get_insert_into_sql(target_relation, temp_relation, dest_columns) %}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
Expand Down
Loading