diff --git a/metaflow/runtime.py b/metaflow/runtime.py index e35b8ba08ec..640e89fd2e6 100644 --- a/metaflow/runtime.py +++ b/metaflow/runtime.py @@ -793,26 +793,26 @@ def __init__( system_msg=True, ) + task_id_exists_already = False + task_completed = False if reentrant: # A re-entrant clone basically allows multiple concurrent processes # to perform the clone at the same time to the same new run id. Let's # assume two processes A and B both simultaneously calling # `resume --reentrant --run-id XX`. - # For each task that is cloned, we want to guarantee that: - # - one and only one of A or B will do the actual cloning - # - the other process (or other processes) will block until the cloning - # is complete. - # This ensures that the rest of the clone algorithm can proceed as normal - # and also guarantees that we only write once to the datastore and - # metadata. + # We want to guarantee that: + # - All incomplete tasks are cloned exactly once. + # To achieve this, we will select a resume leader and let it clone the + # entire execution graph. This ensures that we only write once to the + # datastore and metadata. # - # To accomplish this, we use the cloned task's task-id as the "key" to - # synchronize on. We then try to "register" this new task-id (or rather - # the full pathspec //) with the metadata service - # which will indicate if we actually registered it or if it existed - # already. If we did manage to register it, we are the "elected cloner" + # We use the cloned _parameter task's task-id as the "key" to synchronize + # on. We try to "register" this new task-id (or rather the full pathspec + # //) with the metadata service which will indicate + # if we actually registered it or if it existed already. If we did manage + # to register it (_parameter task), we are the "elected resume leader" # in essence and proceed to clone. If we didn't, we just wait to make - # sure the task is fully done (ie: the clone is finished). + # sure the entire clone execution is fully done (ie: the clone is finished). if task_id is not None: # Sanity check -- this should never happen. We cannot allow # for explicit task-ids because in the reentrant case, we use the @@ -836,8 +836,8 @@ def __init__( # If _get_task_id returns True it means the task already existed, so # we wait for it. - already_existed = self._get_task_id(clone_task_id) - task_completed = False + task_id_exists_already = self._get_task_id(clone_task_id) + # We may not have access to task datastore on first resume attempt, but # on later resume attempt, we should check if the resume task is complete # or not. This is to fix the issue where the resume leader was killed @@ -846,74 +846,44 @@ def __init__( task_completed = self.results["_task_ok"] except DataException as e: pass - self._should_skip_cloning = already_existed and task_completed else: - self._should_skip_cloning = False self._get_task_id(task_id) # Store the mapping from current_pathspec -> origin_pathspec which # will be useful for looking up origin_ds_set in find_origin_task. self.clone_pathspec_mapping[self._path] = origin.pathspec if self.step == "_parameters": - # We don't put _parameters on the queue so we either clone it or wait - # for it. - if self._should_skip_cloning: - self.log( - "Waiting for clone of _parameters step to occur...", - system_msg=True, + # In the _parameters task, we need to resolve who is the resume leader. + self._is_resume_leader = False + resume_leader = None + + if task_id_exists_already: + # If the task id already exists, we need to check if current task is the resume leader in previous attempt. + ds = self._flow_datastore.get_task_datastore( + self.run_id, self.step, self.task_id ) - self._resume_done = False - resume_leader = None - while True: - try: - ds = self._flow_datastore.get_task_datastore( - self.run_id, self.step, self.task_id - ) - if not ds["_task_ok"]: - raise MetaflowInternalError( - "Externally cloned _parameters task did not succeed" - ) - - # Check if we are the resume leader (and only check once). - if (not resume_leader) and ds.has_metadata( - "_resume_leader", add_attempt=False - ): - resume_leader = ds.load_metadata( - ["_resume_leader"], add_attempt=False - )["_resume_leader"] - self._is_resume_leader = ( - resume_leader == resume_identifier - ) - self.log( - "Resume leader is %s." % resume_leader, - system_msg=True, - ) + if not ds["_task_ok"]: + raise MetaflowInternalError( + "Externally cloned _parameters task did not succeed" + ) - # Check if resume is complete. Resume leader will write the done file. - self._resume_done = ds.has_metadata( - "_resume_done", add_attempt=False - ) + # Check if we should be the resume leader (maybe from previous attempt) + if ds.has_metadata("_resume_leader", add_attempt=False): + resume_leader = ds.load_metadata( + ["_resume_leader"], add_attempt=False + )["_resume_leader"] + self._is_resume_leader = resume_leader == resume_identifier + else: + # If the task id does not exist, current task is the resume leader. + resume_leader = resume_identifier + self._is_resume_leader = True - # If we are the resume leader, we should not wait. - # If resume is complete, we no longer need to wait. - if self._is_resume_leader or self._resume_done: - break + self.log( + "Resume leader is %s." % resume_leader, + system_msg=True, + ) - self.log( - "Waiting for resume leader to complete. Sleeping for %ds..." - % RESUME_POLL_SECONDS, - system_msg=True, - ) - time.sleep(RESUME_POLL_SECONDS) - except DataException: - self.log( - "Sleeping for %ds..." % RESUME_POLL_SECONDS, - system_msg=True, - ) - # No need to get fancy with the sleep here. - time.sleep(RESUME_POLL_SECONDS) - self.log("_parameters clone successful", system_msg=True) - else: + if self._is_resume_leader: self.log( "Selected as the reentrant clone leader.", system_msg=True, @@ -926,17 +896,47 @@ def __init__( self._ds.save_metadata( {"_resume_leader": resume_identifier}, add_attempt=False ) - self._is_resume_leader = True self._ds.done() + else: + # Wait for the resume leader to complete + while True: + ds = self._flow_datastore.get_task_datastore( + self.run_id, self.step, self.task_id + ) + if not ds["_task_ok"]: + raise MetaflowInternalError( + "Externally cloned _parameters task did not succeed" + ) + + # Check if resume is complete. Resume leader will write the done file. + self._resume_done = ds.has_metadata( + "_resume_done", add_attempt=False + ) + + if self._resume_done: + break + + self.log( + "Waiting for resume leader to complete. Sleeping for %ds..." + % RESUME_POLL_SECONDS, + system_msg=True, + ) + time.sleep(RESUME_POLL_SECONDS) + self.log( + "_parameters clone completed by resume leader", system_msg=True + ) else: - # For non parameter steps + # Only leader can reach non-parameter steps in resume. + # Store the origin pathspec in clone_origin so this can be run # as a task by the runtime. self.clone_origin = origin.pathspec # Save a call to creating the results_ds since its same as origin. self._results_ds = origin + # If the task is already completed in new run, we don't need to clone it. + self._should_skip_cloning = task_completed if self._should_skip_cloning: self.log( "Skip cloning of previously run task %s" % self.clone_origin,