Skip to content

Commit

Permalink
Merge branch 'main' into snapshot_column_names
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Sep 16, 2024
2 parents 220107b + aeaaedc commit 3c03f29
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 38 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240913-213312.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add test utility patch_microbatch_end_time for adapters testing
time: 2024-09-13T21:33:12.482336-04:00
custom:
Author: michelleark
Issue: "10713"
2 changes: 1 addition & 1 deletion .github/workflows/schema-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ env:

jobs:
checking-schemas:
name: "Checking schemas"
name: "Post-merge schema changes required"
runs-on: ubuntu-latest

steps:
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/artifacts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ These types of minor, non-breaking changes are tested by [tests/unit/artifacts/t

#### Updating [schemas.getdbt.com](https://schemas.getdbt.com)
Non-breaking changes to artifact schemas require an update to the corresponding jsonschemas published to [schemas.getdbt.com](https://schemas.getdbt.com), which are defined in https://github.com/dbt-labs/schemas.getdbt.com. To do so:
Note this must be done AFTER the core pull request is merged, otherwise we may end up with unresolvable conflicts and schemas that are invalid prior to base pull request merge. You may create the schemas.getdbt.com pull request prior to merging the base pull request, but do not merge until afterward.
1. Create a PR in https://github.com/dbt-labs/schemas.getdbt.com which reflects the schema changes to the artifact. The schema can be updated in-place for non-breaking changes. Example PR: https://github.com/dbt-labs/schemas.getdbt.com/pull/39
2. Merge the https://github.com/dbt-labs/schemas.getdbt.com PR
3. Observe the `Artifact Schema Check` CI check pass on the `dbt-core` PR that updates the artifact schemas, and merge the `dbt-core` PR!

Note: Although `jsonschema` validation using the schemas in [schemas.getdbt.com](https://schemas.getdbt.com) is not encouraged or formally supported, `jsonschema` validation should still continue to work once the schemas are updated because they are forward-compatible and can therefore be used to validate previous minor versions of the schema.

Expand Down
7 changes: 7 additions & 0 deletions core/dbt/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
from datetime import datetime
from io import StringIO
from typing import Any, Dict, List, Optional
from unittest import mock

import yaml

from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.factory import Adapter
from dbt.cli.main import dbtRunner
from dbt.context.providers import BaseResolver
from dbt.contracts.graph.manifest import Manifest
from dbt_common.context import _INVOCATION_CONTEXT_VAR, InvocationContext
from dbt_common.events.base_types import EventLevel
Expand Down Expand Up @@ -640,3 +642,8 @@ def safe_set_invocation_context():
if invocation_var is None:
invocation_var = _INVOCATION_CONTEXT_VAR
invocation_var.set(InvocationContext(os.environ))


def patch_microbatch_end_time(dt_str: str):
dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
return mock.patch.object(BaseResolver, "_build_end_time", return_value=dt)
42 changes: 23 additions & 19 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
from unittest import mock

import pytest
from freezegun import freeze_time

from dbt.tests.util import relation_from_name, run_dbt, write_file
from dbt.tests.util import (
patch_microbatch_end_time,
relation_from_name,
run_dbt,
write_file,
)

input_model_sql = """
{{ config(materialized='table', event_time='event_time') }}
Expand Down Expand Up @@ -133,12 +137,12 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int)
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# initial run -- backfills all data
with freeze_time("2020-01-03 13:57:00"):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# our partition grain is "day" so running the same day without new data should produce the same results
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

Expand All @@ -152,17 +156,17 @@ def test_run_with_event_time(self, project):
self.assert_row_count(project, "input_model", 5)

# re-run without changing current time => no insert
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 3)

# re-run by advancing time by one day changing current time => insert 1 row
with freeze_time("2020-01-04 14:57:00"):
with patch_microbatch_end_time("2020-01-04 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 4)

# re-run by advancing time by one more day changing current time => insert 1 more row
with freeze_time("2020-01-05 14:57:00"):
with patch_microbatch_end_time("2020-01-05 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 5)

Expand Down Expand Up @@ -198,12 +202,12 @@ def test_run_with_event_time(self, project):
run_dbt(["seed"])

# initial run -- backfills all data
with freeze_time("2020-01-03 13:57:00"):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# our partition grain is "day" so running the same day without new data should produce the same results
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

Expand All @@ -217,17 +221,17 @@ def test_run_with_event_time(self, project):
self.assert_row_count(project, "raw_source", 5)

# re-run without changing current time => no insert
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 3)

# re-run by advancing time by one day changing current time => insert 1 row
with freeze_time("2020-01-04 14:57:00"):
with patch_microbatch_end_time("2020-01-04 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 4)

# re-run by advancing time by one more day changing current time => insert 1 more row
with freeze_time("2020-01-05 14:57:00"):
with patch_microbatch_end_time("2020-01-05 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 5)

Expand All @@ -253,12 +257,12 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int)
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# initial run -- backfills all data
with freeze_time("2020-01-03 13:57:00"):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# our partition grain is "day" so running the same day without new data should produce the same results
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

Expand All @@ -272,7 +276,7 @@ def test_run_with_event_time(self, project):
self.assert_row_count(project, "input_model", 5)

# re-run without changing current time => INSERT BECAUSE INPUT MODEL ISN'T BEING FILTERED
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 5)

Expand All @@ -298,12 +302,12 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int)
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# initial run -- backfills all data
with freeze_time("2020-01-03 13:57:00"):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# our partition grain is "day" so running the same day without new data should produce the same results
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

Expand All @@ -317,7 +321,7 @@ def test_run_with_event_time(self, project):
self.assert_row_count(project, "input_model", 5)

# re-run without changing current time => no insert
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 3)

Expand All @@ -327,6 +331,6 @@ def test_run_with_event_time(self, project):
)

# re-run without changing current time => INSERT because .render() skips filtering
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 5)
18 changes: 1 addition & 17 deletions tests/functional/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import os
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import List, Optional

from dbt_common.events.base_types import BaseEvent, EventMsg
from typing import Optional


@contextmanager
Expand All @@ -20,16 +17,3 @@ def up_one(return_path: Optional[Path] = None):

def is_aware(dt: datetime) -> bool:
return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None


@dataclass
class EventCatcher:
event_to_catch: BaseEvent
caught_events: List[EventMsg] = field(default_factory=list)

def catch(self, event: EventMsg):
if event.info.name == self.event_to_catch.__name__:
self.caught_events.append(event)

def flush(self) -> None:
self.caught_events = []

0 comments on commit 3c03f29

Please sign in to comment.