Skip to content

Commit

Permalink
merge in main
Browse files Browse the repository at this point in the history
  • Loading branch information
DevonFulcher committed Oct 29, 2024
2 parents 6da0bc6 + 6b5db17 commit b7ffa49
Show file tree
Hide file tree
Showing 35 changed files with 1,037 additions and 670 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241028-173419.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Emit warning when microbatch model has no input with `event_time` config
time: 2024-10-28T17:34:19.195209-04:00
custom:
Author: michelleark
Issue: "10926"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241017-134857.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Change `lookback` default from `0` to `1` to ensure better data completeness
time: 2024-10-17T13:48:57.805205-07:00
custom:
Author: QMalcolm
Issue: "10867"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241022-222927.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Implement partial parsing for all-yaml snapshots
time: 2024-10-22T22:29:27.396378-04:00
custom:
Author: gshank
Issue: "10903"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241023-152054.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Restore source quoting behaviour when quoting config provided in dbt_project.yml
time: 2024-10-23T15:20:54.766893-04:00
custom:
Author: michelleark
Issue: "10892"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241024-104938.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix bug when referencing deprecated models
time: 2024-10-24T10:49:38.352328-06:00
custom:
Author: dbeatty10 danlsn
Issue: "10915"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241028-132751.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: 'Fix ''model'' jinja context variable type to dict '
time: 2024-10-28T13:27:51.604093-04:00
custom:
Author: michelleark
Issue: "10927"
2 changes: 1 addition & 1 deletion core/dbt/artifacts/resources/v1/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class NodeConfig(NodeAndTestConfig):
materialized: str = "view"
incremental_strategy: Optional[str] = None
batch_size: Any = None
lookback: Any = 0
lookback: Any = 1
begin: Any = None
persist_docs: Dict[str, Any] = field(default_factory=dict)
post_hook: List[Hook] = field(
Expand Down
9 changes: 8 additions & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,15 @@ def resolve(self, source_name: str, table_name: str):
target_kind="source",
disabled=(isinstance(target_source, Disabled)),
)

# Source quoting does _not_ respect global configs in dbt_project.yml, as documented here:
# https://docs.getdbt.com/reference/project-configs/quoting
# Use an object with an empty quoting field to bypass any settings in self.
class SourceQuotingBaseConfig:
quoting: Dict[str, Any] = {}

return self.Relation.create_from(
self.config,
SourceQuotingBaseConfig(),
target_source,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter(target_source),
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ class SchemaSourceFile(BaseSourceFile):
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
metrics: List[str] = field(default_factory=list)
snapshots: List[str] = field(default_factory=list)
# The following field will no longer be used. Leaving
# here to avoid breaking existing projects. To be removed
# later if possible.
Expand Down
9 changes: 6 additions & 3 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
SeedNode,
SemanticModel,
SingularTestNode,
SnapshotNode,
SourceDefinition,
UnitTestDefinition,
UnitTestFileFixture,
Expand Down Expand Up @@ -1600,12 +1601,14 @@ def add_node(self, source_file: AnySourceFile, node: ManifestNode, test_from=Non
if isinstance(node, GenericTestNode):
assert test_from
source_file.add_test(node.unique_id, test_from)
if isinstance(node, Metric):
elif isinstance(node, Metric):
source_file.metrics.append(node.unique_id)
if isinstance(node, Exposure):
elif isinstance(node, Exposure):
source_file.exposures.append(node.unique_id)
if isinstance(node, Group):
elif isinstance(node, Group):
source_file.groups.append(node.unique_id)
elif isinstance(node, SnapshotNode):
source_file.snapshots.append(node.unique_id)
elif isinstance(source_file, FixtureSourceFile):
pass
else:
Expand Down
11 changes: 11 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,17 @@ message FreshnessConfigProblemMsg {
}


// I074
message MicrobatchModelNoEventTimeInputs {
string model_name = 1;
}

message MicrobatchModelNoEventTimeInputsMsg {
CoreEventInfo info = 1;
MicrobatchModelNoEventTimeInputs data = 2;
}


// M - Deps generation


Expand Down
1,173 changes: 587 additions & 586 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,19 @@ def message(self) -> str:
return self.msg


class MicrobatchModelNoEventTimeInputs(WarnLevel):
def code(self) -> str:
return "I074"

def message(self) -> str:
msg = (
f"The microbatch model '{self.model_name}' has no 'ref' or 'source' input with an 'event_time' configuration. "
"\nThis means no filtering can be applied and can result in unexpected duplicate records in the resulting microbatch model."
)

return warning_tag(msg)


# =======================================================
# M - Deps generation
# =======================================================
Expand Down
22 changes: 21 additions & 1 deletion core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, timedelta
from typing import List, Optional
from typing import Any, Dict, List, Optional

import pytz

Expand Down Expand Up @@ -99,6 +99,26 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]:

return batches

def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]:
"""
Create context with entries that reflect microbatch model + incremental execution state
Assumes self.model has been (re)-compiled with necessary batch filters applied.
"""
batch_context: Dict[str, Any] = {}

