Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Updated PR for spin steps #2209

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
171013f
Add static and runtime dag info, API to fetch ancestor tasks
talsperre Oct 21, 2024
a842dc1
Add API to get immediate successors
talsperre Oct 31, 2024
3f9fdfd
Add API for getting closest siblings
talsperre Oct 31, 2024
4d0298d
Update metadata API params
talsperre Nov 1, 2024
a42f31f
Refactor ancestor and successor client code
talsperre Nov 1, 2024
493c3fa
Remove unneccessary prints
talsperre Nov 1, 2024
26a0a6a
Support querying ancestors and successors in local metadata provider
talsperre Jan 4, 2025
5f092e7
Refactor and simplify client code
talsperre Jan 4, 2025
890ff92
Make query logic more descriptive
talsperre Jan 4, 2025
7f91bf8
Add core tests for ancestor task API
talsperre Jan 7, 2025
0cfea97
Add core test for immediate successor API
talsperre Jan 7, 2025
6bd0778
Add endpoint in OSS metadata service
talsperre Jan 7, 2025
b92581b
Add logs to tests
talsperre Jan 10, 2025
6002395
Log for each stack to metadata, update query logic
talsperre Jan 12, 2025
d505d87
Add more comments to code
talsperre Jan 12, 2025
f71e26b
Run black formatting
talsperre Jan 12, 2025
536278d
Set monitor to None in filter tasks API
talsperre Jan 14, 2025
2559999
import urlencode
talsperre Jan 14, 2025
c172ef2
Address comments
talsperre Jan 14, 2025
c825d66
Update logic for siblings, make it work for static splits as well
talsperre Jan 14, 2025
81aca61
Initial commit for spin steps
talsperre Jan 13, 2025
5cb5292
Make it work with subprocess
talsperre Jan 15, 2025
23d7f12
Make it work with subprocess and truncated buffers
talsperre Jan 15, 2025
f4df5db
dummy commit
talsperre Jan 15, 2025
30f170c
Dummy commit
talsperre Jan 25, 2025
2b821d0
Working implementation
talsperre Jan 26, 2025
6f57368
Use polling for spin logging
talsperre Jan 26, 2025
86b872e
Make spin work with runner API
talsperre Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add API for getting closest siblings
talsperre committed Jan 23, 2025
commit 3f9fdfd7d3771ecc397a3fcb985ef17985c17439
42 changes: 31 additions & 11 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
@@ -1241,17 +1241,37 @@ def _successor_task(flow_id, run_id, successor_step):
)
return successor_iters

# def closest_siblings(self) -> Iterator["Task"]:
# """
# Returns an iterator over the closest siblings of this task.
#
# Returns
# -------
# Iterator[Task]
# Iterator over the closest siblings of this task
# """
# flow_id, run_id, step_name, task_id = self.path_components
# print(f"flow_id: {flow_id}, run_id: {run_id}, step_name: {step_name}, task_id: {task_id}")
def closest_siblings(self) -> Dict[str, List[str]]:
"""
Returns a dictionary of closest siblings of this task for each step.
Returns
-------
Dict[str, List[str]]
Dictionary of closest siblings of this task. The keys are the
names of the current step and the values are the corresponding
task ids of the siblings.
"""
flow_id, run_id, step_name, task_id = self.path_components

foreach_stack = self.metadata_dict.get("foreach-stack", [])
foreach_step_names = self.metadata_dict.get("foreach-step-names", [])
if len(foreach_stack) == 0:
raise MetaflowInternalError("Task is not part of any foreach split")
elif step_name != foreach_step_names[-1]:
raise MetaflowInternalError(
f"Step {step_name} does not have any direct siblings since it is not part "
f"of a new foreach split."
)

field_name = "foreach-indices-truncated"
field_value = self.metadata_dict.get("foreach-indices-truncated")
# We find all tasks of the same step that have the same foreach-indices-truncated value
return {
step_name: self._metaflow.metadata.filter_tasks_by_metadata(
flow_id, run_id, step_name, step_name, field_name, field_value
)
}

@property
def metadata(self) -> List[Metadata]: