Skip to content

Commit

Permalink
Track batch execution time for microbatch models (#10828)
Browse files Browse the repository at this point in the history
* Begin testing that microbatch execution times are being tracked and set

* Begin tracking the execution time of batches for microbatch models

* Add changie doc

* Additional assertions in microbatch testing
  • Loading branch information
QMalcolm authored Oct 8, 2024
1 parent fc8eb82 commit 5db0b81
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 13 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241004-163908.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Begin tracking execution time of microbatch model batches
time: 2024-10-04T16:39:08.464064-05:00
custom:
Author: QMalcolm
Issue: "10825"
34 changes: 25 additions & 9 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import os
import threading
import time
from copy import deepcopy
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type
Expand Down Expand Up @@ -284,7 +285,7 @@ def after_execute(self, result) -> None:
track_model_run(self.node_index, self.num_nodes, result)
self.print_result_line(result)

def _build_run_model_result(self, model, context):
def _build_run_model_result(self, model, context, elapsed_time: float = 0.0):
result = context["load_result"]("main")
if not result:
raise DbtRuntimeError("main is not being called during running model")
Expand All @@ -296,7 +297,7 @@ def _build_run_model_result(self, model, context):
status=RunStatus.Success,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
execution_time=elapsed_time,
message=str(result.response),
adapter_response=adapter_response,
failures=result.get("failures"),
Expand Down Expand Up @@ -340,7 +341,8 @@ def _build_run_microbatch_model_result(
status=status,
timing=[],
thread_id=threading.current_thread().name,
# TODO -- why isn't this getting propagated to logs?
# The execution_time here doesn't get propagated to logs because
# `safe_run_hooks` handles the elapsed time at the node level
execution_time=0,
message=msg,
adapter_response={},
Expand All @@ -349,19 +351,28 @@ def _build_run_microbatch_model_result(
)

def _build_succesful_run_batch_result(
self, model: ModelNode, context: Dict[str, Any], batch: BatchType
self,
model: ModelNode,
context: Dict[str, Any],
batch: BatchType,
elapsed_time: float = 0.0,
) -> RunResult:
run_result = self._build_run_model_result(model, context)
run_result = self._build_run_model_result(model, context, elapsed_time)
run_result.batch_results = BatchResults(successful=[batch])
return run_result

def _build_failed_run_batch_result(self, model: ModelNode, batch: BatchType) -> RunResult:
def _build_failed_run_batch_result(
self,
model: ModelNode,
batch: BatchType,
elapsed_time: float = 0.0,
) -> RunResult:
return RunResult(
node=model,
status=RunStatus.Error,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
execution_time=elapsed_time,
message="ERROR",
adapter_response={},
failures=1,
Expand Down Expand Up @@ -504,6 +515,7 @@ def _execute_microbatch_materialization(
self.print_batch_start_line(batch[0], batch_idx + 1, len(batches))

exception = None
start_time = time.perf_counter()
try:
# Set start/end in context prior to re-compiling
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
Expand All @@ -530,13 +542,17 @@ def _execute_microbatch_materialization(
self.adapter.cache_added(relation.incorporate(dbt_created=True))

# Build result of executed batch
batch_run_result = self._build_succesful_run_batch_result(model, context, batch)
batch_run_result = self._build_succesful_run_batch_result(
model, context, batch, time.perf_counter() - start_time
)
# Update context vars for future batches
context["is_incremental"] = lambda: True
context["should_full_refresh"] = lambda: False
except Exception as e:
exception = e
batch_run_result = self._build_failed_run_batch_result(model, batch)
batch_run_result = self._build_failed_run_batch_result(
model, batch, time.perf_counter() - start_time
)

self.print_batch_result_line(
batch_run_result, batch[0], batch_idx + 1, len(batches), exception
Expand Down
7 changes: 4 additions & 3 deletions core/dbt/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from contextvars import ContextVar, copy_context
from datetime import datetime
from io import StringIO
from typing import Any, Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional
from unittest import mock

import pytz
Expand All @@ -17,7 +17,7 @@
from dbt.contracts.graph.manifest import Manifest
from dbt.materializations.incremental.microbatch import MicrobatchBuilder
from dbt_common.context import _INVOCATION_CONTEXT_VAR, InvocationContext
from dbt_common.events.base_types import EventLevel
from dbt_common.events.base_types import EventLevel, EventMsg
from dbt_common.events.functions import (
capture_stdout_logs,
fire_event,
Expand Down Expand Up @@ -76,6 +76,7 @@
def run_dbt(
args: Optional[List[str]] = None,
expect_pass: bool = True,
callbacks: Optional[List[Callable[[EventMsg], None]]] = None,
):
# reset global vars
reset_metadata_vars()
Expand All @@ -93,7 +94,7 @@ def run_dbt(
args.extend(["--project-dir", project_dir])
if profiles_dir and "--profiles-dir" not in args:
args.extend(["--profiles-dir", profiles_dir])
dbt = dbtRunner()
dbt = dbtRunner(callbacks=callbacks)

res = dbt.invoke(args)

Expand Down
16 changes: 15 additions & 1 deletion tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest

from dbt.events.types import LogModelResult
from dbt.tests.util import (
get_artifact,
patch_microbatch_end_time,
Expand All @@ -12,6 +13,7 @@
run_dbt_and_capture,
write_file,
)
from tests.utils import EventCatcher

input_model_sql = """
{{ config(materialized='table', event_time='event_time') }}
Expand Down Expand Up @@ -186,10 +188,22 @@ class TestMicrobatchCLI(BaseMicrobatchTest):
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run without --event-time-start or --event-time-end - 3 expected rows in output
catcher = EventCatcher(event_to_catch=LogModelResult)

with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
run_dbt(["run"], callbacks=[catcher.catch])
self.assert_row_count(project, "microbatch_model", 3)

assert len(catcher.caught_events) == 5
batch_creation_events = 0
for caught_event in catcher.caught_events:
if "batch 2020" in caught_event.data.description:
batch_creation_events += 1
assert caught_event.data.execution_time > 0
# 3 batches should have been run, so there should be 3 batch
# creation events
assert batch_creation_events == 3

# build model >= 2020-01-02
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-02", "--full-refresh"])
Expand Down

0 comments on commit 5db0b81

Please sign in to comment.