Skip to content

Commit

Permalink
Added vars to DbtRunner.
Browse files Browse the repository at this point in the history
  • Loading branch information
elongl committed Jun 28, 2023
1 parent 830ebfd commit 931860f
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 29 deletions.
40 changes: 30 additions & 10 deletions elementary/clients/dbt/dbt_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import subprocess
from json import JSONDecodeError
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
from elementary.exceptions.exceptions import DbtCommandError, DbtLsCommandError
Expand All @@ -29,11 +29,15 @@ def __init__(
profiles_dir: Optional[str] = None,
target: Optional[str] = None,
raise_on_failure: bool = True,
dbt_env_vars: Optional[Dict[str, str]] = None,
env_vars: Optional[Dict[str, str]] = None,
vars: Optional[Dict[str, Any]] = None,
secret_vars: Optional[Dict[str, Any]] = None,
) -> None:
super().__init__(project_dir, profiles_dir, target)
self.raise_on_failure = raise_on_failure
self.dbt_env_vars = dbt_env_vars
self.env_vars = env_vars
self.vars = vars
self.secret_vars = secret_vars

def _run_command(
self,
Expand All @@ -52,11 +56,27 @@ def _run_command(
dbt_command.extend(["--profiles-dir", self.profiles_dir])
if self.target:
dbt_command.extend(["--target", self.target])
if vars:
json_vars = json.dumps(vars)
agg_vars = {
**(self.vars or {}),
**(self.secret_vars or {}),
**(vars or {}),
}
log_command = " ".join(
[
*dbt_command,
"--vars",
json.dumps(
{
k: v if k not in self.secret_vars else "***"
for k, v in agg_vars.items()
}
),
]
)
if agg_vars:
json_vars = json.dumps(agg_vars)
dbt_command.extend(["--vars", json_vars])
dbt_command_str = " ".join(dbt_command)
log_msg = f"Running {dbt_command_str}"
log_msg = f"Running {log_command}"
if not quiet:
logger.info(log_msg)
else:
Expand Down Expand Up @@ -92,7 +112,7 @@ def _run_command(
if is_debug():
logger.debug(f"Output: {output}")
logger.debug(
f"Result bytes size for command '{dbt_command_str}' is {len(result.stdout)}"
f"Result bytes size for command '{log_command}' is {len(result.stdout)}"
)
if result.returncode != 0:
return False, output
Expand Down Expand Up @@ -209,8 +229,8 @@ def test(

def _get_command_env(self):
env = os.environ.copy()
if self.dbt_env_vars is not None:
env.update(self.dbt_env_vars)
if self.env_vars is not None:
env.update(self.env_vars)
return env

def debug(self, quiet: bool = False) -> bool:
Expand Down
54 changes: 40 additions & 14 deletions elementary/clients/dbt/slim_dbt_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,17 @@ def __init__(
profiles_dir: str = DEFAULT_PROFILES_DIR,
target: Optional[str] = None,
vars: Optional[dict] = None,
secret_vars: Optional[dict] = None,
**kwargs,
):
super().__init__(project_dir, profiles_dir, target)
self._load_runner(
project_dir=project_dir, profiles_dir=profiles_dir, target=target, vars=vars
)
self.vars = vars
self.secret_vars = secret_vars
self.config = None
self.adapter = None
self.adapter_name = None
self.project_parser = None
self.manifest = None

def _load_runner(
self,
Expand Down Expand Up @@ -166,20 +171,41 @@ def run_operation(
quiet: bool = False,
**kwargs,
) -> list:
if vars:
# vars are being parsed as part of the manifest
self._load_runner(
project_dir=self.args.project_dir,
profiles_dir=self.args.profiles_dir,
target=self.args.target,
vars=vars,
agg_vars = {
**(self.vars or {}),
**(self.secret_vars or {}),
**(vars or {}),
}
self._load_runner(
project_dir=self.project_dir,
profiles_dir=self.profiles_dir,
target=self.target,
vars=agg_vars,
)
log_command = [
"dbt",
"run-operation",
macro_name,
"--args",
json.dumps(macro_args),
]
if agg_vars:
log_command.extend(
[
"--vars",
json.dumps(
{
k: v if k not in self.secret_vars else "***"
for k, v in agg_vars.items()
}
),
]
)

log_message = f"Running dbt run-operation {macro_name} --args {macro_args}{f' --var {vars}' if vars else ''}"
log_msg = f"Running {' '.join(log_command)}"
if not quiet:
logger.info(log_message)
logger.info(log_msg)
else:
logger.debug(log_message)
logger.debug(log_msg)

run_operation_results = []
macro_output = self._execute_macro(macro_name, **macro_args)
Expand Down
4 changes: 2 additions & 2 deletions elementary/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def __init__(
self.env = env

# Additional env vars supplied to dbt invocations
self.dbt_env_vars = dict()
self.dbt_env_vars.update(self._parse_dbt_quoting_to_env_vars(dbt_quoting))
self.env_vars = dict()
self.env_vars.update(self._parse_dbt_quoting_to_env_vars(dbt_quoting))

config = self._load_configuration()

Expand Down
6 changes: 3 additions & 3 deletions elementary/monitor/data_monitoring/data_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _init_internal_dbt_runner(self):
dbt_project_utils.PATH,
self.config.profiles_dir,
self.config.profile_target,
dbt_env_vars=self.config.dbt_env_vars,
env_vars=self.config.env_vars,
)
return internal_dbt_runner

Expand All @@ -85,7 +85,7 @@ def _init_user_dbt_runner(self):
self.config.project_dir,
self.config.profiles_dir,
self.config.project_profile_target,
dbt_env_vars=self.config.dbt_env_vars,
env_vars=self.config.env_vars,
)
else:
user_dbt_runner = None
Expand Down Expand Up @@ -178,7 +178,7 @@ def _get_warehouse_info(self, hash_id: bool = False) -> Optional[WarehouseInfo]:
dbt_project_utils.PATH,
self.config.profiles_dir,
self.config.profile_target,
dbt_env_vars=self.config.dbt_env_vars,
env_vars=self.config.env_vars,
)
try:
warehouse_type, warehouse_unique_id = json.loads(
Expand Down

0 comments on commit 931860f

Please sign in to comment.