Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-364] Add group info to RunResultError, RunResultFailure, RunResultWarning log lines #10535

Merged
merged 10 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20240807-155652.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Add group info to RunResultError, RunResultFailure, RunResultWarning log lines
time: 2024-08-07T15:56:52.171199-05:00
custom:
Author: aranke
Issue: ""
JiraID: "364"
2 changes: 1 addition & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@
except ValueError as e:
raise LoadAgateTableValueError(e, node=self.model)
# this is used by some adapters
table.original_abspath = os.path.abspath(path)
table.original_abspath = os.path.abspath(path) # type: ignore

Check warning on line 978 in core/dbt/context/providers.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/context/providers.py#L978

Added line #L978 was not covered by tests
return table

@contextproperty()
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,13 @@
def resource_class(cls) -> Type[GroupResource]:
return GroupResource

def to_logging_dict(self) -> Dict[str, Union[str, Dict[str, str]]]:
return {

Check warning on line 1471 in core/dbt/contracts/graph/nodes.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/contracts/graph/nodes.py#L1471

Added line #L1471 was not covered by tests
"name": self.name,
"package_name": self.package_name,
"owner": self.owner.to_dict(),
}


# ====================================
# SemanticModel node
Expand Down
9 changes: 9 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1809,12 +1809,19 @@ message ServingDocsExitInfoMsg {
ServingDocsExitInfo data = 2;
}

message Group {
aranke marked this conversation as resolved.
Show resolved Hide resolved
string name = 1;
string package_name = 3;
map<string, string> owner = 7;
}

// Z021
message RunResultWarning {
string resource_type = 1;
string node_name = 2;
string path = 3;
NodeInfo node_info = 4;
Group group = 5;
}

