Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write microbatch compiled + run code to separate target files #10743

Merged
merged 8 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240920-172419.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Write microbatch compiled/run targets to separate files, one per batch
time: 2024-09-20T17:24:19.219556+01:00
custom:
Author: michelleark
Issue: "10714"
11 changes: 8 additions & 3 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,9 @@ def write_graph_file(self, linker: Linker, manifest: Manifest):
linker.write_graph(graph_path, manifest)

# writes the "compiled_code" into the target/compiled directory
def _write_node(self, node: ManifestSQLNode) -> ManifestSQLNode:
def _write_node(
self, node: ManifestSQLNode, split_suffix: Optional[str] = None
) -> ManifestSQLNode:
if not node.extra_ctes_injected or node.resource_type in (
NodeType.Snapshot,
NodeType.Seed,
Expand All @@ -530,7 +532,9 @@ def _write_node(self, node: ManifestSQLNode) -> ManifestSQLNode:
fire_event(WritingInjectedSQLForNode(node_info=get_node_info()))

if node.compiled_code:
node.compiled_path = node.get_target_write_path(self.config.target_path, "compiled")
node.compiled_path = node.get_target_write_path(
self.config.target_path, "compiled", split_suffix
)
node.write_node(self.config.project_root, node.compiled_path, node.compiled_code)
return node

Expand All @@ -540,6 +544,7 @@ def compile_node(
manifest: Manifest,
extra_context: Optional[Dict[str, Any]] = None,
write: bool = True,
split_suffix: Optional[str] = None,
) -> ManifestSQLNode:
"""This is the main entry point into this code. It's called by
CompileRunner.compile, GenericRPCRunner.compile, and
Expand All @@ -562,7 +567,7 @@ def compile_node(

node, _ = self._recursively_prepend_ctes(node, manifest, extra_context)
if write:
self._write_node(node)
self._write_node(node, split_suffix=split_suffix)
return node


Expand Down
17 changes: 16 additions & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
Exposure,
Macro,
ManifestNode,
ModelNode,
Resource,
SeedNode,
SemanticModel,
Expand All @@ -77,6 +78,7 @@
SecretEnvVarLocationError,
TargetNotFoundError,
)
from dbt.materializations.incremental.microbatch import MicrobatchBuilder
from dbt.node_types import ModelLanguage, NodeType
from dbt.utils import MultiDict, args_to_dict
from dbt_common.clients.jinja import MacroProtocol
Expand Down Expand Up @@ -972,7 +974,20 @@
# macros/source defs aren't 'writeable'.
if isinstance(self.model, (Macro, SourceDefinition)):
raise MacrosSourcesUnWriteableError(node=self.model)
self.model.build_path = self.model.get_target_write_path(self.config.target_path, "run")

split_suffix = None
if (
isinstance(self.model, ModelNode)
and self.model.config.get("incremental_strategy") == "microbatch"
):
split_suffix = MicrobatchBuilder.format_batch_start(
self.model.config.get("__dbt_internal_microbatch_event_time_start"),
self.model.config.batch_size,
)

Check warning on line 986 in core/dbt/context/providers.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/context/providers.py#L986

Added line #L986 was not covered by tests

self.model.build_path = self.model.get_target_write_path(
self.config.target_path, "run", split_suffix=split_suffix
)
self.model.write_node(self.config.project_root, self.model.build_path, payload)
return ""

Expand Down
14 changes: 13 additions & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import (
Any,
Dict,
Expand Down Expand Up @@ -243,14 +244,25 @@ def clear_event_status(self):

@dataclass
class ParsedNode(ParsedResource, NodeInfoMixin, ParsedNodeMandatory, SerializableType):
def get_target_write_path(self, target_path: str, subdirectory: str):
def get_target_write_path(
self, target_path: str, subdirectory: str, split_suffix: Optional[str] = None
):
# This is called for both the "compiled" subdirectory of "target" and the "run" subdirectory
if os.path.basename(self.path) == os.path.basename(self.original_file_path):
# One-to-one relationship of nodes to files.
path = self.original_file_path
else:
# Many-to-one relationship of nodes to files.
path = os.path.join(self.original_file_path, self.path)

if split_suffix:
pathlib_path = Path(path)
path = str(
pathlib_path.parent
/ pathlib_path.stem
/ (pathlib_path.stem + f"_{split_suffix}" + pathlib_path.suffix)
)

target_write_path = os.path.join(target_path, subdirectory, self.package_name, path)
return target_write_path

Expand Down
11 changes: 11 additions & 0 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,14 @@ def truncate_timestamp(timestamp: datetime, batch_size: BatchSize):
truncated = datetime(timestamp.year, 1, 1, 0, 0, 0, 0, pytz.utc)

return truncated

@staticmethod
def format_batch_start(
batch_start: Optional[datetime], batch_size: BatchSize
) -> Optional[str]:
if batch_start is None:
return batch_start

return str(
batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start
)
17 changes: 11 additions & 6 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
)
from dbt.adapters.exceptions import MissingMaterializationError
from dbt.artifacts.resources import Hook
from dbt.artifacts.resources.types import BatchSize
from dbt.artifacts.schemas.results import (
BaseResult,
NodeStatus,
Expand Down Expand Up @@ -197,11 +196,10 @@ def describe_node(self) -> str:

def describe_batch(self, batch_start: Optional[datetime]) -> str:
# Only visualize date if batch_start year/month/day
formatted_batch_start = (
batch_start.date()
if (batch_start and self.node.config.batch_size != BatchSize.hour)
else batch_start
formatted_batch_start = MicrobatchBuilder.format_batch_start(
batch_start, self.node.config.batch_size
)

return f"batch {formatted_batch_start} of {self.get_node_representation()}"

def print_start_line(self):
Expand Down Expand Up @@ -463,7 +461,14 @@ def _execute_microbatch_materialization(
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]

# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(model, manifest, {})
self.compiler.compile_node(
model,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], model.config.batch_size
),
)
context["model"] = model
context["sql"] = model.compiled_code
context["compiled_code"] = model.compiled_code
Expand Down
76 changes: 76 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dbt.tests.util import (
patch_microbatch_end_time,
read_file,
relation_from_name,
run_dbt,
run_dbt_and_capture,
Expand Down Expand Up @@ -442,3 +443,78 @@ def test_run_with_event_time(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-01"])
self.assert_row_count(project, "microbatch_model", 2)


class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest):
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-01"])

# Compiled paths - compiled model without filter only
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion with @graciegoheen, we decided it could still be useful to see the non-batched (no filters applied) model file, and at the top-level is where users would expect it. I've added a test to formalize this expected behaviour, but it's something that would be easy to change if we get beta feedback!

assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model.sql",
)

# Compiled paths - batch compilations
assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-01.sql",
)
assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-02.sql",
)
assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-03.sql",
)

assert read_file(
project.project_root,
"target",
"run",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-01.sql",
)
assert read_file(
project.project_root,
"target",
"run",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-02.sql",
)
assert read_file(
project.project_root,
"target",
"run",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-03.sql",
)
34 changes: 33 additions & 1 deletion tests/unit/graph/test_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)
from dbt.artifacts.resources.v1.semantic_model import NodeRelation
from dbt.contracts.graph.model_config import TestConfig
from dbt.contracts.graph.nodes import ColumnInfo, ModelNode, SemanticModel
from dbt.contracts.graph.nodes import ColumnInfo, ModelNode, ParsedNode, SemanticModel
from dbt.node_types import NodeType
from dbt_common.contracts.constraints import (
ColumnLevelConstraint,
Expand Down Expand Up @@ -391,3 +391,35 @@ def test_disabled_unique_combo_multiple():

def assertSameContents(list1, list2):
assert sorted(list1) == sorted(list2)


class TestParsedNode:
@pytest.fixture(scope="class")
def parsed_node(self) -> ParsedNode:
return ParsedNode(
resource_type=NodeType.Model,
unique_id="model.test_package.test_name",
name="test_name",
package_name="test_package",
schema="test_schema",
alias="test_alias",
fqn=["models", "test_name"],
original_file_path="test_original_file_path",
checksum=FileHash.from_contents("checksum"),
path="test_path.sql",
database=None,
)

def test_get_target_write_path(self, parsed_node):
write_path = parsed_node.get_target_write_path("target_path", "subdirectory")
assert (
write_path
== "target_path/subdirectory/test_package/test_original_file_path/test_path.sql"
)

def test_get_target_write_path_split(self, parsed_node):
write_path = parsed_node.get_target_write_path("target_path", "subdirectory", "split")
assert (
write_path
== "target_path/subdirectory/test_package/test_original_file_path/test_path/test_path_split.sql"
)
16 changes: 16 additions & 0 deletions tests/unit/materializations/incremental/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,19 @@ def test_offset_timestamp(self, timestamp, batch_size, offset, expected_timestam
)
def test_truncate_timestamp(self, timestamp, batch_size, expected_timestamp):
assert MicrobatchBuilder.truncate_timestamp(timestamp, batch_size) == expected_timestamp

@pytest.mark.parametrize(
"batch_size,batch_start,expected_formatted_batch_start",
[
(None, None, None),
(BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.hour, datetime(2020, 1, 1, 1), "2020-01-01 01:00:00"),
],
)
def test_format_batch_start(self, batch_size, batch_start, expected_formatted_batch_start):
assert (
MicrobatchBuilder.format_batch_start(batch_start, batch_size)
== expected_formatted_batch_start
)
Loading