diff --git a/elementary/clients/dbt/base_dbt_runner.py b/elementary/clients/dbt/base_dbt_runner.py index ec1a88766..181aae60e 100644 --- a/elementary/clients/dbt/base_dbt_runner.py +++ b/elementary/clients/dbt/base_dbt_runner.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Optional +from typing import Any, Dict, Optional class BaseDbtRunner(ABC): @@ -8,10 +8,14 @@ def __init__( project_dir: str, profiles_dir: Optional[str] = None, target: Optional[str] = None, + vars: Optional[dict] = None, + secret_vars: Optional[dict] = None, ) -> None: self.project_dir = project_dir self.profiles_dir = profiles_dir self.target = target + self.vars = vars or {} + self.secret_vars = secret_vars or {} @abstractmethod def deps(self, *args, **kwargs): @@ -48,3 +52,13 @@ def ls(self, *args, **kwargs): @abstractmethod def source_freshness(self, *args, **kwargs): raise NotImplementedError + + def _get_all_vars(self, vars: Optional[Dict[str, Any]] = None): + return { + **self.vars, + **self.secret_vars, + **(vars or {}), + } + + def _get_secret_masked_vars(self, vars: Dict[str, Any]): + return {k: v if k not in self.secret_vars else "***" for k, v in vars.items()} diff --git a/elementary/clients/dbt/dbt_runner.py b/elementary/clients/dbt/dbt_runner.py index 4dfef2a12..6caccc2d8 100644 --- a/elementary/clients/dbt/dbt_runner.py +++ b/elementary/clients/dbt/dbt_runner.py @@ -2,7 +2,7 @@ import os import subprocess from json import JSONDecodeError -from typing import Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner from elementary.exceptions.exceptions import DbtCommandError, DbtLsCommandError @@ -29,11 +29,13 @@ def __init__( profiles_dir: Optional[str] = None, target: Optional[str] = None, raise_on_failure: bool = True, - dbt_env_vars: Optional[Dict[str, str]] = None, + env_vars: Optional[Dict[str, str]] = None, + vars: Optional[Dict[str, Any]] = None, + secret_vars: Optional[Dict[str, Any]] = None, ) -> None: - super().__init__(project_dir, profiles_dir, target) + super().__init__(project_dir, profiles_dir, target, vars, secret_vars) self.raise_on_failure = raise_on_failure - self.dbt_env_vars = dbt_env_vars + self.env_vars = env_vars def _run_command( self, @@ -52,11 +54,21 @@ def _run_command( dbt_command.extend(["--profiles-dir", self.profiles_dir]) if self.target: dbt_command.extend(["--target", self.target]) - if vars: - json_vars = json.dumps(vars) - dbt_command.extend(["--vars", json_vars]) - dbt_command_str = " ".join(dbt_command) - log_msg = f"Running {dbt_command_str}" + + all_vars = self._get_all_vars(vars) + if all_vars: + log_command = dbt_command.copy() + log_command.extend( + [ + "--vars", + json.dumps(self._get_secret_masked_vars(all_vars)), + ] + ) + dbt_command.extend(["--vars", json.dumps(all_vars)]) + else: + log_command = dbt_command + + log_msg = f"Running {log_command}" if not quiet: logger.info(log_msg) else: @@ -92,7 +104,7 @@ def _run_command( if is_debug(): logger.debug(f"Output: {output}") logger.debug( - f"Result bytes size for command '{dbt_command_str}' is {len(result.stdout)}" + f"Result bytes size for command '{log_command}' is {len(result.stdout)}" ) if result.returncode != 0: return False, output @@ -209,8 +221,8 @@ def test( def _get_command_env(self): env = os.environ.copy() - if self.dbt_env_vars is not None: - env.update(self.dbt_env_vars) + if self.env_vars is not None: + env.update(self.env_vars) return env def debug(self, quiet: bool = False) -> bool: diff --git a/elementary/clients/dbt/slim_dbt_runner.py b/elementary/clients/dbt/slim_dbt_runner.py index cdffc599f..74ca97740 100644 --- a/elementary/clients/dbt/slim_dbt_runner.py +++ b/elementary/clients/dbt/slim_dbt_runner.py @@ -78,12 +78,15 @@ def __init__( profiles_dir: str = DEFAULT_PROFILES_DIR, target: Optional[str] = None, vars: Optional[dict] = None, + secret_vars: Optional[dict] = None, **kwargs, ): - super().__init__(project_dir, profiles_dir, target) - self._load_runner( - project_dir=project_dir, profiles_dir=profiles_dir, target=target, vars=vars - ) + super().__init__(project_dir, profiles_dir, target, vars, secret_vars) + self.config = None + self.adapter = None + self.adapter_name = None + self.project_parser = None + self.manifest = None def _load_runner( self, @@ -166,20 +169,32 @@ def run_operation( quiet: bool = False, **kwargs, ) -> list: - if vars: - # vars are being parsed as part of the manifest - self._load_runner( - project_dir=self.args.project_dir, - profiles_dir=self.args.profiles_dir, - target=self.args.target, - vars=vars, + all_vars = self._get_all_vars(vars) + self._load_runner( + project_dir=self.project_dir, + profiles_dir=self.profiles_dir, + target=self.target, + vars=all_vars, + ) + log_command = [ + "dbt", + "run-operation", + macro_name, + "--args", + json.dumps(macro_args), + ] + if all_vars: + log_command.extend( + [ + "--vars", + json.dumps(self._get_secret_masked_vars(all_vars)), + ] ) - - log_message = f"Running dbt run-operation {macro_name} --args {macro_args}{f' --var {vars}' if vars else ''}" + log_msg = f"Running {' '.join(log_command)}" if not quiet: - logger.info(log_message) + logger.info(log_msg) else: - logger.debug(log_message) + logger.debug(log_msg) run_operation_results = [] macro_output = self._execute_macro(macro_name, **macro_args) diff --git a/elementary/config/config.py b/elementary/config/config.py index 4105e2b2d..c666b0a6c 100644 --- a/elementary/config/config.py +++ b/elementary/config/config.py @@ -67,8 +67,8 @@ def __init__( self.env = env # Additional env vars supplied to dbt invocations - self.dbt_env_vars = dict() - self.dbt_env_vars.update(self._parse_dbt_quoting_to_env_vars(dbt_quoting)) + self.env_vars = dict() + self.env_vars.update(self._parse_dbt_quoting_to_env_vars(dbt_quoting)) config = self._load_configuration() diff --git a/elementary/monitor/data_monitoring/data_monitoring.py b/elementary/monitor/data_monitoring/data_monitoring.py index 3997433dd..734516efd 100644 --- a/elementary/monitor/data_monitoring/data_monitoring.py +++ b/elementary/monitor/data_monitoring/data_monitoring.py @@ -75,7 +75,7 @@ def _init_internal_dbt_runner(self): dbt_project_utils.PATH, self.config.profiles_dir, self.config.profile_target, - dbt_env_vars=self.config.dbt_env_vars, + env_vars=self.config.env_vars, ) return internal_dbt_runner @@ -85,7 +85,7 @@ def _init_user_dbt_runner(self): self.config.project_dir, self.config.profiles_dir, self.config.project_profile_target, - dbt_env_vars=self.config.dbt_env_vars, + env_vars=self.config.env_vars, ) else: user_dbt_runner = None @@ -178,7 +178,7 @@ def _get_warehouse_info(self, hash_id: bool = False) -> Optional[WarehouseInfo]: dbt_project_utils.PATH, self.config.profiles_dir, self.config.profile_target, - dbt_env_vars=self.config.dbt_env_vars, + env_vars=self.config.env_vars, ) try: warehouse_type, warehouse_unique_id = json.loads(