# Microbatch model properties
batch_context["model"] = self.model.to_dict()
batch_context["sql"] = self.model.compiled_code
batch_context["compiled_code"] = self.model.compiled_code

# Add incremental context variables for batches running incrementally
if incremental_batch:
batch_context["is_incremental"] = lambda: True
batch_context["should_full_refresh"] = lambda: False

return batch_context

@staticmethod
def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime:
"""Truncates the passed in timestamp based on the batch_size and then applies the offset by the batch_size.
Expand Down
17 changes: 12 additions & 5 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
DeprecatedModel,
DeprecatedReference,
InvalidDisabledTargetInTestNode,
MicrobatchModelNoEventTimeInputs,
NodeNotFoundOrDisabled,
ParsedFileLoadFailed,
ParsePerfInfoPath,
Expand Down Expand Up @@ -590,7 +591,7 @@ def check_for_model_deprecations(self):
# Get the child_nodes and check for deprecations.
child_nodes = self.manifest.child_map[node.unique_id]
for child_unique_id in child_nodes:
child_node = self.manifest.nodes[child_unique_id]
child_node = self.manifest.nodes.get(child_unique_id)
if not isinstance(child_node, ModelNode):
continue
if node.is_past_deprecation_date:
Expand Down Expand Up @@ -1442,13 +1443,19 @@ def check_valid_microbatch_config(self):
)

# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time and not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
if input_event_time:
if not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
has_input_with_event_time_config = True

if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def write_perf_info(self, target_path: str):
path = os.path.join(target_path, PERF_INFO_FILE_NAME)
Expand Down
21 changes: 21 additions & 0 deletions core/dbt/parser/partial.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,10 +658,14 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
key_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
if key_diff["changed"]:
for elem in key_diff["changed"]:
if dict_key == "snapshots" and "relation" in elem:
self.delete_yaml_snapshot(schema_file, elem)
self.delete_schema_mssa_links(schema_file, dict_key, elem)
self.merge_patch(schema_file, dict_key, elem, True)
if key_diff["deleted"]:
for elem in key_diff["deleted"]:
if dict_key == "snapshots" and "relation" in elem:
self.delete_yaml_snapshot(schema_file, elem)
self.delete_schema_mssa_links(schema_file, dict_key, elem)
if key_diff["added"]:
for elem in key_diff["added"]:
Expand All @@ -673,6 +677,8 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
if dict_key == "snapshots" and "relation" in elem:
self.delete_yaml_snapshot(schema_file, elem)
self.delete_schema_mssa_links(schema_file, dict_key, elem)
self.merge_patch(schema_file, dict_key, elem, True)

Expand Down Expand Up @@ -828,6 +834,8 @@ def delete_schema_mssa_links(self, schema_file, dict_key, elem):
# remove elem node and remove unique_id from node_patches
for elem_unique_id in elem_unique_ids:
# might have been already removed
# For all-yaml snapshots, we don't do this, since the node
# should have already been removed.
if (
elem_unique_id in self.saved_manifest.nodes
or elem_unique_id in self.saved_manifest.disabled
Expand Down Expand Up @@ -868,6 +876,19 @@ def remove_tests(self, schema_file, dict_key, name):
self.saved_manifest.nodes.pop(test_unique_id)
schema_file.remove_tests(dict_key, name)

def delete_yaml_snapshot(self, schema_file, snapshot_dict):
snapshot_name = snapshot_dict["name"]
snapshots = schema_file.snapshots.copy()
for unique_id in snapshots:
if unique_id in self.saved_manifest.nodes:
snapshot = self.saved_manifest.nodes[unique_id]
if snapshot.name == snapshot_name:
self.saved_manifest.nodes.pop(unique_id)
schema_file.snapshots.remove(unique_id)
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)
schema_file.snapshots.remove(unique_id)

def delete_schema_source(self, schema_file, source_dict):
# both patches, tests, and source nodes
source_name = source_dict["name"]
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,9 @@ def _add_yaml_snapshot_nodes_to_manifest(
snapshot_node.raw_code = "select * from {{ " + snapshot["relation"] + " }}"

# Add our new node to the manifest, and note that ref lookup collections
# will need to be rebuilt.
self.manifest.add_node_nofile(snapshot_node)
# will need to be rebuilt. This adds the node unique_id to the "snapshots"
# list in the SchemaSourceFile.
self.manifest.add_node(block.file, snapshot_node)
rebuild_refs = True

if rebuild_refs:
Expand Down
49 changes: 31 additions & 18 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
import time
from copy import deepcopy
from dataclasses import asdict
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type

Expand Down Expand Up @@ -100,7 +101,14 @@ def get_execution_status(sql: str, adapter: BaseAdapter) -> Tuple[RunStatus, str
return status, message


def track_model_run(index, num_nodes, run_model_result):
def _get_adapter_info(adapter, run_model_result) -> Dict[str, Any]:
"""Each adapter returns a dataclass with a flexible dictionary for
adapter-specific fields. Only the non-'model_adapter_details' fields
are guaranteed cross adapter."""
return asdict(adapter.get_adapter_run_info(run_model_result.node.config)) if adapter else {}


def track_model_run(index, num_nodes, run_model_result, adapter=None):
if tracking.active_user is None:
raise DbtInternalError("cannot track model run with no active user")
invocation_id = get_invocation_id()
Expand All @@ -116,6 +124,7 @@ def track_model_run(index, num_nodes, run_model_result):
contract_enforced = False
versioned = False
incremental_strategy = None

tracking.track_model_run(
{
"invocation_id": invocation_id,
Expand All @@ -135,6 +144,7 @@ def track_model_run(index, num_nodes, run_model_result):
"contract_enforced": contract_enforced,
"access": access,
"versioned": versioned,
"adapter_info": _get_adapter_info(adapter, run_model_result),
}
)

Expand Down Expand Up @@ -283,7 +293,7 @@ def before_execute(self) -> None:
self.print_start_line()

def after_execute(self, result) -> None:
track_model_run(self.node_index, self.num_nodes, result)
track_model_run(self.node_index, self.num_nodes, result, adapter=self.adapter)
self.print_result_line(result)

def _build_run_model_result(self, model, context, elapsed_time: float = 0.0):
Expand Down Expand Up @@ -489,28 +499,29 @@ def _execute_microbatch_materialization(
materialization_macro: MacroProtocol,
) -> List[RunResult]:
batch_results: List[RunResult] = []
microbatch_builder = MicrobatchBuilder(
model=model,
is_incremental=self._is_incremental(model),
event_time_start=getattr(self.config.args, "EVENT_TIME_START", None),
event_time_end=getattr(self.config.args, "EVENT_TIME_END", None),
default_end_time=self.config.invoked_at,
)
# Indicates whether current batch should be run incrementally
incremental_batch = False

# Note currently (9/30/2024) model.batch_info is only ever _not_ `None`
# IFF `dbt retry` is being run and the microbatch model had batches which
# failed on the run of the model (which is being retried)
if model.batch_info is None:
microbatch_builder = MicrobatchBuilder(
model=model,
is_incremental=self._is_incremental(model),
event_time_start=getattr(self.config.args, "EVENT_TIME_START", None),
event_time_end=getattr(self.config.args, "EVENT_TIME_END", None),
default_end_time=self.config.invoked_at,
)
end = microbatch_builder.build_end_time()
start = microbatch_builder.build_start_time(end)
batches = microbatch_builder.build_batches(start, end)
else:
batches = model.batch_info.failed
# if there is batch info, then don't run as full_refresh and do force is_incremental
# If there is batch info, then don't run as full_refresh and do force is_incremental
# not doing this risks blowing away the work that has already been done
if self._has_relation(model=model):
context["is_incremental"] = lambda: True
context["should_full_refresh"] = lambda: False
incremental_batch = True

# iterate over each batch, calling materialization_macro to get a batch-level run result
for batch_idx, batch in enumerate(batches):
Expand All @@ -532,9 +543,11 @@ def _execute_microbatch_materialization(
batch[0], model.config.batch_size
),
)
context["model"] = model
context["sql"] = model.compiled_code
context["compiled_code"] = model.compiled_code
# Update jinja context with batch context members
batch_context = microbatch_builder.build_batch_context(
incremental_batch=incremental_batch
)
context.update(batch_context)

# Materialize batch and cache any materialized relations
result = MacroGenerator(
Expand All @@ -547,9 +560,9 @@ def _execute_microbatch_materialization(
batch_run_result = self._build_succesful_run_batch_result(
model, context, batch, time.perf_counter() - start_time
)
# Update context vars for future batches
context["is_incremental"] = lambda: True
context["should_full_refresh"] = lambda: False
# At least one batch has been inserted successfully!
incremental_batch = True

except Exception as e:
exception = e
batch_run_result = self._build_failed_run_batch_result(
Expand Down
Loading

0 comments on commit b7ffa49

Please sign in to comment.