diff --git a/test/cpp/c10d/ProcessGroupNCCLTest.cpp b/test/cpp/c10d/ProcessGroupNCCLTest.cpp index 80171c9c0bfbe..32c51bef3efc8 100644 --- a/test/cpp/c10d/ProcessGroupNCCLTest.cpp +++ b/test/cpp/c10d/ProcessGroupNCCLTest.cpp @@ -165,29 +165,6 @@ class NCCLTest : public NCCLTestBase { } } - at::Tensor to_sparse_row_indices_format(at::Tensor& tensor) { - // Get the indices of all non-zero elements in the dense tensor - // Get the unique row indices of the non-zero elements - auto [row_indices, _] = - at::_unique(tensor.nonzero().select(/*dim=*/1, /*index=*/0)); - at::Tensor sparse_values = tensor.index_select( - /*dim=*/0, row_indices); // get the values at the non-zero indices - return at::sparse_coo_tensor( - row_indices.unsqueeze(0), sparse_values, tensor.sizes()) - .to(tensor.device()); - } - - // Launches value initialization for every sparse tensor - void valueInitializationForSparse() { - at::cuda::OptionalCUDAGuard deviceGuard; - for (const auto i : c10::irange(numDevices_)) { - deviceGuard.set_index(i); - tensors_[i].fill_(pg_->getRank() * numDevices_ + i + 1); - // Convert the dense tensor to a sparse tensor in COO row format - tensors_[i] = to_sparse_row_indices_format(tensors_[i]); - } - } - const int numDevices_; int worldSize_; std::vector tensors_; @@ -219,21 +196,6 @@ class AllreduceNCCLTest : public NCCLTest { } }; -class SparseAllreduceNCCLTest : public NCCLTest { - public: - SparseAllreduceNCCLTest(const std::string& path, int worldSize, int inputDim) - : NCCLTest(path, worldSize, kBackendDefaultTimeout, inputDim) {} - - c10::intrusive_ptr run() { - // For the duration of this function, make THC use our streams - c10::cuda::CUDAMultiStreamGuard guard(streams_); - launchDeviceSleep(); - valueInitializationForSparse(); - auto results = pg_->allreduce_sparse(tensors_); - return results; - } -}; - class BroadcastNCCLTest : public NCCLTest { public: BroadcastNCCLTest(const std::string& path, int worldSize) @@ -399,108 +361,6 @@ void testAllreduce(const std::string& path, int rank, int size) { } } -void testSparseAllreduce(const std::string& path, int rank, int size) { - const int inputDim = 3; - auto test = SparseAllreduceNCCLTest(path, size, inputDim); - test.initialize(rank, size); - auto work = test.run(); - // Wait for work to finish - test.wait(work); - - const auto input_tensors = test.getTensors(); - - // validate the work output is same as tensor - auto output_tensor = work->result(); - // Validation - int totalNumGPUs = test.numDevices() * size; - // Add one since we are seeding with an additional 1 to prevent empty tensors - totalNumGPUs++; - const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2; - for (const auto i : c10::irange(input_tensors.size())) { - const auto& tensor = input_tensors[i]; - - // validate the tensor is sparse - EXPECT_EQ(tensor.is_sparse(), true); - - auto indices = tensor._indices(); - auto values = tensor._values(); - - // validate indices are expected size - auto sizes = indices.sizes(); - EXPECT_EQ(sizes.size(), 2); - if (sizes[0] == 1) { - // row indices - EXPECT_EQ(sizes[1], inputDim); - } else if (sizes[0] == 2) { - // coorindate indices - EXPECT_EQ(sizes[1], inputDim * inputDim); - } - - // validate all tensor values are expected value - const auto* const data = values.data_ptr(); - for (const auto k : c10::irange(values.numel())) { - EXPECT_EQ(data[k], expected) - << "Allreduce outputs do not match expected outputs"; - } - - // expect the input and output tensors should be the same - auto input_dense = tensor.to_dense(); - auto output_dense = output_tensor[i].to(input_dense.device()).to_dense(); - EXPECT_TRUE(input_dense.allclose(output_dense)); - } -} - -void testSparseAllreduceLarge(const std::string& path, int rank, int size) { - const int inputDim = 2500; - auto test = SparseAllreduceNCCLTest(path, size, inputDim); - test.initialize(rank, size); - auto work = test.run(); - // Wait for work to finish - test.wait(work); - - const auto input_tensors = test.getTensors(); - - // validate the work output is same as tensor - auto output_tensor = work->result(); - // Validation - int totalNumGPUs = test.numDevices() * size; - // Add one since we are seeding with an additional 1 to prevent empty tensors - totalNumGPUs++; - const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2; - for (const auto i : c10::irange(input_tensors.size())) { - const auto& tensor = input_tensors[i]; - - // validate the tensor is sparse - EXPECT_EQ(tensor.is_sparse(), true); - - auto indices = tensor._indices(); - auto values = tensor._values(); - - // validate indices are expected size - auto sizes = indices.sizes(); - EXPECT_EQ(sizes.size(), 2); - if (sizes[0] == 1) { - // row indices - EXPECT_EQ(sizes[1], inputDim); - } else if (sizes[0] == 2) { - // coorindate indices - EXPECT_EQ(sizes[1], inputDim * inputDim); - } - - // validate all tensor values are expected value - const auto* const data = values.data_ptr(); - for (const auto k : c10::irange(values.numel())) { - EXPECT_EQ(data[k], expected) - << "Allreduce outputs do not match expected outputs"; - } - - // expect the input and output tensors should be the same - auto input_dense = tensor.to_dense(); - auto output_dense = output_tensor[i].to(input_dense.device()).to_dense(); - EXPECT_TRUE(input_dense.allclose(output_dense)); - } -} - void testBroadcast(const std::string& path, int rank, int size) { auto test = BroadcastNCCLTest(path, size); test.initialize(rank, size); @@ -871,16 +731,3 @@ TEST_F(ProcessGroupNCCLTest, testBackendName) { std::string(c10d::NCCL_BACKEND_NAME)); } } - -#ifdef IS_NCCL_EXP -TEST_F(ProcessGroupNCCLTest, testSparseAllreduce) { - if (skipTest()) { - return; - } - { - TemporaryFile file; - testSparseAllreduce(file.path, rank_, size_); - testSparseAllreduceLarge(file.path, rank_, size_); - } -} -#endif diff --git a/test/distributed/test_c10d_nccl.py b/test/distributed/test_c10d_nccl.py index 519169963688f..fbff8b6bcf5a7 100644 --- a/test/distributed/test_c10d_nccl.py +++ b/test/distributed/test_c10d_nccl.py @@ -305,30 +305,6 @@ def broadcast(xs, rootRank, rootTensor): for tensor in xs: self.assertEqual(tensor, expected_tensor) - @requires_nccl() - @skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs") - def test_sparse_allreduce_ops(self): - store = c10d.FileStore(self.file_name, self.world_size) - pg = self._create_process_group_nccl(store, self.opts()) - - indices = torch.tensor([[0, 1]]) - values = torch.tensor([[1, 2, 0], [4, 0, 6]]) - sparse_tensor = torch.sparse_coo_tensor(indices, values, size=(2, 3)).to(self.rank) - - # sparse allreduce call is wrapped in a try catch since the c10d API is only available in the nccl experimental branch - try: - work = pg.allreduce([sparse_tensor]) - work.wait() - - # work.result() returns a list of size 1, with the allreduce output as a dense tensor - a = torch.tensor([[2, 4, 0], [8, 0, 12]]).to(self.rank) - self.assertEqual(work.result()[0], a) - except RuntimeError as e: - if "allreduce_sparse is only available in the NCCL experimental branch." in str(e): - pass - else: - # Rethrow the exception if it's a different error - raise @requires_nccl() @skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs") @@ -3073,75 +3049,6 @@ def test_new_group_local_sync_duplicated_pg(self): self._test_new_group_local_sync_duplicate_pg(backend="nccl") -class SparseCollective(MultiProcessTestCase): - @property - def world_size(self): - return 1 - - def setUp(self): - super().setUp() - # NCCL_BLOCKING_WAIT overrides NCCL_ASYNC_ERROR_HANDLING hence tests - # that use NCCL_BLOCKING_WAIT will test it as expected. - os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1" - # self.num_gpus = torch.cuda.device_count() - self._spawn_processes() - - def tearDown(self): - super().tearDown() - try: - os.remove(self.file_name) - except OSError: - pass - - class ToyModel(nn.Module): - def __init__(self, rank, vocab_size, embedding_dim): - super().__init__() - self.embedding = nn.Embedding(vocab_size, embedding_dim, sparse=True).to(rank) - self.linear = nn.Linear(embedding_dim, 1).to(rank) - - def forward(self, inputs): - embedded = self.embedding(inputs) - # embedded shape: (batch_size, sequence_length, embedding_dim) - flattened = torch.mean(embedded, dim=1) - # flattened shape: (batch_size, embedding_dim) - output = self.linear(flattened) - # output shape: (batch_size, 1) - return output - - @requires_nccl() - @skip_if_lt_x_gpu(1) - def test_ddp_set_sparse_metadata(self): - store = dist.FileStore(self.file_name, self.world_size) - dist.init_process_group( - "nccl", - world_size=self.world_size, - rank=self.rank, - store=store, - ) - - vocab_size = 5 - - model = SparseCollective.ToyModel(self.rank, vocab_size=vocab_size, embedding_dim=10) - ddp_model = DistributedDataParallel(model) - inputs = torch.tensor([[1, 0, 0], [0, 0, 0], [0, 0, 0]]).to(self.rank) - # set sparse metadata on the DDP model - indices = torch.Tensor(list(range(vocab_size))) - ddp_model._set_sparse_metadata({"embedding.weight" : indices}) - # forward pass - try: - output = ddp_model(inputs) - loss = output.sum() - - # backward pass - loss.backward() - self.assertTrue(ddp_model.module.embedding.weight.grad.indices, indices) - except RuntimeError as e: - if "allreduce_sparse is only available in the NCCL experimental branch." in str(e): - pass - else: - # Rethrow the exception if it's a different error - raise - if __name__ == "__main__": assert ( diff --git a/test/forward_backward_compatibility/check_forward_backward_compatibility.py b/test/forward_backward_compatibility/check_forward_backward_compatibility.py index 4f03e88c743a7..1773ea06298ed 100644 --- a/test/forward_backward_compatibility/check_forward_backward_compatibility.py +++ b/test/forward_backward_compatibility/check_forward_backward_compatibility.py @@ -298,7 +298,6 @@ datetime.date(9999, 1, 1)), ("mkldnn::_convolution_pointwise_.binary", datetime.date(2023, 7, 1)), # These ops were moved to python under the c10d_functional namespace - ("c10d::allreduce_", datetime.date(2023, 7, 30)), ("aten::wait_tensor", datetime.date(9999, 1, 30)), ("aten::reduce_scatter_tensor", datetime.date(9999, 1, 30)), ("aten::all_gather_into_tensor", datetime.date(9999, 1, 30)), diff --git a/torch/_C/_distributed_c10d.pyi b/torch/_C/_distributed_c10d.pyi index f6891bc923ae7..e62312daffb9c 100644 --- a/torch/_C/_distributed_c10d.pyi +++ b/torch/_C/_distributed_c10d.pyi @@ -62,7 +62,6 @@ class Reducer: def set_logger(self, logger: Logger) -> None: ... def _remove_autograd_hooks(self) -> None: ... def _check_reducer_finalized(self) -> None: ... - def _set_sparse_metadata(self, global_unique_ids: Dict[str, Tensor]) -> None: ... class DDPLoggingData: strs_map: Dict[str, str] diff --git a/torch/csrc/distributed/c10d/Backend.hpp b/torch/csrc/distributed/c10d/Backend.hpp index 9d43bd555a5b4..db2ff6bb4e3e5 100644 --- a/torch/csrc/distributed/c10d/Backend.hpp +++ b/torch/csrc/distributed/c10d/Backend.hpp @@ -86,14 +86,6 @@ class TORCH_API Backend : public torch::CustomClassHolder { c10::str("Backend ", getBackendName(), " does not support allreduce")); } - virtual c10::intrusive_ptr allreduce_sparse( - std::vector& /* tensors */, - const AllreduceOptions& /* opts */ = AllreduceOptions()) { - TORCH_CHECK( - false, - c10::str("Backend ", getBackendName(), "does not support allreduce")); - } - virtual c10::intrusive_ptr allreduce_coalesced( std::vector& /* tensors */, const AllreduceCoalescedOptions& /* opts */ = diff --git a/torch/csrc/distributed/c10d/Ops.cpp b/torch/csrc/distributed/c10d/Ops.cpp index db7c4986b230f..5c195d748d5fa 100644 --- a/torch/csrc/distributed/c10d/Ops.cpp +++ b/torch/csrc/distributed/c10d/Ops.cpp @@ -19,7 +19,7 @@ TORCH_LIBRARY(c10d, m) { m.def( "broadcast_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, int root_rank, int root_tensor, int timeout) -> (Tensor[], __torch__.torch.classes.c10d.Work)"); m.def( - "allreduce_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, __torch__.torch.classes.c10d.ReduceOp reduce_op, Tensor? sparse_indices, int timeout) -> (Tensor[], __torch__.torch.classes.c10d.Work)"); + "allreduce_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, __torch__.torch.classes.c10d.ReduceOp reduce_op, int timeout) -> (Tensor[], __torch__.torch.classes.c10d.Work)"); m.def( "allreduce_coalesced_(Tensor[] tensors, __torch__.torch.classes.c10d.ProcessGroup process_group, __torch__.torch.classes.c10d.ReduceOp reduce_op, int timeout) -> __torch__.torch.classes.c10d.Work"); m.def( @@ -167,7 +167,6 @@ IMPL_BROADCAST(PrivateUse1) at::TensorList tensors, \ const c10::intrusive_ptr& process_group, \ const c10::intrusive_ptr& reduce_op, \ - const c10::optional& sparse_indices, \ int64_t timeout) { \ auto tensor_vec = tensors.vec(); \ auto work = \ @@ -452,26 +451,6 @@ void monitored_barrier_CPU( BarrierOptions{device_ids, std::chrono::milliseconds(timeout)}, wait_all_ranks); } - -std::tuple, c10::intrusive_ptr> -allreduce_sparse_cuda_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - const c10::optional& sparse_indices, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - auto work = process_group->getBackend(c10::DeviceType::CUDA) - ->allreduce_sparse( - tensor_vec, - AllreduceOptions{ - *reduce_op.get(), - std::chrono::milliseconds(timeout), - sparse_indices}); - - return std::tuple, c10::intrusive_ptr>( - std::move(tensor_vec), work); -} } // namespace // register functions to dispatcher @@ -526,7 +505,7 @@ TORCH_LIBRARY_IMPL(c10d, SparseCPU, m) { } TORCH_LIBRARY_IMPL(c10d, SparseCUDA, m) { - m.impl("allreduce_", allreduce_sparse_cuda_); + m.impl("allreduce_", allreduce_CUDA); } } // namespace diff --git a/torch/csrc/distributed/c10d/ProcessGroup.hpp b/torch/csrc/distributed/c10d/ProcessGroup.hpp index 63d2a1aac3457..bb201570f2aac 100644 --- a/torch/csrc/distributed/c10d/ProcessGroup.hpp +++ b/torch/csrc/distributed/c10d/ProcessGroup.hpp @@ -151,14 +151,12 @@ class TORCH_API ProcessGroup : public torch::CustomClassHolder { at::TensorList, const c10::intrusive_ptr<::c10d::ProcessGroup>&, const c10::intrusive_ptr<::c10d::ReduceOp>&, - const c10::optional& sparse_indices, int64_t)>(); return std::get<1>(op.call( tensors, c10::intrusive_ptr::unsafe_reclaim_from_nonowning(this), c10::make_intrusive(opts.reduceOp), - opts.sparseIndices, opts.timeout.count())); } diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp index 01aff5fd0ee48..fe288f9975fa6 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp +++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp @@ -26,7 +26,6 @@ #include #include -#include namespace c10d { @@ -1557,17 +1556,8 @@ c10::intrusive_ptr ProcessGroupNCCL::collective( // // See [Sync Streams]. if (!avoidRecordStreams_) { - if (!inputs[i].is_sparse()) { - c10::cuda::CUDACachingAllocator::recordStream( - inputs[i].storage().data_ptr(), ncclStream); - } else { - // for sparse input case record streams on both index and value - // tensors - c10::cuda::CUDACachingAllocator::recordStream( - inputs[i].values().storage().data_ptr(), ncclStream); - c10::cuda::CUDACachingAllocator::recordStream( - inputs[i].indices().storage().data_ptr(), ncclStream); - } + c10::cuda::CUDACachingAllocator::recordStream( + inputs[i].storage().data_ptr(), ncclStream); } #ifndef NCCL_HAS_COMM_NONBLOCKING C10D_NCCL_CHECK( @@ -1826,82 +1816,6 @@ c10::intrusive_ptr ProcessGroupNCCL::pointToPoint( profilingTitle); } -c10::intrusive_ptr ProcessGroupNCCL::allreduce_sparse( - std::vector& tensors, - const AllreduceOptions& opts) { -#ifdef IS_NCCL_EXP - std::vector outputTensors(tensors.size()); - for (std::vector::size_type i = 0; i < tensors.size(); i++) { - tensors[i] = tensors[i].coalesce(); - outputTensors[i] = torch::zeros( - tensors[i].sizes(), tensors[i].options().layout(torch::kStrided)); - } - int dev_in_group = 0; - auto work = collective( - tensors, - outputTensors, - [&](at::Tensor& input, - at::Tensor& output, - ncclComm_t comm, - at::cuda::CUDAStream& stream) { - auto ncclDataType = getNcclDataType(input.scalar_type()); - auto ncclReduceOp = getNcclReduceOp( - opts.reduceOp, input, ncclDataType, comm, dev_in_group++); - - size_t num_elements = output.numel(); - auto indices = input.indices(); - auto sizes = input.sizes(); - int colSize = sizes[1]; - auto rows = indices[0]; - size_t blockCount = rows.sizes()[0]; - auto recvIndices = indices[0] * colSize; - - // prevent output and recvIndices from being freed - c10::cuda::CUDACachingAllocator::recordStream( - output.storage().data_ptr(), stream); - c10::cuda::CUDACachingAllocator::recordStream( - recvIndices.storage().data_ptr(), stream); - auto result = ncclAllReduceSparseBlock( - input._values().data_ptr(), // sendbuff - recvIndices.data_ptr(), // recv_indices - blockCount, // block_count - colSize, // block_length - output.data_ptr(), // recvbuff - output.numel(), // recv_count - ncclDataType, - ncclReduceOp, - comm, - stream.stream()); - return result; - }, - [](std::vector& ncclStreams, - c10::intrusive_ptr& work) {}, - [&](std::vector& ncclStreams, - c10::intrusive_ptr& work) { - // Convert output tensors to sparse and back into tensors. - for (const auto i : c10::irange(outputTensors.size())) { - at::cuda::CUDAStreamGuard guard(ncclStreams[i]); - if (opts.sparseIndices.has_value()) { - tensors[i] = at::sparse_coo_tensor( - opts.sparseIndices.value(), - outputTensors[i], - tensors[i].sizes()); - } else { - tensors[i] = outputTensors[i].to_sparse(); - } - } - }, - OpType::_ALLREDUCE_SPARSE, - "nccl:all_reduce_sparse"); - return work; -#else - // If the nccl branch is not "exp" then we just error - TORCH_CHECK( - false, - "allreduce_sparse is only available in the NCCL experimental branch."); -#endif -} - c10::intrusive_ptr ProcessGroupNCCL::allreduce_impl( std::vector& tensors, const AllreduceOptions& opts) { diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp index e59413496a21f..e437600244e0b 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp +++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp @@ -353,10 +353,6 @@ class TORCH_API ProcessGroupNCCL : public Backend { std::vector& inputTensors, const BroadcastOptions& opts = BroadcastOptions()); - c10::intrusive_ptr allreduce_sparse( - std::vector& tensors, - const AllreduceOptions& opts = AllreduceOptions()) override; - c10::intrusive_ptr allreduce( std::vector& tensors, const AllreduceOptions& opts = AllreduceOptions()) override; diff --git a/torch/csrc/distributed/c10d/Types.hpp b/torch/csrc/distributed/c10d/Types.hpp index a019e76035258..a56c618a6738f 100644 --- a/torch/csrc/distributed/c10d/Types.hpp +++ b/torch/csrc/distributed/c10d/Types.hpp @@ -119,7 +119,6 @@ struct BroadcastOptions { struct AllreduceOptions { ReduceOp reduceOp = ReduceOp::SUM; std::chrono::milliseconds timeout = kUnsetTimeout; - c10::optional sparseIndices = c10::nullopt; }; struct AllreduceCoalescedOptions : AllreduceOptions {}; diff --git a/torch/csrc/distributed/c10d/Work.hpp b/torch/csrc/distributed/c10d/Work.hpp index 02d5ad69dcaf2..2a827f15ca6c3 100644 --- a/torch/csrc/distributed/c10d/Work.hpp +++ b/torch/csrc/distributed/c10d/Work.hpp @@ -29,7 +29,6 @@ enum class OpType : std::uint8_t { BARRIER = 15, _REDUCE_SCATTER_BASE = 16, COALESCED = 17, - _ALLREDUCE_SPARSE = 18, UNKNOWN = 100, }; diff --git a/torch/csrc/distributed/c10d/comm.hpp b/torch/csrc/distributed/c10d/comm.hpp index 8da1ae8f7f842..b4b3eae6a2080 100644 --- a/torch/csrc/distributed/c10d/comm.hpp +++ b/torch/csrc/distributed/c10d/comm.hpp @@ -25,16 +25,14 @@ class TORCH_API GradBucket { std::vector offsets, std::vector lengths, std::vector sizes_vec, - std::vector parameters, - c10::optional sparse_grad_indices) + std::vector parameters) : index_(index), bucket_count_(bucket_count), buffer_(std::move(tensor)), offsets_(std::move(offsets)), lengths_(std::move(lengths)), sizes_vec_(std::move(sizes_vec)), - parameters_(std::move(parameters)), - sparse_grad_indices_(std::move(sparse_grad_indices)) {} + parameters_(std::move(parameters)) {} // Returns the index of the bucket, which is unique across all the buckets. size_t getIndex() const { @@ -72,10 +70,6 @@ class TORCH_API GradBucket { return index_ == bucket_count_ - 1; } - c10::optional& getSparseGradIndices() { - return sparse_grad_indices_; - } - private: size_t index_; size_t bucket_count_; @@ -85,13 +79,8 @@ class TORCH_API GradBucket { std::vector offsets_; std::vector lengths_; std::vector sizes_vec_; - // Model parameters for this bucket. const std::vector parameters_; - - // Predefined sparse indices for this bucket (only used for sparse tensors). - // The gradients will be updated to have indices with these tensor values - c10::optional sparse_grad_indices_; }; // Base class of both `PythonCommHook` and `CppCommHook`. diff --git a/torch/csrc/distributed/c10d/default_comm_hooks.cpp b/torch/csrc/distributed/c10d/default_comm_hooks.cpp index 124bacd2b205a..cd3eec9b23d8d 100644 --- a/torch/csrc/distributed/c10d/default_comm_hooks.cpp +++ b/torch/csrc/distributed/c10d/default_comm_hooks.cpp @@ -46,16 +46,7 @@ c10::intrusive_ptr FP16CompressCommHook::runHook( c10::intrusive_ptr _AllReduceBySumCommHook::runHook( GradBucket& bucket) { std::vector tensors = {bucket.getBufferRef()}; -#ifdef IS_NCCL_EXP - // case with sparse_metadata_ set and using indices from there - if (bucket.getSparseGradIndices().has_value()) { - AllreduceOptions opts = AllreduceOptions(); - opts.sparseIndices = bucket.getSparseGradIndices().value(); - return state_->allreduce(tensors, opts)->getFuture(); - } -#else return state_->allreduce(tensors)->getFuture(); -#endif } } // namespace c10d diff --git a/torch/csrc/distributed/c10d/init.cpp b/torch/csrc/distributed/c10d/init.cpp index 015f9db1f9158..9c11b34ea1a24 100644 --- a/torch/csrc/distributed/c10d/init.cpp +++ b/torch/csrc/distributed/c10d/init.cpp @@ -518,10 +518,6 @@ An enum-like class for built-in communication hooks: ``ALLREDUCE`` and ``FP16_CO "_set_optimizer_in_backward", [](::c10d::Reducer& reducer) { reducer.set_optimizer_in_backward(); }, py::call_guard()) - .def( - "_set_sparse_metadata", - &::c10d::Reducer::setSparseMetadata, - py::call_guard()) .def( "_set_mixed_precision_param_dtype", [](::c10d::Reducer& reducer, py::object data_type_obj) { @@ -1575,6 +1571,13 @@ that adds a prefix to each key inserted to the store. py::arg("inputTensor"), py::arg("opts") = ::c10d::ReduceScatterOptions(), py::call_guard()) + .def( + "reduce_scatter_tensor_coalesced", + &::c10d::ProcessGroup::reduce_scatter_tensor_coalesced, + py::arg("outputs"), + py::arg("inputs"), + py::arg("opts") = ::c10d::ReduceScatterOptions(), + py::call_guard()) .def( "alltoall_base", &::c10d::ProcessGroup::alltoall_base, diff --git a/torch/csrc/distributed/c10d/reducer.cpp b/torch/csrc/distributed/c10d/reducer.cpp index 221eb21bd1a0d..70f8bcd2b47e4 100644 --- a/torch/csrc/distributed/c10d/reducer.cpp +++ b/torch/csrc/distributed/c10d/reducer.cpp @@ -440,25 +440,6 @@ void Reducer::mark_variable_ready_sparse(size_t variable_index) { logger_, "Expected variable to have sparse gradient."); - // Copy the indices of sparse metadata - if (sparse_metadata_) { - grad = grad.coalesce(); - REDUCER_CHECK( - param_names_.size() != 0, logger_, "No parameter names were found"); - std::string& param_name = param_names_[variable_index]; - auto iter = sparse_metadata_->find(param_name); - REDUCER_CHECK( - iter != sparse_metadata_->end(), - logger_, - "param: " + param_name + " not found in sparse metadata"); - bucket.sparse_tensor_indices = - iter->second.to(at::kLong).unsqueeze(0).to(grad.device()); - auto indices = at::searchsorted( - bucket.sparse_tensor_indices.value(), grad.indices(), false, false); - // For indices we are using the ones set by sparse_metadata - grad = at::sparse_coo_tensor(indices, grad.values(), grad.sizes()); - } - // Sparse tensors cannot be grouped together with other sparse tensors in a // single reduction operation like we can for dense tensors. Therefore, the // `offsets` and `lengths` vectors in the bucket struct are empty, and @@ -491,8 +472,7 @@ std::vector Reducer::get_grad_buckets( bucket.offsets, bucket.lengths, bucket.sizes_vec, - variables_for_bucket, - c10::nullopt); + variables_for_bucket); } return gradBuckets; } @@ -944,8 +924,7 @@ void Reducer::all_reduce_bucket(Bucket& bucket) { bucket.offsets, bucket.lengths, bucket.sizes_vec, - variables_for_bucket, - bucket.sparse_tensor_indices); + variables_for_bucket); bucket.future_work = run_comm_hook(grad_bucket); } @@ -1567,21 +1546,7 @@ void Reducer::finalize_backward() { ? detail::parseCppCommHookResult(bucket.future_work->value()) : comm_hook_->parseHookResult(bucket.future_work->value()); if (bucket.expect_sparse_gradient) { - // sparse metadata is set so the bucket should have sparse_tensor_indices - if (sparse_metadata_) { - REDUCER_CHECK( - bucket.sparse_tensor_indices.value().numel() == - bucket.gradients.sizes()[0], - logger_, - "Sparse metadata and gradient size mismatch"); - auto sparse_result = at::sparse_coo_tensor( - bucket.sparse_tensor_indices.value(), - future_result, - bucket.gradients.sizes()); - bucket.gradients.copy_(sparse_result); - } else { - bucket.gradients.copy_(future_result); - } + bucket.gradients.copy_(future_result); } else { // Reinitialize only `bucket_views_out` with the future_result by // following the same logic in `initialize_buckets`. @@ -1628,8 +1593,6 @@ void Reducer::finalize_backward() { if (should_collect_runtime_stats()) { record_backward_comm_end_time(); } - - sparse_metadata_.reset(); } void Reducer::runGradCallbackForVariable( @@ -1816,11 +1779,6 @@ bool Reducer::rebuild_buckets() { return true; } -void Reducer::setSparseMetadata(std::map& metadata) { - sparse_metadata_ = - std::make_unique>(metadata); -} - // See Note [DDP Communication Hook] void Reducer::register_comm_hook(std::unique_ptr iface) { REDUCER_CHECK( diff --git a/torch/csrc/distributed/c10d/reducer.hpp b/torch/csrc/distributed/c10d/reducer.hpp index a76a545cfba56..3b90309e0f31f 100644 --- a/torch/csrc/distributed/c10d/reducer.hpp +++ b/torch/csrc/distributed/c10d/reducer.hpp @@ -130,8 +130,6 @@ class TORCH_API Reducer { // rebuilt. bool rebuild_buckets(); - void setSparseMetadata(std::map& metadata); - // Install futures that should be awaited at end of backwards. Currently these // are only used by user-defined custom buffer reduction hooks, but can be generalized // to any user-originating futures that need to be awaited. @@ -377,9 +375,6 @@ class TORCH_API Reducer { // If `true`, then this implies that `bucket.variables.size() == 1`. bool expect_sparse_gradient = false; - // Sparse indices tensor - c10::optional sparse_tensor_indices = c10::nullopt; - // TODO(@pietern) // Memory copies from gradient tensors into the bucket are potentially // done on different CUDA streams. We record an event for every copy @@ -510,12 +505,6 @@ class TORCH_API Reducer { // comm_hook_ is used to access the DDP communication hook if registered. std::unique_ptr comm_hook_; - - // Sparse metadata contains the indices that will be used - // when calling into sparse allreduce. - // This is only used in the sparse allreduce collective calls - std::unique_ptr> sparse_metadata_; - // Debug level setting. It is parsed once when Reducer is constructed, and // remains the same across a single invocation of DDP training. DebugLevel ddp_debug_level_; diff --git a/torch/nn/parallel/distributed.py b/torch/nn/parallel/distributed.py index bed03f9a8aea9..d0487bb39ab9a 100644 --- a/torch/nn/parallel/distributed.py +++ b/torch/nn/parallel/distributed.py @@ -1239,6 +1239,9 @@ def _assign_modules_buffers(self): } def _build_debug_param_to_name_mapping(self, parameters): + if dist.get_debug_level() == dist.DebugLevel.OFF: + return {} + param_to_param_index = {parameters[i]: i for i in range(len(parameters))} param_set = set(parameters) param_index_to_param_fqn = {} @@ -2228,6 +2231,3 @@ def _check_reducer_finalized(self): reducer not finalizing backward. """ self.reducer._check_reducer_finalized() - - def _set_sparse_metadata(self, global_unique_ids): - self.reducer._set_sparse_metadata(global_unique_ids)