diff --git a/core/dbt/events/eventmgr.py b/core/dbt/events/eventmgr.py index e52dae706b5..c3444103207 100644 --- a/core/dbt/events/eventmgr.py +++ b/core/dbt/events/eventmgr.py @@ -13,7 +13,7 @@ from dbt.events.format import timestamp_to_datetime_string from dbt.events.base_types import BaseEvent, EventLevel, msg_from_base_event, EventMsg - +import dbt.utils # A Filter is a function which takes a BaseEvent and returns True if the event # should be logged, False otherwise. @@ -175,7 +175,7 @@ def create_line(self, msg: EventMsg) -> str: from dbt.events.functions import msg_to_dict msg_dict = msg_to_dict(msg) - raw_log_line = json.dumps(msg_dict, sort_keys=True) + raw_log_line = json.dumps(msg_dict, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder) line = self.scrubber(raw_log_line) # type: ignore return line diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index 9a1c2340a2d..6a2b7f4fe97 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -13,6 +13,7 @@ import uuid from google.protobuf.json_format import MessageToDict +import dbt.utils LOG_VERSION = 3 metadata_vars: Optional[Dict[str, str]] = None @@ -200,7 +201,7 @@ def stop_capture_stdout_logs(): # the message may contain secrets which must be scrubbed at the usage site. def msg_to_json(msg: EventMsg) -> str: msg_dict = msg_to_dict(msg) - raw_log_line = json.dumps(msg_dict, sort_keys=True) + raw_log_line = json.dumps(msg_dict, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder) return raw_log_line diff --git a/core/dbt/utils.py b/core/dbt/utils.py index 9ddebcaf71a..26a97778d38 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -16,8 +16,6 @@ from pathlib import PosixPath, WindowsPath from contextlib import contextmanager -from dbt.exceptions import ConnectionError, DuplicateAliasError -from dbt.events.functions import fire_event from dbt.events.types import RetryExternalCall, RecordRetryException from dbt import flags from enum import Enum @@ -40,6 +38,7 @@ Sequence, ) +import dbt.events.functions import dbt.exceptions DECIMALS: Tuple[Type[Any], ...] @@ -337,15 +336,18 @@ class JSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, DECIMALS): return float(obj) - if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): + elif isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): return obj.isoformat() - if isinstance(obj, jinja2.Undefined): + elif isinstance(obj, jinja2.Undefined): return "" - if hasattr(obj, "to_dict"): + elif isinstance(obj, Exception): + return repr(obj) + elif hasattr(obj, "to_dict"): # if we have a to_dict we should try to serialize the result of # that! return obj.to_dict(omit_none=True) - return super().default(obj) + else: + return super().default(obj) class ForgivingJSONEncoder(JSONEncoder): @@ -369,7 +371,7 @@ def translate_mapping(self, kwargs: Mapping[str, Any]) -> Dict[str, Any]: for key, value in kwargs.items(): canonical_key = self.aliases.get(key, key) if canonical_key in result: - raise DuplicateAliasError(kwargs, self.aliases, canonical_key) + raise dbt.exceptions.DuplicateAliasError(kwargs, self.aliases, canonical_key) result[canonical_key] = self.translate_value(value) return result @@ -389,9 +391,7 @@ def translate(self, value: Mapping[str, Any]) -> Dict[str, Any]: return self.translate_mapping(value) except RuntimeError as exc: if "maximum recursion depth exceeded" in str(exc): - raise dbt.exceptions.RecursionError( - "Cycle detected in a value passed to translate!" - ) + raise RecursionError("Cycle detected in a value passed to translate!") raise @@ -603,12 +603,14 @@ def _connection_exception_retry(fn, max_attempts: int, attempt: int = 0): ReadError, ) as exc: if attempt <= max_attempts - 1: - fire_event(RecordRetryException(exc=str(exc))) - fire_event(RetryExternalCall(attempt=attempt, max=max_attempts)) + dbt.events.functions.fire_event(RecordRetryException(exc=str(exc))) + dbt.events.functions.fire_event(RetryExternalCall(attempt=attempt, max=max_attempts)) time.sleep(1) return _connection_exception_retry(fn, max_attempts, attempt + 1) else: - raise ConnectionError("External connection exception occurred: " + str(exc)) + raise dbt.exceptions.ConnectionError( + "External connection exception occurred: " + str(exc) + ) # This is used to serialize the args in the run_results and in the logs.