Skip to content

Commit

Permalink
Add BatchResults object to BaseResult and begin tracking during m…
Browse files Browse the repository at this point in the history
…icrobatch runs
  • Loading branch information
QMalcolm committed Sep 20, 2024
1 parent 35963b4 commit 3d606fa
Show file tree
Hide file tree
Showing 14 changed files with 75 additions and 17 deletions.
24 changes: 22 additions & 2 deletions core/dbt/artifacts/schemas/results.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from dataclasses import dataclass
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

from dbt.contracts.graph.nodes import ResultNode
from dbt_common.dataclass_schema import StrEnum, dbtClassMixin
Expand Down Expand Up @@ -83,6 +85,21 @@ class FreshnessStatus(StrEnum):
RuntimeErr = NodeStatus.RuntimeErr


BatchType = Tuple[Optional[datetime], datetime]


@dataclass
class BatchResults(dbtClassMixin):
successful: List[BatchType] = field(default_factory=list)
failed: List[BatchType] = field(default_factory=list)

def __add__(self, other: BatchResults) -> BatchResults:
return BatchResults(
successful=self.successful + other.successful,
failed=self.failed + other.failed,
)


@dataclass
class BaseResult(dbtClassMixin):
status: Union[RunStatus, TestStatus, FreshnessStatus]
Expand All @@ -92,6 +109,7 @@ class BaseResult(dbtClassMixin):
adapter_response: Dict[str, Any]
message: Optional[str]
failures: Optional[int]
batch_results: Optional[BatchResults]

@classmethod
def __pre_deserialize__(cls, data):
Expand All @@ -100,6 +118,8 @@ def __pre_deserialize__(cls, data):
data["message"] = None
if "failures" not in data:
data["failures"] = None
if "batch_results" not in data:
data["batch_results"] = None

Check warning on line 122 in core/dbt/artifacts/schemas/results.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/artifacts/schemas/results.py#L122

Added line #L122 was not covered by tests
return data

def to_msg_dict(self):
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/artifacts/schemas/run/v5/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def from_node(cls, node: ResultNode, status: RunStatus, message: Optional[str]):
node=node,
adapter_response={},
failures=None,
batch_results=None,
)


Expand Down Expand Up @@ -82,6 +83,7 @@ def process_run_result(result: RunResult) -> RunResultOutput:
message=result.message,
adapter_response=result.adapter_response,
failures=result.failures,
batch_results=result.batch_results,
compiled=result.node.compiled if compiled else None, # type:ignore
compiled_code=result.node.compiled_code if compiled else None, # type:ignore
relation_name=result.node.relation_name if compiled else None, # type:ignore
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def _build_run_result(
agate_table=None,
adapter_response=None,
failures=None,
batch_results=None,
):
execution_time = time.time() - start_time
thread_id = threading.current_thread().name
Expand All @@ -242,6 +243,7 @@ def _build_run_result(
agate_table=agate_table,
adapter_response=adapter_response,
failures=failures,
batch_results=None,
)

def error_result(self, node, message, start_time, timing_info):
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def execute(self, compiled_node, manifest):
message="NO-OP",
adapter_response={},
failures=0,
batch_results=None,
agate_table=None,
)

Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def _build_run_model_result(self, model, context):
message=message,
adapter_response=adapter_response,
failures=None,
batch_results=None,
)

def compile(self, manifest: Manifest):
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def execute(self, compiled_node, manifest):
message=None,
adapter_response={},
failures=None,
batch_results=None,
)

def compile(self, manifest: Manifest):
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def _build_run_result(self, node, start_time, status, timing_info, message):
node=node,
adapter_response={},
failures=None,
batch_results=None,
)

def from_run_result(self, result, start_time, timing_info):
Expand Down Expand Up @@ -160,6 +161,7 @@ def execute(self, compiled_node, manifest):
message=None,
adapter_response=adapter_response or {},
failures=None,
batch_results=None,
**freshness,
)

Expand Down
44 changes: 31 additions & 13 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from dbt.artifacts.resources.types import BatchSize
from dbt.artifacts.schemas.results import (
BaseResult,
BatchResults,
BatchType,
NodeStatus,
RunningStatus,
RunStatus,
Expand Down Expand Up @@ -302,25 +304,31 @@ def _build_run_model_result(self, model, context):
message=str(result.response),
adapter_response=adapter_response,
failures=result.get("failures"),
batch_results=None,
)

def _build_run_microbatch_model_result(
self, model: ModelNode, batch_run_results: List[RunResult]
) -> RunResult:
failures = sum([result.failures for result in batch_run_results if result.failures])
successes = len(batch_run_results) - failures

if failures == 0:
batch_results = BatchResults()
for result in batch_run_results:
if result.batch_results is not None:
batch_results += result.batch_results
else:
# TODO: Should we raise an error here?
continue

Check warning on line 319 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L319

Added line #L319 was not covered by tests

if len(batch_results.failed) == 0:
status = RunStatus.Success
elif successes == 0:
elif len(batch_results.successful) == 0:
status = RunStatus.Error
else:
status = RunStatus.PartialSuccess

