Skip to content

Commit

Permalink
[Backport 1.6.latest] Hotfix for 372: Use JSONEncoder in json.dumps (#…
Browse files Browse the repository at this point in the history
…8162)

Co-authored-by: Kshitij Aranke <[email protected]>
  • Loading branch information
github-actions[bot] and aranke authored Jul 21, 2023
1 parent f9c8b7c commit fd05fb7
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
4 changes: 2 additions & 2 deletions core/dbt/events/eventmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from dbt.events.format import timestamp_to_datetime_string

from dbt.events.base_types import BaseEvent, EventLevel, msg_from_base_event, EventMsg

import dbt.utils

# A Filter is a function which takes a BaseEvent and returns True if the event
# should be logged, False otherwise.
Expand Down Expand Up @@ -175,7 +175,7 @@ def create_line(self, msg: EventMsg) -> str:
from dbt.events.functions import msg_to_dict

msg_dict = msg_to_dict(msg)
raw_log_line = json.dumps(msg_dict, sort_keys=True)
raw_log_line = json.dumps(msg_dict, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder)
line = self.scrubber(raw_log_line) # type: ignore
return line

Expand Down
3 changes: 2 additions & 1 deletion core/dbt/events/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import uuid
from google.protobuf.json_format import MessageToDict

import dbt.utils

LOG_VERSION = 3
metadata_vars: Optional[Dict[str, str]] = None
Expand Down Expand Up @@ -200,7 +201,7 @@ def stop_capture_stdout_logs():
# the message may contain secrets which must be scrubbed at the usage site.
def msg_to_json(msg: EventMsg) -> str:
msg_dict = msg_to_dict(msg)
raw_log_line = json.dumps(msg_dict, sort_keys=True)
raw_log_line = json.dumps(msg_dict, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder)
return raw_log_line


Expand Down
28 changes: 15 additions & 13 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
from pathlib import PosixPath, WindowsPath

from contextlib import contextmanager
from dbt.exceptions import ConnectionError, DuplicateAliasError
from dbt.events.functions import fire_event
from dbt.events.types import RetryExternalCall, RecordRetryException
from dbt import flags
from enum import Enum
Expand All @@ -40,6 +38,7 @@
Sequence,
)

import dbt.events.functions
import dbt.exceptions

DECIMALS: Tuple[Type[Any], ...]
Expand Down Expand Up @@ -337,15 +336,18 @@ class JSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, DECIMALS):
return float(obj)
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
elif isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
return obj.isoformat()
if isinstance(obj, jinja2.Undefined):
elif isinstance(obj, jinja2.Undefined):
return ""
if hasattr(obj, "to_dict"):
elif isinstance(obj, Exception):
return repr(obj)
elif hasattr(obj, "to_dict"):
# if we have a to_dict we should try to serialize the result of
# that!
return obj.to_dict(omit_none=True)
return super().default(obj)
else:
return super().default(obj)


class ForgivingJSONEncoder(JSONEncoder):
Expand All @@ -369,7 +371,7 @@ def translate_mapping(self, kwargs: Mapping[str, Any]) -> Dict[str, Any]:
for key, value in kwargs.items():
canonical_key = self.aliases.get(key, key)
if canonical_key in result:
raise DuplicateAliasError(kwargs, self.aliases, canonical_key)
raise dbt.exceptions.DuplicateAliasError(kwargs, self.aliases, canonical_key)
result[canonical_key] = self.translate_value(value)
return result

Expand All @@ -389,9 +391,7 @@ def translate(self, value: Mapping[str, Any]) -> Dict[str, Any]:
return self.translate_mapping(value)
except RuntimeError as exc:
if "maximum recursion depth exceeded" in str(exc):
raise dbt.exceptions.RecursionError(
"Cycle detected in a value passed to translate!"
)
raise RecursionError("Cycle detected in a value passed to translate!")
raise


Expand Down Expand Up @@ -603,12 +603,14 @@ def _connection_exception_retry(fn, max_attempts: int, attempt: int = 0):
ReadError,
) as exc:
if attempt <= max_attempts - 1:
fire_event(RecordRetryException(exc=str(exc)))
fire_event(RetryExternalCall(attempt=attempt, max=max_attempts))
dbt.events.functions.fire_event(RecordRetryException(exc=str(exc)))
dbt.events.functions.fire_event(RetryExternalCall(attempt=attempt, max=max_attempts))
time.sleep(1)
return _connection_exception_retry(fn, max_attempts, attempt + 1)
else:
raise ConnectionError("External connection exception occurred: " + str(exc))
raise dbt.exceptions.ConnectionError(
"External connection exception occurred: " + str(exc)
)


# This is used to serialize the args in the run_results and in the logs.
Expand Down

0 comments on commit fd05fb7

Please sign in to comment.