From 3d606fa36b35365049abf128b18358699c2c4f8e Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Fri, 20 Sep 2024 14:13:15 -0500 Subject: [PATCH] Add `BatchResults` object to `BaseResult` and begin tracking during microbatch runs --- core/dbt/artifacts/schemas/results.py | 24 +++++++++++-- core/dbt/artifacts/schemas/run/v5/run.py | 2 ++ core/dbt/task/base.py | 2 ++ core/dbt/task/build.py | 1 + core/dbt/task/clone.py | 1 + core/dbt/task/compile.py | 1 + core/dbt/task/freshness.py | 2 ++ core/dbt/task/run.py | 44 +++++++++++++++++------- core/dbt/task/run_operation.py | 1 + core/dbt/task/runnable.py | 1 + core/dbt/task/show.py | 1 + core/dbt/task/test.py | 2 ++ tests/unit/task/test_run.py | 7 +++- tests/unit/test_events.py | 3 +- 14 files changed, 75 insertions(+), 17 deletions(-) diff --git a/core/dbt/artifacts/schemas/results.py b/core/dbt/artifacts/schemas/results.py index 00746c87885..10451dad78b 100644 --- a/core/dbt/artifacts/schemas/results.py +++ b/core/dbt/artifacts/schemas/results.py @@ -1,6 +1,8 @@ -from dataclasses import dataclass +from __future__ import annotations + +from dataclasses import dataclass, field from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Sequence, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union from dbt.contracts.graph.nodes import ResultNode from dbt_common.dataclass_schema import StrEnum, dbtClassMixin @@ -83,6 +85,21 @@ class FreshnessStatus(StrEnum): RuntimeErr = NodeStatus.RuntimeErr +BatchType = Tuple[Optional[datetime], datetime] + + +@dataclass +class BatchResults(dbtClassMixin): + successful: List[BatchType] = field(default_factory=list) + failed: List[BatchType] = field(default_factory=list) + + def __add__(self, other: BatchResults) -> BatchResults: + return BatchResults( + successful=self.successful + other.successful, + failed=self.failed + other.failed, + ) + + @dataclass class BaseResult(dbtClassMixin): status: Union[RunStatus, TestStatus, FreshnessStatus] @@ -92,6 +109,7 @@ class BaseResult(dbtClassMixin): adapter_response: Dict[str, Any] message: Optional[str] failures: Optional[int] + batch_results: Optional[BatchResults] @classmethod def __pre_deserialize__(cls, data): @@ -100,6 +118,8 @@ def __pre_deserialize__(cls, data): data["message"] = None if "failures" not in data: data["failures"] = None + if "batch_results" not in data: + data["batch_results"] = None return data def to_msg_dict(self): diff --git a/core/dbt/artifacts/schemas/run/v5/run.py b/core/dbt/artifacts/schemas/run/v5/run.py index 33a5859ccc7..ab358bfadb5 100644 --- a/core/dbt/artifacts/schemas/run/v5/run.py +++ b/core/dbt/artifacts/schemas/run/v5/run.py @@ -51,6 +51,7 @@ def from_node(cls, node: ResultNode, status: RunStatus, message: Optional[str]): node=node, adapter_response={}, failures=None, + batch_results=None, ) @@ -82,6 +83,7 @@ def process_run_result(result: RunResult) -> RunResultOutput: message=result.message, adapter_response=result.adapter_response, failures=result.failures, + batch_results=result.batch_results, compiled=result.node.compiled if compiled else None, # type:ignore compiled_code=result.node.compiled_code if compiled else None, # type:ignore relation_name=result.node.relation_name if compiled else None, # type:ignore diff --git a/core/dbt/task/base.py b/core/dbt/task/base.py index b7a3a6a29ab..a6b5831c634 100644 --- a/core/dbt/task/base.py +++ b/core/dbt/task/base.py @@ -227,6 +227,7 @@ def _build_run_result( agate_table=None, adapter_response=None, failures=None, + batch_results=None, ): execution_time = time.time() - start_time thread_id = threading.current_thread().name @@ -242,6 +243,7 @@ def _build_run_result( agate_table=agate_table, adapter_response=adapter_response, failures=failures, + batch_results=None, ) def error_result(self, node, message, start_time, timing_info): diff --git a/core/dbt/task/build.py b/core/dbt/task/build.py index 625a4498d6a..bb76f1f03ea 100644 --- a/core/dbt/task/build.py +++ b/core/dbt/task/build.py @@ -53,6 +53,7 @@ def execute(self, compiled_node, manifest): message="NO-OP", adapter_response={}, failures=0, + batch_results=None, agate_table=None, ) diff --git a/core/dbt/task/clone.py b/core/dbt/task/clone.py index 09a7942aa31..68012861907 100644 --- a/core/dbt/task/clone.py +++ b/core/dbt/task/clone.py @@ -43,6 +43,7 @@ def _build_run_model_result(self, model, context): message=message, adapter_response=adapter_response, failures=None, + batch_results=None, ) def compile(self, manifest: Manifest): diff --git a/core/dbt/task/compile.py b/core/dbt/task/compile.py index fcb4c69d4d4..50ecdca5f3d 100644 --- a/core/dbt/task/compile.py +++ b/core/dbt/task/compile.py @@ -35,6 +35,7 @@ def execute(self, compiled_node, manifest): message=None, adapter_response={}, failures=None, + batch_results=None, ) def compile(self, manifest: Manifest): diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index eb1508acb36..9ac4bb20bf9 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -98,6 +98,7 @@ def _build_run_result(self, node, start_time, status, timing_info, message): node=node, adapter_response={}, failures=None, + batch_results=None, ) def from_run_result(self, result, start_time, timing_info): @@ -160,6 +161,7 @@ def execute(self, compiled_node, manifest): message=None, adapter_response=adapter_response or {}, failures=None, + batch_results=None, **freshness, ) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 9ed50e25578..320d18cdbee 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -17,6 +17,8 @@ from dbt.artifacts.resources.types import BatchSize from dbt.artifacts.schemas.results import ( BaseResult, + BatchResults, + BatchType, NodeStatus, RunningStatus, RunStatus, @@ -302,25 +304,31 @@ def _build_run_model_result(self, model, context): message=str(result.response), adapter_response=adapter_response, failures=result.get("failures"), + batch_results=None, ) def _build_run_microbatch_model_result( self, model: ModelNode, batch_run_results: List[RunResult] ) -> RunResult: - failures = sum([result.failures for result in batch_run_results if result.failures]) - successes = len(batch_run_results) - failures - - if failures == 0: + batch_results = BatchResults() + for result in batch_run_results: + if result.batch_results is not None: + batch_results += result.batch_results + else: + # TODO: Should we raise an error here? + continue + + if len(batch_results.failed) == 0: status = RunStatus.Success - elif successes == 0: + elif len(batch_results.successful) == 0: status = RunStatus.Error else: status = RunStatus.PartialSuccess - if failures == 0: - msg = f"Batches: {successes} successful" + if len(batch_results.failed) == 0: + msg = f"Batches: {len(batch_results.successful)} successful" else: - msg = f"Batches: {successes} succeeded, {failures} failed" + msg = f"Batches: {len(batch_results.successful)} succeeded, {batch_results.failed} failed" return RunResult( node=model, @@ -331,10 +339,18 @@ def _build_run_microbatch_model_result( execution_time=0, message=msg, adapter_response={}, - failures=failures, + failures=len(batch_results.failed), + batch_results=batch_results, ) - def _build_failed_run_batch_result(self, model: ModelNode) -> RunResult: + def _build_succesful_run_batch_result( + self, model: ModelNode, context: Dict[str, Any], batch: BatchType + ) -> RunResult: + run_result = self._build_run_model_result(model, context) + run_result.batch_results = BatchResults(successful=[batch]) + return run_result + + def _build_failed_run_batch_result(self, model: ModelNode, batch: BatchType) -> RunResult: return RunResult( node=model, status=RunStatus.Error, @@ -344,6 +360,7 @@ def _build_failed_run_batch_result(self, model: ModelNode) -> RunResult: message="ERROR", adapter_response={}, failures=1, + batch_results=BatchResults(failed=[batch]), ) def _materialization_relations(self, result: Any, model) -> List[BaseRelation]: @@ -488,14 +505,14 @@ def _execute_microbatch_materialization( for relation in self._materialization_relations(result, model): self.adapter.cache_added(relation.incorporate(dbt_created=True)) - # Build result fo executed batch - batch_run_result = self._build_run_model_result(model, context) + # Build result of executed batch + batch_run_result = self._build_succesful_run_batch_result(model, context, batch) # Update context vars for future batches context["is_incremental"] = lambda: True context["should_full_refresh"] = lambda: False except Exception as e: exception = e - batch_run_result = self._build_failed_run_batch_result(model) + batch_run_result = self._build_failed_run_batch_result(model, batch) self.print_batch_result_line( batch_run_result, batch[0], batch_idx + 1, len(batches), exception @@ -635,6 +652,7 @@ def safe_run_hooks( adapter_response={}, execution_time=0, failures=1, + batch_results=None, ) ) diff --git a/core/dbt/task/run_operation.py b/core/dbt/task/run_operation.py index 6f7cd7b64c0..793ba81fb01 100644 --- a/core/dbt/task/run_operation.py +++ b/core/dbt/task/run_operation.py @@ -106,6 +106,7 @@ def run(self) -> RunResultsArtifact: ), thread_id=threading.current_thread().name, timing=[TimingInfo(name=macro_name, started_at=start, completed_at=end)], + batch_results=None, ) results = RunResultsArtifact.from_execution_results( diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 19a2a968df8..b86d623af74 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -251,6 +251,7 @@ def call_runner(self, runner: BaseRunner) -> RunResult: adapter_response={}, message=msg, failures=None, + batch_results=None, node=runner.node, ) diff --git a/core/dbt/task/show.py b/core/dbt/task/show.py index 0fb6551bf94..3dee429f3d9 100644 --- a/core/dbt/task/show.py +++ b/core/dbt/task/show.py @@ -52,6 +52,7 @@ def execute(self, compiled_node, manifest): adapter_response=adapter_response.to_dict(), agate_table=execute_result, failures=None, + batch_results=None, ) diff --git a/core/dbt/task/test.py b/core/dbt/task/test.py index 356328a4263..d5fb7c1df2e 100644 --- a/core/dbt/task/test.py +++ b/core/dbt/task/test.py @@ -318,6 +318,7 @@ def build_test_run_result(self, test: TestNode, result: TestResultData) -> RunRe message=message, adapter_response=result.adapter_response, failures=failures, + batch_results=None, ) return run_result @@ -343,6 +344,7 @@ def build_unit_test_run_result( message=message, adapter_response=result.adapter_response, failures=failures, + batch_results=None, ) def after_execute(self, result) -> None: diff --git a/tests/unit/task/test_run.py b/tests/unit/task/test_run.py index b4ba84ca10c..077d25a3e18 100644 --- a/tests/unit/task/test_run.py +++ b/tests/unit/task/test_run.py @@ -1,11 +1,12 @@ import threading from argparse import Namespace +from datetime import datetime, timedelta from unittest.mock import MagicMock, patch import pytest from dbt.adapters.postgres import PostgresAdapter -from dbt.artifacts.schemas.results import RunStatus +from dbt.artifacts.schemas.results import BatchResults, RunStatus from dbt.artifacts.schemas.run import RunResult from dbt.config.runtime import RuntimeConfig from dbt.contracts.graph.manifest import Manifest @@ -94,6 +95,7 @@ def run_result(self, table_model: ModelNode) -> RunResult: adapter_response={}, message="It did it", failures=None, + batch_results=None, node=table_model, ) @@ -131,6 +133,7 @@ def test_execute( def test__build_run_microbatch_model_result( self, table_model: ModelNode, model_runner: ModelRunner ) -> None: + batch = (datetime.now() - timedelta(days=1), datetime.now()) only_successes = [ RunResult( node=table_model, @@ -141,6 +144,7 @@ def test__build_run_microbatch_model_result( message="SUCCESS", adapter_response={}, failures=0, + batch_results=BatchResults(successful=[batch]), ) ] only_failures = [ @@ -153,6 +157,7 @@ def test__build_run_microbatch_model_result( message="ERROR", adapter_response={}, failures=1, + batch_results=BatchResults(failed=[batch]), ) ] mixed_results = only_failures + only_successes diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py index 17e2e2d90f8..68a3edc2614 100644 --- a/tests/unit/test_events.py +++ b/tests/unit/test_events.py @@ -558,7 +558,8 @@ def test_single_run_error(): node=None, adapter_response=dict(), message="oh no!", - failures=[], + failures=1, + batch_results=None, ) print_run_result_error(error_result)