Skip to content

Commit

Permalink
Alter functional microbatch tests to work with updated `event_time_st…
Browse files Browse the repository at this point in the history
…art/end` reqs

We made it such that when `event_time_start` is specified, `event_time_end` must also
be specified (and vice versa). This broke numerous tests, in a few different ways:

1. There were tests that used `--event-time-start` without `--event-time-end` butg
were using event_time_start essentially as the `begin` time for models being initially
built or full refreshed. These tests could simply drop the `--event-time-start` and
instead rely on the `begin` value.

2. There was a test  that was trying to load a subset of the data _excluding_ some
data which would be captured by using `begin`. In this test we added an appropriate
`--event-time-end` as the `--event-time-start` was necessary to statisfy what the
test was testing

3. There was a test which was trying to ensure that two microbatch models would be
given the same "current" time. Because we wanted to ensure the "current" time code
path was used, we couldn't add `--event-time-end` to resolve the problem, thus we
needed to remove the `--event-time-start` that was being used. However, this led to
the test being incredibly slow. This was resolved by switching the relevant microbatch
models from having `batch_size`s of `day` to instead have `year`. This solution should
be good enough for roughly ~40 years? We'll figure out a better solution then, so see ya
in 2064. Assuming I haven't died before my 70th birthday, feel free to ping me to get
this taken care of.
  • Loading branch information
QMalcolm committed Oct 21, 2024
1 parent 74b7c09 commit 917d8d8
Showing 1 changed file with 25 additions and 24 deletions.
49 changes: 25 additions & 24 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import os
from datetime import datetime
from unittest import mock

import pytest
import pytz

from dbt.events.types import LogModelResult
from dbt.tests.util import (
Expand Down Expand Up @@ -42,8 +40,13 @@
select * from {{ ref('input_model') }}
"""

microbatch_model_downstream_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
microbatch_yearly_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('input_model') }}
"""

microbatch_yearly_model_downstream_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('microbatch_model') }}
"""

Expand Down Expand Up @@ -208,15 +211,6 @@ def test_run_with_event_time(self, project):
# creation events
assert batch_creation_events == 3

# build model >= 2020-01-02
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-02", "--full-refresh"])
self.assert_row_count(project, "microbatch_model", 2)

# build model < 2020-01-03
run_dbt(["run", "--event-time-end", "2020-01-03", "--full-refresh"])
self.assert_row_count(project, "microbatch_model", 2)

# build model between 2020-01-02 >= event_time < 2020-01-03
run_dbt(
[
Expand Down Expand Up @@ -416,7 +410,7 @@ def models(self):
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time_logs(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, logs = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"])
_, logs = run_dbt_and_capture(["run"])

assert "start: 2020-01-01 00:00:00+00:00" in logs
assert "end: 2020-01-02 00:00:00+00:00" in logs
Expand Down Expand Up @@ -450,7 +444,7 @@ def models(self):
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-01"], expect_pass=False)
run_dbt(["run"], expect_pass=False)
self.assert_row_count(project, "microbatch_model", 2)

run_results = get_artifact(project.project_root, "target", "run_results.json")
Expand All @@ -475,7 +469,7 @@ def models(self):
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, console_output = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"])
_, console_output = run_dbt_and_capture(["run"])

assert "PARTIAL SUCCESS (2/3)" in console_output
assert "Completed with 1 partial success" in console_output
Expand Down Expand Up @@ -515,7 +509,7 @@ def models(self):
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, console_output = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"])
_, console_output = run_dbt_and_capture(["run"])

assert "PARTIAL SUCCESS (2/3)" in console_output
assert "Completed with 1 partial success" in console_output
Expand Down Expand Up @@ -562,7 +556,7 @@ def models(self):
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-01"])
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 2)


Expand All @@ -571,7 +565,7 @@ class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest):
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-01"])
run_dbt(["run"])

# Compiled paths - compiled model without filter only
assert read_file(
Expand Down Expand Up @@ -654,7 +648,15 @@ def models(self):
def test_run_with_event_time(self, project):
# run all partitions from 2020-01-02 to spoofed "now" - 2 expected rows in output
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-02"])
run_dbt(
[
"run",
"--event-time-start",
"2020-01-02",
"--event-time-end",
"2020-01-03 13:57:00",
]
)
self.assert_row_count(project, "microbatch_model", 2)

# re-running shouldn't change what it's in the data set because there is nothing new
Expand Down Expand Up @@ -683,14 +685,13 @@ class TestMicrbobatchModelsRunWithSameCurrentTime(BaseMicrobatchTest):
def models(self):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_sql,
"second_microbatch_model.sql": microbatch_model_downstream_sql,
"microbatch_model.sql": microbatch_yearly_model_sql,
"second_microbatch_model.sql": microbatch_yearly_model_downstream_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_microbatch(self, project) -> None:
current_time = datetime.now(pytz.UTC)
run_dbt(["run", "--event-time-start", current_time.strftime("%Y-%m-%d")])
run_dbt(["run"])

run_results = get_artifact(project.project_root, "target", "run_results.json")
microbatch_model_last_batch = run_results["results"][1]["batch_results"]["successful"][-1]
Expand Down

0 comments on commit 917d8d8

Please sign in to comment.