-
Notifications
You must be signed in to change notification settings - Fork 787
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add static and runtime dag info, API to fetch ancestor and successor tasks #2124
base: master
Are you sure you want to change the base?
Conversation
48c771d
to
ec43f14
Compare
metaflow/metadata/metadata.py
Outdated
@classmethod | ||
def _filter_tasks_by_metadata( | ||
cls, flow_id, run_id, query_step, field_name, field_value | ||
): | ||
raise NotImplementedError() | ||
|
||
@classmethod | ||
def filter_tasks_by_metadata( | ||
cls, flow_id, run_id, query_step, field_name, field_value | ||
): | ||
# TODO: Do we need to do anything wrt to task attempt? | ||
task_ids = cls._filter_tasks_by_metadata( | ||
flow_id, run_id, query_step, field_name, field_value | ||
) | ||
return task_ids | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a need for the private method, or could this simply be contained in the public-facing one? right now its not doing anything before calling the private one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, did you have an implementation of this for service.py
yet?
metaflow/metadata/metadata.py
Outdated
def filter_tasks_by_metadata( | ||
cls, flow_id, run_id, query_step, field_name, field_value | ||
): | ||
# TODO: Do we need to do anything wrt to task attempt? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably not, as the ancestors for task attempts should be identical, right? What about the immediate_siblings
though, will they include or exclude attempts of the same task?
ffbf68a
to
c6fb9ac
Compare
d66d32b
to
7644058
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments. I think it's pretty close though. I haven't looked at hte metadata service changes. We may also want to raise a better error message if the service is not new enough?
metaflow/client/core.py
Outdated
run_id: str, | ||
cur_foreach_stack_len: int, | ||
steps: List[str], | ||
query_type: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would just use a boolean. Something like is_ancestor
. These are internal functions anyways and slightly more efficient to use bools :)
@@ -1123,6 +1124,202 @@ def _iter_filter(self, x): | |||
# exclude private data artifacts | |||
return x.id[0] != "_" | |||
|
|||
def _get_task_for_queried_step(self, flow_id, run_id, query_step): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this function simply be replaced by Step(flow_id/run_id/step_name).task
? It doesn't seem to bring much more to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wasn't aware that this returns the first available task.
if query_foreach_stack_len == cur_foreach_stack_len: | ||
# The successor or ancestor tasks belong to the same foreach stack level | ||
field_name = "foreach-indices" | ||
field_value = self.metadata_dict.get(field_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't currently cache metadata_dict so either we could fix that or cache it here to avoid making multiple calls to the metadata service and then sorts. It would need to be cached across _get_related_tasks and this function.
metaflow/client/core.py
Outdated
# Current Task: foreach-indices = [0, 1, 2], foreach-indices-truncated = [0, 1] | ||
# Ancestor Task: foreach-indices = [0, 1], foreach-indices-truncated = [0] | ||
# We will compare the foreach-indices value of ancestor task with the | ||
# foreach-indices value of current task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: foreach-indices-truncated value of the current task
metaflow/client/core.py
Outdated
return field_name, field_value | ||
|
||
def _get_related_tasks(self, relation_type: str) -> Dict[str, List[str]]: | ||
start_time = time.time() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used -- can strip or use.
@@ -248,8 +248,7 @@ | |||
# Default container registry | |||
DEFAULT_CONTAINER_REGISTRY = from_conf("DEFAULT_CONTAINER_REGISTRY") | |||
# Controls whether to include foreach stack information in metadata. | |||
# TODO(Darin, 05/01/24): Remove this flag once we are confident with this feature. | |||
INCLUDE_FOREACH_STACK = from_conf("INCLUDE_FOREACH_STACK", False) | |||
INCLUDE_FOREACH_STACK = from_conf("INCLUDE_FOREACH_STACK", True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably change this at some point and remove it to not make it optional anymore.
# Filter tasks based on metadata | ||
for task in tasks: | ||
task_id = task.get("task_id") | ||
if not task_id: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when does this happen? Also, task_id of zero is valid iirc.
# and the artifact files are saved as: <attempt>_artifact__<artifact_name>.json | ||
# We loop over all the JSON files in the directory and find the latest one | ||
# that matches the field prefix. | ||
json_files = glob.glob(os.path.join(path, "*.json")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should be able to do a more efficient globbing so we don't have to filter by field_prefix later on. SOmething like f"{field_prefix}*.json"
.
type="foreach-indices-truncated", | ||
tags=metadata_tags, | ||
), | ||
MetaDatum( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is only used in the siblings thing. If that's the case, we may be able to get rid of this when we refactor the siblings thing (if we do that). I am also a little confused as to why this is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will refactor siblings function mostly to return siblings irrespective of whether it is in a for each or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool -- I think we can now get rid of this metadatum then right?
metaflow/task.py
Outdated
tags=metadata_tags, | ||
), | ||
MetaDatum( | ||
field="previous_steps", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consistency here between previous_steps and foreach-indices for example.
} | ||
url = ServiceMetadataProvider._obj_path(flow_id, run_id, query_step) | ||
url = f"{url}/tasks?{urlencode(query_params)}" | ||
return cls._request(cls._monitor, url, "GET") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getting an error with this that cls
does not have _monitor
. All other calls to _request
pass in None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will pass in None
simply.
"query_step": query_step, | ||
} | ||
url = ServiceMetadataProvider._obj_path(flow_id, run_id, query_step) | ||
url = f"{url}/tasks?{urlencode(query_params)}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing import for urlencode. f-strings are probably fine by 2025, as we've gotten rid of the older tests that break with them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, I was wondering about fstrings but did check and our official minimum version is 3.6 which supports it -- and yes, let's move at least a tad into the future :). I'm going to start using them too and the code will slowly migrate to it (and become infinitesimally faster :) )
17a4489
to
7cdfb41
Compare
m.name: m.value | ||
for m in sorted(self.metadata, key=lambda m: m.created_at) | ||
} | ||
return self._metadata_dict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this slightly changes the syntax since now if there is new metadata, the user won't get it. Should check if this impacts other operations. Or scope the caching to just the functions that need it.
|
||
def _get_related_tasks(self, is_ancestor: bool) -> Dict[str, List[str]]: | ||
flow_id, run_id, _, _ = self.path_components | ||
steps = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
data type problem here which leads to the queries not working correctly:
the steps ends up being of type str
on OSS metadata-service, so using these you end up iterating over characters instead of step names, e.g.:
/flows/SplitFlow/runs/63/steps/{/filtered_tasks?metadata_field_name=foreach-indices&metadata_field_value=%7B%7D&query_step=%7B
/flows/SplitFlow/runs/63/steps/e/filtered_tasks?metadata_field_name=foreach-indices&metadata_field_value=%7B%7D&query_step=e
/flows/SplitFlow/runs/63/steps/n/filtered_tasks?metadata_field_name=foreach-indices&metadata_field_value=%7B%7D&query_step=n
/flows/SplitFlow/runs/63/steps/d/filtered_tasks?metadata_field_name=foreach-indices&metadata_field_value=%7B%7D&query_step=d
/flows/SplitFlow/runs/63/steps/}/filtered_tasks?metadata_field_name=foreach-indices&metadata_field_value=%7B%7D&query_step=%7D
Add runtime DAG info so that we can query the ancestor and successor tasks for a given task easily.
Usage
To get ancestors, progenies, and siblings, use the following API: