diff --git a/graphbolt/src/cuda/cooperative_minibatching_utils.cu b/graphbolt/src/cuda/cooperative_minibatching_utils.cu index 49403128a7f1..fb9858f6d559 100644 --- a/graphbolt/src/cuda/cooperative_minibatching_utils.cu +++ b/graphbolt/src/cuda/cooperative_minibatching_utils.cu @@ -18,12 +18,14 @@ * @brief Cooperative Minibatching (arXiv:2310.12403) utility function * implementations in CUDA. */ +#include #include #include #include #include "./common.h" +#include "./cooperative_minibatching_utils.cuh" #include "./cooperative_minibatching_utils.h" #include "./utils.h" @@ -60,7 +62,8 @@ RankSortImpl( auto part_ids2 = part_ids.clone(); auto part_ids2_sorted = torch::empty_like(part_ids2); auto nodes_sorted = torch::empty_like(nodes); - auto index = torch::arange(nodes.numel(), nodes.options()); + auto index = ops::IndptrEdgeIdsImpl( + offsets_dev, nodes.scalar_type(), torch::nullopt, nodes.numel()); auto index_sorted = torch::empty_like(index); return AT_DISPATCH_INDEX_TYPES( nodes.scalar_type(), "RankSortImpl", ([&] { @@ -103,7 +106,7 @@ RankSortImpl( } std::vector> RankSort( - std::vector& nodes_list, const int64_t rank, + const std::vector& nodes_list, const int64_t rank, const int64_t world_size) { const auto num_batches = nodes_list.size(); auto nodes = torch::cat(nodes_list, 0); diff --git a/graphbolt/src/cuda/cooperative_minibatching_utils.cuh b/graphbolt/src/cuda/cooperative_minibatching_utils.cuh new file mode 100644 index 000000000000..f5acc20a1f77 --- /dev/null +++ b/graphbolt/src/cuda/cooperative_minibatching_utils.cuh @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2024, mfbalin (Muhammed Fatih Balin) + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file cuda/cooperative_minibatching_utils.cuh + * @brief Cooperative Minibatching (arXiv:2310.12403) utility device functions + * in CUDA. + */ +#ifndef GRAPHBOLT_CUDA_COOPERATIVE_MINIBATCHING_UTILS_CUH_ +#define GRAPHBOLT_CUDA_COOPERATIVE_MINIBATCHING_UTILS_CUH_ + +#include + +namespace graphbolt { +namespace cuda { + +using part_t = uint8_t; +constexpr auto kPartDType = torch::kUInt8; + +/** + * @brief Given a vertex id, the rank of current GPU and the world size, returns + * the rank that this id belongs in a deterministic manner. + * + * @param id The node id that will mapped to a rank in [0, world_size). + * @param rank The rank of the current GPU. + * @param world_size The world size, the total number of cooperating GPUs. + * + * @return The rank of the GPU the given id is mapped to. + */ +template +__device__ inline auto rank_assignment( + index_t id, uint32_t rank, uint32_t world_size) { + // Consider using a faster implementation in the future. + constexpr uint64_t kCurandSeed = 999961; // Any random number. + curandStatePhilox4_32_10_t rng; + curand_init(kCurandSeed, 0, id, &rng); + return (curand(&rng) - rank) % world_size; +} + +} // namespace cuda +} // namespace graphbolt + +#endif // GRAPHBOLT_CUDA_COOPERATIVE_MINIBATCHING_UTILS_CUH_ diff --git a/graphbolt/src/cuda/cooperative_minibatching_utils.h b/graphbolt/src/cuda/cooperative_minibatching_utils.h index cd20138a01c9..45bd203f1f71 100644 --- a/graphbolt/src/cuda/cooperative_minibatching_utils.h +++ b/graphbolt/src/cuda/cooperative_minibatching_utils.h @@ -21,35 +21,12 @@ #ifndef GRAPHBOLT_CUDA_COOPERATIVE_MINIBATCHING_UTILS_H_ #define GRAPHBOLT_CUDA_COOPERATIVE_MINIBATCHING_UTILS_H_ -#include +#include #include namespace graphbolt { namespace cuda { -using part_t = uint8_t; -constexpr auto kPartDType = torch::kUInt8; - -/** - * @brief Given a vertex id, the rank of current GPU and the world size, returns - * the rank that this id belongs in a deterministic manner. - * - * @param id The node id that will mapped to a rank in [0, world_size). - * @param rank The rank of the current GPU. - * @param world_size The world size, the total number of cooperating GPUs. - * - * @return The rank of the GPU the given id is mapped to. - */ -template -__device__ inline auto rank_assignment( - index_t id, uint32_t rank, uint32_t world_size) { - // Consider using a faster implementation in the future. - constexpr uint64_t kCurandSeed = 999961; // Any random number. - curandStatePhilox4_32_10_t rng; - curand_init(kCurandSeed, 0, id, &rng); - return (curand(&rng) - rank) % world_size; -} - /** * @brief Given node ids, the rank of current GPU and the world size, returns * the ranks that the given ids belong in a deterministic manner. @@ -102,7 +79,8 @@ RankSortImpl( * that belongs to the `i`th rank. */ std::vector> RankSort( - std::vector& nodes_list, int64_t rank, int64_t world_size); + const std::vector& nodes_list, int64_t rank, + int64_t world_size); } // namespace cuda } // namespace graphbolt diff --git a/graphbolt/src/cuda/extension/unique_and_compact_map.cu b/graphbolt/src/cuda/extension/unique_and_compact_map.cu index ed030fd98718..a36c63925d7f 100644 --- a/graphbolt/src/cuda/extension/unique_and_compact_map.cu +++ b/graphbolt/src/cuda/extension/unique_and_compact_map.cu @@ -33,6 +33,7 @@ #include #include "../common.h" +#include "../cooperative_minibatching_utils.cuh" #include "../cooperative_minibatching_utils.h" #include "../utils.h" #include "./unique_and_compact.h" diff --git a/graphbolt/src/python_binding.cc b/graphbolt/src/python_binding.cc index ea2b543761cf..20c6d59be5d5 100644 --- a/graphbolt/src/python_binding.cc +++ b/graphbolt/src/python_binding.cc @@ -10,10 +10,10 @@ #include #ifdef GRAPHBOLT_USE_CUDA +#include "./cuda/cooperative_minibatching_utils.h" #include "./cuda/max_uva_threads.h" #endif #include "./cnumpy.h" -#include "./expand_indptr.h" #include "./feature_cache.h" #include "./index_select.h" #include "./io_uring.h" @@ -196,6 +196,7 @@ TORCH_LIBRARY(graphbolt, m) { m.def("set_seed", &RandomEngine::SetManualSeed); #ifdef GRAPHBOLT_USE_CUDA m.def("set_max_uva_threads", &cuda::set_max_uva_threads); + m.def("rank_sort", &cuda::RankSort); #endif #ifdef HAS_IMPL_ABSTRACT_PYSTUB m.impl_abstract_pystub("dgl.graphbolt.base", "//dgl.graphbolt.base"); diff --git a/tests/python/pytorch/graphbolt/impl/test_cooperative_minibatching_utils.py b/tests/python/pytorch/graphbolt/impl/test_cooperative_minibatching_utils.py new file mode 100644 index 000000000000..f85676578bd5 --- /dev/null +++ b/tests/python/pytorch/graphbolt/impl/test_cooperative_minibatching_utils.py @@ -0,0 +1,59 @@ +import unittest + +from functools import partial + +import backend as F +import dgl.graphbolt as gb +import pytest +import torch + +WORLD_SIZE = 7 + +assert_equal = partial(torch.testing.assert_close, rtol=0, atol=0) + + +@unittest.skipIf( + F._default_context_str != "gpu", + reason="This test requires an NVIDIA GPU.", +) +@pytest.mark.parametrize("dtype", [torch.int32, torch.int64]) +@pytest.mark.parametrize("rank", list(range(WORLD_SIZE))) +def test_gpu_cached_feature_read_async(dtype, rank): + nodes_list1 = [ + torch.randint(0, 11111111, [777], dtype=dtype, device=F.ctx()) + for i in range(10) + ] + nodes_list2 = [nodes.sort()[0] for nodes in nodes_list1] + + res1 = torch.ops.graphbolt.rank_sort(nodes_list1, rank, WORLD_SIZE) + res2 = torch.ops.graphbolt.rank_sort(nodes_list2, rank, WORLD_SIZE) + + for i, ((nodes1, idx1, offsets1), (nodes2, idx2, offsets2)) in enumerate( + zip(res1, res2) + ): + assert_equal(nodes_list1[i], nodes1[idx1.sort()[1]]) + assert_equal(nodes_list2[i], nodes2[idx2.sort()[1]]) + assert_equal(offsets1, offsets2) + assert offsets1.is_pinned() and offsets2.is_pinned() + + res3 = torch.ops.graphbolt.rank_sort(nodes_list1, rank, WORLD_SIZE) + + # This function is deterministic. Call with identical arguments and check. + for (nodes1, idx1, offsets1), (nodes3, idx3, offsets3) in zip(res1, res3): + assert_equal(nodes1, nodes3) + assert_equal(idx1, idx3) + assert_equal(offsets1, offsets3) + + # The dependency on the rank argument is simply a permutation. + res4 = torch.ops.graphbolt.rank_sort(nodes_list1, 0, WORLD_SIZE) + for (nodes1, idx1, offsets1), (nodes4, idx4, offsets4) in zip(res1, res4): + off1 = offsets1.tolist() + off4 = offsets4.tolist() + for i in range(WORLD_SIZE): + j = (i - rank + WORLD_SIZE) % WORLD_SIZE + assert_equal( + nodes1[off1[j] : off1[j + 1]], nodes4[off4[i] : off4[i + 1]] + ) + assert_equal( + idx1[off1[j] : off1[j + 1]], idx4[off4[i] : off4[i + 1]] + )