Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 1.2.latest] Hotfix for 372: Use JSONEncoder in json.dumps #8170

Merged
merged 3 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dbt.events.base_types import NoStdOut, Event, NoFile, ShowException, Cache
from dbt.events.types import EventBufferFull, T_Event, MainReportVersion, EmptyLine
import dbt.flags as flags
import dbt.utils

# TODO this will need to move eventually
from dbt.logger import SECRET_ENV_PREFIX, make_log_dir_if_missing, GLOBAL_LOGGER
Expand Down Expand Up @@ -89,7 +90,10 @@ def setup_event_logger(log_path, level_override=None):
file_passthrough_formatter = logging.Formatter(fmt=FORMAT)

file_handler = RotatingFileHandler(
filename=log_dest, encoding="utf8", maxBytes=10 * 1024 * 1024, backupCount=5 # 10 mb
filename=log_dest,
encoding="utf8",
maxBytes=10 * 1024 * 1024,
backupCount=5, # 10 mb
)
file_handler.setFormatter(file_passthrough_formatter)
file_handler.setLevel(logging.DEBUG) # always debug regardless of user input
Expand Down Expand Up @@ -133,7 +137,6 @@ def scrub_secrets(msg: str, secrets: List[str]) -> str:
def event_to_serializable_dict(
e: T_Event,
) -> Dict[str, Any]:

log_line = dict()
code: str
try:
Expand Down Expand Up @@ -204,7 +207,7 @@ def create_json_log_line(e: T_Event) -> Optional[str]:
return None # will not be sent to logger
# using preformatted ts string instead of formatting it here to be extra careful about timezone
values = event_to_serializable_dict(e)
raw_log_line = json.dumps(values, sort_keys=True)
raw_log_line = json.dumps(values, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder)
return scrub_secrets(raw_log_line, env_secrets())


Expand Down Expand Up @@ -241,7 +244,12 @@ def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: s


def send_exc_to_logger(
l: Logger, level_tag: str, log_line: str, exc_info=True, stack_info=False, extra=False
l: Logger,
level_tag: str,
log_line: str,
exc_info=True,
stack_info=False,
extra=False,
):
if level_tag == "test":
# TODO after implmenting #3977 send to new test level
Expand Down
47 changes: 34 additions & 13 deletions core/dbt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import functools
from typing import NoReturn, Optional, Mapping, Any

from dbt.events.functions import fire_event, scrub_secrets, env_secrets
from dbt.events.types import GeneralWarningMsg, GeneralWarningException
from dbt.node_types import NodeType
from dbt import flags
from dbt.ui import line_wrap_message, warning_tag

import dbt.dataclass_schema
import dbt.events.functions


def validator_error_message(exc):
Expand Down Expand Up @@ -53,7 +53,7 @@ class RuntimeException(RuntimeError, Exception):
def __init__(self, msg, node=None):
self.stack = []
self.node = node
self.msg = scrub_secrets(msg, env_secrets())
self.msg = dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets())

def add_node(self, node=None):
if node is not None and node is not self.node:
Expand Down Expand Up @@ -383,7 +383,10 @@ class FailedToConnectException(DatabaseException):

class CommandError(RuntimeException):
def __init__(self, cwd, cmd, message="Error running command"):
cmd_scrubbed = list(scrub_secrets(cmd_txt, env_secrets()) for cmd_txt in cmd)
cmd_scrubbed = list(
dbt.events.functions.scrub_secrets(cmd_txt, dbt.events.functions.env_secrets())
for cmd_txt in cmd
)
super().__init__(message)
self.cwd = cwd
self.cmd = cmd_scrubbed
Expand Down Expand Up @@ -412,8 +415,12 @@ class CommandResultError(CommandError):
def __init__(self, cwd, cmd, returncode, stdout, stderr, message="Got a non-zero returncode"):
super().__init__(cwd, cmd, message)
self.returncode = returncode
self.stdout = scrub_secrets(stdout.decode("utf-8"), env_secrets())
self.stderr = scrub_secrets(stderr.decode("utf-8"), env_secrets())
self.stdout = dbt.events.functions.scrub_secrets(
stdout.decode("utf-8"), dbt.events.functions.env_secrets()
)
self.stderr = dbt.events.functions.scrub_secrets(
stderr.decode("utf-8"), dbt.events.functions.env_secrets()
)
self.args = (cwd, self.cmd, returncode, self.stdout, self.stderr, message)

