From a0372fc0780fc5d11a5d52b748d64970c9f169b5 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Wed, 20 Mar 2024 16:06:14 -0400 Subject: [PATCH] Handle exceptions during node execution more elegantly. (#9585) (#9778) * Handle exceptions during node execution more elegantly. * Add changelog entry. * Fix import * Add task documentation. * Change event type for noting thread exceptions. Co-authored-by: Peter Webb --- .../unreleased/Fixes-20240216-145632.yaml | 6 +++ core/dbt/task/README.md | 42 +++++++++++++++++++ core/dbt/task/runnable.py | 38 ++++++++++++++--- 3 files changed, 81 insertions(+), 5 deletions(-) create mode 100644 .changes/unreleased/Fixes-20240216-145632.yaml diff --git a/.changes/unreleased/Fixes-20240216-145632.yaml b/.changes/unreleased/Fixes-20240216-145632.yaml new file mode 100644 index 00000000000..a02027f66a5 --- /dev/null +++ b/.changes/unreleased/Fixes-20240216-145632.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Tighten exception handling to avoid worker thread hangs. +time: 2024-02-16T14:56:32.858967-05:00 +custom: + Author: peterallenwebb + Issue: "9583" diff --git a/core/dbt/task/README.md b/core/dbt/task/README.md index 9de939e4cc4..2b32f5dbfa8 100644 --- a/core/dbt/task/README.md +++ b/core/dbt/task/README.md @@ -1 +1,43 @@ # Task README + +### Task Hierarchy +``` +BaseTask + ┣ CleanTask + ┣ ConfiguredTask + ┃ ┣ GraphRunnableTask + ┃ ┃ ┣ CloneTask + ┃ ┃ ┣ CompileTask + ┃ ┃ ┃ ┣ GenerateTask + ┃ ┃ ┃ ┣ RunTask + ┃ ┃ ┃ ┃ ┣ BuildTask + ┃ ┃ ┃ ┃ ┣ FreshnessTask + ┃ ┃ ┃ ┃ ┣ SeedTask + ┃ ┃ ┃ ┃ ┣ SnapshotTask + ┃ ┃ ┃ ┃ ┗ TestTask + ┃ ┃ ┃ ┗ ShowTask + ┃ ┃ ┗ ListTask + ┃ ┣ RetryTask + ┃ ┣ RunOperationTask + ┃ ┗ ServeTask + ┣ DebugTask + ┣ DepsTask + ┗ InitTask +``` + +### Runner Hierarchy +``` +BaseRunner + ┣ CloneRunner + ┣ CompileRunner + ┃ ┣ GenericSqlRunner + ┃ ┃ ┣ SqlCompileRunner + ┃ ┃ ┗ SqlExecuteRunner + ┃ ┣ ModelRunner + ┃ ┃ ┣ SeedRunner + ┃ ┃ ┗ SnapshotRunner + ┃ ┣ ShowRunner + ┃ ┗ TestRunner + ┣ FreshnessRunner + ┗ SavedQueryRunner +``` diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 242e155a4d7..e9ea21f661d 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -35,6 +35,7 @@ ConcurrencyLine, EndRunResult, NothingToDo, + GenericExceptionOnRun, ) from dbt.exceptions import ( DbtInternalError, @@ -204,17 +205,44 @@ def call_runner(self, runner: BaseRunner) -> RunResult: ) ) status: Dict[str, str] = {} + result = None + thread_exception = None try: result = runner.run_with_hooks(self.manifest) + except Exception as e: + thread_exception = e finally: finishctx = TimestampNamed("finished_at") with finishctx, DbtModelState(status): - fire_event( - NodeFinished( - node_info=runner.node.node_info, - run_result=result.to_msg_dict(), + if result is not None: + fire_event( + NodeFinished( + node_info=runner.node.node_info, + run_result=result.to_msg_dict(), + ) ) - ) + else: + msg = f"Exception on worker thread. {thread_exception}" + + fire_event( + GenericExceptionOnRun( + unique_id=runner.node.unique_id, + exc=str(thread_exception), + node_info=runner.node.node_info, + ) + ) + + result = RunResult( + status=RunStatus.Error, # type: ignore + timing=[], + thread_id="", + execution_time=0.0, + adapter_response={}, + message=msg, + failures=None, + node=runner.node, + ) + # `_event_status` dict is only used for logging. Make sure # it gets deleted when we're done with it runner.node.clear_event_status()