diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index 9b7dcbf697b..62fa5fa6757 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -3,6 +3,7 @@ from dbt.events.base_types import NoStdOut, Event, NoFile, ShowException, Cache from dbt.events.types import EventBufferFull, T_Event, MainReportVersion, EmptyLine import dbt.flags as flags +import dbt.utils # TODO this will need to move eventually from dbt.logger import SECRET_ENV_PREFIX, make_log_dir_if_missing, GLOBAL_LOGGER @@ -89,7 +90,10 @@ def setup_event_logger(log_path, level_override=None): file_passthrough_formatter = logging.Formatter(fmt=FORMAT) file_handler = RotatingFileHandler( - filename=log_dest, encoding="utf8", maxBytes=10 * 1024 * 1024, backupCount=5 # 10 mb + filename=log_dest, + encoding="utf8", + maxBytes=10 * 1024 * 1024, + backupCount=5, # 10 mb ) file_handler.setFormatter(file_passthrough_formatter) file_handler.setLevel(logging.DEBUG) # always debug regardless of user input @@ -133,7 +137,6 @@ def scrub_secrets(msg: str, secrets: List[str]) -> str: def event_to_serializable_dict( e: T_Event, ) -> Dict[str, Any]: - log_line = dict() code: str try: @@ -204,7 +207,7 @@ def create_json_log_line(e: T_Event) -> Optional[str]: return None # will not be sent to logger # using preformatted ts string instead of formatting it here to be extra careful about timezone values = event_to_serializable_dict(e) - raw_log_line = json.dumps(values, sort_keys=True) + raw_log_line = json.dumps(values, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder) return scrub_secrets(raw_log_line, env_secrets()) @@ -241,7 +244,12 @@ def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: s def send_exc_to_logger( - l: Logger, level_tag: str, log_line: str, exc_info=True, stack_info=False, extra=False + l: Logger, + level_tag: str, + log_line: str, + exc_info=True, + stack_info=False, + extra=False, ): if level_tag == "test": # TODO after implmenting #3977 send to new test level diff --git a/core/dbt/exceptions.py b/core/dbt/exceptions.py index d78f5a86c7b..813e829f37e 100644 --- a/core/dbt/exceptions.py +++ b/core/dbt/exceptions.py @@ -2,13 +2,13 @@ import functools from typing import NoReturn, Optional, Mapping, Any -from dbt.events.functions import fire_event, scrub_secrets, env_secrets from dbt.events.types import GeneralWarningMsg, GeneralWarningException from dbt.node_types import NodeType from dbt import flags from dbt.ui import line_wrap_message, warning_tag import dbt.dataclass_schema +import dbt.events.functions def validator_error_message(exc): @@ -53,7 +53,7 @@ class RuntimeException(RuntimeError, Exception): def __init__(self, msg, node=None): self.stack = [] self.node = node - self.msg = scrub_secrets(msg, env_secrets()) + self.msg = dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()) def add_node(self, node=None): if node is not None and node is not self.node: @@ -383,7 +383,10 @@ class FailedToConnectException(DatabaseException): class CommandError(RuntimeException): def __init__(self, cwd, cmd, message="Error running command"): - cmd_scrubbed = list(scrub_secrets(cmd_txt, env_secrets()) for cmd_txt in cmd) + cmd_scrubbed = list( + dbt.events.functions.scrub_secrets(cmd_txt, dbt.events.functions.env_secrets()) + for cmd_txt in cmd + ) super().__init__(message) self.cwd = cwd self.cmd = cmd_scrubbed @@ -412,8 +415,12 @@ class CommandResultError(CommandError): def __init__(self, cwd, cmd, returncode, stdout, stderr, message="Got a non-zero returncode"): super().__init__(cwd, cmd, message) self.returncode = returncode - self.stdout = scrub_secrets(stdout.decode("utf-8"), env_secrets()) - self.stderr = scrub_secrets(stderr.decode("utf-8"), env_secrets()) + self.stdout = dbt.events.functions.scrub_secrets( + stdout.decode("utf-8"), dbt.events.functions.env_secrets() + ) + self.stderr = dbt.events.functions.scrub_secrets( + stderr.decode("utf-8"), dbt.events.functions.env_secrets() + ) self.args = (cwd, self.cmd, returncode, self.stdout, self.stderr, message) def __str__(self): @@ -454,16 +461,20 @@ def raise_database_error(msg, node=None) -> NoReturn: def raise_dependency_error(msg) -> NoReturn: - raise DependencyException(scrub_secrets(msg, env_secrets())) + raise DependencyException( + dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()) + ) def raise_git_cloning_error(error: CommandResultError) -> NoReturn: - error.cmd = scrub_secrets(str(error.cmd), env_secrets()) + error.cmd = dbt.events.functions.scrub_secrets( + str(error.cmd), dbt.events.functions.env_secrets() + ) raise error def raise_git_cloning_problem(repo) -> NoReturn: - repo = scrub_secrets(repo, env_secrets()) + repo = dbt.events.functions.scrub_secrets(repo, dbt.events.functions.env_secrets()) msg = """\ Something went wrong while cloning {} Check the debug logs for more information @@ -663,7 +674,12 @@ def get_metric_not_found_msg( ) -> str: reason = "was not found" return _get_target_failure_msg( - model, target_name, target_package, include_path=True, reason=reason, target_kind="metric" + model, + target_name, + target_package, + include_path=True, + reason=reason, + target_kind="metric", ) @@ -732,7 +748,9 @@ def missing_materialization(model, adapter_type): def bad_package_spec(repo, spec, error_message): msg = "Error checking out spec='{}' for repo {}\n{}".format(spec, repo, error_message) - raise InternalException(scrub_secrets(msg, env_secrets())) + raise InternalException( + dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()) + ) def raise_cache_inconsistent(message): @@ -1054,16 +1072,19 @@ def raise_duplicate_alias( def warn_or_error(msg, node=None, log_fmt=None): if flags.WARN_ERROR: - raise_compiler_error(scrub_secrets(msg, env_secrets()), node) + raise_compiler_error( + dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()), + node, + ) else: - fire_event(GeneralWarningMsg(msg=msg, log_fmt=log_fmt)) + dbt.events.functions.fire_event(GeneralWarningMsg(msg=msg, log_fmt=log_fmt)) def warn_or_raise(exc, log_fmt=None): if flags.WARN_ERROR: raise exc else: - fire_event(GeneralWarningException(exc=exc, log_fmt=log_fmt)) + dbt.events.functions.fire_event(GeneralWarningException(exc=exc, log_fmt=log_fmt)) def warn(msg, node=None): diff --git a/core/dbt/utils.py b/core/dbt/utils.py index 039f22dc069..7db9dd860ac 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -15,8 +15,6 @@ from pathlib import PosixPath, WindowsPath from contextlib import contextmanager -from dbt.exceptions import ConnectionException -from dbt.events.functions import fire_event from dbt.events.types import RetryExternalCall, RecordRetryException from dbt import flags from enum import Enum @@ -39,6 +37,7 @@ Sequence, ) +import dbt.events.functions import dbt.exceptions DECIMALS: Tuple[Type[Any], ...] @@ -333,15 +332,18 @@ class JSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, DECIMALS): return float(obj) - if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): + elif isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): return obj.isoformat() - if isinstance(obj, jinja2.Undefined): + elif isinstance(obj, jinja2.Undefined): return "" - if hasattr(obj, "to_dict"): + elif isinstance(obj, Exception): + return repr(obj) + elif hasattr(obj, "to_dict"): # if we have a to_dict we should try to serialize the result of # that! return obj.to_dict(omit_none=True) - return super().default(obj) + else: + return super().default(obj) class ForgivingJSONEncoder(JSONEncoder): @@ -619,12 +621,14 @@ def _connection_exception_retry(fn, max_attempts: int, attempt: int = 0): ReadError, ) as exc: if attempt <= max_attempts - 1: - fire_event(RecordRetryException(exc=exc)) - fire_event(RetryExternalCall(attempt=attempt, max=max_attempts)) + dbt.events.functions.fire_event(RecordRetryException(exc=exc)) + dbt.events.functions.fire_event(RetryExternalCall(attempt=attempt, max=max_attempts)) time.sleep(1) return _connection_exception_retry(fn, max_attempts, attempt + 1) else: - raise ConnectionException("External connection exception occurred: " + str(exc)) + raise dbt.exceptions.ConnectionException( + "External connection exception occurred: " + str(exc) + ) # This is used to serialize the args in the run_results and in the logs.