Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Use return instead of break to correctly exit loop on path exist… #1312

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dlrover/go/operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ help: ## Display this help.

.PHONY: manifests
manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and CustomResourceDefinition objects.
$(CONTROLLER_GEN) rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
$(CONTROLLER_GEN) rbac:roleName=manager-role crd:generateEmbeddedObjectMeta=true webhook paths="./..." output:crd:artifacts:config=config/crd/bases

.PHONY: generate
generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
Expand Down
7 changes: 4 additions & 3 deletions dlrover/python/elastic_agent/torch/ckpt_saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ def _save_shard(
f"The step {step} in event is no equal "
f"to step {config.step} in memory."
)
return
return False

logger.info(
f"Saves the checkpoint shard {local_shard_id} "
Expand Down Expand Up @@ -659,7 +659,7 @@ def _dist_make_dir(self, path, timeout=30):
else:
for _ in range(timeout):
if self.storage.exists(path):
break
return
time.sleep(1)
logger.warning(
f"Worker {self._node_rank} can't find path {path} "
Expand Down Expand Up @@ -914,7 +914,7 @@ def save_step_checkpoint(self, step: int):
f"Fail to save checkpoint shared {i} for step {step}"
)

if success_count == self.local_shard_num:
if success_count == len(futures):
write_success = True
self._latest_step = step

Expand All @@ -923,6 +923,7 @@ def save_step_checkpoint(self, step: int):
f"Rank {self._node_rank} save checkpoint failed for "
f"step {step}"
)
self._writing_storage = False
jinqinn marked this conversation as resolved.
Show resolved Hide resolved
return

# commit checkpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

try:
from megatron.core import mpu, tensor_parallel
from megatron.core.num_microbatches_calculator import (
update_num_microbatches,
)
from megatron.core.optimizer.optimizer import ChainedOptimizer
from megatron.training import get_args
from megatron.training.checkpointing import (
Expand All @@ -37,13 +40,12 @@
get_rng_state,
read_metadata,
set_checkpoint_version,
update_num_microbatches,
)
from megatron.training.utils import print_rank_0, unwrap_model
except ImportError:
# Keep back compatibility with Megatron-LM.
try:
from megatron import get_args
from megatron import get_args, update_num_microbatches
from megatron.checkpointing import (
check_checkpoint_args,
find_checkpoint_rank_0,
Expand All @@ -54,7 +56,6 @@
get_rng_state,
read_metadata,
set_checkpoint_version,
update_num_microbatches,
)
from megatron.optimizer.optimizer import ChainedOptimizer
from megatron.utils import print_rank_0, unwrap_model
Expand Down
Loading