Skip to content

Commit

Permalink
change partition
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu committed Sep 23, 2024
2 parents 945565e + 97d000b commit 27bb682
Show file tree
Hide file tree
Showing 21 changed files with 2,684 additions and 144 deletions.
485 changes: 485 additions & 0 deletions examples/graphbolt/pyg/multigpu/node_classification.py

Large diffs are not rendered by default.

23 changes: 20 additions & 3 deletions graphbolt/src/cuda/cooperative_minibatching_utils.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
* implementations in CUDA.
*/
#include <graphbolt/cuda_ops.h>
#include <thrust/scatter.h>
#include <thrust/transform.h>

#include <cub/cub.cuh>
#include <cuda/functional>

#include "../utils.h"
#include "./common.h"
#include "./cooperative_minibatching_utils.cuh"
#include "./cooperative_minibatching_utils.h"
Expand Down Expand Up @@ -62,8 +64,7 @@ RankSortImpl(
auto part_ids2 = part_ids.clone();
auto part_ids2_sorted = torch::empty_like(part_ids2);
auto nodes_sorted = torch::empty_like(nodes);
auto index = ops::IndptrEdgeIdsImpl(
offsets_dev, nodes.scalar_type(), torch::nullopt, nodes.numel());
auto index = torch::arange(nodes.numel(), nodes.options());
auto index_sorted = torch::empty_like(index);
return AT_DISPATCH_INDEX_TYPES(
nodes.scalar_type(), "RankSortImpl", ([&] {
Expand Down Expand Up @@ -100,8 +101,14 @@ RankSortImpl(
index.data_ptr<index_t>(), index_sorted.data_ptr<index_t>(),
nodes.numel(), num_batches, offsets_dev_ptr, offsets_dev_ptr + 1, 0,
num_bits);
auto values = ops::IndptrEdgeIdsImpl(
offsets_dev, nodes.scalar_type(), torch::nullopt, nodes.numel());
THRUST_CALL(
scatter, values.data_ptr<index_t>(),
values.data_ptr<index_t>() + values.numel(),
index_sorted.data_ptr<index_t>(), index.data_ptr<index_t>());
return std::make_tuple(
nodes_sorted, index_sorted, offsets, std::move(offsets_event));
nodes_sorted, index, offsets, std::move(offsets_event));
}));
}

Expand Down Expand Up @@ -138,5 +145,15 @@ std::vector<std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>> RankSort(
return results;
}

c10::intrusive_ptr<Future<
std::vector<std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>>>>
RankSortAsync(
const std::vector<torch::Tensor>& nodes_list, const int64_t rank,
const int64_t world_size) {
return async(
[=] { return RankSort(nodes_list, rank, world_size); },
utils::is_on_gpu(nodes_list.at(0)));
}

} // namespace cuda
} // namespace graphbolt
36 changes: 22 additions & 14 deletions graphbolt/src/cuda/cooperative_minibatching_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#define GRAPHBOLT_CUDA_COOPERATIVE_MINIBATCHING_UTILS_H_

#include <ATen/cuda/CUDAEvent.h>
#include <graphbolt/async.h>
#include <torch/script.h>

