Skip to content

Commit

Permalink
[CORE-364] Add group info to RunResultError, RunResultFailure, …
Browse files Browse the repository at this point in the history
…`RunResultWarning` log lines (#10535)
  • Loading branch information
aranke authored Aug 19, 2024
1 parent 9ca1bc5 commit 3695698
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 132 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20240807-155652.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Add group info to RunResultError, RunResultFailure, RunResultWarning log lines
time: 2024-08-07T15:56:52.171199-05:00
custom:
Author: aranke
Issue: ""
JiraID: "364"
2 changes: 1 addition & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ def load_agate_table(self) -> "agate.Table":
except ValueError as e:
raise LoadAgateTableValueError(e, node=self.model)
# this is used by some adapters
table.original_abspath = os.path.abspath(path)
table.original_abspath = os.path.abspath(path) # type: ignore
return table

@contextproperty()
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,13 @@ class Group(GroupResource, BaseNode):
def resource_class(cls) -> Type[GroupResource]:
return GroupResource

def to_logging_dict(self) -> Dict[str, Union[str, Dict[str, str]]]:
return {
"name": self.name,
"package_name": self.package_name,
"owner": self.owner.to_dict(),
}


# ====================================
# SemanticModel node
Expand Down
9 changes: 9 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1809,12 +1809,19 @@ message ServingDocsExitInfoMsg {
ServingDocsExitInfo data = 2;
}

message Group {
string name = 1;
string package_name = 3;
map<string, string> owner = 7;
}

// Z021
message RunResultWarning {
string resource_type = 1;
string node_name = 2;
string path = 3;
NodeInfo node_info = 4;
Group group = 5;
}

message RunResultWarningMsg {
Expand All @@ -1828,6 +1835,7 @@ message RunResultFailure {
string node_name = 2;
string path = 3;
NodeInfo node_info = 4;
Group group = 5;
}

message RunResultFailureMsg {
Expand All @@ -1849,6 +1857,7 @@ message StatsLineMsg {
message RunResultError {
string msg = 1;
NodeInfo node_info = 2;
Group group = 3;
}

message RunResultErrorMsg {
Expand Down
204 changes: 105 additions & 99 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

29 changes: 22 additions & 7 deletions core/dbt/task/printer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Dict
from typing import Dict, Optional

from dbt.artifacts.schemas.results import NodeStatus
from dbt.contracts.graph.nodes import Group
from dbt.events.types import (
CheckNodeTestFailure,
EndOfRunSummary,
Expand Down Expand Up @@ -68,7 +69,9 @@ def print_run_status_line(results) -> None:
fire_event(StatsLine(stats=stats))


def print_run_result_error(result, newline: bool = True, is_warning: bool = False) -> None:
def print_run_result_error(
result, newline: bool = True, is_warning: bool = False, group: Optional[Group] = None
) -> None:
# set node_info for logging events
node_info = None
if hasattr(result, "node") and result.node:
Expand All @@ -77,29 +80,36 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
if newline:
fire_event(Formatting(""))
if is_warning:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultWarning(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
)
)
else:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultFailure(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
)
)

if result.message:
if is_warning:
fire_event(RunResultWarningMessage(msg=result.message, node_info=node_info))
else:
fire_event(RunResultError(msg=result.message, node_info=node_info))
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultError(msg=result.message, node_info=node_info, group=group_dict)
)
else:
fire_event(RunResultErrorNoMessage(status=result.status, node_info=node_info))

Expand All @@ -119,10 +129,13 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
elif result.message is not None:
if newline:
fire_event(Formatting(""))
fire_event(RunResultError(msg=result.message, node_info=node_info))
group_dict = group.to_logging_dict() if group else None
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group_dict))


def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
def print_run_end_messages(
results, keyboard_interrupt: bool = False, groups: Optional[Dict[str, Group]] = None
) -> None:
errors, warnings = [], []
for r in results:
if r.status in (NodeStatus.RuntimeErr, NodeStatus.Error, NodeStatus.Fail):
Expand All @@ -144,9 +157,11 @@ def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
)

for error in errors:
print_run_result_error(error, is_warning=False)
group = groups.get(error.node.unique_id) if groups and hasattr(error, "node") else None
print_run_result_error(error, is_warning=False, group=group)

for warning in warnings:
print_run_result_error(warning, is_warning=True)
group = groups.get(warning.node.unique_id) if groups and hasattr(warning, "node") else None
print_run_result_error(warning, is_warning=True, group=group)

print_run_status_line(results)
13 changes: 12 additions & 1 deletion core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,17 @@ def get_node_selector(self) -> ResourceTypeSelector:
def get_runner_type(self, _):
return ModelRunner

