Skip to content

Commit

Permalink
[Backport 1.2.latest] Hotfix for 372: Use JSONEncoder in json.dumps (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aranke authored Jul 20, 2023
1 parent b6b50b2 commit 7f0f239
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 26 deletions.
16 changes: 12 additions & 4 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dbt.events.base_types import NoStdOut, Event, NoFile, ShowException, Cache
from dbt.events.types import EventBufferFull, T_Event, MainReportVersion, EmptyLine
import dbt.flags as flags
import dbt.utils

# TODO this will need to move eventually
from dbt.logger import SECRET_ENV_PREFIX, make_log_dir_if_missing, GLOBAL_LOGGER
Expand Down Expand Up @@ -89,7 +90,10 @@ def setup_event_logger(log_path, level_override=None):
file_passthrough_formatter = logging.Formatter(fmt=FORMAT)

file_handler = RotatingFileHandler(
filename=log_dest, encoding="utf8", maxBytes=10 * 1024 * 1024, backupCount=5 # 10 mb
filename=log_dest,
encoding="utf8",
maxBytes=10 * 1024 * 1024,
backupCount=5, # 10 mb
)
file_handler.setFormatter(file_passthrough_formatter)
file_handler.setLevel(logging.DEBUG) # always debug regardless of user input
Expand Down Expand Up @@ -133,7 +137,6 @@ def scrub_secrets(msg: str, secrets: List[str]) -> str:
def event_to_serializable_dict(
e: T_Event,
) -> Dict[str, Any]:

log_line = dict()
code: str
try:
Expand Down Expand Up @@ -204,7 +207,7 @@ def create_json_log_line(e: T_Event) -> Optional[str]:
return None # will not be sent to logger
# using preformatted ts string instead of formatting it here to be extra careful about timezone
values = event_to_serializable_dict(e)
raw_log_line = json.dumps(values, sort_keys=True)
raw_log_line = json.dumps(values, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder)
return scrub_secrets(raw_log_line, env_secrets())


Expand Down Expand Up @@ -241,7 +244,12 @@ def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: s


def send_exc_to_logger(
l: Logger, level_tag: str, log_line: str, exc_info=True, stack_info=False, extra=False
l: Logger,
level_tag: str,
log_line: str,
exc_info=True,
stack_info=False,
extra=False,
):
if level_tag == "test":
# TODO after implmenting #3977 send to new test level
Expand Down
47 changes: 34 additions & 13 deletions core/dbt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import functools
from typing import NoReturn, Optional, Mapping, Any

from dbt.events.functions import fire_event, scrub_secrets, env_secrets
from dbt.events.types import GeneralWarningMsg, GeneralWarningException
from dbt.node_types import NodeType
from dbt import flags
from dbt.ui import line_wrap_message, warning_tag

import dbt.dataclass_schema
import dbt.events.functions


def validator_error_message(exc):
Expand Down Expand Up @@ -53,7 +53,7 @@ class RuntimeException(RuntimeError, Exception):
def __init__(self, msg, node=None):
self.stack = []
self.node = node
self.msg = scrub_secrets(msg, env_secrets())
self.msg = dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets())

def add_node(self, node=None):
if node is not None and node is not self.node:
Expand Down Expand Up @@ -383,7 +383,10 @@ class FailedToConnectException(DatabaseException):

class CommandError(RuntimeException):
def __init__(self, cwd, cmd, message="Error running command"):
cmd_scrubbed = list(scrub_secrets(cmd_txt, env_secrets()) for cmd_txt in cmd)
cmd_scrubbed = list(
dbt.events.functions.scrub_secrets(cmd_txt, dbt.events.functions.env_secrets())
for cmd_txt in cmd
)
super().__init__(message)
self.cwd = cwd
self.cmd = cmd_scrubbed
Expand Down Expand Up @@ -412,8 +415,12 @@ class CommandResultError(CommandError):
def __init__(self, cwd, cmd, returncode, stdout, stderr, message="Got a non-zero returncode"):
super().__init__(cwd, cmd, message)
self.returncode = returncode
self.stdout = scrub_secrets(stdout.decode("utf-8"), env_secrets())
self.stderr = scrub_secrets(stderr.decode("utf-8"), env_secrets())
self.stdout = dbt.events.functions.scrub_secrets(
stdout.decode("utf-8"), dbt.events.functions.env_secrets()
)
self.stderr = dbt.events.functions.scrub_secrets(
stderr.decode("utf-8"), dbt.events.functions.env_secrets()
)
self.args = (cwd, self.cmd, returncode, self.stdout, self.stderr, message)

