From cfa089fe6101773572d93c4ddbb94ecbae99ad83 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 20 Jul 2023 10:48:39 -0500 Subject: [PATCH 1/3] [Backport 1.2.latest] Hotfix for 372: Use JSONEncoder in json.dumps --- core/dbt/events/functions.py | 18 ++++-- core/dbt/exceptions.py | 120 ++++++++++++++++++++++++++--------- core/dbt/utils.py | 48 ++++++++++---- 3 files changed, 138 insertions(+), 48 deletions(-) diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index 9b7dcbf697b..a9ad18e6660 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -3,6 +3,7 @@ from dbt.events.base_types import NoStdOut, Event, NoFile, ShowException, Cache from dbt.events.types import EventBufferFull, T_Event, MainReportVersion, EmptyLine import dbt.flags as flags +import dbt.utils # TODO this will need to move eventually from dbt.logger import SECRET_ENV_PREFIX, make_log_dir_if_missing, GLOBAL_LOGGER @@ -89,7 +90,10 @@ def setup_event_logger(log_path, level_override=None): file_passthrough_formatter = logging.Formatter(fmt=FORMAT) file_handler = RotatingFileHandler( - filename=log_dest, encoding="utf8", maxBytes=10 * 1024 * 1024, backupCount=5 # 10 mb + filename=log_dest, + encoding="utf8", + maxBytes=10 * 1024 * 1024, + backupCount=5, # 10 mb ) file_handler.setFormatter(file_passthrough_formatter) file_handler.setLevel(logging.DEBUG) # always debug regardless of user input @@ -133,7 +137,6 @@ def scrub_secrets(msg: str, secrets: List[str]) -> str: def event_to_serializable_dict( e: T_Event, ) -> Dict[str, Any]: - log_line = dict() code: str try: @@ -204,7 +207,9 @@ def create_json_log_line(e: T_Event) -> Optional[str]: return None # will not be sent to logger # using preformatted ts string instead of formatting it here to be extra careful about timezone values = event_to_serializable_dict(e) - raw_log_line = json.dumps(values, sort_keys=True) + raw_log_line = json.dumps( + values, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder + ) return scrub_secrets(raw_log_line, env_secrets()) @@ -241,7 +246,12 @@ def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: s def send_exc_to_logger( - l: Logger, level_tag: str, log_line: str, exc_info=True, stack_info=False, extra=False + l: Logger, + level_tag: str, + log_line: str, + exc_info=True, + stack_info=False, + extra=False, ): if level_tag == "test": # TODO after implmenting #3977 send to new test level diff --git a/core/dbt/exceptions.py b/core/dbt/exceptions.py index d78f5a86c7b..a8786f68ba9 100644 --- a/core/dbt/exceptions.py +++ b/core/dbt/exceptions.py @@ -2,13 +2,13 @@ import functools from typing import NoReturn, Optional, Mapping, Any -from dbt.events.functions import fire_event, scrub_secrets, env_secrets from dbt.events.types import GeneralWarningMsg, GeneralWarningException from dbt.node_types import NodeType from dbt import flags from dbt.ui import line_wrap_message, warning_tag import dbt.dataclass_schema +import dbt.events.functions def validator_error_message(exc): @@ -53,7 +53,9 @@ class RuntimeException(RuntimeError, Exception): def __init__(self, msg, node=None): self.stack = [] self.node = node - self.msg = scrub_secrets(msg, env_secrets()) + self.msg = dbt.events.functions.scrub_secrets( + msg, dbt.events.functions.env_secrets() + ) def add_node(self, node=None): if node is not None and node is not self.node: @@ -77,7 +79,9 @@ def node_to_string(self, node): # out the path we know at least. This indicates an error during # block parsing. return "{}".format(node.path.original_file_path) - return "{} {} ({})".format(node.resource_type, node.name, node.original_file_path) + return "{} {} ({})".format( + node.resource_type, node.name, node.original_file_path + ) def process_stack(self): lines = [] @@ -172,7 +176,10 @@ def data(self): class RPCCompiling(RuntimeException): CODE = 10010 - MESSAGE = 'RPC server is compiling the project, call the "status" method for' " compile status" + MESSAGE = ( + 'RPC server is compiling the project, call the "status" method for' + " compile status" + ) def __init__(self, msg=None, node=None): if msg is None: @@ -183,7 +190,8 @@ def __init__(self, msg=None, node=None): class RPCLoadException(RuntimeException): CODE = 10011 MESSAGE = ( - 'RPC server failed to compile project, call the "status" method for' " compile status" + 'RPC server failed to compile project, call the "status" method for' + " compile status" ) def __init__(self, cause): @@ -383,7 +391,12 @@ class FailedToConnectException(DatabaseException): class CommandError(RuntimeException): def __init__(self, cwd, cmd, message="Error running command"): - cmd_scrubbed = list(scrub_secrets(cmd_txt, env_secrets()) for cmd_txt in cmd) + cmd_scrubbed = list( + dbt.events.functions.scrub_secrets( + cmd_txt, dbt.events.functions.env_secrets() + ) + for cmd_txt in cmd + ) super().__init__(message) self.cwd = cwd self.cmd = cmd_scrubbed @@ -409,11 +422,17 @@ def __str__(self): class CommandResultError(CommandError): - def __init__(self, cwd, cmd, returncode, stdout, stderr, message="Got a non-zero returncode"): + def __init__( + self, cwd, cmd, returncode, stdout, stderr, message="Got a non-zero returncode" + ): super().__init__(cwd, cmd, message) self.returncode = returncode - self.stdout = scrub_secrets(stdout.decode("utf-8"), env_secrets()) - self.stderr = scrub_secrets(stderr.decode("utf-8"), env_secrets()) + self.stdout = dbt.events.functions.scrub_secrets( + stdout.decode("utf-8"), dbt.events.functions.env_secrets() + ) + self.stderr = dbt.events.functions.scrub_secrets( + stderr.decode("utf-8"), dbt.events.functions.env_secrets() + ) self.args = (cwd, self.cmd, returncode, self.stdout, self.stderr, message) def __str__(self): @@ -454,16 +473,20 @@ def raise_database_error(msg, node=None) -> NoReturn: def raise_dependency_error(msg) -> NoReturn: - raise DependencyException(scrub_secrets(msg, env_secrets())) + raise DependencyException( + dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()) + ) def raise_git_cloning_error(error: CommandResultError) -> NoReturn: - error.cmd = scrub_secrets(str(error.cmd), env_secrets()) + error.cmd = dbt.events.functions.scrub_secrets( + str(error.cmd), dbt.events.functions.env_secrets() + ) raise error def raise_git_cloning_problem(repo) -> NoReturn: - repo = scrub_secrets(repo, env_secrets()) + repo = dbt.events.functions.scrub_secrets(repo, dbt.events.functions.env_secrets()) msg = """\ Something went wrong while cloning {} Check the debug logs for more information @@ -517,7 +540,9 @@ def invalid_bool_error(got_value, macro_name) -> NoReturn: def ref_invalid_args(model, args) -> NoReturn: - raise_compiler_error("ref() takes at most two arguments ({} given)".format(len(args)), model) + raise_compiler_error( + "ref() takes at most two arguments ({} given)".format(len(args)), model + ) def metric_invalid_args(model, args) -> NoReturn: @@ -553,7 +578,9 @@ def ref_bad_context(model, args) -> NoReturn: def doc_invalid_args(model, args) -> NoReturn: - raise_compiler_error("doc() takes at most two arguments ({} given)".format(len(args)), model) + raise_compiler_error( + "doc() takes at most two arguments ({} given)".format(len(args)), model + ) def doc_target_not_found( @@ -652,7 +679,9 @@ def get_source_not_found_or_disabled_msg( def source_target_not_found( model, target_name: str, target_table_name: str, disabled: Optional[bool] = None ) -> NoReturn: - msg = get_source_not_found_or_disabled_msg(model, target_name, target_table_name, disabled) + msg = get_source_not_found_or_disabled_msg( + model, target_name, target_table_name, disabled + ) raise_compiler_error(msg, model) @@ -663,11 +692,18 @@ def get_metric_not_found_msg( ) -> str: reason = "was not found" return _get_target_failure_msg( - model, target_name, target_package, include_path=True, reason=reason, target_kind="metric" + model, + target_name, + target_package, + include_path=True, + reason=reason, + target_kind="metric", ) -def metric_target_not_found(metric, target_name: str, target_package: Optional[str]) -> NoReturn: +def metric_target_not_found( + metric, target_name: str, target_package: Optional[str] +) -> NoReturn: msg = get_metric_not_found_msg(metric, target_name, target_package) raise_compiler_error(msg, metric) @@ -709,7 +745,9 @@ def materialization_not_available(model, adapter_type): materialization = model.get_materialization() raise_compiler_error( - "Materialization '{}' is not available for {}!".format(materialization, adapter_type), + "Materialization '{}' is not available for {}!".format( + materialization, adapter_type + ), model, ) @@ -731,8 +769,12 @@ def missing_materialization(model, adapter_type): def bad_package_spec(repo, spec, error_message): - msg = "Error checking out spec='{}' for repo {}\n{}".format(spec, repo, error_message) - raise InternalException(scrub_secrets(msg, env_secrets())) + msg = "Error checking out spec='{}' for repo {}\n{}".format( + spec, repo, error_message + ) + raise InternalException( + dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()) + ) def raise_cache_inconsistent(message): @@ -767,13 +809,17 @@ def relation_wrong_type(relation, expected_type, model=None): "but it currently exists as a {current_type}. Either " "drop {relation} manually, or run dbt with " "`--full-refresh` and dbt will drop it for you." - ).format(relation=relation, current_type=relation.type, expected_type=expected_type), + ).format( + relation=relation, current_type=relation.type, expected_type=expected_type + ), model, ) def package_not_found(package_name): - raise_dependency_error("Package {} was not found in the package index".format(package_name)) + raise_dependency_error( + "Package {} was not found in the package index".format(package_name) + ) def package_version_not_found(package_name, version_range, available_versions): @@ -782,7 +828,9 @@ def package_version_not_found(package_name, version_range, available_versions): " Requested range: {}\n" " Available versions: {}" ) - raise_dependency_error(base_msg.format(package_name, version_range, available_versions)) + raise_dependency_error( + base_msg.format(package_name, version_range, available_versions) + ) def invalid_materialization_argument(name, argument): @@ -834,7 +882,9 @@ def approximate_relation_match(target, relation): "When searching for a relation, dbt found an approximate match. " "Instead of guessing \nwhich relation to use, dbt will move on. " "Please delete {relation}, or rename it to be less ambiguous." - "\nSearched for: {target}\nFound: {relation}".format(target=target, relation=relation) + "\nSearched for: {target}\nFound: {relation}".format( + target=target, relation=relation + ) ) @@ -960,7 +1010,8 @@ def raise_patch_targets_not_found(patches): def _fix_dupe_msg(path_1: str, path_2: str, name: str, type_name: str) -> str: if path_1 == path_2: return ( - f"remove one of the {type_name} entries for {name} in this file:\n" f" - {path_1!s}\n" + f"remove one of the {type_name} entries for {name} in this file:\n" + f" - {path_1!s}\n" ) else: return ( @@ -1047,23 +1098,32 @@ def raise_duplicate_alias( kwargs: Mapping[str, Any], aliases: Mapping[str, str], canonical_key: str ) -> NoReturn: # dupe found: go through the dict so we can have a nice-ish error - key_names = ", ".join("{}".format(k) for k in kwargs if aliases.get(k) == canonical_key) + key_names = ", ".join( + "{}".format(k) for k in kwargs if aliases.get(k) == canonical_key + ) - raise AliasException(f'Got duplicate keys: ({key_names}) all map to "{canonical_key}"') + raise AliasException( + f'Got duplicate keys: ({key_names}) all map to "{canonical_key}"' + ) def warn_or_error(msg, node=None, log_fmt=None): if flags.WARN_ERROR: - raise_compiler_error(scrub_secrets(msg, env_secrets()), node) + raise_compiler_error( + dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()), + node, + ) else: - fire_event(GeneralWarningMsg(msg=msg, log_fmt=log_fmt)) + dbt.events.functions.fire_event(GeneralWarningMsg(msg=msg, log_fmt=log_fmt)) def warn_or_raise(exc, log_fmt=None): if flags.WARN_ERROR: raise exc else: - fire_event(GeneralWarningException(exc=exc, log_fmt=log_fmt)) + dbt.events.functions.fire_event( + GeneralWarningException(exc=exc, log_fmt=log_fmt) + ) def warn(msg, node=None): diff --git a/core/dbt/utils.py b/core/dbt/utils.py index 039f22dc069..9618fc12e25 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -15,8 +15,6 @@ from pathlib import PosixPath, WindowsPath from contextlib import contextmanager -from dbt.exceptions import ConnectionException -from dbt.events.functions import fire_event from dbt.events.types import RetryExternalCall, RecordRetryException from dbt import flags from enum import Enum @@ -39,6 +37,7 @@ Sequence, ) +import dbt.events.functions import dbt.exceptions DECIMALS: Tuple[Type[Any], ...] @@ -102,7 +101,9 @@ def get_dbt_docs_name(name): return f"{DOCS_PREFIX}{name}" -def get_materialization_macro_name(materialization_name, adapter_type=None, with_prefix=True): +def get_materialization_macro_name( + materialization_name, adapter_type=None, with_prefix=True +): if adapter_type is None: adapter_type = "default" name = f"materialization_{materialization_name}_{adapter_type}" @@ -189,22 +190,31 @@ def _deep_map_render( ret: Any if isinstance(value, list): - ret = [_deep_map_render(func, v, (keypath + (idx,))) for idx, v in enumerate(value)] + ret = [ + _deep_map_render(func, v, (keypath + (idx,))) for idx, v in enumerate(value) + ] elif isinstance(value, dict): - ret = {k: _deep_map_render(func, v, (keypath + (str(k),))) for k, v in value.items()} + ret = { + k: _deep_map_render(func, v, (keypath + (str(k),))) + for k, v in value.items() + } elif isinstance(value, atomic_types): ret = func(value, keypath) else: container_types: Tuple[Type[Any], ...] = (list, dict) ok_types = container_types + atomic_types raise dbt.exceptions.DbtConfigError( - "in _deep_map_render, expected one of {!r}, got {!r}".format(ok_types, type(value)) + "in _deep_map_render, expected one of {!r}, got {!r}".format( + ok_types, type(value) + ) ) return ret -def deep_map_render(func: Callable[[Any, Tuple[Union[str, int], ...]], Any], value: Any) -> Any: +def deep_map_render( + func: Callable[[Any, Tuple[Union[str, int], ...]], Any], value: Any +) -> Any: """This function renders a nested dictionary derived from a yaml file. It is used to render dbt_project.yml, profiles.yml, and schema files. @@ -365,7 +375,9 @@ def translate_mapping(self, kwargs: Mapping[str, Any]) -> Dict[str, Any]: for key, value in kwargs.items(): canonical_key = self.aliases.get(key, key) if canonical_key in result: - dbt.exceptions.raise_duplicate_alias(kwargs, self.aliases, canonical_key) + dbt.exceptions.raise_duplicate_alias( + kwargs, self.aliases, canonical_key + ) result[canonical_key] = self.translate_value(value) return result @@ -491,11 +503,13 @@ def submit(*args, **kwargs): self, fn, *args = args elif not args: raise TypeError( - "descriptor 'submit' of 'SingleThreadedExecutor' object needs " "an argument" + "descriptor 'submit' of 'SingleThreadedExecutor' object needs " + "an argument" ) else: raise TypeError( - "submit expected at least 1 positional argument, " "got %d" % (len(args) - 1) + "submit expected at least 1 positional argument, " + "got %d" % (len(args) - 1) ) fut = concurrent.futures.Future() try: @@ -619,12 +633,16 @@ def _connection_exception_retry(fn, max_attempts: int, attempt: int = 0): ReadError, ) as exc: if attempt <= max_attempts - 1: - fire_event(RecordRetryException(exc=exc)) - fire_event(RetryExternalCall(attempt=attempt, max=max_attempts)) + dbt.events.functions.fire_event(RecordRetryException(exc=exc)) + dbt.events.functions.fire_event( + RetryExternalCall(attempt=attempt, max=max_attempts) + ) time.sleep(1) return _connection_exception_retry(fn, max_attempts, attempt + 1) else: - raise ConnectionException("External connection exception occurred: " + str(exc)) + raise dbt.exceptions.ConnectionException( + "External connection exception occurred: " + str(exc) + ) # This is used to serialize the args in the run_results and in the logs. @@ -662,7 +680,9 @@ def args_to_dict(args): if key == "vars" and var_args[key] == "{}": continue # this was required for a test case - if isinstance(var_args[key], PosixPath) or isinstance(var_args[key], WindowsPath): + if isinstance(var_args[key], PosixPath) or isinstance( + var_args[key], WindowsPath + ): var_args[key] = str(var_args[key]) dict_args[key] = var_args[key] return dict_args From c55b2b61e4f3220c75b90402ee3e2d122e21653b Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 20 Jul 2023 10:55:31 -0500 Subject: [PATCH 2/3] line length 99 --- core/dbt/events/functions.py | 4 +- core/dbt/exceptions.py | 79 +++++++++--------------------------- core/dbt/utils.py | 39 +++++------------- 3 files changed, 31 insertions(+), 91 deletions(-) diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index a9ad18e6660..62fa5fa6757 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -207,9 +207,7 @@ def create_json_log_line(e: T_Event) -> Optional[str]: return None # will not be sent to logger # using preformatted ts string instead of formatting it here to be extra careful about timezone values = event_to_serializable_dict(e) - raw_log_line = json.dumps( - values, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder - ) + raw_log_line = json.dumps(values, sort_keys=True, cls=dbt.utils.ForgivingJSONEncoder) return scrub_secrets(raw_log_line, env_secrets()) diff --git a/core/dbt/exceptions.py b/core/dbt/exceptions.py index a8786f68ba9..813e829f37e 100644 --- a/core/dbt/exceptions.py +++ b/core/dbt/exceptions.py @@ -53,9 +53,7 @@ class RuntimeException(RuntimeError, Exception): def __init__(self, msg, node=None): self.stack = [] self.node = node - self.msg = dbt.events.functions.scrub_secrets( - msg, dbt.events.functions.env_secrets() - ) + self.msg = dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()) def add_node(self, node=None): if node is not None and node is not self.node: @@ -79,9 +77,7 @@ def node_to_string(self, node): # out the path we know at least. This indicates an error during # block parsing. return "{}".format(node.path.original_file_path) - return "{} {} ({})".format( - node.resource_type, node.name, node.original_file_path - ) + return "{} {} ({})".format(node.resource_type, node.name, node.original_file_path) def process_stack(self): lines = [] @@ -176,10 +172,7 @@ def data(self): class RPCCompiling(RuntimeException): CODE = 10010 - MESSAGE = ( - 'RPC server is compiling the project, call the "status" method for' - " compile status" - ) + MESSAGE = 'RPC server is compiling the project, call the "status" method for' " compile status" def __init__(self, msg=None, node=None): if msg is None: @@ -190,8 +183,7 @@ def __init__(self, msg=None, node=None): class RPCLoadException(RuntimeException): CODE = 10011 MESSAGE = ( - 'RPC server failed to compile project, call the "status" method for' - " compile status" + 'RPC server failed to compile project, call the "status" method for' " compile status" ) def __init__(self, cause): @@ -392,9 +384,7 @@ class FailedToConnectException(DatabaseException): class CommandError(RuntimeException): def __init__(self, cwd, cmd, message="Error running command"): cmd_scrubbed = list( - dbt.events.functions.scrub_secrets( - cmd_txt, dbt.events.functions.env_secrets() - ) + dbt.events.functions.scrub_secrets(cmd_txt, dbt.events.functions.env_secrets()) for cmd_txt in cmd ) super().__init__(message) @@ -422,9 +412,7 @@ def __str__(self): class CommandResultError(CommandError): - def __init__( - self, cwd, cmd, returncode, stdout, stderr, message="Got a non-zero returncode" - ): + def __init__(self, cwd, cmd, returncode, stdout, stderr, message="Got a non-zero returncode"): super().__init__(cwd, cmd, message) self.returncode = returncode self.stdout = dbt.events.functions.scrub_secrets( @@ -540,9 +528,7 @@ def invalid_bool_error(got_value, macro_name) -> NoReturn: def ref_invalid_args(model, args) -> NoReturn: - raise_compiler_error( - "ref() takes at most two arguments ({} given)".format(len(args)), model - ) + raise_compiler_error("ref() takes at most two arguments ({} given)".format(len(args)), model) def metric_invalid_args(model, args) -> NoReturn: @@ -578,9 +564,7 @@ def ref_bad_context(model, args) -> NoReturn: def doc_invalid_args(model, args) -> NoReturn: - raise_compiler_error( - "doc() takes at most two arguments ({} given)".format(len(args)), model - ) + raise_compiler_error("doc() takes at most two arguments ({} given)".format(len(args)), model) def doc_target_not_found( @@ -679,9 +663,7 @@ def get_source_not_found_or_disabled_msg( def source_target_not_found( model, target_name: str, target_table_name: str, disabled: Optional[bool] = None ) -> NoReturn: - msg = get_source_not_found_or_disabled_msg( - model, target_name, target_table_name, disabled - ) + msg = get_source_not_found_or_disabled_msg(model, target_name, target_table_name, disabled) raise_compiler_error(msg, model) @@ -701,9 +683,7 @@ def get_metric_not_found_msg( ) -def metric_target_not_found( - metric, target_name: str, target_package: Optional[str] -) -> NoReturn: +def metric_target_not_found(metric, target_name: str, target_package: Optional[str]) -> NoReturn: msg = get_metric_not_found_msg(metric, target_name, target_package) raise_compiler_error(msg, metric) @@ -745,9 +725,7 @@ def materialization_not_available(model, adapter_type): materialization = model.get_materialization() raise_compiler_error( - "Materialization '{}' is not available for {}!".format( - materialization, adapter_type - ), + "Materialization '{}' is not available for {}!".format(materialization, adapter_type), model, ) @@ -769,9 +747,7 @@ def missing_materialization(model, adapter_type): def bad_package_spec(repo, spec, error_message): - msg = "Error checking out spec='{}' for repo {}\n{}".format( - spec, repo, error_message - ) + msg = "Error checking out spec='{}' for repo {}\n{}".format(spec, repo, error_message) raise InternalException( dbt.events.functions.scrub_secrets(msg, dbt.events.functions.env_secrets()) ) @@ -809,17 +785,13 @@ def relation_wrong_type(relation, expected_type, model=None): "but it currently exists as a {current_type}. Either " "drop {relation} manually, or run dbt with " "`--full-refresh` and dbt will drop it for you." - ).format( - relation=relation, current_type=relation.type, expected_type=expected_type - ), + ).format(relation=relation, current_type=relation.type, expected_type=expected_type), model, ) def package_not_found(package_name): - raise_dependency_error( - "Package {} was not found in the package index".format(package_name) - ) + raise_dependency_error("Package {} was not found in the package index".format(package_name)) def package_version_not_found(package_name, version_range, available_versions): @@ -828,9 +800,7 @@ def package_version_not_found(package_name, version_range, available_versions): " Requested range: {}\n" " Available versions: {}" ) - raise_dependency_error( - base_msg.format(package_name, version_range, available_versions) - ) + raise_dependency_error(base_msg.format(package_name, version_range, available_versions)) def invalid_materialization_argument(name, argument): @@ -882,9 +852,7 @@ def approximate_relation_match(target, relation): "When searching for a relation, dbt found an approximate match. " "Instead of guessing \nwhich relation to use, dbt will move on. " "Please delete {relation}, or rename it to be less ambiguous." - "\nSearched for: {target}\nFound: {relation}".format( - target=target, relation=relation - ) + "\nSearched for: {target}\nFound: {relation}".format(target=target, relation=relation) ) @@ -1010,8 +978,7 @@ def raise_patch_targets_not_found(patches): def _fix_dupe_msg(path_1: str, path_2: str, name: str, type_name: str) -> str: if path_1 == path_2: return ( - f"remove one of the {type_name} entries for {name} in this file:\n" - f" - {path_1!s}\n" + f"remove one of the {type_name} entries for {name} in this file:\n" f" - {path_1!s}\n" ) else: return ( @@ -1098,13 +1065,9 @@ def raise_duplicate_alias( kwargs: Mapping[str, Any], aliases: Mapping[str, str], canonical_key: str ) -> NoReturn: # dupe found: go through the dict so we can have a nice-ish error - key_names = ", ".join( - "{}".format(k) for k in kwargs if aliases.get(k) == canonical_key - ) + key_names = ", ".join("{}".format(k) for k in kwargs if aliases.get(k) == canonical_key) - raise AliasException( - f'Got duplicate keys: ({key_names}) all map to "{canonical_key}"' - ) + raise AliasException(f'Got duplicate keys: ({key_names}) all map to "{canonical_key}"') def warn_or_error(msg, node=None, log_fmt=None): @@ -1121,9 +1084,7 @@ def warn_or_raise(exc, log_fmt=None): if flags.WARN_ERROR: raise exc else: - dbt.events.functions.fire_event( - GeneralWarningException(exc=exc, log_fmt=log_fmt) - ) + dbt.events.functions.fire_event(GeneralWarningException(exc=exc, log_fmt=log_fmt)) def warn(msg, node=None): diff --git a/core/dbt/utils.py b/core/dbt/utils.py index 9618fc12e25..d6b16f4c30e 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -101,9 +101,7 @@ def get_dbt_docs_name(name): return f"{DOCS_PREFIX}{name}" -def get_materialization_macro_name( - materialization_name, adapter_type=None, with_prefix=True -): +def get_materialization_macro_name(materialization_name, adapter_type=None, with_prefix=True): if adapter_type is None: adapter_type = "default" name = f"materialization_{materialization_name}_{adapter_type}" @@ -190,31 +188,22 @@ def _deep_map_render( ret: Any if isinstance(value, list): - ret = [ - _deep_map_render(func, v, (keypath + (idx,))) for idx, v in enumerate(value) - ] + ret = [_deep_map_render(func, v, (keypath + (idx,))) for idx, v in enumerate(value)] elif isinstance(value, dict): - ret = { - k: _deep_map_render(func, v, (keypath + (str(k),))) - for k, v in value.items() - } + ret = {k: _deep_map_render(func, v, (keypath + (str(k),))) for k, v in value.items()} elif isinstance(value, atomic_types): ret = func(value, keypath) else: container_types: Tuple[Type[Any], ...] = (list, dict) ok_types = container_types + atomic_types raise dbt.exceptions.DbtConfigError( - "in _deep_map_render, expected one of {!r}, got {!r}".format( - ok_types, type(value) - ) + "in _deep_map_render, expected one of {!r}, got {!r}".format(ok_types, type(value)) ) return ret -def deep_map_render( - func: Callable[[Any, Tuple[Union[str, int], ...]], Any], value: Any -) -> Any: +def deep_map_render(func: Callable[[Any, Tuple[Union[str, int], ...]], Any], value: Any) -> Any: """This function renders a nested dictionary derived from a yaml file. It is used to render dbt_project.yml, profiles.yml, and schema files. @@ -375,9 +364,7 @@ def translate_mapping(self, kwargs: Mapping[str, Any]) -> Dict[str, Any]: for key, value in kwargs.items(): canonical_key = self.aliases.get(key, key) if canonical_key in result: - dbt.exceptions.raise_duplicate_alias( - kwargs, self.aliases, canonical_key - ) + dbt.exceptions.raise_duplicate_alias(kwargs, self.aliases, canonical_key) result[canonical_key] = self.translate_value(value) return result @@ -503,13 +490,11 @@ def submit(*args, **kwargs): self, fn, *args = args elif not args: raise TypeError( - "descriptor 'submit' of 'SingleThreadedExecutor' object needs " - "an argument" + "descriptor 'submit' of 'SingleThreadedExecutor' object needs " "an argument" ) else: raise TypeError( - "submit expected at least 1 positional argument, " - "got %d" % (len(args) - 1) + "submit expected at least 1 positional argument, " "got %d" % (len(args) - 1) ) fut = concurrent.futures.Future() try: @@ -634,9 +619,7 @@ def _connection_exception_retry(fn, max_attempts: int, attempt: int = 0): ) as exc: if attempt <= max_attempts - 1: dbt.events.functions.fire_event(RecordRetryException(exc=exc)) - dbt.events.functions.fire_event( - RetryExternalCall(attempt=attempt, max=max_attempts) - ) + dbt.events.functions.fire_event(RetryExternalCall(attempt=attempt, max=max_attempts)) time.sleep(1) return _connection_exception_retry(fn, max_attempts, attempt + 1) else: @@ -680,9 +663,7 @@ def args_to_dict(args): if key == "vars" and var_args[key] == "{}": continue # this was required for a test case - if isinstance(var_args[key], PosixPath) or isinstance( - var_args[key], WindowsPath - ): + if isinstance(var_args[key], PosixPath) or isinstance(var_args[key], WindowsPath): var_args[key] = str(var_args[key]) dict_args[key] = var_args[key] return dict_args From 2a8178f86838a720fee5d9e5f15c1c56f823f142 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 20 Jul 2023 12:30:38 -0500 Subject: [PATCH 3/3] update jsonencoder --- core/dbt/utils.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/dbt/utils.py b/core/dbt/utils.py index d6b16f4c30e..7db9dd860ac 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -332,15 +332,18 @@ class JSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, DECIMALS): return float(obj) - if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): + elif isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): return obj.isoformat() - if isinstance(obj, jinja2.Undefined): + elif isinstance(obj, jinja2.Undefined): return "" - if hasattr(obj, "to_dict"): + elif isinstance(obj, Exception): + return repr(obj) + elif hasattr(obj, "to_dict"): # if we have a to_dict we should try to serialize the result of # that! return obj.to_dict(omit_none=True) - return super().default(obj) + else: + return super().default(obj) class ForgivingJSONEncoder(JSONEncoder):