Skip to content

Commit

Permalink
Stop making microbatch batches with filters that will never have any …
Browse files Browse the repository at this point in the history
…rows (#10826)
  • Loading branch information
QMalcolm authored Oct 8, 2024
1 parent 5db0b81 commit f6cdacc
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 3 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20241004-133630.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Handle edge cases when a specified `--event-time-end` is equivalent to the batch
size truncated batch start time
time: 2024-10-04T13:36:30.357936-05:00
custom:
Author: QMalcolm
Issue: "10824"
12 changes: 9 additions & 3 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,15 @@ def build_start_time(self, checkpoint: Optional[datetime]):
return MicrobatchBuilder.truncate_timestamp(self.model.config.begin, batch_size)

lookback = self.model.config.lookback
start = MicrobatchBuilder.offset_timestamp(checkpoint, batch_size, -1 * lookback)

return start
# If the checkpoint is equivalent to itself truncated then the checkpoint stradles
# the batch line. In this case the last batch will end with the checkpoint, but start
# should be the previous hour/day/month/year. Thus we need to increase the lookback by
# 1 to get this affect properly.
if checkpoint == MicrobatchBuilder.truncate_timestamp(checkpoint, batch_size):
lookback += 1

return MicrobatchBuilder.offset_timestamp(checkpoint, batch_size, -1 * lookback)

def build_batches(self, start: datetime, end: datetime) -> List[BatchType]:
"""
Expand All @@ -81,7 +87,7 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]:
)

batches: List[BatchType] = [(curr_batch_start, curr_batch_end)]
while curr_batch_end <= end:
while curr_batch_end < end:
curr_batch_start = curr_batch_end
curr_batch_end = MicrobatchBuilder.offset_timestamp(curr_batch_start, batch_size, 1)
batches.append((curr_batch_start, curr_batch_end))
Expand Down
125 changes: 125 additions & 0 deletions tests/unit/materializations/incremental/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,70 @@ def test_build_end_time(
1,
datetime(2024, 9, 5, 7, 0, 0, 0, pytz.UTC),
),
(
True,
None,
datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC),
BatchSize.hour,
0,
datetime(2024, 9, 4, 23, 0, 0, 0, pytz.UTC),
),
(
True,
None,
datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC),
BatchSize.hour,
1,
datetime(2024, 9, 4, 22, 0, 0, 0, pytz.UTC),
),
(
True,
None,
datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC),
BatchSize.day,
0,
datetime(2024, 9, 4, 0, 0, 0, 0, pytz.UTC),
),
(
True,
None,
datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC),
BatchSize.day,
1,
datetime(2024, 9, 3, 0, 0, 0, 0, pytz.UTC),
),
(
True,
None,
datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC),
BatchSize.month,
0,
datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC),
),
(
True,
None,
datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC),
BatchSize.month,
1,
datetime(2024, 7, 1, 0, 0, 0, 0, pytz.UTC),
),
(
True,
None,
datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC),
BatchSize.year,
0,
datetime(2023, 1, 1, 0, 0, 0, 0, pytz.UTC),
),
(
True,
None,
datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC),
BatchSize.year,
1,
datetime(2022, 1, 1, 0, 0, 0, 0, pytz.UTC),
),
],
)
def test_build_start_time(
Expand Down Expand Up @@ -351,6 +415,67 @@ def test_build_start_time(
),
],
),
# Test when event_time_end matches the truncated batch size
(
datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC),
datetime(2026, 1, 1, 0, 0, 0, 0, pytz.UTC),
BatchSize.year,
[
(
datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC),
datetime(2025, 1, 1, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2025, 1, 1, 0, 0, 0, 0, pytz.UTC),
datetime(2026, 1, 1, 0, 0, 0, 0, pytz.UTC),
),
],
),
(
datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC),
datetime(2024, 11, 1, 0, 0, 0, 0, pytz.UTC),
BatchSize.month,
[
(
datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC),
datetime(2024, 10, 1, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 10, 1, 0, 0, 0, 0, pytz.UTC),
datetime(2024, 11, 1, 0, 0, 0, 0, pytz.UTC),
),
],
),
(
datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC),
datetime(2024, 9, 7, 0, 0, 0, 0, pytz.UTC),
BatchSize.day,
[
(
datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC),
datetime(2024, 9, 6, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 6, 0, 0, 0, 0, pytz.UTC),
datetime(2024, 9, 7, 0, 0, 0, 0, pytz.UTC),
),
],
),
(
datetime(2024, 9, 5, 1, 0, 0, 0, pytz.UTC),
datetime(2024, 9, 5, 3, 0, 0, 0, pytz.UTC),
BatchSize.hour,
[
(
datetime(2024, 9, 5, 1, 0, 0, 0, pytz.UTC),
datetime(2024, 9, 5, 2, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 5, 2, 0, 0, 0, pytz.UTC),
datetime(2024, 9, 5, 3, 0, 0, 0, pytz.UTC),
),
],
),
],
)
def test_build_batches(self, microbatch_model, start, end, batch_size, expected_batches):
Expand Down

0 comments on commit f6cdacc

Please sign in to comment.