Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tidy-First] Refactor safe run hooks #10944

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 86 additions & 73 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,96 +644,109 @@
hooks.sort(key=self._hook_keyfunc)
return hooks

def _safe_run_hook(
self,
adapter: BaseAdapter,
hook: HookNode,
hook_name: str,
timing: List[TimingInfo],
num_hooks: int,
extra_context: Dict[str, Any],
) -> Tuple[RunStatus, str, float]:
with collect_timing_info("compile", timing.append):
sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context)

Check warning on line 657 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L656-L657

Added lines #L656 - L657 were not covered by tests

started_at = timing[0].started_at or datetime.utcnow()
hook.update_event_status(

Check warning on line 660 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L659-L660

Added lines #L659 - L660 were not covered by tests
started_at=started_at.isoformat(), node_status=RunningStatus.Started
)

fire_event(

Check warning on line 664 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L664

Added line #L664 was not covered by tests
LogHookStartLine(
statement=hook_name,
index=hook.index,
total=num_hooks,
node_info=hook.node_info,
)
)

with collect_timing_info("execute", timing.append):
status, message = get_execution_status(sql, adapter)

Check warning on line 674 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L673-L674

Added lines #L673 - L674 were not covered by tests

finished_at = timing[1].completed_at or datetime.utcnow()
hook.update_event_status(finished_at=finished_at.isoformat())
execution_time = (finished_at - started_at).total_seconds()

Check warning on line 678 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L676-L678

Added lines #L676 - L678 were not covered by tests

if status == RunStatus.Success:
message = f"{hook_name} passed"

Check warning on line 681 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L680-L681

Added lines #L680 - L681 were not covered by tests
else:
message = f"{hook_name} failed, error:\n {message}"

Check warning on line 683 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L683

Added line #L683 was not covered by tests

return (status, message, execution_time)

Check warning on line 685 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L685

Added line #L685 was not covered by tests

def safe_run_hooks(
self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any]
self,
adapter: BaseAdapter,
hook_type: RunHookType,
extra_context: Dict[str, Any],
) -> RunStatus:
ordered_hooks = self.get_hooks_by_type(hook_type)
status = RunStatus.Success

Check warning on line 694 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L694

Added line #L694 was not covered by tests

if hook_type == RunHookType.End and ordered_hooks:
fire_event(Formatting(""))
if ordered_hooks:
if hook_type == RunHookType.End:
fire_event(Formatting(""))

Check warning on line 698 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L696-L698

Added lines #L696 - L698 were not covered by tests

# on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created.
adapter.clear_transaction()
if not ordered_hooks:
return RunStatus.Success
# on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created.
adapter.clear_transaction()

Check warning on line 701 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L701

Added line #L701 was not covered by tests

status = RunStatus.Success
failed = False
num_hooks = len(ordered_hooks)

for idx, hook in enumerate(ordered_hooks, 1):
with log_contextvars(node_info=hook.node_info):
hook.index = idx
hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}"
execution_time = 0.0
timing: List[TimingInfo] = []
failures = 1

if not failed:
with collect_timing_info("compile", timing.append):
sql = self.get_hook_sql(
adapter, hook, hook.index, num_hooks, extra_context
)
num_hooks = len(ordered_hooks)

Check warning on line 703 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L703

Added line #L703 was not covered by tests

started_at = timing[0].started_at or datetime.utcnow()
hook.update_event_status(
started_at=started_at.isoformat(), node_status=RunningStatus.Started
for idx, hook in enumerate(ordered_hooks, 1):
with log_contextvars(node_info=hook.node_info):
hook.index = idx
hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}"
execution_time = 0.0
timing: List[TimingInfo] = []

Check warning on line 710 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L705-L710

Added lines #L705 - L710 were not covered by tests

# Only run this hook if the previous hook succeeded
# otherwise, skip it
if status == RunStatus.Success:
(status, message, execution_time) = self._safe_run_hook(

Check warning on line 715 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L714-L715

Added lines #L714 - L715 were not covered by tests
adapter, hook, hook_name, timing, num_hooks, extra_context
)
else:
status = RunStatus.Skipped
message = f"{hook_name} skipped"

Check warning on line 720 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L719-L720

Added lines #L719 - L720 were not covered by tests

hook.update_event_status(node_status=status)

Check warning on line 722 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L722

Added line #L722 was not covered by tests

self.node_results.append(

Check warning on line 724 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L724

Added line #L724 was not covered by tests
RunResult(
status=status,
thread_id="main",
timing=timing,
message=message,
adapter_response={},
execution_time=execution_time,
failures=0 if status == RunStatus.Success else 1,
node=hook,
)
)

fire_event(
LogHookStartLine(
LogHookEndLine(
statement=hook_name,
status=status,
index=hook.index,
total=num_hooks,
execution_time=execution_time,
node_info=hook.node_info,
)
)

with collect_timing_info("execute", timing.append):
status, message = get_execution_status(sql, adapter)

finished_at = timing[1].completed_at or datetime.utcnow()
hook.update_event_status(finished_at=finished_at.isoformat())
execution_time = (finished_at - started_at).total_seconds()
failures = 0 if status == RunStatus.Success else 1

if status == RunStatus.Success:
message = f"{hook_name} passed"
else:
message = f"{hook_name} failed, error:\n {message}"
failed = True
else:
status = RunStatus.Skipped
message = f"{hook_name} skipped"

hook.update_event_status(node_status=status)

self.node_results.append(
RunResult(
status=status,
thread_id="main",
timing=timing,
message=message,
adapter_response={},
execution_time=execution_time,
failures=failures,
node=hook,
)
)

fire_event(
LogHookEndLine(
statement=hook_name,
status=status,
index=hook.index,
total=num_hooks,
execution_time=execution_time,
node_info=hook.node_info,
)
)

if hook_type == RunHookType.Start and ordered_hooks:
fire_event(Formatting(""))
if hook_type == RunHookType.Start:
fire_event(Formatting(""))

Check warning on line 749 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L748-L749

Added lines #L748 - L749 were not covered by tests

return status

Expand Down
Loading