Skip to content

fix: Use default torch timeout for nccl watchdog unless overridden #521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions src/instructlab/training/main_ds.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
import argparse
import datetime
import functools
import logging
import math
import os
Expand Down Expand Up @@ -544,6 +545,28 @@ def train(
)


# This function makes an effort to stick to a default value from torch library,
# whatever it may be. That's why we don't just set to the current (as of the
# time of writing) default: to cover the unlikely event torch decides to tweak
# the default.
def _get_collective_timeout() -> datetime.timedelta | None:
timeout_var = os.getenv("INSTRUCTLAB_NCCL_TIMEOUT_MS")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worthwhile to also update the README.md to let people know that this env can be used to set the timeout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already documented in README:

## Environment variables

Below is a list of custom environment variables users can set in the training library.

1. `INSTRUCTLAB_NCCL_TIMEOUT_MS`, this environment variable controls the NCCL timeout in milliseconds. Consider increasing if seeing FSDP related NCCL errors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, I must have missed that. Thank you!

if timeout_var is None:
return None

try:
timeout = int(timeout_var)
except ValueError:
timeout = -1

if timeout <= 0:
raise ValueError(
f"Invalid value for INSTRUCTLAB_NCCL_TIMEOUT_MS: {timeout_var}. Must be a positive integer."
)

return datetime.timedelta(milliseconds=timeout)


def main(args):
if args.distributed_training_framework == "deepspeed" and not FusedAdam:
raise ImportError(
Expand Down Expand Up @@ -571,15 +594,17 @@ def main(args):
model_conf = AutoConfig.from_pretrained(args.model_name_or_path)
args.model_type = model_conf.model_type

# solution discovered from torchtune https://github.com/pytorch/torchtune/issues/2093
# gets converted to a timedelta of 1:40:00 if the default is kept
nccl_timeout = int(os.getenv("INSTRUCTLAB_NCCL_TIMEOUT_MS", "6000000"))
#### distributed init #####
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
args.local_rank = int(os.environ["LOCAL_RANK"])
torch.distributed.init_process_group(
"nccl", timeout=datetime.timedelta(milliseconds=nccl_timeout)
)

timeout = _get_collective_timeout()
init = functools.partial(torch.distributed.init_process_group, "nccl")
if timeout is not None:
init(timeout=timeout)
else:
init()

args.global_rank = torch.distributed.get_rank()
tensor = torch.ByteTensor([False]).cuda()
torch.distributed.all_reduce(tensor)
Expand Down
39 changes: 39 additions & 0 deletions tests/unit/test_main_ds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Standard
from unittest import mock
import datetime

# Third Party
import pytest

# First Party
from instructlab.training import main_ds


def test__get_collective_timeout():
# Test with default timeout
assert main_ds._get_collective_timeout() is None

# Test with custom timeout
timeout = 1234
with mock.patch.dict(
main_ds.os.environ, {"INSTRUCTLAB_NCCL_TIMEOUT_MS": str(timeout)}
):
assert main_ds._get_collective_timeout() == datetime.timedelta(
milliseconds=timeout
)

# Test with invalid timeout (negative)
invalid_timeout = "-100"
with mock.patch.dict(
main_ds.os.environ, {"INSTRUCTLAB_NCCL_TIMEOUT_MS": invalid_timeout}
):
with pytest.raises(ValueError):
main_ds._get_collective_timeout()

# Test with invalid timeout (string)
invalid_timeout = "invalid"
with mock.patch.dict(
main_ds.os.environ, {"INSTRUCTLAB_NCCL_TIMEOUT_MS": invalid_timeout}
):
with pytest.raises(ValueError):
main_ds._get_collective_timeout()
Loading