Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into fix/reload_runner_p…
Browse files Browse the repository at this point in the history
…arameters
  • Loading branch information
wangchy27 committed Jan 17, 2025
2 parents 3bc1127 + e1ae62c commit 99ff7c5
Show file tree
Hide file tree
Showing 30 changed files with 584 additions and 126 deletions.
6 changes: 3 additions & 3 deletions docs/cards.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ PATH_TO_CUSTOM_HTML = 'myhtml.html'
class CustomCard(MetaflowCard):
type = "custom_card"

def __init__(self, options={"no_header": True}, graph=None,components=[]):
def __init__(self, options={"no_header": True}, graph=None, components=[], flow=None, **kwargs):
super().__init__()
self._no_header = True
self._graph = graph
Expand Down Expand Up @@ -177,7 +177,7 @@ class CustomCard(MetaflowCard):

HTML = "<html><head></head><body>{data}<body></html>"

def __init__(self, options={"no_header": True}, graph=None,components=[]):
def __init__(self, options={"no_header": True}, graph=None, components=[], flow=None, **kwargs):
super().__init__()
self._no_header = True
self._graph = graph
Expand Down Expand Up @@ -276,7 +276,7 @@ class YCard(MetaflowCard):

ALLOW_USER_COMPONENTS = True

def __init__(self, options={}, components=[], graph=None):
def __init__(self, options={}, components=[], graph=None, flow=None, **kwargs):
self._components = components

def render(self, task):
Expand Down
1 change: 1 addition & 0 deletions metaflow/cards.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
Markdown,
VegaChart,
ProgressBar,
PythonCode,
)
from metaflow.plugins.cards.card_modules.basic import (
DefaultCard,
Expand Down
5 changes: 5 additions & 0 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,11 @@ def start(
# *after* the run decospecs so that they don't take precedence. In other
# words, for the same decorator, we want `myflow.py run --with foo` to
# take precedence over any other `foo` decospec

# Note that top-level decospecs are used primarily with non run/resume
# options as well as with the airflow/argo/sfn integrations which pass
# all the decospecs (the ones from top-level but also the ones from the
# run/resume level) through the tl decospecs.
ctx.obj.tl_decospecs = list(decospecs or [])

# initialize current and parameter context for deploy-time parameters
Expand Down
2 changes: 2 additions & 0 deletions metaflow/cli_components/run_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def before_run(obj, tags, decospecs):
+ list(obj.environment.decospecs() or [])
)
if all_decospecs:
# These decospecs are the ones from run/resume PLUS the ones from the
# environment (for example the @conda)
decorators._attach_decorators(obj.flow, all_decospecs)
decorators._init(obj.flow)
# Regenerate graph if we attached more decorators
Expand Down
13 changes: 0 additions & 13 deletions metaflow/cli_components/step_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,6 @@
default=None,
help="Run id of the origin flow, if this task is part of a flow being resumed.",
)
@click.option(
"--with",
"decospecs",
multiple=True,
help="Add a decorator to this task. You can specify this "
"option multiple times to attach multiple decorators "
"to this task.",
)
@click.option(
"--ubf-context",
default="none",
Expand Down Expand Up @@ -112,7 +104,6 @@ def step(
max_user_code_retries=None,
clone_only=None,
clone_run_id=None,
decospecs=None,
ubf_context="none",
num_parallel=None,
):
Expand All @@ -136,10 +127,6 @@ def step(
raise CommandException("Function *%s* is not a step." % step_name)
echo("Executing a step, *%s*" % step_name, fg="magenta", bold=False)

if decospecs:
decorators._attach_decorators_to_step(func, decospecs)
decorators._init(ctx.obj.flow)

step_kwargs = ctx.params
# Remove argument `step_name` from `step_kwargs`.
step_kwargs.pop("step_name", None)
Expand Down
4 changes: 2 additions & 2 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ def init(self):
return

# Note that by design, later values override previous ones.
self.attributes = unpack_delayed_evaluator(self.attributes)
self._user_defined_attributes.update(self.attributes.keys())
self.attributes, new_user_attributes = unpack_delayed_evaluator(self.attributes)
self._user_defined_attributes.update(new_user_attributes)
self.attributes = resolve_delayed_evaluator(self.attributes)

self._ran_init = True
Expand Down
4 changes: 3 additions & 1 deletion metaflow/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ def init(self, ignore_errors=False):
)

# Resolve any value from configurations
self.kwargs = unpack_delayed_evaluator(self.kwargs, ignore_errors=ignore_errors)
self.kwargs, _ = unpack_delayed_evaluator(
self.kwargs, ignore_errors=ignore_errors
)
# Do it one item at a time so errors are ignored at that level (as opposed to
# at the entire kwargs level)
self.kwargs = {
Expand Down
5 changes: 5 additions & 0 deletions metaflow/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
("argo-workflows", ".argo.argo_workflows_cli.cli"),
("card", ".cards.card_cli.cli"),
("tag", ".tag_cli.cli"),
("spot-metadata", ".kubernetes.spot_metadata_cli.cli"),
("logs", ".logs_cli.cli"),
]

Expand Down Expand Up @@ -104,6 +105,10 @@
"save_logs_periodically",
"..mflog.save_logs_periodically.SaveLogsPeriodicallySidecar",
),
(
"spot_termination_monitor",
".kubernetes.spot_monitor_sidecar.SpotTerminationMonitorSidecar",
),
("heartbeat", "metaflow.metadata_provider.heartbeat.MetadataHeartBeat"),
]

