Skip to content

Commit

Permalink
Merge pull request #970 from elementary-data/ele-1194-add-vars-to-dbt…
Browse files Browse the repository at this point in the history
…-runner

Ele 1194 add vars to dbt runner
  • Loading branch information
elongl authored Jun 28, 2023
2 parents 830ebfd + b1222be commit afb8bb7
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 33 deletions.
16 changes: 15 additions & 1 deletion elementary/clients/dbt/base_dbt_runner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Optional
from typing import Any, Dict, Optional


class BaseDbtRunner(ABC):
Expand All @@ -8,10 +8,14 @@ def __init__(
project_dir: str,
profiles_dir: Optional[str] = None,
target: Optional[str] = None,
vars: Optional[dict] = None,
secret_vars: Optional[dict] = None,
) -> None:
self.project_dir = project_dir
self.profiles_dir = profiles_dir
self.target = target
self.vars = vars or {}
self.secret_vars = secret_vars or {}

@abstractmethod
def deps(self, *args, **kwargs):
Expand Down Expand Up @@ -48,3 +52,13 @@ def ls(self, *args, **kwargs):
@abstractmethod
def source_freshness(self, *args, **kwargs):
raise NotImplementedError

def _get_all_vars(self, vars: Optional[Dict[str, Any]] = None):
return {
**self.vars,
**self.secret_vars,
**(vars or {}),
}

def _get_secret_masked_vars(self, vars: Dict[str, Any]):
return {k: v if k not in self.secret_vars else "***" for k, v in vars.items()}
36 changes: 24 additions & 12 deletions elementary/clients/dbt/dbt_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import subprocess
from json import JSONDecodeError
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
from elementary.exceptions.exceptions import DbtCommandError, DbtLsCommandError
Expand All @@ -29,11 +29,13 @@ def __init__(
profiles_dir: Optional[str] = None,
target: Optional[str] = None,
raise_on_failure: bool = True,
dbt_env_vars: Optional[Dict[str, str]] = None,
env_vars: Optional[Dict[str, str]] = None,
vars: Optional[Dict[str, Any]] = None,
secret_vars: Optional[Dict[str, Any]] = None,
) -> None:
super().__init__(project_dir, profiles_dir, target)
super().__init__(project_dir, profiles_dir, target, vars, secret_vars)
self.raise_on_failure = raise_on_failure
self.dbt_env_vars = dbt_env_vars
self.env_vars = env_vars

def _run_command(
self,
Expand All @@ -52,11 +54,21 @@ def _run_command(
dbt_command.extend(["--profiles-dir", self.profiles_dir])
if self.target:
dbt_command.extend(["--target", self.target])
if vars:
json_vars = json.dumps(vars)
dbt_command.extend(["--vars", json_vars])
dbt_command_str = " ".join(dbt_command)
log_msg = f"Running {dbt_command_str}"

all_vars = self._get_all_vars(vars)
if all_vars:
log_command = dbt_command.copy()
log_command.extend(
[
"--vars",
json.dumps(self._get_secret_masked_vars(all_vars)),
]
)
dbt_command.extend(["--vars", json.dumps(all_vars)])
else:
log_command = dbt_command

log_msg = f"Running {log_command}"
if not quiet:
logger.info(log_msg)
else:
Expand Down Expand Up @@ -92,7 +104,7 @@ def _run_command(
if is_debug():
logger.debug(f"Output: {output}")
logger.debug(
f"Result bytes size for command '{dbt_command_str}' is {len(result.stdout)}"
f"Result bytes size for command '{log_command}' is {len(result.stdout)}"
)
if result.returncode != 0:
return False, output
Expand Down Expand Up @@ -209,8 +221,8 @@ def test(

def _get_command_env(self):
env = os.environ.copy()
if self.dbt_env_vars is not None:
env.update(self.dbt_env_vars)
if self.env_vars is not None:
env.update(self.env_vars)
return env

def debug(self, quiet: bool = False) -> bool:
Expand Down
45 changes: 30 additions & 15 deletions elementary/clients/dbt/slim_dbt_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,15 @@ def __init__(
profiles_dir: str = DEFAULT_PROFILES_DIR,
target: Optional[str] = None,
vars: Optional[dict] = None,
secret_vars: Optional[dict] = None,
**kwargs,
):
super().__init__(project_dir, profiles_dir, target)
self._load_runner(
project_dir=project_dir, profiles_dir=profiles_dir, target=target, vars=vars
)
super().__init__(project_dir, profiles_dir, target, vars, secret_vars)
self.config = None
self.adapter = None
self.adapter_name = None
self.project_parser = None
self.manifest = None

def _load_runner(
self,
Expand Down Expand Up @@ -166,20 +169,32 @@ def run_operation(
quiet: bool = False,
**kwargs,
) -> list:
if vars:
# vars are being parsed as part of the manifest
self._load_runner(
project_dir=self.args.project_dir,
profiles_dir=self.args.profiles_dir,
target=self.args.target,
vars=vars,
all_vars = self._get_all_vars(vars)
self._load_runner(
project_dir=self.project_dir,
profiles_dir=self.profiles_dir,
target=self.target,
vars=all_vars,
)
log_command = [
"dbt",
"run-operation",
macro_name,
"--args",
json.dumps(macro_args),
]
if all_vars:
log_command.extend(
[
"--vars",
json.dumps(self._get_secret_masked_vars(all_vars)),
]
)

log_message = f"Running dbt run-operation {macro_name} --args {macro_args}{f' --var {vars}' if vars else ''}"
log_msg = f"Running {' '.join(log_command)}"
if not quiet:
logger.info(log_message)
logger.info(log_msg)
else:
logger.debug(log_message)
logger.debug(log_msg)

run_operation_results = []
macro_output = self._execute_macro(macro_name, **macro_args)
Expand Down
4 changes: 2 additions & 2 deletions elementary/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def __init__(
self.env = env

# Additional env vars supplied to dbt invocations
self.dbt_env_vars = dict()
self.dbt_env_vars.update(self._parse_dbt_quoting_to_env_vars(dbt_quoting))
self.env_vars = dict()
self.env_vars.update(self._parse_dbt_quoting_to_env_vars(dbt_quoting))

config = self._load_configuration()

Expand Down
6 changes: 3 additions & 3 deletions elementary/monitor/data_monitoring/data_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _init_internal_dbt_runner(self):
dbt_project_utils.PATH,
self.config.profiles_dir,
self.config.profile_target,
dbt_env_vars=self.config.dbt_env_vars,
env_vars=self.config.env_vars,
)
return internal_dbt_runner

Expand All @@ -85,7 +85,7 @@ def _init_user_dbt_runner(self):
self.config.project_dir,
self.config.profiles_dir,
self.config.project_profile_target,
dbt_env_vars=self.config.dbt_env_vars,
env_vars=self.config.env_vars,
)
else:
user_dbt_runner = None
Expand Down Expand Up @@ -178,7 +178,7 @@ def _get_warehouse_info(self, hash_id: bool = False) -> Optional[WarehouseInfo]:
dbt_project_utils.PATH,
self.config.profiles_dir,
self.config.profile_target,
dbt_env_vars=self.config.dbt_env_vars,
env_vars=self.config.env_vars,
)
try:
warehouse_type, warehouse_unique_id = json.loads(
Expand Down

0 comments on commit afb8bb7

Please sign in to comment.