Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 1.6.latest] Hotfix for 372: Use JSONEncoder in json.dumps #8162

Merged
merged 1 commit into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/dbt/events/eventmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from dbt.events.format import timestamp_to_datetime_string

from dbt.events.base_types import BaseEvent, EventLevel, msg_from_base_event, EventMsg

import dbt.utils

# A Filter is a function which takes a BaseEvent and returns True if the event
# should be logged, False otherwise.
Expand Down Expand Up @@ -175,7 +175,7 @@ def create_line(self, msg: EventMsg) -> str:
from dbt.events.functions import msg_to_dict

msg_dict = msg_to_dict(msg)
raw_log_line = json.dumps(msg_dict, sort_keys=True)
raw_log_line = json.dumps(msg_dict, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder)
line = self.scrubber(raw_log_line) # type: ignore
return line

Expand Down
3 changes: 2 additions & 1 deletion core/dbt/events/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import uuid
from google.protobuf.json_format import MessageToDict

import dbt.utils

LOG_VERSION = 3
metadata_vars: Optional[Dict[str, str]] = None
Expand Down Expand Up @@ -200,7 +201,7 @@ def stop_capture_stdout_logs():
# the message may contain secrets which must be scrubbed at the usage site.
def msg_to_json(msg: EventMsg) -> str:
msg_dict = msg_to_dict(msg)
raw_log_line = json.dumps(msg_dict, sort_keys=True)
raw_log_line = json.dumps(msg_dict, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder)
return raw_log_line


Expand Down
28 changes: 15 additions & 13 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
from pathlib import PosixPath, WindowsPath

from contextlib import contextmanager
from dbt.exceptions import ConnectionError, DuplicateAliasError
from dbt.events.functions import fire_event
from dbt.events.types import RetryExternalCall, RecordRetryException
from dbt import flags
from enum import Enum
Expand All @@ -40,6 +38,7 @@
Sequence,
)

import dbt.events.functions
import dbt.exceptions

DECIMALS: Tuple[Type[Any], ...]
Expand Down Expand Up @@ -337,15 +336,18 @@ class JSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, DECIMALS):
return float(obj)
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
elif isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
return obj.isoformat()
if isinstance(obj, jinja2.Undefined):
elif isinstance(obj, jinja2.Undefined):
return ""
if hasattr(obj, "to_dict"):
elif isinstance(obj, Exception):
return repr(obj)
elif hasattr(obj, "to_dict"):
# if we have a to_dict we should try to serialize the result of
# that!
return obj.to_dict(omit_none=True)
return super().default(obj)
else:
return super().default(obj)


class ForgivingJSONEncoder(JSONEncoder):
Expand All @@ -369,7 +371,7 @@ def translate_mapping(self, kwargs: Mapping[str, Any]) -> Dict[str, Any]:
for key, value in kwargs.items():
canonical_key = self.aliases.get(key, key)
if canonical_key in result:
raise DuplicateAliasError(kwargs, self.aliases, canonical_key)
raise dbt.exceptions.DuplicateAliasError(kwargs, self.aliases, canonical_key)
result[canonical_key] = self.translate_value(value)
return result

Expand All @@ -389,9 +391,7 @@ def translate(self, value: Mapping[str, Any]) -> Dict[str, Any]:
return self.translate_mapping(value)
except RuntimeError as exc:
if "maximum recursion depth exceeded" in str(exc):
raise dbt.exceptions.RecursionError(
"Cycle detected in a value passed to translate!"
)
raise RecursionError("Cycle detected in a value passed to translate!")
raise


Expand Down Expand Up @@ -603,12 +603,14 @@ def _connection_exception_retry(fn, max_attempts: int, attempt: int = 0):
ReadError,
) as exc:
if attempt <= max_attempts - 1:
fire_event(RecordRetryException(exc=str(exc)))
fire_event(RetryExternalCall(attempt=attempt, max=max_attempts))
dbt.events.functions.fire_event(RecordRetryException(exc=str(exc)))
dbt.events.functions.fire_event(RetryExternalCall(attempt=attempt, max=max_attempts))
time.sleep(1)
return _connection_exception_retry(fn, max_attempts, attempt + 1)
else:
raise ConnectionError("External connection exception occurred: " + str(exc))
raise dbt.exceptions.ConnectionError(
"External connection exception occurred: " + str(exc)
)


# This is used to serialize the args in the run_results and in the logs.
Expand Down
Loading