Expand Down
21 changes: 15 additions & 6 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,16 @@ def _process_triggers(self):
for event in trigger_on_finish_deco.triggers:
# Actual filters are deduced here since we don't have access to
# the current object in the @trigger_on_finish decorator.
project_name = event.get("project") or current.get("project_name")
branch_name = event.get("branch") or current.get("branch_name")
# validate that we have complete project info for an event name
if project_name or branch_name:
if not (project_name and branch_name):
# if one of the two is missing, we would end up listening to an event that will never be broadcast.
raise ArgoWorkflowsException(
"Incomplete project info. Please specify both 'project' and 'project_branch' or use the @project decorator"
)

triggers.append(
{
# Make sure this remains consistent with the event name format
Expand All @@ -632,18 +642,16 @@ def _process_triggers(self):
% ".".join(
v
for v in [
event.get("project") or current.get("project_name"),
event.get("branch") or current.get("branch_name"),
project_name,
branch_name,
event["flow"],
]
if v
),
"filters": {
"auto-generated-by-metaflow": True,
"project_name": event.get("project")
or current.get("project_name"),
"branch_name": event.get("branch")
or current.get("branch_name"),
"project_name": project_name,
"branch_name": branch_name,
# TODO: Add a time filters to guard against cached events
},
"type": "run",
Expand Down Expand Up @@ -1705,6 +1713,7 @@ def _container_templates(self):
},
**{
# Some optional values for bookkeeping
"METAFLOW_FLOW_FILENAME": os.path.basename(sys.argv[0]),
"METAFLOW_FLOW_NAME": self.flow.name,
"METAFLOW_STEP_NAME": node.name,
"METAFLOW_RUN_ID": run_id,
Expand Down
9 changes: 7 additions & 2 deletions metaflow/plugins/cards/card_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,10 +691,15 @@ def create(
try:
if options is not None:
mf_card = filtered_card(
options=options, components=component_arr, graph=graph_dict
options=options,
components=component_arr,
graph=graph_dict,
flow=ctx.obj.flow,
)
else:
mf_card = filtered_card(components=component_arr, graph=graph_dict)
mf_card = filtered_card(
components=component_arr, graph=graph_dict, flow=ctx.obj.flow
)
except TypeError as e:
if render_error_card:
mf_card = None
Expand Down
73 changes: 53 additions & 20 deletions metaflow/plugins/cards/card_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ class CardDecorator(StepDecorator):

card_creator = None

_config_values = None

_config_file_name = None

task_finished_decos = 0

def __init__(self, *args, **kwargs):
super(CardDecorator, self).__init__(*args, **kwargs)
self._task_datastore = None
Expand Down Expand Up @@ -106,6 +112,25 @@ def _set_card_counts_per_step(cls, step_name, total_count):
def _increment_step_counter(cls):
cls.step_counter += 1

@classmethod
def _increment_completed_counter(cls):
cls.task_finished_decos += 1

@classmethod
def _set_config_values(cls, config_values):
cls._config_values = config_values

@classmethod
def _set_config_file_name(cls, flow):
# Only create a config file from the very first card decorator.
if cls._config_values and not cls._config_file_name:
with tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", delete=False
) as config_file:
config_value = dump_config_values(flow)
json.dump(config_value, config_file)
cls._config_file_name = config_file.name

def step_init(
self, flow, graph, step_name, decorators, environment, flow_datastore, logger
):
Expand All @@ -116,11 +141,13 @@ def step_init(

# We check for configuration options. We do this here before they are
# converted to properties.
self._config_values = [
(config.name, ConfigInput.make_key_name(config.name))
for _, config in flow._get_parameters()
if config.IS_CONFIG_PARAMETER
]
self._set_config_values(
[
(config.name, ConfigInput.make_key_name(config.name))
for _, config in flow._get_parameters()
if config.IS_CONFIG_PARAMETER
]
)

self.card_options = self.attributes["options"]

Expand Down Expand Up @@ -159,15 +186,11 @@ def task_pre_step(

# If we have configs, we need to dump them to a file so we can re-use them
# when calling the card creation subprocess.
if self._config_values:
with tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", delete=False
) as config_file:
config_value = dump_config_values(flow)
json.dump(config_value, config_file)
self._config_file_name = config_file.name
else:
self._config_file_name = None
# Since a step can contain multiple card decorators, and all the card creation processes
# will reference the same config file (because of how the CardCreator is created (only single class instance)),
# we need to ensure that a single config file is being referenced for all card create commands.
# This config file will be removed when the last card decorator has finished creating its card.
self._set_config_file_name(flow)

card_type = self.attributes["type"]
card_class = get_card_class(card_type)
Expand Down Expand Up @@ -246,12 +269,7 @@ def task_finished(
self.card_creator.create(mode="render", final=True, **create_options)
self.card_creator.create(mode="refresh", final=True, **create_options)

# Unlink the config file if it exists
if self._config_file_name:
try:
os.unlink(self._config_file_name)
except Exception as e:
pass
self._cleanup(step_name)

@staticmethod
def _options(mapping):
Expand Down Expand Up @@ -286,3 +304,18 @@ def _create_top_level_args(self, flow):
top_level_options["local-config-file"] = self._config_file_name

return list(self._options(top_level_options))

def task_exception(
self, exception, step_name, flow, graph, retry_count, max_user_code_retries
):
self._cleanup(step_name)

def _cleanup(self, step_name):
self._increment_completed_counter()
if self.task_finished_decos == self.total_decos_on_step[step_name]:
# Unlink the config file if it exists
if self._config_file_name:
try:
os.unlink(self._config_file_name)
except Exception as e:
pass
Loading

0 comments on commit 99ff7c5

Please sign in to comment.