def __str__(self):
Expand Down Expand Up @@ -454,16 +461,20 @@ def raise_database_error(msg, node=None) -> NoReturn:


def raise_dependency_error(msg) -> NoReturn:
raise DependencyException(scrub_secrets(msg, env_secrets()))
raise DependencyException(
dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets())
)


def raise_git_cloning_error(error: CommandResultError) -> NoReturn:
error.cmd = scrub_secrets(str(error.cmd), env_secrets())
error.cmd = dbt.events.functions.scrub_secrets(
str(error.cmd), dbt.events.functions.env_secrets()
)
raise error


def raise_git_cloning_problem(repo) -> NoReturn:
repo = scrub_secrets(repo, env_secrets())
repo = dbt.events.functions.scrub_secrets(repo, dbt.events.functions.env_secrets())
msg = """\
Something went wrong while cloning {}
Check the debug logs for more information
Expand Down Expand Up @@ -663,7 +674,12 @@ def get_metric_not_found_msg(
) -> str:
reason = "was not found"
return _get_target_failure_msg(
model, target_name, target_package, include_path=True, reason=reason, target_kind="metric"
model,
target_name,
target_package,
include_path=True,
reason=reason,
target_kind="metric",
)


Expand Down Expand Up @@ -732,7 +748,9 @@ def missing_materialization(model, adapter_type):

def bad_package_spec(repo, spec, error_message):
msg = "Error checking out spec='{}' for repo {}\n{}".format(spec, repo, error_message)
raise InternalException(scrub_secrets(msg, env_secrets()))
raise InternalException(
dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets())
)


def raise_cache_inconsistent(message):
Expand Down Expand Up @@ -1054,16 +1072,19 @@ def raise_duplicate_alias(

def warn_or_error(msg, node=None, log_fmt=None):
if flags.WARN_ERROR:
raise_compiler_error(scrub_secrets(msg, env_secrets()), node)
raise_compiler_error(
dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()),
node,
)
else:
fire_event(GeneralWarningMsg(msg=msg, log_fmt=log_fmt))
dbt.events.functions.fire_event(GeneralWarningMsg(msg=msg, log_fmt=log_fmt))


def warn_or_raise(exc, log_fmt=None):
if flags.WARN_ERROR:
raise exc
else:
fire_event(GeneralWarningException(exc=exc, log_fmt=log_fmt))
dbt.events.functions.fire_event(GeneralWarningException(exc=exc, log_fmt=log_fmt))


def warn(msg, node=None):
Expand Down
22 changes: 13 additions & 9 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
from pathlib import PosixPath, WindowsPath

from contextlib import contextmanager
from dbt.exceptions import ConnectionException
from dbt.events.functions import fire_event
from dbt.events.types import RetryExternalCall, RecordRetryException
from dbt import flags
from enum import Enum
Expand All @@ -39,6 +37,7 @@
Sequence,
)

import dbt.events.functions
import dbt.exceptions

DECIMALS: Tuple[Type[Any], ...]
Expand Down Expand Up @@ -333,15 +332,18 @@ class JSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, DECIMALS):
return float(obj)
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
elif isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
return obj.isoformat()
if isinstance(obj, jinja2.Undefined):
elif isinstance(obj, jinja2.Undefined):
return ""
if hasattr(obj, "to_dict"):
elif isinstance(obj, Exception):
return repr(obj)
elif hasattr(obj, "to_dict"):
# if we have a to_dict we should try to serialize the result of
# that!
return obj.to_dict(omit_none=True)
return super().default(obj)
else:
return super().default(obj)


class ForgivingJSONEncoder(JSONEncoder):
Expand Down Expand Up @@ -619,12 +621,14 @@ def _connection_exception_retry(fn, max_attempts: int, attempt: int = 0):
ReadError,
) as exc:
if attempt <= max_attempts - 1:
fire_event(RecordRetryException(exc=exc))
fire_event(RetryExternalCall(attempt=attempt, max=max_attempts))
dbt.events.functions.fire_event(RecordRetryException(exc=exc))
dbt.events.functions.fire_event(RetryExternalCall(attempt=attempt, max=max_attempts))
time.sleep(1)
return _connection_exception_retry(fn, max_attempts, attempt + 1)
else:
raise ConnectionException("External connection exception occurred: " + str(exc))
raise dbt.exceptions.ConnectionException(
"External connection exception occurred: " + str(exc)
)


# This is used to serialize the args in the run_results and in the logs.
Expand Down

0 comments on commit 7f0f239

Please sign in to comment.