def get_groups_for_nodes(self, nodes):
node_to_group_name_map = {i: k for k, v in self.manifest.group_map.items() for i in v}
group_name_to_group_map = {v.name: v for v in self.manifest.groups.values()}

return {
node.unique_id: group_name_to_group_map.get(node_to_group_name_map.get(node.unique_id))
for node in nodes
}

def task_end_messages(self, results) -> None:
groups = self.get_groups_for_nodes([r.node for r in results if hasattr(r, "node")])

if results:
print_run_end_messages(results)
print_run_end_messages(results, groups=groups)
188 changes: 164 additions & 24 deletions tests/functional/logging/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dbt.events.types import InvalidOptionYAML
from dbt.tests.util import get_manifest, read_file, run_dbt
from dbt_common.events import EventLevel
from dbt_common.events.functions import fire_event

my_model_sql = """
Expand Down Expand Up @@ -103,37 +104,176 @@ def test_invalid_event_value(project, logs_dir):
assert str(excinfo.value) == "[InvalidOptionYAML]: Unable to parse dict {'option_name': 1}"


class TestNodeInfo:
groups_yml = """
groups:
- name: my_group
owner:
name: my_name
email: [email protected]
slack: my_slack
other_property: something_else
models:
- name: my_model
group: my_group
access: public
"""


class TestRunResultErrorNodeInfo:
@pytest.fixture(scope="class")
def models(self):
return {"my_model.sql": "select not_found as id"}
return {
"my_model.sql": "select not_found as id",
}

def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "run"], expect_pass=False)
assert len(results) == 1
# get log file

log_file = read_file(logs_dir, "dbt.log")
task_printer_events = [
"RunResultWarning",
"RunResultFailure",
"RunResultWarningMessage",
"RunResultError",
"RunResultErrorNoMessage",
"SQLCompiledPath",
"CheckNodeTestFailure",
]
count = 0

for log_line in log_file.split("\n"):
# skip empty lines
if len(log_line) == 0:
if not log_line:
continue
# The adapter logging also shows up, so skip non-json lines
if "[debug]" in log_line:

log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue
log_dct = json.loads(log_line)
log_data = log_dct["data"]
log_event = log_dct["info"]["name"]
if log_event in task_printer_events:
assert "node_info" in log_data
count += 1
assert count > 0

if log_json["info"]["name"] == "RunResultError":
assert "node_info" in log_json["data"]
assert log_json["data"]["node_info"]["unique_id"] == "model.test.my_model"
assert "Database Error" in log_json["data"]["msg"]


def assert_group_data(group_data):
assert group_data["name"] == "my_group"
assert group_data["owner"] == {
"name": "my_name",
"email": "[email protected]",
"slack": "my_slack",
"other_property": "something_else",
}


class TestRunResultErrorGroup:
@pytest.fixture(scope="class")
def models(self):
return {
"my_model.sql": "select not_found as id",
"groups.yml": groups_yml,
}

def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "run"], expect_pass=False)
assert len(results) == 1

log_file = read_file(logs_dir, "dbt.log")
run_result_error_count = 0

for log_line in log_file.split("\n"):
if not log_line:
continue

log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue

if log_json["info"]["name"] == "RunResultError":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_error_count += 1

assert run_result_error_count == 1


class TestRunResultFailureGroup:
@pytest.fixture(scope="class")
def models(self):
schema_yml = (
groups_yml
+ """
columns:
- name: my_column
tests:
- not_null
"""
)
print(schema_yml)
return {
"my_model.sql": "select 1 as id, null as my_column",
"groups.yml": schema_yml,
}

def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "build"], expect_pass=False)
assert len(results) == 2

log_file = read_file(logs_dir, "dbt.log")
run_result_error_count = 0
run_result_failure_count = 0

for log_line in log_file.split("\n"):
if not log_line:
continue

log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue

if log_json["info"]["name"] == "RunResultError":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_error_count += 1

if log_json["info"]["name"] == "RunResultFailure":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_failure_count += 1

assert run_result_error_count == 1
assert run_result_failure_count == 1


class TestRunResultWarningGroup:
@pytest.fixture(scope="class")
def models(self):
schema_yml = (
groups_yml
+ """
columns:
- name: my_column
tests:
- not_null:
config:
severity: warn
"""
)
print(schema_yml)
return {
"my_model.sql": "select 1 as id, null as my_column",
"groups.yml": schema_yml,
}

def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "build"])
assert len(results) == 2

log_file = read_file(logs_dir, "dbt.log")
run_result_warning_count = 0

for log_line in log_file.split("\n"):
if not log_line:
continue

log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue

if log_json["info"]["name"] == "RunResultWarning":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_warning_count += 1

assert run_result_warning_count == 1

0 comments on commit 3695698

Please sign in to comment.