Skip to content

Commit

Permalink
Revert "DDP + C10D sparse all_reduce changes (pytorch#103916)"
Browse files Browse the repository at this point in the history
This reverts commit fed5fba.

Reverted pytorch#103916 on behalf of https://github.com/facebook-github-bot due to Diff reverted internally ([comment](pytorch#103916 (comment)))
  • Loading branch information
pytorchmergebot committed Jun 26, 2023
1 parent a69f427 commit 436d035
Show file tree
Hide file tree
Showing 17 changed files with 19 additions and 460 deletions.
153 changes: 0 additions & 153 deletions test/cpp/c10d/ProcessGroupNCCLTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,29 +165,6 @@ class NCCLTest : public NCCLTestBase {
}
}

at::Tensor to_sparse_row_indices_format(at::Tensor& tensor) {
// Get the indices of all non-zero elements in the dense tensor
// Get the unique row indices of the non-zero elements
auto [row_indices, _] =
at::_unique(tensor.nonzero().select(/*dim=*/1, /*index=*/0));
at::Tensor sparse_values = tensor.index_select(
/*dim=*/0, row_indices); // get the values at the non-zero indices
return at::sparse_coo_tensor(
row_indices.unsqueeze(0), sparse_values, tensor.sizes())
.to(tensor.device());
}

// Launches value initialization for every sparse tensor
void valueInitializationForSparse() {
at::cuda::OptionalCUDAGuard deviceGuard;
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(i);
tensors_[i].fill_(pg_->getRank() * numDevices_ + i + 1);
// Convert the dense tensor to a sparse tensor in COO row format
tensors_[i] = to_sparse_row_indices_format(tensors_[i]);
}
}

const int numDevices_;
int worldSize_;
std::vector<at::Tensor> tensors_;
Expand Down Expand Up @@ -219,21 +196,6 @@ class AllreduceNCCLTest : public NCCLTest {
}
};

class SparseAllreduceNCCLTest : public NCCLTest {
public:
SparseAllreduceNCCLTest(const std::string& path, int worldSize, int inputDim)
: NCCLTest(path, worldSize, kBackendDefaultTimeout, inputDim) {}

c10::intrusive_ptr<c10d::Work> run() {
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
launchDeviceSleep();
valueInitializationForSparse();
auto results = pg_->allreduce_sparse(tensors_);
return results;
}
};

class BroadcastNCCLTest : public NCCLTest {
public:
BroadcastNCCLTest(const std::string& path, int worldSize)
Expand Down Expand Up @@ -399,108 +361,6 @@ void testAllreduce(const std::string& path, int rank, int size) {
}
}

void testSparseAllreduce(const std::string& path, int rank, int size) {
const int inputDim = 3;
auto test = SparseAllreduceNCCLTest(path, size, inputDim);
test.initialize(rank, size);
auto work = test.run();
// Wait for work to finish
test.wait(work);

const auto input_tensors = test.getTensors();

// validate the work output is same as tensor
auto output_tensor = work->result();
// Validation
int totalNumGPUs = test.numDevices() * size;
// Add one since we are seeding with an additional 1 to prevent empty tensors
totalNumGPUs++;
const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2;
for (const auto i : c10::irange(input_tensors.size())) {
const auto& tensor = input_tensors[i];

// validate the tensor is sparse
EXPECT_EQ(tensor.is_sparse(), true);

auto indices = tensor._indices();
auto values = tensor._values();

// validate indices are expected size
auto sizes = indices.sizes();
EXPECT_EQ(sizes.size(), 2);
if (sizes[0] == 1) {
// row indices
EXPECT_EQ(sizes[1], inputDim);
} else if (sizes[0] == 2) {
// coorindate indices
EXPECT_EQ(sizes[1], inputDim * inputDim);
}

// validate all tensor values are expected value
const auto* const data = values.data_ptr<float>();
for (const auto k : c10::irange(values.numel())) {
EXPECT_EQ(data[k], expected)
<< "Allreduce outputs do not match expected outputs";
}

// expect the input and output tensors should be the same
auto input_dense = tensor.to_dense();
auto output_dense = output_tensor[i].to(input_dense.device()).to_dense();
EXPECT_TRUE(input_dense.allclose(output_dense));
}
}