message RunResultWarningMsg {
Expand All @@ -1828,6 +1835,7 @@ message RunResultFailure {
string node_name = 2;
string path = 3;
NodeInfo node_info = 4;
Group group = 5;
}

message RunResultFailureMsg {
Expand All @@ -1849,6 +1857,7 @@ message StatsLineMsg {
message RunResultError {
string msg = 1;
NodeInfo node_info = 2;
Group group = 3;
}

message RunResultErrorMsg {
Expand Down
204 changes: 105 additions & 99 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

29 changes: 22 additions & 7 deletions core/dbt/task/printer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Dict
from typing import Dict, Optional

from dbt.artifacts.schemas.results import NodeStatus
from dbt.contracts.graph.nodes import Group
from dbt.events.types import (
CheckNodeTestFailure,
EndOfRunSummary,
Expand Down Expand Up @@ -68,7 +69,9 @@
fire_event(StatsLine(stats=stats))


def print_run_result_error(result, newline: bool = True, is_warning: bool = False) -> None:
def print_run_result_error(
result, newline: bool = True, is_warning: bool = False, group: Optional[Group] = None
) -> None:
# set node_info for logging events
node_info = None
if hasattr(result, "node") and result.node:
Expand All @@ -77,29 +80,36 @@
if newline:
fire_event(Formatting(""))
if is_warning:
group_dict = group.to_logging_dict() if group else None

Check warning on line 83 in core/dbt/task/printer.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/printer.py#L83

Added line #L83 was not covered by tests
fire_event(
RunResultWarning(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
)
)
else:
group_dict = group.to_logging_dict() if group else None

Check warning on line 94 in core/dbt/task/printer.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/printer.py#L94

Added line #L94 was not covered by tests
fire_event(
RunResultFailure(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
)
)

if result.message:
if is_warning:
fire_event(RunResultWarningMessage(msg=result.message, node_info=node_info))
else:
fire_event(RunResultError(msg=result.message, node_info=node_info))
group_dict = group.to_logging_dict() if group else None
fire_event(

Check warning on line 110 in core/dbt/task/printer.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/printer.py#L109-L110

Added lines #L109 - L110 were not covered by tests
RunResultError(msg=result.message, node_info=node_info, group=group_dict)
)
else:
fire_event(RunResultErrorNoMessage(status=result.status, node_info=node_info))

Expand All @@ -119,10 +129,13 @@
elif result.message is not None:
if newline:
fire_event(Formatting(""))
fire_event(RunResultError(msg=result.message, node_info=node_info))
group_dict = group.to_logging_dict() if group else None
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group_dict))


def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
def print_run_end_messages(
results, keyboard_interrupt: bool = False, groups: Optional[Dict[str, Group]] = None
) -> None:
errors, warnings = [], []
for r in results:
if r.status in (NodeStatus.RuntimeErr, NodeStatus.Error, NodeStatus.Fail):
Expand All @@ -144,9 +157,11 @@
)

for error in errors:
print_run_result_error(error, is_warning=False)
group = groups.get(error.node.unique_id) if groups and hasattr(error, "node") else None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is neat, iiuc this is printing the summary of all model errors with the group info for those models

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these summary logs have a different code (like how individual model failures have "Z002")?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes:

  1. RunResultWarning – Z021
  2. RunResultFailure – Z022
  3. RunResultError – Z024

print_run_result_error(error, is_warning=False, group=group)

Check warning on line 161 in core/dbt/task/printer.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/printer.py#L160-L161

Added lines #L160 - L161 were not covered by tests

for warning in warnings:
print_run_result_error(warning, is_warning=True)
group = groups.get(warning.node.unique_id) if groups and hasattr(warning, "node") else None
print_run_result_error(warning, is_warning=True, group=group)

Check warning on line 165 in core/dbt/task/printer.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/printer.py#L164-L165

Added lines #L164 - L165 were not covered by tests

print_run_status_line(results)
13 changes: 12 additions & 1 deletion core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,17 @@
def get_runner_type(self, _):
return ModelRunner

def get_groups_for_nodes(self, nodes):
node_to_group_name_map = {i: k for k, v in self.manifest.group_map.items() for i in v}
group_name_to_group_map = {v.name: v for v in self.manifest.groups.values()}

Check warning on line 480 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L479-L480

Added lines #L479 - L480 were not covered by tests

return {

Check warning on line 482 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L482

Added line #L482 was not covered by tests
node.unique_id: group_name_to_group_map.get(node_to_group_name_map.get(node.unique_id))
for node in nodes
}

def task_end_messages(self, results) -> None:
groups = self.get_groups_for_nodes([r.node for r in results if hasattr(r, "node")])

Check warning on line 488 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L488

Added line #L488 was not covered by tests

if results:
print_run_end_messages(results)
print_run_end_messages(results, groups=groups)

Check warning on line 491 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L491

Added line #L491 was not covered by tests
188 changes: 164 additions & 24 deletions tests/functional/logging/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dbt.events.types import InvalidOptionYAML
from dbt.tests.util import get_manifest, read_file, run_dbt
from dbt_common.events import EventLevel
from dbt_common.events.functions import fire_event

my_model_sql = """
Expand Down Expand Up @@ -103,37 +104,176 @@ def test_invalid_event_value(project, logs_dir):
assert str(excinfo.value) == "[InvalidOptionYAML]: Unable to parse dict {'option_name': 1}"


class TestNodeInfo:
groups_yml = """
groups:
- name: my_group
owner:
name: my_name
email: [email protected]
slack: my_slack
other_property: something_else
models:
- name: my_model
group: my_group
access: public
"""


class TestRunResultErrorNodeInfo:
@pytest.fixture(scope="class")
def models(self):
return {"my_model.sql": "select not_found as id"}
return {
"my_model.sql": "select not_found as id",
}

def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "run"], expect_pass=False)
assert len(results) == 1
# get log file

log_file = read_file(logs_dir, "dbt.log")
task_printer_events = [
"RunResultWarning",
"RunResultFailure",
"RunResultWarningMessage",
"RunResultError",
"RunResultErrorNoMessage",
"SQLCompiledPath",
"CheckNodeTestFailure",
]
count = 0

for log_line in log_file.split("\n"):
# skip empty lines
if len(log_line) == 0:
if not log_line:
continue
# The adapter logging also shows up, so skip non-json lines
if "[debug]" in log_line:

log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue
log_dct = json.loads(log_line)
log_data = log_dct["data"]
log_event = log_dct["info"]["name"]
if log_event in task_printer_events:
assert "node_info" in log_data
count += 1
assert count > 0

if log_json["info"]["name"] == "RunResultError":
assert "node_info" in log_json["data"]
assert log_json["data"]["node_info"]["unique_id"] == "model.test.my_model"
assert "Database Error" in log_json["data"]["msg"]


def assert_group_data(group_data):
assert group_data["name"] == "my_group"
assert group_data["owner"] == {
"name": "my_name",
"email": "[email protected]",
"slack": "my_slack",
"other_property": "something_else",
}


class TestRunResultErrorGroup:
@pytest.fixture(scope="class")
def models(self):
return {
"my_model.sql": "select not_found as id",
"groups.yml": groups_yml,
}

def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "run"], expect_pass=False)
assert len(results) == 1

log_file = read_file(logs_dir, "dbt.log")
run_result_error_count = 0

for log_line in log_file.split("\n"):
if not log_line:
continue

log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue

if log_json["info"]["name"] == "RunResultError":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_error_count += 1

assert run_result_error_count == 1


class TestRunResultFailureGroup:
@pytest.fixture(scope="class")
def models(self):
schema_yml = (
groups_yml
+ """
columns:
- name: my_column
tests:
- not_null
"""
)
print(schema_yml)
return {
"my_model.sql": "select 1 as id, null as my_column",
"groups.yml": schema_yml,
}

def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "build"], expect_pass=False)
assert len(results) == 2

log_file = read_file(logs_dir, "dbt.log")
run_result_error_count = 0
run_result_failure_count = 0

for log_line in log_file.split("\n"):
if not log_line:
continue

log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue

if log_json["info"]["name"] == "RunResultError":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_error_count += 1

if log_json["info"]["name"] == "RunResultFailure":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_failure_count += 1

assert run_result_error_count == 1
assert run_result_failure_count == 1


class TestRunResultWarningGroup:
@pytest.fixture(scope="class")
def models(self):
schema_yml = (
groups_yml
+ """
columns:
- name: my_column
tests:
- not_null:
config:
severity: warn
"""
)
print(schema_yml)
return {
"my_model.sql": "select 1 as id, null as my_column",
"groups.yml": schema_yml,
}

def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "build"])
assert len(results) == 2

log_file = read_file(logs_dir, "dbt.log")
run_result_warning_count = 0

for log_line in log_file.split("\n"):
if not log_line:
continue

log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue

if log_json["info"]["name"] == "RunResultWarning":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_warning_count += 1

assert run_result_warning_count == 1
Loading