From c63dd9d2943b5f9559ce65ca1fa40e66d18ae7dd Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 29 Oct 2024 10:12:23 -0500 Subject: [PATCH 1/6] Remove `failed` tracking in `safe_run_hooks` as it's duplicative of `status` --- core/dbt/task/run.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index e0c436b5a61..1a9dc6a9eea 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -658,7 +658,6 @@ def safe_run_hooks( return RunStatus.Success status = RunStatus.Success - failed = False num_hooks = len(ordered_hooks) for idx, hook in enumerate(ordered_hooks, 1): @@ -669,7 +668,7 @@ def safe_run_hooks( timing: List[TimingInfo] = [] failures = 1 - if not failed: + if status == RunStatus.Success: with collect_timing_info("compile", timing.append): sql = self.get_hook_sql( adapter, hook, hook.index, num_hooks, extra_context @@ -701,7 +700,6 @@ def safe_run_hooks( message = f"{hook_name} passed" else: message = f"{hook_name} failed, error:\n {message}" - failed = True else: status = RunStatus.Skipped message = f"{hook_name} skipped" From 43945d68a18d8d764fcefb2b969bdf16cf64035b Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 29 Oct 2024 10:26:21 -0500 Subject: [PATCH 2/6] Delay calculation of `failures` in `safe_run_hooks` to simplify code If it isn't apparent, I'm trying to reduce the need for tracking variables to make it easier extract the execution code to a separate private function to make it easier to see what is happening. --- core/dbt/task/run.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 1a9dc6a9eea..06f91a3dd39 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -666,7 +666,6 @@ def safe_run_hooks( hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}" execution_time = 0.0 timing: List[TimingInfo] = [] - failures = 1 if status == RunStatus.Success: with collect_timing_info("compile", timing.append): @@ -694,7 +693,6 @@ def safe_run_hooks( finished_at = timing[1].completed_at or datetime.utcnow() hook.update_event_status(finished_at=finished_at.isoformat()) execution_time = (finished_at - started_at).total_seconds() - failures = 0 if status == RunStatus.Success else 1 if status == RunStatus.Success: message = f"{hook_name} passed" @@ -714,7 +712,7 @@ def safe_run_hooks( message=message, adapter_response={}, execution_time=execution_time, - failures=failures, + failures=0 if status == RunStatus.Success else 1, node=hook, ) ) From de43f599c0da0868cfac8bd09e428aa1ae23618d Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 30 Oct 2024 10:49:41 -0500 Subject: [PATCH 3/6] Lint `safe_run_hooks` function signature --- core/dbt/task/run.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 06f91a3dd39..12f551a5fb9 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -645,7 +645,10 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: return hooks def safe_run_hooks( - self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any] + self, + adapter: BaseAdapter, + hook_type: RunHookType, + extra_context: Dict[str, Any], ) -> RunStatus: ordered_hooks = self.get_hooks_by_type(hook_type) From 43d1720986e975472a84e0eee0627b4dd9148dc0 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 30 Oct 2024 10:54:21 -0500 Subject: [PATCH 4/6] Refactor safe_run_hooks so that there is only one return statement --- core/dbt/task/run.py | 133 +++++++++++++++++++++---------------------- 1 file changed, 66 insertions(+), 67 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 12f551a5fb9..a62a4ce8748 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -651,88 +651,87 @@ def safe_run_hooks( extra_context: Dict[str, Any], ) -> RunStatus: ordered_hooks = self.get_hooks_by_type(hook_type) + status = RunStatus.Success - if hook_type == RunHookType.End and ordered_hooks: - fire_event(Formatting("")) + if ordered_hooks: + if hook_type == RunHookType.End: + fire_event(Formatting("")) - # on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created. - adapter.clear_transaction() - if not ordered_hooks: - return RunStatus.Success + # on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created. + adapter.clear_transaction() - status = RunStatus.Success - num_hooks = len(ordered_hooks) - - for idx, hook in enumerate(ordered_hooks, 1): - with log_contextvars(node_info=hook.node_info): - hook.index = idx - hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}" - execution_time = 0.0 - timing: List[TimingInfo] = [] - - if status == RunStatus.Success: - with collect_timing_info("compile", timing.append): - sql = self.get_hook_sql( - adapter, hook, hook.index, num_hooks, extra_context + num_hooks = len(ordered_hooks) + + for idx, hook in enumerate(ordered_hooks, 1): + with log_contextvars(node_info=hook.node_info): + hook.index = idx + hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}" + execution_time = 0.0 + timing: List[TimingInfo] = [] + + if status == RunStatus.Success: + with collect_timing_info("compile", timing.append): + sql = self.get_hook_sql( + adapter, hook, hook.index, num_hooks, extra_context + ) + + started_at = timing[0].started_at or datetime.utcnow() + hook.update_event_status( + started_at=started_at.isoformat(), node_status=RunningStatus.Started ) - started_at = timing[0].started_at or datetime.utcnow() - hook.update_event_status( - started_at=started_at.isoformat(), node_status=RunningStatus.Started + fire_event( + LogHookStartLine( + statement=hook_name, + index=hook.index, + total=num_hooks, + node_info=hook.node_info, + ) + ) + + with collect_timing_info("execute", timing.append): + status, message = get_execution_status(sql, adapter) + + finished_at = timing[1].completed_at or datetime.utcnow() + hook.update_event_status(finished_at=finished_at.isoformat()) + execution_time = (finished_at - started_at).total_seconds() + + if status == RunStatus.Success: + message = f"{hook_name} passed" + else: + message = f"{hook_name} failed, error:\n {message}" + else: + status = RunStatus.Skipped + message = f"{hook_name} skipped" + + hook.update_event_status(node_status=status) + + self.node_results.append( + RunResult( + status=status, + thread_id="main", + timing=timing, + message=message, + adapter_response={}, + execution_time=execution_time, + failures=0 if status == RunStatus.Success else 1, + node=hook, + ) ) fire_event( - LogHookStartLine( + LogHookEndLine( statement=hook_name, + status=status, index=hook.index, total=num_hooks, + execution_time=execution_time, node_info=hook.node_info, ) ) - with collect_timing_info("execute", timing.append): - status, message = get_execution_status(sql, adapter) - - finished_at = timing[1].completed_at or datetime.utcnow() - hook.update_event_status(finished_at=finished_at.isoformat()) - execution_time = (finished_at - started_at).total_seconds() - - if status == RunStatus.Success: - message = f"{hook_name} passed" - else: - message = f"{hook_name} failed, error:\n {message}" - else: - status = RunStatus.Skipped - message = f"{hook_name} skipped" - - hook.update_event_status(node_status=status) - - self.node_results.append( - RunResult( - status=status, - thread_id="main", - timing=timing, - message=message, - adapter_response={}, - execution_time=execution_time, - failures=0 if status == RunStatus.Success else 1, - node=hook, - ) - ) - - fire_event( - LogHookEndLine( - statement=hook_name, - status=status, - index=hook.index, - total=num_hooks, - execution_time=execution_time, - node_info=hook.node_info, - ) - ) - - if hook_type == RunHookType.Start and ordered_hooks: - fire_event(Formatting("")) + if hook_type == RunHookType.Start: + fire_event(Formatting("")) return status From 9731eaaeb0a47e68c3903e7ca34fed7fd5775641 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 30 Oct 2024 11:03:01 -0500 Subject: [PATCH 5/6] Split out actual execution of hook into private function to simplify `safe_run_hooks` --- core/dbt/task/run.py | 73 ++++++++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index a62a4ce8748..1ea6b5d59f5 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -644,6 +644,46 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: hooks.sort(key=self._hook_keyfunc) return hooks + def _safe_run_hook( + self, + adapter: BaseAdapter, + hook: HookNode, + hook_name: str, + timing: List[TimingInfo], + num_hooks: int, + extra_context: Dict[str, Any], + ) -> Tuple[RunStatus, str, float]: + with collect_timing_info("compile", timing.append): + sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context) + + started_at = timing[0].started_at or datetime.utcnow() + hook.update_event_status( + started_at=started_at.isoformat(), node_status=RunningStatus.Started + ) + + fire_event( + LogHookStartLine( + statement=hook_name, + index=hook.index, + total=num_hooks, + node_info=hook.node_info, + ) + ) + + with collect_timing_info("execute", timing.append): + status, message = get_execution_status(sql, adapter) + + finished_at = timing[1].completed_at or datetime.utcnow() + hook.update_event_status(finished_at=finished_at.isoformat()) + execution_time = (finished_at - started_at).total_seconds() + + if status == RunStatus.Success: + message = f"{hook_name} passed" + else: + message = f"{hook_name} failed, error:\n {message}" + + return (status, message, execution_time) + def safe_run_hooks( self, adapter: BaseAdapter, @@ -669,37 +709,12 @@ def safe_run_hooks( execution_time = 0.0 timing: List[TimingInfo] = [] + # Only run this hook if the previous hook succeeded + # otherwise, skip it if status == RunStatus.Success: - with collect_timing_info("compile", timing.append): - sql = self.get_hook_sql( - adapter, hook, hook.index, num_hooks, extra_context - ) - - started_at = timing[0].started_at or datetime.utcnow() - hook.update_event_status( - started_at=started_at.isoformat(), node_status=RunningStatus.Started + (status, message, execution_time) = self._safe_run_hook( + adapter, hook, hook_name, timing, num_hooks, extra_context ) - - fire_event( - LogHookStartLine( - statement=hook_name, - index=hook.index, - total=num_hooks, - node_info=hook.node_info, - ) - ) - - with collect_timing_info("execute", timing.append): - status, message = get_execution_status(sql, adapter) - - finished_at = timing[1].completed_at or datetime.utcnow() - hook.update_event_status(finished_at=finished_at.isoformat()) - execution_time = (finished_at - started_at).total_seconds() - - if status == RunStatus.Success: - message = f"{hook_name} passed" - else: - message = f"{hook_name} failed, error:\n {message}" else: status = RunStatus.Skipped message = f"{hook_name} skipped" From 6358aff386b95c1036af0d9d17604ec75fc507dc Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 30 Oct 2024 14:23:21 -0500 Subject: [PATCH 6/6] Fix indentation of `_safe_run_hook` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In 9731eaa when splitting out the hook execution logic I screwed up the indentation of most of the logic 🙈 This fixes that --- core/dbt/task/run.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 1ea6b5d59f5..f0446728aee 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -656,31 +656,31 @@ def _safe_run_hook( with collect_timing_info("compile", timing.append): sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context) - started_at = timing[0].started_at or datetime.utcnow() - hook.update_event_status( - started_at=started_at.isoformat(), node_status=RunningStatus.Started - ) + started_at = timing[0].started_at or datetime.utcnow() + hook.update_event_status( + started_at=started_at.isoformat(), node_status=RunningStatus.Started + ) - fire_event( - LogHookStartLine( - statement=hook_name, - index=hook.index, - total=num_hooks, - node_info=hook.node_info, - ) + fire_event( + LogHookStartLine( + statement=hook_name, + index=hook.index, + total=num_hooks, + node_info=hook.node_info, ) + ) - with collect_timing_info("execute", timing.append): - status, message = get_execution_status(sql, adapter) + with collect_timing_info("execute", timing.append): + status, message = get_execution_status(sql, adapter) - finished_at = timing[1].completed_at or datetime.utcnow() - hook.update_event_status(finished_at=finished_at.isoformat()) - execution_time = (finished_at - started_at).total_seconds() + finished_at = timing[1].completed_at or datetime.utcnow() + hook.update_event_status(finished_at=finished_at.isoformat()) + execution_time = (finished_at - started_at).total_seconds() - if status == RunStatus.Success: - message = f"{hook_name} passed" - else: - message = f"{hook_name} failed, error:\n {message}" + if status == RunStatus.Success: + message = f"{hook_name} passed" + else: + message = f"{hook_name} failed, error:\n {message}" return (status, message, execution_time)