void testSparseAllreduceLarge(const std::string& path, int rank, int size) {
const int inputDim = 2500;
auto test = SparseAllreduceNCCLTest(path, size, inputDim);
test.initialize(rank, size);
auto work = test.run();
// Wait for work to finish
test.wait(work);

const auto input_tensors = test.getTensors();

// validate the work output is same as tensor
auto output_tensor = work->result();
// Validation
int totalNumGPUs = test.numDevices() * size;
// Add one since we are seeding with an additional 1 to prevent empty tensors
totalNumGPUs++;
const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2;
for (const auto i : c10::irange(input_tensors.size())) {
const auto& tensor = input_tensors[i];

// validate the tensor is sparse
EXPECT_EQ(tensor.is_sparse(), true);

auto indices = tensor._indices();
auto values = tensor._values();

// validate indices are expected size
auto sizes = indices.sizes();
EXPECT_EQ(sizes.size(), 2);
if (sizes[0] == 1) {
// row indices
EXPECT_EQ(sizes[1], inputDim);
} else if (sizes[0] == 2) {
// coorindate indices
EXPECT_EQ(sizes[1], inputDim * inputDim);
}

// validate all tensor values are expected value
const auto* const data = values.data_ptr<float>();
for (const auto k : c10::irange(values.numel())) {
EXPECT_EQ(data[k], expected)
<< "Allreduce outputs do not match expected outputs";
}

// expect the input and output tensors should be the same
auto input_dense = tensor.to_dense();
auto output_dense = output_tensor[i].to(input_dense.device()).to_dense();
EXPECT_TRUE(input_dense.allclose(output_dense));
}
}

void testBroadcast(const std::string& path, int rank, int size) {
auto test = BroadcastNCCLTest(path, size);
test.initialize(rank, size);
Expand Down Expand Up @@ -871,16 +731,3 @@ TEST_F(ProcessGroupNCCLTest, testBackendName) {
std::string(c10d::NCCL_BACKEND_NAME));
}
}

#ifdef IS_NCCL_EXP
TEST_F(ProcessGroupNCCLTest, testSparseAllreduce) {
if (skipTest()) {
return;
}
{
TemporaryFile file;
testSparseAllreduce(file.path, rank_, size_);
testSparseAllreduceLarge(file.path, rank_, size_);
}
}
#endif
93 changes: 0 additions & 93 deletions test/distributed/test_c10d_nccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,30 +305,6 @@ def broadcast(xs, rootRank, rootTensor):
for tensor in xs:
self.assertEqual(tensor, expected_tensor)

@requires_nccl()
@skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs")
def test_sparse_allreduce_ops(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = self._create_process_group_nccl(store, self.opts())

indices = torch.tensor([[0, 1]])
values = torch.tensor([[1, 2, 0], [4, 0, 6]])
sparse_tensor = torch.sparse_coo_tensor(indices, values, size=(2, 3)).to(self.rank)

# sparse allreduce call is wrapped in a try catch since the c10d API is only available in the nccl experimental branch
try:
work = pg.allreduce([sparse_tensor])
work.wait()

# work.result() returns a list of size 1, with the allreduce output as a dense tensor
a = torch.tensor([[2, 4, 0], [8, 0, 12]]).to(self.rank)
self.assertEqual(work.result()[0], a)
except RuntimeError as e:
if "allreduce_sparse is only available in the NCCL experimental branch." in str(e):
pass
else:
# Rethrow the exception if it's a different error
raise

@requires_nccl()
@skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs")
Expand Down Expand Up @@ -3073,75 +3049,6 @@ def test_new_group_local_sync_duplicated_pg(self):
self._test_new_group_local_sync_duplicate_pg(backend="nccl")


class SparseCollective(MultiProcessTestCase):
@property
def world_size(self):
return 1

def setUp(self):
super().setUp()
# NCCL_BLOCKING_WAIT overrides NCCL_ASYNC_ERROR_HANDLING hence tests
# that use NCCL_BLOCKING_WAIT will test it as expected.
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1"
# self.num_gpus = torch.cuda.device_count()
self._spawn_processes()

def tearDown(self):
super().tearDown()
try:
os.remove(self.file_name)
except OSError:
pass

class ToyModel(nn.Module):
def __init__(self, rank, vocab_size, embedding_dim):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, sparse=True).to(rank)
self.linear = nn.Linear(embedding_dim, 1).to(rank)

def forward(self, inputs):
embedded = self.embedding(inputs)
# embedded shape: (batch_size, sequence_length, embedding_dim)
flattened = torch.mean(embedded, dim=1)
# flattened shape: (batch_size, embedding_dim)
output = self.linear(flattened)
# output shape: (batch_size, 1)
return output

@requires_nccl()
@skip_if_lt_x_gpu(1)
def test_ddp_set_sparse_metadata(self):
store = dist.FileStore(self.file_name, self.world_size)
dist.init_process_group(
"nccl",
world_size=self.world_size,
rank=self.rank,
store=store,
)

vocab_size = 5

model = SparseCollective.ToyModel(self.rank, vocab_size=vocab_size, embedding_dim=10)
ddp_model = DistributedDataParallel(model)
inputs = torch.tensor([[1, 0, 0], [0, 0, 0], [0, 0, 0]]).to(self.rank)
# set sparse metadata on the DDP model
indices = torch.Tensor(list(range(vocab_size)))
ddp_model._set_sparse_metadata({"embedding.weight" : indices})
# forward pass
try:
output = ddp_model(inputs)
loss = output.sum()

# backward pass
loss.backward()
self.assertTrue(ddp_model.module.embedding.weight.grad.indices, indices)
except RuntimeError as e:
if "allreduce_sparse is only available in the NCCL experimental branch." in str(e):
pass
else:
# Rethrow the exception if it's a different error
raise


