Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Updated PR for spin steps #2209

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
171013f
Add static and runtime dag info, API to fetch ancestor tasks
talsperre Oct 21, 2024
a842dc1
Add API to get immediate successors
talsperre Oct 31, 2024
3f9fdfd
Add API for getting closest siblings
talsperre Oct 31, 2024
4d0298d
Update metadata API params
talsperre Nov 1, 2024
a42f31f
Refactor ancestor and successor client code
talsperre Nov 1, 2024
493c3fa
Remove unneccessary prints
talsperre Nov 1, 2024
26a0a6a
Support querying ancestors and successors in local metadata provider
talsperre Jan 4, 2025
5f092e7
Refactor and simplify client code
talsperre Jan 4, 2025
890ff92
Make query logic more descriptive
talsperre Jan 4, 2025
7f91bf8
Add core tests for ancestor task API
talsperre Jan 7, 2025
0cfea97
Add core test for immediate successor API
talsperre Jan 7, 2025
6bd0778
Add endpoint in OSS metadata service
talsperre Jan 7, 2025
b92581b
Add logs to tests
talsperre Jan 10, 2025
6002395
Log for each stack to metadata, update query logic
talsperre Jan 12, 2025
d505d87
Add more comments to code
talsperre Jan 12, 2025
f71e26b
Run black formatting
talsperre Jan 12, 2025
536278d
Set monitor to None in filter tasks API
talsperre Jan 14, 2025
2559999
import urlencode
talsperre Jan 14, 2025
c172ef2
Address comments
talsperre Jan 14, 2025
c825d66
Update logic for siblings, make it work for static splits as well
talsperre Jan 14, 2025
81aca61
Initial commit for spin steps
talsperre Jan 13, 2025
5cb5292
Make it work with subprocess
talsperre Jan 15, 2025
23d7f12
Make it work with subprocess and truncated buffers
talsperre Jan 15, 2025
f4df5db
dummy commit
talsperre Jan 15, 2025
30f170c
Dummy commit
talsperre Jan 25, 2025
2b821d0
Working implementation
talsperre Jan 26, 2025
6f57368
Use polling for spin logging
talsperre Jan 26, 2025
86b872e
Make spin work with runner API
talsperre Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make query logic more descriptive
talsperre committed Jan 23, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 890ff920d43c3cd9c9c27c8b0aba299963b54e71
56 changes: 47 additions & 9 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
@@ -1135,49 +1135,87 @@ def _get_task_for_queried_step(self, flow_id, run_id, query_step):
return task
raise MetaflowNotFound(f"No task found for the queried step {query_step}")

def _get_filter_query_value(
self, flow_id, run_id, cur_foreach_stack_len, query_steps, query_type
def _get_metadata_query_vals(
self,
flow_id: str,
run_id: str,
cur_foreach_stack_len: int,
steps: List[str],
query_type: str,
):
"""
For a given query type, returns the field name and value to be used for filtering tasks
based on the task's metadata.
"""
if len(query_steps) > 1:
# This is a static join, so there is no change in foreach stack length
Returns the field name and field value to be used for querying metadata of successor or ancestor tasks.

Parameters
----------
flow_id : str
Flow ID of the task
run_id : str
Run ID of the task
cur_foreach_stack_len : int
Length of the foreach stack of the current task
steps : List[str]
List of step names whose tasks will be returned. For static joins, and static splits, we can have
ancestors and successors across multiple steps.
query_type : str
Type of query. Can be 'ancestor' or 'successor'.
"""
# For each task, we also log additional metadata fields such as foreach-indices and foreach-indices-truncated
# which help us in querying ancestor and successor tasks.
# `foreach-indices`: contains the indices of the foreach stack at the time of task execution.
# `foreach-indices-truncated`: contains the indices of the foreach stack at the time of task execution but
# truncated by 1
# For example, a task thats nested 3 levels deep in a foreach stack may have the following values:
# foreach-indices = [0, 1, 2]
# foreach-indices-truncated = [0, 1]

if len(steps) > 1:
# This is a static join or a static split. There will be no change in foreach stack length
query_foreach_stack_len = cur_foreach_stack_len
else:
# For linear steps, or foreach splits and joins, ancestor and successor tasks will all belong to
# the same step.
query_task = self._get_task_for_queried_step(
flow_id, run_id, query_steps[0]
flow_id, run_id, steps[0]
)
query_foreach_stack_len = len(
query_task.metadata_dict.get("foreach-stack", [])
)

if query_foreach_stack_len == cur_foreach_stack_len:
# The successor or ancestor tasks belong to the same foreach stack level
field_name = "foreach-indices"
field_value = self.metadata_dict.get(field_name)
elif query_type == "ancestor":
if query_foreach_stack_len > cur_foreach_stack_len:
# This is a foreach join
# Current Task: foreach-indices = [0, 1], foreach-indices-truncated = [0]
# Ancestor Task: foreach-indices = [0, 1, 2], foreach-indices-truncated = [0, 1]
# We will compare the foreach-indices-truncated value of ancestor task with the
# foreach-indices value of current task
field_name = "foreach-indices-truncated"
field_value = self.metadata_dict.get("foreach-indices")
else:
# This is a foreach split
# Current Task: foreach-indices = [0, 1, 2], foreach-indices-truncated = [0, 1]
# Ancestor Task: foreach-indices = [0, 1], foreach-indices-truncated = [0]
# We will compare the foreach-indices value of ancestor task with the
# foreach-indices value of current task
field_name = "foreach-indices"
field_value = self.metadata_dict.get("foreach-indices-truncated")
else:
if query_foreach_stack_len > cur_foreach_stack_len:
# This is a foreach split
# Current Task: foreach-indices = [0, 1], foreach-indices-truncated = [0]
# Successor Task: foreach-indices = [0, 1, 2], foreach-indices-truncated = [0, 1]
# We will compare the foreach-indices value of current task with the
# foreach-indices-truncated value of successor tasks
field_name = "foreach-indices-truncated"
field_value = self.metadata_dict.get("foreach-indices")
else:
# This is a foreach join
# Current Task: foreach-indices = [0, 1, 2], foreach-indices-truncated = [0, 1]
# Successor Task: foreach-indices = [0, 1], foreach-indices-truncated = [0]
# We will compare the foreach-indices-truncated value of current task with the
# foreach-indices value of successor tasks
field_name = "foreach-indices"
@@ -1195,7 +1233,7 @@ def _get_related_tasks(self, relation_type: str) -> Dict[str, List[str]]:
if not steps:
return {}

field_name, field_value = self._get_filter_query_value(
field_name, field_value = self._get_metadata_query_vals(
flow_id,
run_id,
len(self.metadata_dict.get("foreach-stack", [])),