Skip to content

Commit

Permalink
Fix trigger on finish bugs, and refactor code (Netflix#2218)
Browse files Browse the repository at this point in the history
  • Loading branch information
talsperre authored Jan 23, 2025
1 parent e2b93da commit 9f028e7
Showing 1 changed file with 79 additions and 142 deletions.
221 changes: 79 additions & 142 deletions metaflow/plugins/events_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,111 +398,23 @@ def flow_init(
)
elif self.attributes["flow"]:
# flow supports the format @trigger_on_finish(flow='FooFlow')
if is_stringish(self.attributes["flow"]):
self.triggers.append(
{
"fq_name": self.attributes["flow"],
}
)
elif isinstance(self.attributes["flow"], dict):
if "name" not in self.attributes["flow"]:
raise MetaflowException(
"The *flow* attribute for *@trigger_on_finish* is missing the "
"*name* key."
)
flow_name = self.attributes["flow"]["name"]

if not is_stringish(flow_name) or "." in flow_name:
raise MetaflowException(
"The *name* attribute of the *flow* is not a valid string"
)
result = {"fq_name": flow_name}
if "project" in self.attributes["flow"]:
if is_stringish(self.attributes["flow"]["project"]):
result["project"] = self.attributes["flow"]["project"]
else:
raise MetaflowException(
"The *project* attribute of the *flow* is not a string"
)
if "project_branch" in self.attributes["flow"]:
if is_stringish(self.attributes["flow"]["project_branch"]):
result["branch"] = self.attributes["flow"]["project_branch"]
else:
raise MetaflowException(
"The *project_branch* attribute of the *flow* is not a string"
)
self.triggers.append(result)
elif callable(self.attributes["flow"]) and not isinstance(
flow = self.attributes["flow"]
if callable(flow) and not isinstance(
self.attributes["flow"], DeployTimeField
):
trig = DeployTimeField(
"fq_name", [str, dict], None, self.attributes["flow"], False
)
trig = DeployTimeField("fq_name", [str, dict], None, flow, False)
self.triggers.append(trig)
else:
raise MetaflowException(
"Incorrect type for *flow* attribute in *@trigger_on_finish* "
" decorator. Supported type is string or Dict[str, str] - \n"
"@trigger_on_finish(flow='FooFlow') or "
"@trigger_on_finish(flow={'name':'FooFlow', 'project_branch': 'branch'})"
)
self.triggers.extend(self._parse_static_triggers([flow]))
elif self.attributes["flows"]:
# flows attribute supports the following formats -
# 1. flows=['FooFlow', 'BarFlow']
if isinstance(self.attributes["flows"], list):
for flow in self.attributes["flows"]:
if is_stringish(flow):
self.triggers.append(
{
"fq_name": flow,
}
)
elif isinstance(flow, dict):
if "name" not in flow:
raise MetaflowException(
"One or more flows in the *flows* attribute for "
"*@trigger_on_finish* is missing the "
"*name* key."
)
flow_name = flow["name"]

if not is_stringish(flow_name) or "." in flow_name:
raise MetaflowException(
"The *name* attribute '%s' is not a valid string"
% str(flow_name)
)
result = {"fq_name": flow_name}
if "project" in flow:
if is_stringish(flow["project"]):
result["project"] = flow["project"]
else:
raise MetaflowException(
"The *project* attribute of the *flow* '%s' is not "
"a string" % flow_name
)
if "project_branch" in flow:
if is_stringish(flow["project_branch"]):
result["branch"] = flow["project_branch"]
else:
raise MetaflowException(
"The *project_branch* attribute of the *flow* %s "
"is not a string" % flow_name
)
self.triggers.append(result)
else:
raise MetaflowException(
"One or more flows in *flows* attribute in "
"*@trigger_on_finish* decorator have an incorrect type. "
"Supported type is string or Dict[str, str]- \n"
"@trigger_on_finish(flows=['FooFlow', 'BarFlow']"
)
elif callable(self.attributes["flows"]) and not isinstance(
self.attributes["flows"], DeployTimeField
):
trig = DeployTimeField(
"flows", list, None, self.attributes["flows"], False
)
flows = self.attributes["flows"]
if callable(flows) and not isinstance(flows, DeployTimeField):
trig = DeployTimeField("flows", list, None, flows, False)
self.triggers.append(trig)
elif isinstance(flows, list):
self.triggers.extend(self._parse_static_triggers(flows))
else:
raise MetaflowException(
"Incorrect type for *flows* attribute in *@trigger_on_finish* "
Expand All @@ -519,26 +431,7 @@ def flow_init(
for trigger in self.triggers:
if isinstance(trigger, DeployTimeField):
continue
if trigger["fq_name"].count(".") == 0:
# fully qualified name is just the flow name
trigger["flow"] = trigger["fq_name"]
elif trigger["fq_name"].count(".") >= 2:
# fully qualified name is of the format - project.branch.flow_name
trigger["project"], tail = trigger["fq_name"].split(".", maxsplit=1)
trigger["branch"], trigger["flow"] = tail.rsplit(".", maxsplit=1)
else:
raise MetaflowException(
"Incorrect format for *flow* in *@trigger_on_finish* "
"decorator. Specify either just the *flow_name* or a fully "
"qualified name like *project_name.branch_name.flow_name*."
)
# TODO: Also sanity check project and branch names
if not re.match(r"^[A-Za-z0-9_]+$", trigger["flow"]):
raise MetaflowException(
"Invalid flow name *%s* in *@trigger_on_finish* "
"decorator. Only alphanumeric characters and "
"underscores(_) are allowed." % trigger["flow"]
)
self._parse_fq_name(trigger)

self.options = self.attributes["options"]

Expand Down Expand Up @@ -593,9 +486,67 @@ def flow_init(
run_objs.append(run_obj)
current._update_env({"trigger": Trigger.from_runs(run_objs)})

@staticmethod
def _parse_static_triggers(flows):
results = []
for flow in flows:
if is_stringish(flow):
results.append(
{
"fq_name": flow,
}
)
elif isinstance(flow, dict):
if "name" not in flow:
if len(flows) > 1:
raise MetaflowException(
"One or more flows in the *flows* attribute for "
"*@trigger_on_finish* is missing the "
"*name* key."
)
raise MetaflowException(
"The *flow* attribute for *@trigger_on_finish* is missing the "
"*name* key."
)
flow_name = flow["name"]

if not is_stringish(flow_name) or "." in flow_name:
raise MetaflowException(
f"The *name* attribute of the *flow* {flow_name} is not a valid string"
)
result = {"fq_name": flow_name}
if "project" in flow:
if is_stringish(flow["project"]):
result["project"] = flow["project"]
else:
raise MetaflowException(
f"The *project* attribute of the *flow* {flow_name} is not a string"
)
if "project_branch" in flow:
if is_stringish(flow["project_branch"]):
result["branch"] = flow["project_branch"]
else:
raise MetaflowException(
f"The *project_branch* attribute of the *flow* {flow_name} is not a string"
)
results.append(result)
else:
if len(flows) > 1:
raise MetaflowException(
"One or more flows in the *flows* attribute for "
"*@trigger_on_finish* decorator have an incorrect type. "
"Supported type is string or Dict[str, str]- \n"
"@trigger_on_finish(flows=['FooFlow', 'BarFlow']"
)
raise MetaflowException(
"Incorrect type for *flow* attribute in *@trigger_on_finish* "
" decorator. Supported type is string or Dict[str, str] - \n"
"@trigger_on_finish(flow='FooFlow') or "
"@trigger_on_finish(flow={'name':'FooFlow', 'project_branch': 'branch'})"
)
return results

def _parse_fq_name(self, trigger):
if isinstance(trigger, DeployTimeField):
trigger["fq_name"] = deploy_time_eval(trigger["fq_name"])
if trigger["fq_name"].count(".") == 0:
# fully qualified name is just the flow name
trigger["flow"] = trigger["fq_name"]
Expand All @@ -615,32 +566,18 @@ def _parse_fq_name(self, trigger):
"decorator. Only alphanumeric characters and "
"underscores(_) are allowed." % trigger["flow"]
)
return trigger

def format_deploytime_value(self):
for trigger in self.triggers:
# Case were trigger is a function that returns a list
# Need to do this bc we need to iterate over list and process
if isinstance(trigger, DeployTimeField):
deploy_value = deploy_time_eval(trigger)
if isinstance(deploy_value, list):
self.triggers = deploy_value
if len(self.triggers) == 1 and isinstance(self.triggers[0], DeployTimeField):
deploy_value = deploy_time_eval(self.triggers[0])
if isinstance(deploy_value, list):
self.triggers = deploy_value
else:
break
for trigger in self.triggers:
# Entire trigger is a function (returns either string or dict)
old_trig = trigger
if isinstance(trigger, DeployTimeField):
trigger = deploy_time_eval(trigger)
if isinstance(trigger, dict):
trigger["fq_name"] = trigger.get("name")
trigger["project"] = trigger.get("project")
trigger["branch"] = trigger.get("branch")
# We also added this bc it won't be formatted yet
if isinstance(trigger, str):
trigger = {"fq_name": trigger}
trigger = self._parse_fq_name(trigger)
self.triggers[self.triggers.index(old_trig)] = trigger
self.triggers = [deploy_value]
triggers = self._parse_static_triggers(self.triggers)
for trigger in triggers:
self._parse_fq_name(trigger)
self.triggers = triggers

def get_top_level_options(self):
return list(self._option_values.items())

0 comments on commit 9f028e7

Please sign in to comment.