Skip to content

Commit

Permalink
[GraphBolt][CUDA] Expose RankSort to python, reorganize and test. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mfbalin committed Sep 5, 2024
1 parent f71427f commit 32b11c9
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 28 deletions.
7 changes: 5 additions & 2 deletions graphbolt/src/cuda/cooperative_minibatching_utils.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
* @brief Cooperative Minibatching (arXiv:2310.12403) utility function
* implementations in CUDA.
*/
#include <graphbolt/cuda_ops.h>
#include <thrust/transform.h>

#include <cub/cub.cuh>
#include <cuda/functional>

#include "./common.h"
#include "./cooperative_minibatching_utils.cuh"
#include "./cooperative_minibatching_utils.h"
#include "./utils.h"

Expand Down Expand Up @@ -60,7 +62,8 @@ RankSortImpl(
auto part_ids2 = part_ids.clone();
auto part_ids2_sorted = torch::empty_like(part_ids2);
auto nodes_sorted = torch::empty_like(nodes);
auto index = torch::arange(nodes.numel(), nodes.options());
auto index = ops::IndptrEdgeIdsImpl(
offsets_dev, nodes.scalar_type(), torch::nullopt, nodes.numel());
auto index_sorted = torch::empty_like(index);
return AT_DISPATCH_INDEX_TYPES(
nodes.scalar_type(), "RankSortImpl", ([&] {
Expand Down Expand Up @@ -103,7 +106,7 @@ RankSortImpl(
}

std::vector<std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>> RankSort(
std::vector<torch::Tensor>& nodes_list, const int64_t rank,
const std::vector<torch::Tensor>& nodes_list, const int64_t rank,
const int64_t world_size) {
const auto num_batches = nodes_list.size();
auto nodes = torch::cat(nodes_list, 0);
Expand Down
55 changes: 55 additions & 0 deletions graphbolt/src/cuda/cooperative_minibatching_utils.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright (c) 2024, mfbalin (Muhammed Fatih Balin)
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file cuda/cooperative_minibatching_utils.cuh
* @brief Cooperative Minibatching (arXiv:2310.12403) utility device functions
* in CUDA.
*/
#ifndef GRAPHBOLT_CUDA_COOPERATIVE_MINIBATCHING_UTILS_CUH_
#define GRAPHBOLT_CUDA_COOPERATIVE_MINIBATCHING_UTILS_CUH_

#include <curand_kernel.h>

namespace graphbolt {
namespace cuda {

using part_t = uint8_t;
constexpr auto kPartDType = torch::kUInt8;

/**
* @brief Given a vertex id, the rank of current GPU and the world size, returns
* the rank that this id belongs in a deterministic manner.
*
* @param id The node id that will mapped to a rank in [0, world_size).
* @param rank The rank of the current GPU.
* @param world_size The world size, the total number of cooperating GPUs.
*
* @return The rank of the GPU the given id is mapped to.
*/
template <typename index_t>
__device__ inline auto rank_assignment(
index_t id, uint32_t rank, uint32_t world_size) {
// Consider using a faster implementation in the future.
constexpr uint64_t kCurandSeed = 999961; // Any random number.
curandStatePhilox4_32_10_t rng;
curand_init(kCurandSeed, 0, id, &rng);
return (curand(&rng) - rank) % world_size;
}

} // namespace cuda
} // namespace graphbolt

#endif // GRAPHBOLT_CUDA_COOPERATIVE_MINIBATCHING_UTILS_CUH_
28 changes: 3 additions & 25 deletions graphbolt/src/cuda/cooperative_minibatching_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,12 @@
#ifndef GRAPHBOLT_CUDA_COOPERATIVE_MINIBATCHING_UTILS_H_
#define GRAPHBOLT_CUDA_COOPERATIVE_MINIBATCHING_UTILS_H_

#include <curand_kernel.h>
#include <ATen/cuda/CUDAEvent.h>
#include <torch/script.h>

namespace graphbolt {
namespace cuda {

using part_t = uint8_t;
constexpr auto kPartDType = torch::kUInt8;

/**
* @brief Given a vertex id, the rank of current GPU and the world size, returns
* the rank that this id belongs in a deterministic manner.
*
* @param id The node id that will mapped to a rank in [0, world_size).
* @param rank The rank of the current GPU.
* @param world_size The world size, the total number of cooperating GPUs.
*
* @return The rank of the GPU the given id is mapped to.
*/
template <typename index_t>
__device__ inline auto rank_assignment(
index_t id, uint32_t rank, uint32_t world_size) {
// Consider using a faster implementation in the future.
constexpr uint64_t kCurandSeed = 999961; // Any random number.
curandStatePhilox4_32_10_t rng;
curand_init(kCurandSeed, 0, id, &rng);
return (curand(&rng) - rank) % world_size;
}

/**
* @brief Given node ids, the rank of current GPU and the world size, returns
* the ranks that the given ids belong in a deterministic manner.
Expand Down Expand Up @@ -102,7 +79,8 @@ RankSortImpl(
* that belongs to the `i`th rank.
*/
std::vector<std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>> RankSort(
std::vector<torch::Tensor>& nodes_list, int64_t rank, int64_t world_size);
const std::vector<torch::Tensor>& nodes_list, int64_t rank,
int64_t world_size);

} // namespace cuda
} // namespace graphbolt
Expand Down
1 change: 1 addition & 0 deletions graphbolt/src/cuda/extension/unique_and_compact_map.cu
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <numeric>

#include "../common.h"
#include "../cooperative_minibatching_utils.cuh"
#include "../cooperative_minibatching_utils.h"
#include "../utils.h"
#include "./unique_and_compact.h"
Expand Down
3 changes: 2 additions & 1 deletion graphbolt/src/python_binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
#include <graphbolt/unique_and_compact.h>

#ifdef GRAPHBOLT_USE_CUDA
#include "./cuda/cooperative_minibatching_utils.h"
#include "./cuda/max_uva_threads.h"
#endif
#include "./cnumpy.h"
#include "./expand_indptr.h"
#include "./feature_cache.h"
#include "./index_select.h"
#include "./io_uring.h"
Expand Down Expand Up @@ -196,6 +196,7 @@ TORCH_LIBRARY(graphbolt, m) {
m.def("set_seed", &RandomEngine::SetManualSeed);
#ifdef GRAPHBOLT_USE_CUDA
m.def("set_max_uva_threads", &cuda::set_max_uva_threads);
m.def("rank_sort", &cuda::RankSort);
#endif
#ifdef HAS_IMPL_ABSTRACT_PYSTUB
m.impl_abstract_pystub("dgl.graphbolt.base", "//dgl.graphbolt.base");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import unittest

from functools import partial

import backend as F
import dgl.graphbolt as gb
import pytest
import torch

WORLD_SIZE = 7

assert_equal = partial(torch.testing.assert_close, rtol=0, atol=0)


@unittest.skipIf(
F._default_context_str != "gpu",
reason="This test requires an NVIDIA GPU.",
)
@pytest.mark.parametrize("dtype", [torch.int32, torch.int64])
@pytest.mark.parametrize("rank", list(range(WORLD_SIZE)))
def test_gpu_cached_feature_read_async(dtype, rank):
nodes_list1 = [
torch.randint(0, 11111111, [777], dtype=dtype, device=F.ctx())
for i in range(10)
]
nodes_list2 = [nodes.sort()[0] for nodes in nodes_list1]

res1 = torch.ops.graphbolt.rank_sort(nodes_list1, rank, WORLD_SIZE)
res2 = torch.ops.graphbolt.rank_sort(nodes_list2, rank, WORLD_SIZE)

for i, ((nodes1, idx1, offsets1), (nodes2, idx2, offsets2)) in enumerate(
zip(res1, res2)
):
assert_equal(nodes_list1[i], nodes1[idx1.sort()[1]])
assert_equal(nodes_list2[i], nodes2[idx2.sort()[1]])
assert_equal(offsets1, offsets2)
assert offsets1.is_pinned() and offsets2.is_pinned()

res3 = torch.ops.graphbolt.rank_sort(nodes_list1, rank, WORLD_SIZE)

# This function is deterministic. Call with identical arguments and check.
for (nodes1, idx1, offsets1), (nodes3, idx3, offsets3) in zip(res1, res3):
assert_equal(nodes1, nodes3)
assert_equal(idx1, idx3)
assert_equal(offsets1, offsets3)

# The dependency on the rank argument is simply a permutation.
res4 = torch.ops.graphbolt.rank_sort(nodes_list1, 0, WORLD_SIZE)
for (nodes1, idx1, offsets1), (nodes4, idx4, offsets4) in zip(res1, res4):
off1 = offsets1.tolist()
off4 = offsets4.tolist()
for i in range(WORLD_SIZE):
j = (i - rank + WORLD_SIZE) % WORLD_SIZE
assert_equal(
nodes1[off1[j] : off1[j + 1]], nodes4[off4[i] : off4[i + 1]]
)
assert_equal(
idx1[off1[j] : off1[j + 1]], idx4[off4[i] : off4[i + 1]]
)

0 comments on commit 32b11c9

Please sign in to comment.