diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index e0c436b5a61..f0446728aee 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -644,96 +644,109 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: hooks.sort(key=self._hook_keyfunc) return hooks + def _safe_run_hook( + self, + adapter: BaseAdapter, + hook: HookNode, + hook_name: str, + timing: List[TimingInfo], + num_hooks: int, + extra_context: Dict[str, Any], + ) -> Tuple[RunStatus, str, float]: + with collect_timing_info("compile", timing.append): + sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context) + + started_at = timing[0].started_at or datetime.utcnow() + hook.update_event_status( + started_at=started_at.isoformat(), node_status=RunningStatus.Started + ) + + fire_event( + LogHookStartLine( + statement=hook_name, + index=hook.index, + total=num_hooks, + node_info=hook.node_info, + ) + ) + + with collect_timing_info("execute", timing.append): + status, message = get_execution_status(sql, adapter) + + finished_at = timing[1].completed_at or datetime.utcnow() + hook.update_event_status(finished_at=finished_at.isoformat()) + execution_time = (finished_at - started_at).total_seconds() + + if status == RunStatus.Success: + message = f"{hook_name} passed" + else: + message = f"{hook_name} failed, error:\n {message}" + + return (status, message, execution_time) + def safe_run_hooks( - self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any] + self, + adapter: BaseAdapter, + hook_type: RunHookType, + extra_context: Dict[str, Any], ) -> RunStatus: ordered_hooks = self.get_hooks_by_type(hook_type) + status = RunStatus.Success - if hook_type == RunHookType.End and ordered_hooks: - fire_event(Formatting("")) + if ordered_hooks: + if hook_type == RunHookType.End: + fire_event(Formatting("")) - # on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created. - adapter.clear_transaction() - if not ordered_hooks: - return RunStatus.Success + # on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created. + adapter.clear_transaction() - status = RunStatus.Success - failed = False - num_hooks = len(ordered_hooks) - - for idx, hook in enumerate(ordered_hooks, 1): - with log_contextvars(node_info=hook.node_info): - hook.index = idx - hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}" - execution_time = 0.0 - timing: List[TimingInfo] = [] - failures = 1 - - if not failed: - with collect_timing_info("compile", timing.append): - sql = self.get_hook_sql( - adapter, hook, hook.index, num_hooks, extra_context - ) + num_hooks = len(ordered_hooks) - started_at = timing[0].started_at or datetime.utcnow() - hook.update_event_status( - started_at=started_at.isoformat(), node_status=RunningStatus.Started + for idx, hook in enumerate(ordered_hooks, 1): + with log_contextvars(node_info=hook.node_info): + hook.index = idx + hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}" + execution_time = 0.0 + timing: List[TimingInfo] = [] + + # Only run this hook if the previous hook succeeded + # otherwise, skip it + if status == RunStatus.Success: + (status, message, execution_time) = self._safe_run_hook( + adapter, hook, hook_name, timing, num_hooks, extra_context + ) + else: + status = RunStatus.Skipped + message = f"{hook_name} skipped" + + hook.update_event_status(node_status=status) + + self.node_results.append( + RunResult( + status=status, + thread_id="main", + timing=timing, + message=message, + adapter_response={}, + execution_time=execution_time, + failures=0 if status == RunStatus.Success else 1, + node=hook, + ) ) fire_event( - LogHookStartLine( + LogHookEndLine( statement=hook_name, + status=status, index=hook.index, total=num_hooks, + execution_time=execution_time, node_info=hook.node_info, ) ) - with collect_timing_info("execute", timing.append): - status, message = get_execution_status(sql, adapter) - - finished_at = timing[1].completed_at or datetime.utcnow() - hook.update_event_status(finished_at=finished_at.isoformat()) - execution_time = (finished_at - started_at).total_seconds() - failures = 0 if status == RunStatus.Success else 1 - - if status == RunStatus.Success: - message = f"{hook_name} passed" - else: - message = f"{hook_name} failed, error:\n {message}" - failed = True - else: - status = RunStatus.Skipped - message = f"{hook_name} skipped" - - hook.update_event_status(node_status=status) - - self.node_results.append( - RunResult( - status=status, - thread_id="main", - timing=timing, - message=message, - adapter_response={}, - execution_time=execution_time, - failures=failures, - node=hook, - ) - ) - - fire_event( - LogHookEndLine( - statement=hook_name, - status=status, - index=hook.index, - total=num_hooks, - execution_time=execution_time, - node_info=hook.node_info, - ) - ) - - if hook_type == RunHookType.Start and ordered_hooks: - fire_event(Formatting("")) + if hook_type == RunHookType.Start: + fire_event(Formatting("")) return status