Skip to content

Commit

Permalink
Merge branch 'master' into issue-6620
Browse files Browse the repository at this point in the history
  • Loading branch information
tjruwase authored Oct 21, 2024
2 parents b18c5df + 6eefc3d commit b58e957
Show file tree
Hide file tree
Showing 28 changed files with 506 additions and 166 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/cpu-torch-latest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ jobs:
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS -n 4 unit/ --torch_ver="2.4"
HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS -m 'sequential' unit/ --torch_ver="2.4"
HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS -n 4 unit/ --torch_ver="2.5"
HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS -m 'sequential' unit/ --torch_ver="2.5"
2 changes: 1 addition & 1 deletion .github/workflows/nv-nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
pytest $PYTEST_OPTS --forked -m 'nightly' unit/ --torch_ver="2.4" --cuda_ver="12.1"
pytest $PYTEST_OPTS --forked -m 'nightly' unit/ --torch_ver="2.5" --cuda_ver="12.1"
- name: Open GitHub issue if nightly CI fails
if: ${{ failure() && (github.event_name == 'schedule') }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/nv-torch-latest-v100.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ jobs:
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.4" --cuda_ver="12.1"
pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.4" --cuda_ver="12.1"
pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.5" --cuda_ver="12.1"
pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.5" --cuda_ver="12.1"
10 changes: 8 additions & 2 deletions .github/workflows/xpu-compile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,15 @@ jobs:
- name: Compile Status
shell: bash
run: |
echo "# torch.compile graph breaks" >> $GITHUB_STEP_SUMMARY
export FI_HMEM=system
ulimit -n 1048575
cd tests/torch_compile
export ZE_AFFINITY_MASK=0,1
deepspeed test_compile.py --deepspeed_config ds_config.json 2>&1 | tee log.txt
cat log.txt | grep "'graph_breaks'" | sed 's/,/ /g' | awk '{print $2}' >> $GITHUB_STEP_SUMMARY
echo "## ZeRO stage 3" >> $GITHUB_STEP_SUMMARY
deepspeed test_compile.py --deepspeed_config ds_config_z3.json 2>&1 | tee log_z3.txt
# for each line start with 'dynamo_output', extract the second field and following fields and append to GITHUB_STEP_SUMMARY using awk
cat log_z3.txt | awk '/^dynamo_output/ {$1=""; print $0}' >> $GITHUB_STEP_SUMMARY
echo "## ZeRO stage 2" >> $GITHUB_STEP_SUMMARY
deepspeed test_compile.py --deepspeed_config ds_config_z2.json 2>&1 | tee log_z2.txt
cat log_z2.txt | awk '/^dynamo_output/ {$1=""; print $0}' >> $GITHUB_STEP_SUMMARY
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ DeepSpeed has been integrated with several different popular open-source DL fram
| PyTorch Nightly | [![nv-torch-nightly-v100](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-torch-nightly-v100.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-torch-nightly-v100.yml) |
| Integrations | [![nv-transformers-v100](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-transformers-v100.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-transformers-v100.yml) [![nv-lightning-v100](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-lightning-v100.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-lightning-v100.yml) [![nv-accelerate-v100](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-accelerate-v100.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-accelerate-v100.yml) [![nv-mii](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-mii.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-mii.yml) [![nv-ds-chat](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-ds-chat.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-ds-chat.yml) [![nv-sd](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-sd.yml/badge.svg)](https://github.com/microsoft/DeepSpeed/actions/workflows/nv-sd.yml) |
| Misc | [![Formatting](https://github.com/microsoft/DeepSpeed/actions/workflows/formatting.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/formatting.yml) [![pages-build-deployment](https://github.com/microsoft/DeepSpeed/actions/workflows/pages/pages-build-deployment/badge.svg)](https://github.com/microsoft/DeepSpeed/actions/workflows/pages/pages-build-deployment) [![Documentation Status](https://readthedocs.org/projects/deepspeed/badge/?version=latest)](https://deepspeed.readthedocs.io/en/latest/?badge=latest)[![python](https://github.com/microsoft/DeepSpeed/actions/workflows/python.yml/badge.svg?branch=master)](https://github.com/microsoft/DeepSpeed/actions/workflows/python.yml) |
| Huawei Ascend NPU | [![Huawei Ascend NPU](https://github.com/cosdt/DeepSpeed/actions/workflows/huawei-ascend-npu.yml/badge.svg?branch=master)](https://github.com/cosdt/DeepSpeed/actions/workflows/huawei-ascend-npu.yml) |

# Installation

Expand Down
5 changes: 4 additions & 1 deletion csrc/aio/py_lib/deepspeed_cpu_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ void cpu_op_desc_t::finish()
{
if (_use_bounce_buffer) {
if (_read_op) {
if (_buffer.is_cuda()) { _buffer.copy_(_cpu_buffer.to(torch::kCUDA)); }
if (_buffer.is_cuda()) {
_buffer.copy_(_cpu_buffer.to(torch::Device(torch::kCUDA, _buffer.get_device()),
/*non_blocking=*/true));
}
if (_buffer.is_xpu()) { _buffer.copy_(_cpu_buffer.to(torch::kXPU)); }
if (_buffer.is_cpu()) { _buffer.copy_(_cpu_buffer); }
#if defined(__ENABLE_CANN__)
Expand Down
2 changes: 2 additions & 0 deletions csrc/aio/py_lib/deepspeed_pin_tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ deepspeed_pin_tensor_t::~deepspeed_pin_tensor_t()
{
for (auto iter = _locked_tensors.begin(); iter != _locked_tensors.end(); ++iter) {
munlock(iter->first, iter->second);
std::free((void*)iter->first);
}
_locked_tensors.clear();
}
Expand Down Expand Up @@ -43,6 +44,7 @@ bool deepspeed_pin_tensor_t::free(torch::Tensor& locked_tensor)
auto addr = locked_tensor.data_ptr();
if (_locked_tensors.find(addr) != _locked_tensors.end()) {
munlock(addr, _locked_tensors[addr]);
std::free(addr);
_locked_tensors.erase(addr);
return true;
}
Expand Down
3 changes: 1 addition & 2 deletions csrc/gds/py_lib/deepspeed_gds_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ void gds_op_desc_t::add_buffer_to_registry(const torch::Tensor& buffer)
const int64_t device = buffer.get_device();
void* reg_ptr = buffer.data_ptr();

// std::cout << "REG PTR " << reg_ptr << std::endl;
// TODO: add checking to make sure pointer isn't already in set
const auto it = base_ptr_registry.find(device);
if (it == base_ptr_registry.end()) {
Expand Down Expand Up @@ -94,7 +93,7 @@ gds_op_desc_t::gds_op_desc_t(const bool read_op,
const torch::Tensor& buffer,
const int fd,
const char* filename,
const long long int file_num_bytes,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate)
: io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, intra_op_parallelism, validate)
Expand Down
2 changes: 1 addition & 1 deletion csrc/gds/py_lib/deepspeed_gds_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct gds_op_desc_t : io_op_desc_t {
const torch::Tensor& buffer,
const int fd,
const char* filename,
const long long int file_num_bytes,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate);

Expand Down
2 changes: 1 addition & 1 deletion csrc/gds/py_lib/deepspeed_py_gds_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ std::shared_ptr<struct io_op_desc_t> deepspeed_gds_handle_t::_create_io_op_desc(
const torch::Tensor& buffer,
const int fd,
const char* filename,
const long long int file_num_bytes,
const int64_t file_num_bytes,
const bool validate)
{
if (buffer.is_cuda()) {
Expand Down
2 changes: 1 addition & 1 deletion csrc/gds/py_lib/deepspeed_py_gds_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct deepspeed_gds_handle_t : deepspeed_io_handle_t {
const torch::Tensor& buffer,
const int fd,
const char* filename,
const long long int file_num_bytes,
const int64_t file_num_bytes,
const bool validate);

static int s_cuFile_init;
Expand Down
16 changes: 12 additions & 4 deletions deepspeed/launcher/multinode_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ def validate_args(self):
def get_cmd(self, environment, active_resources):
total_process_count = sum(self.resource_pool.values())

launcher_args = split(self.args.launcher_args)

# If btl_tcp_if_include option is provided through launcher_args, we use it. Otherwise, we add
# `--mca btl_tcp_if_include eth0` option as a default value for compatibility.
btl_tcp_opt = ['--mca', 'btl_tcp_if_include', 'eth0']
if len(launcher_args) >= 2:
for i in range(len(launcher_args) - 1):
if launcher_args[i] in ['-mca', '--mca'] and launcher_args[i + 1] == 'btl_tcp_if_include':
btl_tcp_opt = []
break

mpirun_cmd = [
'mpirun',
'-n',
Expand All @@ -150,10 +161,7 @@ def get_cmd(self, environment, active_resources):
'--mca',
'btl',
'^openib',
'--mca',
'btl_tcp_if_include',
'eth0',
] + split(self.args.launcher_args)
] + btl_tcp_opt + launcher_args

export_cmd = []
for k, v in self.exports.items():
Expand Down
38 changes: 19 additions & 19 deletions deepspeed/runtime/lr_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ def get_lr_from_config(config):
return lr_params[WARMUP_MAX_LR], ''


def update_lr(param_groups, lrs):
for param_group, lr in zip(param_groups, lrs):
param_group['lr'] = lr
return [group['lr'] for group in param_groups]


"""
Only optimizers that are subclass of torch.optim.Optimizer are supported. So check the passed optimizer and wrapped
optimizer to see if requirement is satisfied.
Expand Down Expand Up @@ -328,7 +334,7 @@ def __init__(self,
self.interval_fn = self._staircase_interval if lr_range_test_staircase else self._continuous_interval

if last_batch_iteration == -1:
self._update_optimizer(self.min_lr)
self._last_lr = update_lr(self.optimizer.param_groups, self.min_lr)

def _staircase_interval(self):
return math.floor(float(self.last_batch_iteration + 1) / self.step_size)
Expand All @@ -349,16 +355,11 @@ def get_last_lr(self):
assert getattr(self, '_last_lr', None) is not None, "need to call step() first"
return self._last_lr

def _update_optimizer(self, group_lrs):
for param_group, lr in zip(self.optimizer.param_groups, group_lrs):
param_group['lr'] = lr

def step(self, batch_iteration=None):
if batch_iteration is None:
batch_iteration = self.last_batch_iteration + 1
self.last_batch_iteration = batch_iteration
self._update_optimizer(self.get_lr())
self._last_lr = [group['lr'] for group in self.optimizer.param_groups]
self._last_lr = update_lr(self.optimizer.param_groups, self.get_lr())

def state_dict(self):
return {'last_batch_iteration': self.last_batch_iteration}
Expand Down Expand Up @@ -615,9 +616,7 @@ def step(self, batch_iteration=None):
batch_iteration = self.last_batch_iteration + 1

self.last_batch_iteration = batch_iteration
for param_group, lr in zip(self.optimizer.param_groups, self.get_lr()):
param_group['lr'] = lr
self._last_lr = [group['lr'] for group in self.optimizer.param_groups]
self._last_lr = update_lr(self.optimizer.param_groups, self.get_lr())

if self.cycle_momentum:
momentums = self.get_mom()
Expand Down Expand Up @@ -675,11 +674,14 @@ def __init__(self,
self.warmup_type = warmup_type
self.inverse_log_warm_up = 1.0 / math.log(self.warmup_num_steps)
self.last_batch_iteration = last_batch_iteration
# Initialize lr in optimizer
if last_batch_iteration == -1:
self._last_lr = update_lr(self.optimizer.param_groups, self.get_lr())

def get_lr(self):
if self.last_batch_iteration < 0:
logger.warning("Attempting to get learning rate from scheduler before it has started")
return [0.0]
return self.min_lrs
gamma = self._get_gamma()
return [min_lr + (delta_lr * gamma) for min_lr, delta_lr in zip(self.min_lrs, self.delta_lrs)]

Expand All @@ -693,9 +695,7 @@ def step(self, last_batch_iteration=None):
if last_batch_iteration is None:
last_batch_iteration = self.last_batch_iteration + 1
self.last_batch_iteration = last_batch_iteration
for param_group, lr in zip(self.optimizer.param_groups, self.get_lr()):
param_group['lr'] = lr
self._last_lr = [group['lr'] for group in self.optimizer.param_groups]
self._last_lr = update_lr(self.optimizer.param_groups, self.get_lr())

def state_dict(self):
return {'last_batch_iteration': self.last_batch_iteration}
Expand Down Expand Up @@ -819,6 +819,10 @@ def __init__(self,
total_num_steps, warmup_num_steps))
self.org_lrs = [group['lr'] for group in self.optimizer.param_groups]

# Initialize lrs in optimizer groups
if last_batch_iteration == -1:
self._last_lr = update_lr(self.optimizer.param_groups, self.get_lr())

def get_lr_ratio(self):
if self.last_batch_iteration < 0:
logger.warning("Attempting to get learning rate from scheduler before it has started")
Expand All @@ -844,11 +848,7 @@ def step(self, last_batch_iteration=None):
if last_batch_iteration is None:
last_batch_iteration = self.last_batch_iteration + 1
self.last_batch_iteration = last_batch_iteration

lrs = self.get_lr()
for param_group, lr in zip(self.optimizer.param_groups, lrs):
param_group['lr'] = lr
self._last_lr = [group['lr'] for group in self.optimizer.param_groups]
self._last_lr = update_lr(self.optimizer.param_groups, self.get_lr())

def get_lr(self):
if self.last_batch_iteration < 0:
Expand Down
50 changes: 43 additions & 7 deletions deepspeed/runtime/zero/stage3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2299,6 +2299,24 @@ def get_fp32_grad_for_param(self, param) -> Tensor:

return self._fp32_state_allgather(param, fp32_grad)

def set_fp32_grad_for_param(self, value, param):
if not param.requires_grad:
return

if not get_accelerator().resolves_data_dependency():
self.reduce_and_partition_stream.synchronize()

if self.offload_optimizer:
group_idx, dest_offset, num_elements = self.grad_position[self.get_param_id(param)]
fp32_grad = self.fp32_partitioned_groups_flat[group_idx].grad.narrow(0, dest_offset, num_elements)
else:
fp32_grad = self.__param_id_to_grad_partition[param.ds_id]

my_rank = dist.get_rank(group=self.dp_process_group)
value_partition = value.flatten().narrow(0, fp32_grad.numel() * my_rank, fp32_grad.numel())

fp32_grad.data.copy_(value_partition.data)

def _get_fp32_opt_state_partition(self, param, optim_state_key=None):
if not get_accelerator().resolves_data_dependency():
self.reduce_and_partition_stream.synchronize()
Expand Down Expand Up @@ -2347,12 +2365,6 @@ def set_full_hp_param(self, value, param, optim_state_key=None):

### Local API START ###

def get_local_fp32_param(self, param, optim_state_key=None) -> Tensor:
if not param.requires_grad:
return None
fp32_opt_state, group_idx = self._get_fp32_opt_state_partition(param, optim_state_key)
return fp32_opt_state

def get_local_fp32_grad_for_param(self, param) -> Tensor:
if not param.requires_grad:
return None
Expand All @@ -2367,6 +2379,30 @@ def get_local_fp32_grad_for_param(self, param) -> Tensor:
fp32_grad = self.__param_id_to_grad_partition[param.ds_id].float()
return fp32_grad

def set_local_grad_for_param(self, value, param):
if not param.requires_grad:
return

assert value.numel() == param.ds_tensor.numel(
), f" Number of elements do not match: {value.numel()} != {param.ds_tensor.ds_numel}"

if not get_accelerator().resolves_data_dependency():
self.reduce_and_partition_stream.synchronize()

if self.offload_optimizer:
group_idx, dest_offset, num_elements = self.grad_position[self.get_param_id(param)]
fp32_grad = self.fp32_partitioned_groups_flat[group_idx].grad.narrow(0, dest_offset, num_elements)
else:
fp32_grad = self.__param_id_to_grad_partition[param.ds_id]

fp32_grad.data.copy_(value.flatten().data)

def get_local_fp32_param(self, param, optim_state_key=None) -> Tensor:
if not param.requires_grad:
return None
fp32_opt_state, group_idx = self._get_fp32_opt_state_partition(param, optim_state_key)
return fp32_opt_state

def set_local_hp_param(self, value, param, optim_state_key=None):
if not param.requires_grad:
return
Expand All @@ -2381,7 +2417,7 @@ def set_local_hp_param(self, value, param, optim_state_key=None):

if self._swappable_optimizer_subgroup(group_idx):
self._optimizer_states_and_gradient_swap_out(group_idx)
logger.info(f"[set_local_hp_param][update the params' value successfully]")
# logger.info(f"[set_local_hp_param][update the params' value successfully]")

### Local API END ###

Expand Down
6 changes: 3 additions & 3 deletions deepspeed/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
# TODO: Move tensor fragment and mixed precision to zero utils
from .tensor_fragment import tensor_fragment, get_full_hp_param, get_hp_fragment_mapping, fragment_address, get_full_hp_grad, map_to_flat_opt_states
from .tensor_fragment import safe_get_full_fp32_param, safe_get_full_grad, safe_get_full_optimizer_state
from .tensor_fragment import set_full_hp_param
from .tensor_fragment import safe_set_full_fp32_param, safe_set_full_optimizer_state
from .tensor_fragment import set_full_hp_param, set_full_hp_grad
from .tensor_fragment import safe_set_full_fp32_param, safe_set_full_optimizer_state, safe_set_full_grad
from .tensor_fragment import safe_get_local_fp32_param, safe_get_local_grad, safe_get_local_optimizer_state
from .tensor_fragment import safe_set_local_fp32_param, safe_set_local_optimizer_state
from .tensor_fragment import safe_set_local_fp32_param, safe_set_local_grad, safe_set_local_optimizer_state
from .z3_leaf_module import set_z3_leaf_modules, unset_z3_leaf_modules, get_z3_leaf_modules, z3_leaf_module, z3_leaf_parameter
from .mixed_precision_linkage import link_hp_params, lazy_init_hp_params_optimizer_state
from deepspeed.runtime.dataloader import RepeatingLoader
Expand Down
Loading

0 comments on commit b58e957

Please sign in to comment.