Skip to content

Commit

Permalink
refactor code to first select leader and then decide whether to clone.
Browse files Browse the repository at this point in the history
  • Loading branch information
darinyu committed Feb 26, 2024
1 parent 83f158c commit 58b1a39
Showing 1 changed file with 74 additions and 74 deletions.
148 changes: 74 additions & 74 deletions metaflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,26 +793,26 @@ def __init__(
system_msg=True,
)

task_id_exists_already = False
task_completed = False
if reentrant:
# A re-entrant clone basically allows multiple concurrent processes
# to perform the clone at the same time to the same new run id. Let's
# assume two processes A and B both simultaneously calling
# `resume --reentrant --run-id XX`.
# For each task that is cloned, we want to guarantee that:
# - one and only one of A or B will do the actual cloning
# - the other process (or other processes) will block until the cloning
# is complete.
# This ensures that the rest of the clone algorithm can proceed as normal
# and also guarantees that we only write once to the datastore and
# metadata.
# We want to guarantee that:
# - All incomplete tasks are cloned exactly once.
# To achieve this, we will select a resume leader and let it clone the
# entire execution graph. This ensures that we only write once to the
# datastore and metadata.
#
# To accomplish this, we use the cloned task's task-id as the "key" to
# synchronize on. We then try to "register" this new task-id (or rather
# the full pathspec <run>/<step>/<taskid>) with the metadata service
# which will indicate if we actually registered it or if it existed
# already. If we did manage to register it, we are the "elected cloner"
# We use the cloned _parameter task's task-id as the "key" to synchronize
# on. We try to "register" this new task-id (or rather the full pathspec
# <run>/<step>/<taskid>) with the metadata service which will indicate
# if we actually registered it or if it existed already. If we did manage
# to register it (_parameter task), we are the "elected resume leader"
# in essence and proceed to clone. If we didn't, we just wait to make
# sure the task is fully done (ie: the clone is finished).
# sure the entire clone execution is fully done (ie: the clone is finished).
if task_id is not None:
# Sanity check -- this should never happen. We cannot allow
# for explicit task-ids because in the reentrant case, we use the
Expand All @@ -836,8 +836,8 @@ def __init__(

# If _get_task_id returns True it means the task already existed, so
# we wait for it.
already_existed = self._get_task_id(clone_task_id)
task_completed = False
task_id_exists_already = self._get_task_id(clone_task_id)

# We may not have access to task datastore on first resume attempt, but
# on later resume attempt, we should check if the resume task is complete
# or not. This is to fix the issue where the resume leader was killed
Expand All @@ -846,74 +846,44 @@ def __init__(
task_completed = self.results["_task_ok"]
except DataException as e:
pass
self._should_skip_cloning = already_existed and task_completed
else:
self._should_skip_cloning = False
self._get_task_id(task_id)

# Store the mapping from current_pathspec -> origin_pathspec which
# will be useful for looking up origin_ds_set in find_origin_task.
self.clone_pathspec_mapping[self._path] = origin.pathspec
if self.step == "_parameters":
# We don't put _parameters on the queue so we either clone it or wait
# for it.
if self._should_skip_cloning:
self.log(
"Waiting for clone of _parameters step to occur...",
system_msg=True,
# In the _parameters task, we need to resolve who is the resume leader.
self._is_resume_leader = False
resume_leader = None

if task_id_exists_already:
# If the task id already exists, we need to check if current task is the resume leader in previous attempt.
ds = self._flow_datastore.get_task_datastore(
self.run_id, self.step, self.task_id
)
self._resume_done = False
resume_leader = None
while True:
try:
ds = self._flow_datastore.get_task_datastore(
self.run_id, self.step, self.task_id
)
if not ds["_task_ok"]:
raise MetaflowInternalError(
"Externally cloned _parameters task did not succeed"
)

# Check if we are the resume leader (and only check once).
if (not resume_leader) and ds.has_metadata(
"_resume_leader", add_attempt=False
):
resume_leader = ds.load_metadata(
["_resume_leader"], add_attempt=False
)["_resume_leader"]
self._is_resume_leader = (
resume_leader == resume_identifier
)
self.log(
"Resume leader is %s." % resume_leader,
system_msg=True,
)
if not ds["_task_ok"]:
raise MetaflowInternalError(
"Externally cloned _parameters task did not succeed"
)

# Check if resume is complete. Resume leader will write the done file.
self._resume_done = ds.has_metadata(
"_resume_done", add_attempt=False
)
# Check if we should be the resume leader (maybe from previous attempt)
if ds.has_metadata("_resume_leader", add_attempt=False):
resume_leader = ds.load_metadata(
["_resume_leader"], add_attempt=False
)["_resume_leader"]
self._is_resume_leader = resume_leader == resume_identifier
else:
# If the task id does not exist, current task is the resume leader.
resume_leader = resume_identifier
self._is_resume_leader = True

# If we are the resume leader, we should not wait.
# If resume is complete, we no longer need to wait.
if self._is_resume_leader or self._resume_done:
break
self.log(
"Resume leader is %s." % resume_leader,
system_msg=True,
)

self.log(
"Waiting for resume leader to complete. Sleeping for %ds..."
% RESUME_POLL_SECONDS,
system_msg=True,
)
time.sleep(RESUME_POLL_SECONDS)
except DataException:
self.log(
"Sleeping for %ds..." % RESUME_POLL_SECONDS,
system_msg=True,
)
# No need to get fancy with the sleep here.
time.sleep(RESUME_POLL_SECONDS)
self.log("_parameters clone successful", system_msg=True)
else:
if self._is_resume_leader:
self.log(
"Selected as the reentrant clone leader.",
system_msg=True,
Expand All @@ -926,17 +896,47 @@ def __init__(
self._ds.save_metadata(
{"_resume_leader": resume_identifier}, add_attempt=False
)
self._is_resume_leader = True

self._ds.done()
else:
# Wait for the resume leader to complete
while True:
ds = self._flow_datastore.get_task_datastore(
self.run_id, self.step, self.task_id
)
if not ds["_task_ok"]:
raise MetaflowInternalError(
"Externally cloned _parameters task did not succeed"
)

# Check if resume is complete. Resume leader will write the done file.
self._resume_done = ds.has_metadata(
"_resume_done", add_attempt=False
)

if self._resume_done:
break

self.log(
"Waiting for resume leader to complete. Sleeping for %ds..."
% RESUME_POLL_SECONDS,
system_msg=True,
)
time.sleep(RESUME_POLL_SECONDS)
self.log(
"_parameters clone completed by resume leader", system_msg=True
)
else:
# For non parameter steps
# Only leader can reach non-parameter steps in resume.

# Store the origin pathspec in clone_origin so this can be run
# as a task by the runtime.
self.clone_origin = origin.pathspec
# Save a call to creating the results_ds since its same as origin.
self._results_ds = origin

# If the task is already completed in new run, we don't need to clone it.
self._should_skip_cloning = task_completed
if self._should_skip_cloning:
self.log(
"Skip cloning of previously run task %s" % self.clone_origin,
Expand Down

0 comments on commit 58b1a39

Please sign in to comment.