diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index 54d5fc3834f..b45cda5b05a 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -1,5 +1,6 @@ from colorama import Style import dbt.events.functions as this # don't worry I hate it too. +import dbt.utils from dbt.events.base_types import NoStdOut, Event, NoFile, ShowException, Cache from dbt.events.types import T_Event, MainReportVersion, EmptyLine, EventBufferFull import dbt.flags as flags @@ -194,7 +195,7 @@ def create_json_log_line(e: T_Event) -> Optional[str]: return None # will not be sent to logger # using preformatted ts string instead of formatting it here to be extra careful about timezone values = event_to_serializable_dict(e) - raw_log_line = json.dumps(values, sort_keys=True) + raw_log_line = json.dumps(values, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder) return scrub_secrets(raw_log_line, env_secrets()) diff --git a/core/dbt/exceptions.py b/core/dbt/exceptions.py index 0f4be19937c..46773d4c365 100644 --- a/core/dbt/exceptions.py +++ b/core/dbt/exceptions.py @@ -2,13 +2,13 @@ import functools from typing import NoReturn, Optional, Mapping, Any -from dbt.events.functions import fire_event, scrub_secrets, env_secrets from dbt.events.types import GeneralWarningMsg, GeneralWarningException from dbt.node_types import NodeType from dbt import flags from dbt.ui import line_wrap_message, warning_tag import dbt.dataclass_schema +import dbt.events.functions def validator_error_message(exc): @@ -53,7 +53,7 @@ class RuntimeException(RuntimeError, Exception): def __init__(self, msg, node=None): self.stack = [] self.node = node - self.msg = scrub_secrets(msg, env_secrets()) + self.msg = dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()) def add_node(self, node=None): if node is not None and node is not self.node: @@ -383,7 +383,10 @@ class FailedToConnectException(DatabaseException): class CommandError(RuntimeException): def __init__(self, cwd, cmd, message="Error running command"): - cmd_scrubbed = list(scrub_secrets(cmd_txt, env_secrets()) for cmd_txt in cmd) + cmd_scrubbed = list( + dbt.events.functions.scrub_secrets(cmd_txt, dbt.events.functions.env_secrets()) + for cmd_txt in cmd + ) super().__init__(message) self.cwd = cwd self.cmd = cmd_scrubbed @@ -412,8 +415,12 @@ class CommandResultError(CommandError): def __init__(self, cwd, cmd, returncode, stdout, stderr, message="Got a non-zero returncode"): super().__init__(cwd, cmd, message) self.returncode = returncode - self.stdout = scrub_secrets(stdout.decode("utf-8"), env_secrets()) - self.stderr = scrub_secrets(stderr.decode("utf-8"), env_secrets()) + self.stdout = dbt.events.functions.scrub_secrets( + stdout.decode("utf-8"), dbt.events.functions.env_secrets() + ) + self.stderr = dbt.events.functions.scrub_secrets( + stderr.decode("utf-8"), dbt.events.functions.env_secrets() + ) self.args = (cwd, self.cmd, returncode, self.stdout, self.stderr, message) def __str__(self): @@ -454,16 +461,20 @@ def raise_database_error(msg, node=None) -> NoReturn: def raise_dependency_error(msg) -> NoReturn: - raise DependencyException(scrub_secrets(msg, env_secrets())) + raise DependencyException( + dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()) + ) def raise_git_cloning_error(error: CommandResultError) -> NoReturn: - error.cmd = scrub_secrets(str(error.cmd), env_secrets()) + error.cmd = dbt.events.functions.scrub_secrets( + str(error.cmd), dbt.events.functions.env_secrets() + ) raise error def raise_git_cloning_problem(repo) -> NoReturn: - repo = scrub_secrets(repo, env_secrets()) + repo = dbt.events.functions.scrub_secrets(repo, dbt.events.functions.env_secrets()) msg = """\ Something went wrong while cloning {} Check the debug logs for more information @@ -755,7 +766,9 @@ def missing_materialization(model, adapter_type): def bad_package_spec(repo, spec, error_message): msg = "Error checking out spec='{}' for repo {}\n{}".format(spec, repo, error_message) - raise InternalException(scrub_secrets(msg, env_secrets())) + raise InternalException( + dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()) + ) def raise_cache_inconsistent(message): @@ -1089,16 +1102,18 @@ def raise_duplicate_alias( def warn_or_error(msg, node=None, log_fmt=None): if flags.WARN_ERROR: - raise_compiler_error(scrub_secrets(msg, env_secrets()), node) + raise_compiler_error( + dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()), node + ) else: - fire_event(GeneralWarningMsg(msg=msg, log_fmt=log_fmt)) + dbt.events.functions.fire_event(GeneralWarningMsg(msg=msg, log_fmt=log_fmt)) def warn_or_raise(exc, log_fmt=None): if flags.WARN_ERROR: raise exc else: - fire_event(GeneralWarningException(exc=str(exc), log_fmt=log_fmt)) + dbt.events.functions.fire_event(GeneralWarningException(exc=str(exc), log_fmt=log_fmt)) def warn(msg, node=None): diff --git a/core/dbt/utils.py b/core/dbt/utils.py index 2c143c10412..46fe8a02a71 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -15,8 +15,6 @@ from pathlib import PosixPath, WindowsPath from contextlib import contextmanager -from dbt.exceptions import ConnectionException -from dbt.events.functions import fire_event from dbt.events.types import RetryExternalCall, RecordRetryException from dbt import flags from enum import Enum @@ -39,6 +37,7 @@ Sequence, ) +import dbt.events.functions import dbt.exceptions DECIMALS: Tuple[Type[Any], ...] @@ -333,15 +332,18 @@ class JSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, DECIMALS): return float(obj) - if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): + elif isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): return obj.isoformat() - if isinstance(obj, jinja2.Undefined): + elif isinstance(obj, jinja2.Undefined): return "" - if hasattr(obj, "to_dict"): + elif isinstance(obj, Exception): + return repr(obj) + elif hasattr(obj, "to_dict"): # if we have a to_dict we should try to serialize the result of # that! return obj.to_dict(omit_none=True) - return super().default(obj) + else: + return super().default(obj) class ForgivingJSONEncoder(JSONEncoder): @@ -619,12 +621,14 @@ def _connection_exception_retry(fn, max_attempts: int, attempt: int = 0): ReadError, ) as exc: if attempt <= max_attempts - 1: - fire_event(RecordRetryException(exc=str(exc))) - fire_event(RetryExternalCall(attempt=attempt, max=max_attempts)) + dbt.events.functions.fire_event(RecordRetryException(exc=str(exc))) + dbt.events.functions.fire_event(RetryExternalCall(attempt=attempt, max=max_attempts)) time.sleep(1) return _connection_exception_retry(fn, max_attempts, attempt + 1) else: - raise ConnectionException("External connection exception occurred: " + str(exc)) + raise dbt.exceptions.ConnectionException( + "External connection exception occurred: " + str(exc) + ) # This is used to serialize the args in the run_results and in the logs.