Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Independent decorator task exec lifecycles #2231

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,6 @@ def task_finished(
# local file system after the user code has finished execution.
# This happens via datastore as a communication bridge.

# TODO: There is no guarantee that task_prestep executes before
# task_finished is invoked. That will result in AttributeError:
# 'KubernetesDecorator' object has no attribute 'metadata' error.
if self.metadata.TYPE == "local":
# Note that the datastore is *always* Amazon S3 (see
# runtime_task_created function).
Expand Down
99 changes: 67 additions & 32 deletions metaflow/task.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
from __future__ import print_function
from io import BytesIO
import math
import sys
import os
import time
import traceback

from types import MethodType, FunctionType

from metaflow.sidecar import Message, MessageTypes
from metaflow.datastore.exceptions import DataException

from .metaflow_config import MAX_ATTEMPTS
from .metadata_provider import MetaDatum
from .mflog import TASK_LOG_SOURCE
from .datastore import Inputs, TaskDataStoreSet
from .exception import (
MetaflowInternalError,
Expand Down Expand Up @@ -616,49 +610,90 @@ def run_step(
"graph_info": self.flow._graph_info,
}
)

task_pre_step_deco_errors = set()
for deco in decorators:
deco.task_pre_step(
step_name,
output,
self.metadata,
run_id,
task_id,
self.flow,
self.flow._graph,
retry_count,
max_user_code_retries,
self.ubf_context,
inputs,
try:
deco.task_pre_step(
step_name,
output,
self.metadata,
run_id,
task_id,
self.flow,
self.flow._graph,
retry_count,
max_user_code_retries,
self.ubf_context,
inputs,
)
except Exception as ex:
task_pre_step_deco_errors.add((deco.name, str(ex)))
if task_pre_step_deco_errors:
raise MetaflowInternalError(
f"Exceptions were encountered during **task_pre_step* with the *{step_name}* step decorator:\n"
+ "\n".join(
[
f"decorator: {name}\nerror: {err}"
for name, err in task_pre_step_deco_errors
]
)
)

task_decorate_deco_errors = set()
for deco in decorators:
# decorators can actually decorate the step function,
# or they can replace it altogether. This functionality
# is used e.g. by catch_decorator which switches to a
# fallback code if the user code has failed too many
# times.
step_func = deco.task_decorate(
step_func,
self.flow,
self.flow._graph,
retry_count,
max_user_code_retries,
self.ubf_context,
try:
step_func = deco.task_decorate(
step_func,
self.flow,
self.flow._graph,
retry_count,
max_user_code_retries,
self.ubf_context,
)
except Exception as ex:
task_decorate_deco_errors.add((deco.name, str(ex)))
if task_decorate_deco_errors:
raise MetaflowInternalError(
f"Exceptions were encountered during **task_decorate* with the *{step_name}* step decorator:\n"
+ "\n".join(
[
f"decorator: {name}\nerror: {err}"
for name, err in task_decorate_deco_errors
]
)
)

if join_type:
self._exec_step_function(step_func, input_obj)
else:
self._exec_step_function(step_func)

task_post_step_deco_errors = set()
for deco in decorators:
deco.task_post_step(
step_name,
self.flow,
self.flow._graph,
retry_count,
max_user_code_retries,
try:
deco.task_post_step(
step_name,
self.flow,
self.flow._graph,
retry_count,
max_user_code_retries,
)
except Exception as ex:
task_post_step_deco_errors.add((deco.name, str(ex)))
if task_post_step_deco_errors:
raise MetaflowInternalError(
f"Exceptions were encountered during **task_post_step* with the *{step_name}* step decorator:\n"
+ "\n".join(
[
f"decorator: {name}\nerror: {err}"
for name, err in task_post_step_deco_errors
]
)
)

self.flow._task_ok = True
Expand Down
Loading