diff --git a/elementary/clients/dbt/dbt_runner.py b/elementary/clients/dbt/dbt_runner.py index 9c5251dd1..2b9992e2c 100644 --- a/elementary/clients/dbt/dbt_runner.py +++ b/elementary/clients/dbt/dbt_runner.py @@ -44,6 +44,7 @@ def _run_command( log_format: str = "json", vars: Optional[dict] = None, quiet: bool = False, + log_output: bool = True, ) -> Tuple[bool, Optional[str]]: dbt_command = ["dbt"] if capture_output: @@ -83,29 +84,25 @@ def _run_command( except subprocess.CalledProcessError as err: err_msg = None if capture_output: - err_log_msgs = [] - err_json_logs = err.output.splitlines() - for err_log_line in err_json_logs: - try: - log = DbtLog(err_log_line) - if log.level == "error": - err_log_msgs.append(log.msg) - except JSONDecodeError: - logger.debug( - f"Unable to parse dbt log message: {err_log_line}", - exc_info=True, - ) + dbt_logs = self._parse_output(err.output) + if log_output or is_debug(): + for log in dbt_logs: + logger.info(log.msg) + err_log_msgs = [log.msg for log in dbt_logs if log.level == "error"] err_msg = "\n".join(err_log_msgs) raise DbtCommandError(err, command_args, err_msg) output = None if capture_output: output = result.stdout.decode("utf-8") - if is_debug(): - logger.debug(f"Output: {output}") logger.debug( f"Result bytes size for command '{log_command}' is {len(result.stdout)}" ) + if log_output or is_debug(): + dbt_logs = self._parse_output(output) + for log in dbt_logs: + logger.info(log.msg) + if result.returncode != 0: return False, output return True, output @@ -136,6 +133,7 @@ def run_operation( vars: Optional[dict] = None, quiet: bool = False, should_log: bool = True, + log_output: bool = False, ) -> list: macro_to_run = macro_name macro_to_run_args = macro_args if macro_args else dict() @@ -152,6 +150,7 @@ def run_operation( capture_output=capture_output, vars=vars, quiet=quiet, + log_output=log_output, ) if log_errors and not success: logger.error( @@ -259,3 +258,13 @@ def ls(self, select: Optional[str] = None) -> list: def source_freshness(self): self._run_command(command_args=["source", "freshness"]) + + def _parse_output(self, output: str) -> List[DbtLog]: + log_msgs = [] + json_logs = output.splitlines() + for json_log in json_logs: + try: + log_msgs.append(DbtLog(json_log)) + except JSONDecodeError: + logger.debug(f"Unable to parse dbt log message: {json_log}") + return log_msgs diff --git a/elementary/monitor/dbt_project/models/elementary.yml b/elementary/monitor/dbt_project/models/elementary.yml new file mode 100644 index 000000000..068679b42 --- /dev/null +++ b/elementary/monitor/dbt_project/models/elementary.yml @@ -0,0 +1,41 @@ +version: 2 + +sources: + - name: elementary + schema: "{{ target.schema }}" + tables: + - name: dbt_models + columns: + - name: unique_id + tests: + - unique + + - name: dbt_tests + columns: + - name: unique_id + tests: + - unique + + - name: dbt_sources + columns: + - name: unique_id + tests: + - unique + + - name: dbt_snapshots + columns: + - name: unique_id + tests: + - unique + + - name: dbt_metrics + columns: + - name: unique_id + tests: + - unique + + - name: dbt_exposures + columns: + - name: unique_id + tests: + - unique