Skip to content

Commit

Permalink
allow resume leader to clone the entire graph
Browse files Browse the repository at this point in the history
  • Loading branch information
darinyu committed Feb 26, 2024
1 parent ae350b4 commit 83f158c
Showing 1 changed file with 28 additions and 19 deletions.
47 changes: 28 additions & 19 deletions metaflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,13 @@ def __init__(
if origin and origin["_task_ok"]:
# At this point, we know we are going to clone
self._is_cloned = True

if resume_identifier:
self.log(
"Resume identifier is %s." % resume_identifier,
system_msg=True,
)

if reentrant:
# A re-entrant clone basically allows multiple concurrent processes
# to perform the clone at the same time to the same new run id. Let's
Expand Down Expand Up @@ -850,24 +857,7 @@ def __init__(
if self.step == "_parameters":
# We don't put _parameters on the queue so we either clone it or wait
# for it.
if not self._should_skip_cloning:
self.log(
"Selected as the reentrant clone leader.",
system_msg=True,
)
# Clone in place without relying on run_queue.
self.new_attempt()
self._ds.clone(origin)

# Set the resume leader be the task that calls the resume (first task to clone _parameters task).
if resume_identifier:
self._ds.save_metadata(
{"_resume_leader": resume_identifier}, add_attempt=False
)
self._is_resume_leader = True

self._ds.done()
else:
if self._should_skip_cloning:
self.log(
"Waiting for clone of _parameters step to occur...",
system_msg=True,
Expand All @@ -894,6 +884,10 @@ def __init__(
self._is_resume_leader = (
resume_leader == resume_identifier
)
self.log(
"Resume leader is %s." % resume_leader,
system_msg=True,
)

# Check if resume is complete. Resume leader will write the done file.
self._resume_done = ds.has_metadata(
Expand All @@ -919,6 +913,22 @@ def __init__(
# No need to get fancy with the sleep here.
time.sleep(RESUME_POLL_SECONDS)
self.log("_parameters clone successful", system_msg=True)
else:
self.log(
"Selected as the reentrant clone leader.",
system_msg=True,
)
# Clone in place without relying on run_queue.
self.new_attempt()
self._ds.clone(origin)
# Set the resume leader be the task that calls the resume (first task to clone _parameters task).
if resume_identifier:
self._ds.save_metadata(
{"_resume_leader": resume_identifier}, add_attempt=False
)
self._is_resume_leader = True

self._ds.done()
else:
# For non parameter steps
# Store the origin pathspec in clone_origin so this can be run
Expand Down Expand Up @@ -1292,7 +1302,6 @@ def __str__(self):

class Worker(object):
def __init__(self, task, max_logs_size):

self.task = task
self._proc = self._launch()

Expand Down

0 comments on commit 83f158c

Please sign in to comment.