def __str__(self):
Expand Down Expand Up @@ -454,16 +461,20 @@ def raise_database_error(msg, node=None) -> NoReturn:


def raise_dependency_error(msg) -> NoReturn:
raise DependencyException(scrub_secrets(msg, env_secrets()))
raise DependencyException(
dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets())
)


def raise_git_cloning_error(error: CommandResultError) -> NoReturn:
error.cmd = scrub_secrets(str(error.cmd), env_secrets())
error.cmd = dbt.events.functions.scrub_secrets(
str(error.cmd), dbt.events.functions.env_secrets()
)
raise error


def raise_git_cloning_problem(repo) -> NoReturn:
repo = scrub_secrets(repo, env_secrets())
repo = dbt.events.functions.scrub_secrets(repo, dbt.events.functions.env_secrets())
msg = """\
Something went wrong while cloning {}
Check the debug logs for more information
Expand Down Expand Up @@ -663,7 +674,12 @@ def get_metric_not_found_msg(
) -> str:
reason = "was not found"
return _get_target_failure_msg(
model, target_name, target_package, include_path=True, reason=reason, target_kind="metric"
model,
target_name,
target_package,
include_path=True,
reason=reason,
target_kind="metric",
)


Expand Down Expand Up @@ -732,7 +748,9 @@ def missing_materialization(model, adapter_type):

def bad_package_spec(repo, spec, error_message):
msg = "Error checking out spec='{}' for repo {}\n{}".format(spec, repo, error_message)
raise InternalException(scrub_secrets(msg, env_secrets()))
raise InternalException(
dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets())
)


def raise_cache_inconsistent(message):
Expand Down Expand Up @@ -1054,16 +1072,19 @@ def raise_duplicate_alias(

def warn_or_error(msg, node=None, log_fmt=None):
if flags.WARN_ERROR:
raise_compiler_error(scrub_secrets(msg, env_secrets()), node)
raise_compiler_error(
dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()),
node,
)
else:
fire_event(GeneralWarningMsg(msg=msg, log_fmt=log_fmt))
dbt.events.functions.fire_event(GeneralWarningMsg(msg=msg, log_fmt=log_fmt))


def warn_or_raise(exc, log_fmt=None):
if flags.WARN_ERROR:
raise exc
else:
fire_event(GeneralWarningException(exc=exc, log_fmt=log_fmt))
dbt.events.functions.fire_event(GeneralWarningException(exc=exc, log_fmt=log_fmt))


def warn(msg, node=None):
Expand Down
22 changes: 13 additions & 9 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
from pathlib import PosixPath, WindowsPath

from contextlib import contextmanager
from dbt.exceptions import ConnectionException
from dbt.events.functions import fire_event
from dbt.events.types import RetryExternalCall, RecordRetryException
from dbt import flags
from enum import Enum
Expand All @@ -39,6 +37,7 @@
Sequence,
)

import dbt.events.functions
import dbt.exceptions

DECIMALS: Tuple[Type[Any], ...]
Expand Down Expand Up @@ -333,15 +332,18 @@ class JSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, DECIMALS):
return float(obj)
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
elif isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
return obj.isoformat()
if isinstance(obj, jinja2.Undefined):
elif isinstance(obj, jinja2.Undefined):
return ""
if hasattr(obj, "to_dict"):
elif isinstance(obj, Exception):
return repr(obj)
elif hasattr(obj, "to_dict"):
# if we have a to_dict we should try to serialize the result of
# that!
return obj.to_dict(omit_none=True)
return super().default(obj)
else:
return super().default(obj)


class ForgivingJSONEncoder(JSONEncoder):
Expand Down Expand Up @@ -619,12 +621,14 @@ def _connection_exception_retry(fn, max_attempts: int, attempt: int = 0):
ReadError,
) as exc:
if attempt <= max_attempts - 1:
fire_event(RecordRetryException(exc=exc))
fire_event(RetryExternalCall(attempt=attempt, max=max_attempts))
dbt.events.functions.fire_event(RecordRetryException(exc=exc))
dbt.events.functions.fire_event(RetryExternalCall(attempt=attempt, max=max_attempts))
time.sleep(1)
return _connection_exception_retry(fn, max_attempts, attempt + 1)
else:
raise ConnectionException("External connection exception occurred: " + str(exc))
raise dbt.exceptions.ConnectionException(
"External connection exception occurred: " + str(exc)
)


# This is used to serialize the args in the run_results and in the logs.
Expand Down
Loading