From 9bf1bb7b3566692d807a15e8b100de988bc71247 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Wed, 29 Jan 2025 12:42:55 +0200 Subject: [PATCH 1/2] add check for metadata in kube deco --- metaflow/plugins/kubernetes/kubernetes_decorator.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 551feecd1ee..a72bf2a5621 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -600,10 +600,9 @@ def task_finished( # local file system after the user code has finished execution. # This happens via datastore as a communication bridge. - # TODO: There is no guarantee that task_prestep executes before - # task_finished is invoked. That will result in AttributeError: - # 'KubernetesDecorator' object has no attribute 'metadata' error. - if self.metadata.TYPE == "local": + # TODO: There is no guarantee that task_pre_step executes before + # task_finished is invoked. This currently results in us not having access to the metadata, as it is bound in the pre_step + if hasattr(self, "metadata") and self.metadata.TYPE == "local": # Note that the datastore is *always* Amazon S3 (see # runtime_task_created function). sync_local_metadata_to_datastore( From d33c315f81d950d57dd316a9120b6e600677db31 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Wed, 29 Jan 2025 14:17:40 +0200 Subject: [PATCH 2/2] make runtime decorator lifecycles execute independently to guarantee initialization on a per-deco level --- .../kubernetes/kubernetes_decorator.py | 4 +- metaflow/task.py | 99 +++++++++++++------ 2 files changed, 68 insertions(+), 35 deletions(-) diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index a72bf2a5621..2dc3d44ef2f 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -600,9 +600,7 @@ def task_finished( # local file system after the user code has finished execution. # This happens via datastore as a communication bridge. - # TODO: There is no guarantee that task_pre_step executes before - # task_finished is invoked. This currently results in us not having access to the metadata, as it is bound in the pre_step - if hasattr(self, "metadata") and self.metadata.TYPE == "local": + if self.metadata.TYPE == "local": # Note that the datastore is *always* Amazon S3 (see # runtime_task_created function). sync_local_metadata_to_datastore( diff --git a/metaflow/task.py b/metaflow/task.py index 6b73302652b..9d526c05863 100644 --- a/metaflow/task.py +++ b/metaflow/task.py @@ -1,6 +1,4 @@ from __future__ import print_function -from io import BytesIO -import math import sys import os import time @@ -8,12 +6,8 @@ from types import MethodType, FunctionType -from metaflow.sidecar import Message, MessageTypes -from metaflow.datastore.exceptions import DataException - from .metaflow_config import MAX_ATTEMPTS from .metadata_provider import MetaDatum -from .mflog import TASK_LOG_SOURCE from .datastore import Inputs, TaskDataStoreSet from .exception import ( MetaflowInternalError, @@ -616,35 +610,62 @@ def run_step( "graph_info": self.flow._graph_info, } ) - + task_pre_step_deco_errors = set() for deco in decorators: - deco.task_pre_step( - step_name, - output, - self.metadata, - run_id, - task_id, - self.flow, - self.flow._graph, - retry_count, - max_user_code_retries, - self.ubf_context, - inputs, + try: + deco.task_pre_step( + step_name, + output, + self.metadata, + run_id, + task_id, + self.flow, + self.flow._graph, + retry_count, + max_user_code_retries, + self.ubf_context, + inputs, + ) + except Exception as ex: + task_pre_step_deco_errors.add((deco.name, str(ex))) + if task_pre_step_deco_errors: + raise MetaflowInternalError( + f"Exceptions were encountered during **task_pre_step* with the *{step_name}* step decorator:\n" + + "\n".join( + [ + f"decorator: {name}\nerror: {err}" + for name, err in task_pre_step_deco_errors + ] + ) ) + task_decorate_deco_errors = set() for deco in decorators: # decorators can actually decorate the step function, # or they can replace it altogether. This functionality # is used e.g. by catch_decorator which switches to a # fallback code if the user code has failed too many # times. - step_func = deco.task_decorate( - step_func, - self.flow, - self.flow._graph, - retry_count, - max_user_code_retries, - self.ubf_context, + try: + step_func = deco.task_decorate( + step_func, + self.flow, + self.flow._graph, + retry_count, + max_user_code_retries, + self.ubf_context, + ) + except Exception as ex: + task_decorate_deco_errors.add((deco.name, str(ex))) + if task_decorate_deco_errors: + raise MetaflowInternalError( + f"Exceptions were encountered during **task_decorate* with the *{step_name}* step decorator:\n" + + "\n".join( + [ + f"decorator: {name}\nerror: {err}" + for name, err in task_decorate_deco_errors + ] + ) ) if join_type: @@ -652,13 +673,27 @@ def run_step( else: self._exec_step_function(step_func) + task_post_step_deco_errors = set() for deco in decorators: - deco.task_post_step( - step_name, - self.flow, - self.flow._graph, - retry_count, - max_user_code_retries, + try: + deco.task_post_step( + step_name, + self.flow, + self.flow._graph, + retry_count, + max_user_code_retries, + ) + except Exception as ex: + task_post_step_deco_errors.add((deco.name, str(ex))) + if task_post_step_deco_errors: + raise MetaflowInternalError( + f"Exceptions were encountered during **task_post_step* with the *{step_name}* step decorator:\n" + + "\n".join( + [ + f"decorator: {name}\nerror: {err}" + for name, err in task_post_step_deco_errors + ] + ) ) self.flow._task_ok = True