if __name__ == "__main__":
assert (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@
datetime.date(9999, 1, 1)),
("mkldnn::_convolution_pointwise_.binary", datetime.date(2023, 7, 1)),
# These ops were moved to python under the c10d_functional namespace
("c10d::allreduce_", datetime.date(2023, 7, 30)),
("aten::wait_tensor", datetime.date(9999, 1, 30)),
("aten::reduce_scatter_tensor", datetime.date(9999, 1, 30)),
("aten::all_gather_into_tensor", datetime.date(9999, 1, 30)),
Expand Down
1 change: 0 additions & 1 deletion torch/_C/_distributed_c10d.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class Reducer:
def set_logger(self, logger: Logger) -> None: ...
def _remove_autograd_hooks(self) -> None: ...
def _check_reducer_finalized(self) -> None: ...
def _set_sparse_metadata(self, global_unique_ids: Dict[str, Tensor]) -> None: ...

class DDPLoggingData:
strs_map: Dict[str, str]
Expand Down
8 changes: 0 additions & 8 deletions torch/csrc/distributed/c10d/Backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,6 @@ class TORCH_API Backend : public torch::CustomClassHolder {
c10::str("Backend ", getBackendName(), " does not support allreduce"));
}

virtual c10::intrusive_ptr<Work> allreduce_sparse(
std::vector<at::Tensor>& /* tensors */,
const AllreduceOptions& /* opts */ = AllreduceOptions()) {
TORCH_CHECK(
false,
c10::str("Backend ", getBackendName(), "does not support allreduce"));
}

virtual c10::intrusive_ptr<Work> allreduce_coalesced(
std::vector<at::Tensor>& /* tensors */,
const AllreduceCoalescedOptions& /* opts */ =
Expand Down
25 changes: 2 additions & 23 deletions torch/csrc/distributed/c10d/Ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ TORCH_LIBRARY(c10d, m) {
m.def(
"broadcast_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, int root_rank, int root_tensor, int timeout) -> (Tensor[], __torch__.torch.classes.c10d.Work)");
m.def(
"allreduce_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, __torch__.torch.classes.c10d.ReduceOp reduce_op, Tensor? sparse_indices, int timeout) -> (Tensor[], __torch__.torch.classes.c10d.Work)");
"allreduce_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, __torch__.torch.classes.c10d.ReduceOp reduce_op, int timeout) -> (Tensor[], __torch__.torch.classes.c10d.Work)");
m.def(
"allreduce_coalesced_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, __torch__.torch.classes.c10d.ReduceOp reduce_op, int timeout) -> __torch__.torch.classes.c10d.Work");
m.def(
Expand Down Expand Up @@ -167,7 +167,6 @@ IMPL_BROADCAST(PrivateUse1)
at::TensorList tensors, \
const c10::intrusive_ptr<ProcessGroup>& process_group, \
const c10::intrusive_ptr<ReduceOp>& reduce_op, \
const c10::optional<at::Tensor>& sparse_indices, \
int64_t timeout) { \
auto tensor_vec = tensors.vec(); \
auto work = \
Expand Down Expand Up @@ -452,26 +451,6 @@ void monitored_barrier_CPU(
BarrierOptions{device_ids, std::chrono::milliseconds(timeout)},
wait_all_ranks);
}

std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>
allreduce_sparse_cuda_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
const c10::optional<at::Tensor>& sparse_indices,
int64_t timeout) {
auto tensor_vec = tensors.vec();
auto work = process_group->getBackend(c10::DeviceType::CUDA)
->allreduce_sparse(
tensor_vec,
AllreduceOptions{
*reduce_op.get(),
std::chrono::milliseconds(timeout),
sparse_indices});

return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<Work>>(
std::move(tensor_vec), work);
}
} // namespace

// register functions to dispatcher
Expand Down Expand Up @@ -526,7 +505,7 @@ TORCH_LIBRARY_IMPL(c10d, SparseCPU, m) {
}

TORCH_LIBRARY_IMPL(c10d, SparseCUDA, m) {
m.impl("allreduce_", allreduce_sparse_cuda_);
m.impl("allreduce_", allreduce_CUDA);
}

} // namespace
Expand Down
2 changes: 0 additions & 2 deletions torch/csrc/distributed/c10d/ProcessGroup.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,12 @@ class TORCH_API ProcessGroup : public torch::CustomClassHolder {
at::TensorList,
const c10::intrusive_ptr<::c10d::ProcessGroup>&,
const c10::intrusive_ptr<::c10d::ReduceOp>&,
const c10::optional<at::Tensor>& sparse_indices,
int64_t)>();

return std::get<1>(op.call(
tensors,
c10::intrusive_ptr<ProcessGroup>::unsafe_reclaim_from_nonowning(this),
c10::make_intrusive<ReduceOp>(opts.reduceOp),
opts.sparseIndices,
opts.timeout.count()));
}

Expand Down
Loading

0 comments on commit 436d035

Please sign in to comment.