namespace graphbolt {
Expand All @@ -42,21 +43,21 @@ torch::Tensor RankAssignment(

/**
* @brief Given node ids, the ranks they belong, the offsets to separate
* different node types and num_bits indicating the world size is <= 2^num_bits,
* returns node ids sorted w.r.t. the ranks that the given ids belong along with
* the original positions.
* different node types and world size, returns node ids sorted w.r.t. the ranks
* that the given ids belong along with their new positions.
*
* @param nodes Node id tensor to be mapped to a rank in [0, world_size).
* @param part_ids Rank tensor the nodes belong to.
* @param offsets_dev Offsets to separate different node types.
* @param world_size World size, the total number of cooperating GPUs.
*
* @return (sorted_nodes, original_positions, rank_offsets, rank_offsets_event),
* where the first one includes sorted nodes, the second contains original
* positions of the sorted nodes and the third contains the offsets of the
* sorted_nodes indicating sorted_nodes[rank_offsets[i]: rank_offsets[i + 1]]
* contains nodes that belongs to the `i`th rank. Before accessing rank_offsets
* on the CPU, `rank_offsets_event.synchronize()` is required.
* @return (sorted_nodes, new_positions, rank_offsets, rank_offsets_event),
* where the first one includes sorted nodes, the second contains new positions
* of the given nodes, so that sorted_nodes[new_positions] == nodes, and the
* third contains the offsets of the sorted_nodes indicating
* sorted_nodes[rank_offsets[i]: rank_offsets[i + 1]] contains nodes that
* belongs to the `i`th rank. Before accessing rank_offsets on the CPU,
* `rank_offsets_event.synchronize()` is required.
*/
std::tuple<torch::Tensor, torch::Tensor, torch::Tensor, at::cuda::CUDAEvent>
RankSortImpl(
Expand All @@ -72,16 +73,23 @@ RankSortImpl(
* @param rank Rank of the current GPU.
* @param world_size World size, the total number of cooperating GPUs.
*
* @return vector of (sorted_nodes, original_positions, rank_offsets), where the
* first one includes sorted nodes, the second contains original positions of
* the sorted nodes and the third contains the offsets of the sorted_nodes
* indicating sorted_nodes[rank_offsets[i]: rank_offsets[i + 1]] contains nodes
* that belongs to the `i`th rank.
* @return vector of (sorted_nodes, new_positions, rank_offsets), where the
* first one includes sorted nodes, the second contains new positions of the
* given nodes, so that sorted_nodes[new_positions] == nodes, and the third
* contains the offsets of the sorted_nodes indicating
* sorted_nodes[rank_offsets[i]: rank_offsets[i + 1]] contains nodes that
* belongs to the `i`th rank.
*/
std::vector<std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>> RankSort(
const std::vector<torch::Tensor>& nodes_list, int64_t rank,
int64_t world_size);

c10::intrusive_ptr<Future<
std::vector<std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>>>>
RankSortAsync(
const std::vector<torch::Tensor>& nodes_list, const int64_t rank,
const int64_t world_size);

} // namespace cuda
} // namespace graphbolt

Expand Down
16 changes: 12 additions & 4 deletions graphbolt/src/cuda/extension/unique_and_compact_map.cu
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,12 @@ __global__ void _MapIdsBatched(

auto slot = map.find(key);
auto new_id = slot->second;
if (index) new_id = index[new_id];
mapped_ids[i] = new_id - unique_ids_offsets[batch_index];
if (index) {
new_id = index[new_id];
} else {
new_id -= unique_ids_offsets[batch_index];
}
mapped_ids[i] = new_id;
}

i += stride;
Expand Down Expand Up @@ -284,14 +288,18 @@ UniqueAndCompactBatchedHashMapBased(
unique_ids_offsets_dev.data_ptr<int64_t>();
}
at::cuda::CUDAEvent unique_ids_offsets_event;
unique_ids_offsets_event.record();
torch::optional<torch::Tensor> index;
if (part_ids) {
unique_ids_offsets_event.synchronize();
const auto num_unique =
unique_ids_offsets.data_ptr<int64_t>()[num_batches];
unique_ids = unique_ids.slice(0, 0, num_unique);
part_ids = part_ids->slice(0, 0, num_unique);
std::tie(
unique_ids, index, unique_ids_offsets, unique_ids_offsets_event) =
cuda::RankSortImpl(
unique_ids, *part_ids, unique_ids_offsets_dev, world_size);
} else {
unique_ids_offsets_event.record();
}
auto mapped_ids =
torch::empty(offsets_ptr[3 * num_batches], unique_ids.options());
Expand Down
8 changes: 8 additions & 0 deletions graphbolt/src/python_binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ TORCH_LIBRARY(graphbolt, m) {
&Future<std::vector<std::tuple<
torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor>>>::
Wait);
m.class_<Future<
std::vector<std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>>>>(
"RankSortFuture")
.def(
"wait",
&Future<std::vector<
std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>>>::Wait);
m.class_<Future<std::tuple<torch::Tensor, torch::Tensor, int64_t, int64_t>>>(
"GpuGraphCacheQueryFuture")
.def(
Expand Down Expand Up @@ -198,6 +205,7 @@ TORCH_LIBRARY(graphbolt, m) {
#ifdef GRAPHBOLT_USE_CUDA
m.def("set_max_uva_threads", &cuda::set_max_uva_threads);
m.def("rank_sort", &cuda::RankSort);
m.def("rank_sort_async", &cuda::RankSortAsync);
#endif
#ifdef HAS_IMPL_ABSTRACT_PYSTUB
m.impl_abstract_pystub("dgl.graphbolt.base", "//dgl.graphbolt.base");
Expand Down
52 changes: 34 additions & 18 deletions python/dgl/distributed/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1600,8 +1600,6 @@ def _save_graph_gb(part_config, part_id, csc_graph):


def cast_various_to_minimum_dtype_gb(
graph,
part_meta,
num_parts,
indptr,
indices,
Expand All @@ -1610,25 +1608,43 @@ def cast_various_to_minimum_dtype_gb(
ntypes,
node_attributes,
edge_attributes,
part_meta=None,
graph=None,
edge_count=None,
node_count=None,
tot_edge_count=None,
tot_node_count=None,
):
"""Cast various data to minimum dtype."""
if graph is not None:
assert part_meta is not None
tot_edge_count = graph.num_edges()
tot_node_count = graph.num_nodes()
node_count = part_meta["num_nodes"]
edge_count = part_meta["num_edges"]
else:
assert tot_edge_count is not None
assert tot_node_count is not None
assert edge_count is not None
assert node_count is not None

# Cast 1: indptr.
indptr = _cast_to_minimum_dtype(graph.num_edges(), indptr)
indptr = _cast_to_minimum_dtype(tot_edge_count, indptr)
# Cast 2: indices.
indices = _cast_to_minimum_dtype(graph.num_nodes(), indices)
indices = _cast_to_minimum_dtype(tot_node_count, indices)
# Cast 3: type_per_edge.
type_per_edge = _cast_to_minimum_dtype(
len(etypes), type_per_edge, field=ETYPE
)
# Cast 4: node/edge_attributes.
predicates = {
NID: part_meta["num_nodes"],
NID: node_count,
"part_id": num_parts,
NTYPE: len(ntypes),
EID: part_meta["num_edges"],
EID: edge_count,
ETYPE: len(etypes),
DGL2GB_EID: part_meta["num_edges"],
GB_DST_ID: part_meta["num_nodes"],
DGL2GB_EID: edge_count,
GB_DST_ID: node_count,
}
for attributes in [node_attributes, edge_attributes]:
for key in attributes:
Expand Down Expand Up @@ -1784,16 +1800,16 @@ def _convert_dgl_partition_to_gb(
)

indptr, indices, type_per_edge = cast_various_to_minimum_dtype_gb(
graph,
part_meta,
num_parts,
indptr,
indices,
type_per_edge,
etypes,
ntypes,
node_attributes,
edge_attributes,
graph=graph,
part_meta=part_meta,
num_parts=num_parts,
indptr=indptr,
indices=indices,
type_per_edge=type_per_edge,
etypes=etypes,
ntypes=ntypes,
node_attributes=node_attributes,
edge_attributes=edge_attributes,
)

csc_graph = gb.fused_csc_sampling_graph(
Expand Down
41 changes: 38 additions & 3 deletions python/dgl/graphbolt/feature_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from torch.utils.data import functional_datapipe

from .base import etype_tuple_to_str
from .impl.cooperative_conv import CooperativeConvFunction

from .minibatch_transformer import MiniBatchTransformer

Expand Down Expand Up @@ -73,6 +74,16 @@ class FeatureFetcher(MiniBatchTransformer):
If True, the feature fetcher will overlap the UVA feature fetcher
operations with the rest of operations by using an alternative CUDA
stream or utilizing asynchronous operations. Default is True.
cooperative: bool, optional
Boolean indicating whether Cooperative Minibatching, which was initially
proposed in
`Deep Graph Library PR#4337<https://github.com/dmlc/dgl/pull/4337>`__
and was later first fully described in
`Cooperative Minibatching in Graph Neural Networks
<https://arxiv.org/abs/2310.12403>`__. Cooperation between the GPUs
eliminates duplicate work performed across the GPUs due to the
overlapping sampled k-hop neighborhoods of seed nodes when performing
GNN minibatching.
"""

def __init__(
Expand All @@ -82,6 +93,7 @@ def __init__(
node_feature_keys=None,
edge_feature_keys=None,
overlap_fetch=True,
cooperative=False,
):
datapipe = datapipe.mark_feature_fetcher_start()
self.feature_store = feature_store
Expand Down Expand Up @@ -113,9 +125,12 @@ def __init__(
datapipe = datapipe.transform(
partial(self._execute_stage, i)
).buffer(1)
super().__init__(
datapipe, self._identity if max_val == 0 else self._final_stage
)
if max_val > 0:
datapipe = datapipe.transform(self._final_stage)
if cooperative:
datapipe = datapipe.transform(self._cooperative_exchange)
datapipe = datapipe.buffer()
super().__init__(datapipe)
# A positive value indicates that the overlap optimization is enabled.
self.max_num_stages = max_val

Expand Down Expand Up @@ -145,6 +160,26 @@ def _final_stage(data):
features[key] = value.wait()
return data

def _cooperative_exchange(self, data):
subgraph = data.sampled_subgraphs[0]
is_heterogeneous = isinstance(
self.node_feature_keys, Dict
) or isinstance(self.edge_feature_keys, Dict)
if is_heterogeneous:
node_features = {key: {} for key, _ in data.node_features.keys()}
for (key, ntype), feature in data.node_features.items():
node_features[key][ntype] = feature
for key, feature in node_features.items():
new_feature = CooperativeConvFunction.apply(subgraph, feature)
for ntype, tensor in new_feature.items():
data.node_features[(key, ntype)] = tensor
else:
for key in data.node_features:
feature = data.node_features[key]
new_feature = CooperativeConvFunction.apply(subgraph, feature)
data.node_features[key] = new_feature
return data

def _read(self, data):
"""
Fill in the node/edge features field in data.
Expand Down
1 change: 1 addition & 0 deletions python/dgl/graphbolt/impl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
from .gpu_graph_cache import *
from .cpu_feature_cache import *
from .cpu_cached_feature import *
from .cooperative_conv import *
Loading

0 comments on commit 27bb682

Please sign in to comment.