if failures == 0:
msg = f"Batches: {successes} successful"
if len(batch_results.failed) == 0:
msg = f"Batches: {len(batch_results.successful)} successful"
else:
msg = f"Batches: {successes} succeeded, {failures} failed"
msg = f"Batches: {len(batch_results.successful)} succeeded, {batch_results.failed} failed"

return RunResult(
node=model,
Expand All @@ -331,10 +339,18 @@ def _build_run_microbatch_model_result(
execution_time=0,
message=msg,
adapter_response={},
failures=failures,
failures=len(batch_results.failed),
batch_results=batch_results,
)

def _build_failed_run_batch_result(self, model: ModelNode) -> RunResult:
def _build_succesful_run_batch_result(
self, model: ModelNode, context: Dict[str, Any], batch: BatchType
) -> RunResult:
run_result = self._build_run_model_result(model, context)
run_result.batch_results = BatchResults(successful=[batch])
return run_result

def _build_failed_run_batch_result(self, model: ModelNode, batch: BatchType) -> RunResult:
return RunResult(
node=model,
status=RunStatus.Error,
Expand All @@ -344,6 +360,7 @@ def _build_failed_run_batch_result(self, model: ModelNode) -> RunResult:
message="ERROR",
adapter_response={},
failures=1,
batch_results=BatchResults(failed=[batch]),
)

def _materialization_relations(self, result: Any, model) -> List[BaseRelation]:
Expand Down Expand Up @@ -488,14 +505,14 @@ def _execute_microbatch_materialization(
for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))

# Build result fo executed batch
batch_run_result = self._build_run_model_result(model, context)
# Build result of executed batch
batch_run_result = self._build_succesful_run_batch_result(model, context, batch)
# Update context vars for future batches
context["is_incremental"] = lambda: True
context["should_full_refresh"] = lambda: False
except Exception as e:
exception = e
batch_run_result = self._build_failed_run_batch_result(model)
batch_run_result = self._build_failed_run_batch_result(model, batch)

self.print_batch_result_line(
batch_run_result, batch[0], batch_idx + 1, len(batches), exception
Expand Down Expand Up @@ -635,6 +652,7 @@ def safe_run_hooks(
adapter_response={},
execution_time=0,
failures=1,
batch_results=None,
)
)

Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def run(self) -> RunResultsArtifact:
),
thread_id=threading.current_thread().name,
timing=[TimingInfo(name=macro_name, started_at=start, completed_at=end)],
batch_results=None,
)

results = RunResultsArtifact.from_execution_results(
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ def call_runner(self, runner: BaseRunner) -> RunResult:
adapter_response={},
message=msg,
failures=None,
batch_results=None,
node=runner.node,
)

Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def execute(self, compiled_node, manifest):
adapter_response=adapter_response.to_dict(),
agate_table=execute_result,
failures=None,
batch_results=None,
)


Expand Down
2 changes: 2 additions & 0 deletions core/dbt/task/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ def build_test_run_result(self, test: TestNode, result: TestResultData) -> RunRe
message=message,
adapter_response=result.adapter_response,
failures=failures,
batch_results=None,
)
return run_result

Expand All @@ -343,6 +344,7 @@ def build_unit_test_run_result(
message=message,
adapter_response=result.adapter_response,
failures=failures,
batch_results=None,
)

def after_execute(self, result) -> None:
Expand Down
7 changes: 6 additions & 1 deletion tests/unit/task/test_run.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import threading
from argparse import Namespace
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch

import pytest

from dbt.adapters.postgres import PostgresAdapter
from dbt.artifacts.schemas.results import RunStatus
from dbt.artifacts.schemas.results import BatchResults, RunStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.config.runtime import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
Expand Down Expand Up @@ -94,6 +95,7 @@ def run_result(self, table_model: ModelNode) -> RunResult:
adapter_response={},
message="It did it",
failures=None,
batch_results=None,
node=table_model,
)

Expand Down Expand Up @@ -131,6 +133,7 @@ def test_execute(
def test__build_run_microbatch_model_result(
self, table_model: ModelNode, model_runner: ModelRunner
) -> None:
batch = (datetime.now() - timedelta(days=1), datetime.now())
only_successes = [
RunResult(
node=table_model,
Expand All @@ -141,6 +144,7 @@ def test__build_run_microbatch_model_result(
message="SUCCESS",
adapter_response={},
failures=0,
batch_results=BatchResults(successful=[batch]),
)
]
only_failures = [
Expand All @@ -153,6 +157,7 @@ def test__build_run_microbatch_model_result(
message="ERROR",
adapter_response={},
failures=1,
batch_results=BatchResults(failed=[batch]),
)
]
mixed_results = only_failures + only_successes
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,8 @@ def test_single_run_error():
node=None,
adapter_response=dict(),
message="oh no!",
failures=[],
failures=1,
batch_results=None,
)

print_run_result_error(error_result)
Expand Down

0 comments on commit 3d606fa

Please sign in to comment.