Skip to content

Commit

Permalink
Handle exceptions during node execution more elegantly. (#9585) (#9778)
Browse files Browse the repository at this point in the history
* Handle exceptions during node execution more elegantly.

* Add changelog entry.

* Fix import

* Add task documentation.

* Change event type for noting thread exceptions.

Co-authored-by: Peter Webb <[email protected]>
  • Loading branch information
gshank and peterallenwebb authored Mar 20, 2024
1 parent 34a97c0 commit a0372fc
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 5 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240216-145632.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Tighten exception handling to avoid worker thread hangs.
time: 2024-02-16T14:56:32.858967-05:00
custom:
Author: peterallenwebb
Issue: "9583"
42 changes: 42 additions & 0 deletions core/dbt/task/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,43 @@
# Task README

### Task Hierarchy
```
BaseTask
┣ CleanTask
┣ ConfiguredTask
┃ ┣ GraphRunnableTask
┃ ┃ ┣ CloneTask
┃ ┃ ┣ CompileTask
┃ ┃ ┃ ┣ GenerateTask
┃ ┃ ┃ ┣ RunTask
┃ ┃ ┃ ┃ ┣ BuildTask
┃ ┃ ┃ ┃ ┣ FreshnessTask
┃ ┃ ┃ ┃ ┣ SeedTask
┃ ┃ ┃ ┃ ┣ SnapshotTask
┃ ┃ ┃ ┃ ┗ TestTask
┃ ┃ ┃ ┗ ShowTask
┃ ┃ ┗ ListTask
┃ ┣ RetryTask
┃ ┣ RunOperationTask
┃ ┗ ServeTask
┣ DebugTask
┣ DepsTask
┗ InitTask
```

### Runner Hierarchy
```
BaseRunner
┣ CloneRunner
┣ CompileRunner
┃ ┣ GenericSqlRunner
┃ ┃ ┣ SqlCompileRunner
┃ ┃ ┗ SqlExecuteRunner
┃ ┣ ModelRunner
┃ ┃ ┣ SeedRunner
┃ ┃ ┗ SnapshotRunner
┃ ┣ ShowRunner
┃ ┗ TestRunner
┣ FreshnessRunner
┗ SavedQueryRunner
```
38 changes: 33 additions & 5 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
ConcurrencyLine,
EndRunResult,
NothingToDo,
GenericExceptionOnRun,
)
from dbt.exceptions import (
DbtInternalError,
Expand Down Expand Up @@ -204,17 +205,44 @@ def call_runner(self, runner: BaseRunner) -> RunResult:
)
)
status: Dict[str, str] = {}
result = None
thread_exception = None
try:
result = runner.run_with_hooks(self.manifest)
except Exception as e:
thread_exception = e
finally:
finishctx = TimestampNamed("finished_at")
with finishctx, DbtModelState(status):
fire_event(
NodeFinished(
node_info=runner.node.node_info,
run_result=result.to_msg_dict(),
if result is not None:
fire_event(
NodeFinished(
node_info=runner.node.node_info,
run_result=result.to_msg_dict(),
)
)
)
else:
msg = f"Exception on worker thread. {thread_exception}"

fire_event(
GenericExceptionOnRun(
unique_id=runner.node.unique_id,
exc=str(thread_exception),
node_info=runner.node.node_info,
)
)

result = RunResult(
status=RunStatus.Error, # type: ignore
timing=[],
thread_id="",
execution_time=0.0,
adapter_response={},
message=msg,
failures=None,
node=runner.node,
)

# `_event_status` dict is only used for logging. Make sure
# it gets deleted when we're done with it
runner.node.clear_event_status()
Expand Down

0 comments on commit a0372fc

Please sign in to comment.