Skip to content

Commit

Permalink
Merge pull request #972 from elementary-data/ele-1073-run-data-tests-…
Browse files Browse the repository at this point in the history
…on-the-synced-schemas

Added tests to the Elementary models.
  • Loading branch information
elongl committed Jul 2, 2023
2 parents d8d0d86 + a75ef6d commit 4135a45
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 14 deletions.
37 changes: 23 additions & 14 deletions elementary/clients/dbt/dbt_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def _run_command(
log_format: str = "json",
vars: Optional[dict] = None,
quiet: bool = False,
log_output: bool = True,
) -> Tuple[bool, Optional[str]]:
dbt_command = ["dbt"]
if capture_output:
Expand Down Expand Up @@ -83,29 +84,25 @@ def _run_command(
except subprocess.CalledProcessError as err:
err_msg = None
if capture_output:
err_log_msgs = []
err_json_logs = err.output.splitlines()
for err_log_line in err_json_logs:
try:
log = DbtLog(err_log_line)
if log.level == "error":
err_log_msgs.append(log.msg)
except JSONDecodeError:
logger.debug(
f"Unable to parse dbt log message: {err_log_line}",
exc_info=True,
)
dbt_logs = self._parse_output(err.output)
if log_output or is_debug():
for log in dbt_logs:
logger.info(log.msg)
err_log_msgs = [log.msg for log in dbt_logs if log.level == "error"]
err_msg = "\n".join(err_log_msgs)
raise DbtCommandError(err, command_args, err_msg)

output = None
if capture_output:
output = result.stdout.decode("utf-8")
if is_debug():
logger.debug(f"Output: {output}")
logger.debug(
f"Result bytes size for command '{log_command}' is {len(result.stdout)}"
)
if log_output or is_debug():
dbt_logs = self._parse_output(output)
for log in dbt_logs:
logger.info(log.msg)

if result.returncode != 0:
return False, output
return True, output
Expand Down Expand Up @@ -136,6 +133,7 @@ def run_operation(
vars: Optional[dict] = None,
quiet: bool = False,
should_log: bool = True,
log_output: bool = False,
) -> list:
macro_to_run = macro_name
macro_to_run_args = macro_args if macro_args else dict()
Expand All @@ -152,6 +150,7 @@ def run_operation(
capture_output=capture_output,
vars=vars,
quiet=quiet,
log_output=log_output,
)
if log_errors and not success:
logger.error(
Expand Down Expand Up @@ -259,3 +258,13 @@ def ls(self, select: Optional[str] = None) -> list:

def source_freshness(self):
self._run_command(command_args=["source", "freshness"])

def _parse_output(self, output: str) -> List[DbtLog]:
log_msgs = []
json_logs = output.splitlines()
for json_log in json_logs:
try:
log_msgs.append(DbtLog(json_log))
except JSONDecodeError:
logger.debug(f"Unable to parse dbt log message: {json_log}")
return log_msgs
41 changes: 41 additions & 0 deletions elementary/monitor/dbt_project/models/elementary.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
version: 2

sources:
- name: elementary
schema: "{{ target.schema }}"
tables:
- name: dbt_models
columns:
- name: unique_id
tests:
- unique

- name: dbt_tests
columns:
- name: unique_id
tests:
- unique

- name: dbt_sources
columns:
- name: unique_id
tests:
- unique

- name: dbt_snapshots
columns:
- name: unique_id
tests:
- unique

- name: dbt_metrics
columns:
- name: unique_id
tests:
- unique

- name: dbt_exposures
columns:
- name: unique_id
tests:
- unique

0 comments on commit 4135a45

Please sign in to comment.