Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[draft] [gpartition/sagemaker] Add support for range partitioning #882

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/source/scale/sagemaker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ data for distributed training.
Running the above will take the dataset in chunked format
from ``${DATASET_S3_PATH}`` as input and create a DistDGL graph with
``${NUM_PARTITIONS}`` under the output path, ``${OUTPUT_PATH}``.
Currently we only support ``random`` as the partitioning algorithm.
Currently we support ``random`` and ``range`` partition
assignment algorithms.

Passing additional arguments to the SageMaker
`````````````````````````````````````````````
Expand Down
8 changes: 5 additions & 3 deletions python/graphstorm/gpartition/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

Modules for local graph partitioning.
"""
from .random_partition import (RandomPartitionAlgorithm)
from .metis_partition import (ParMetisPartitionAlgorithm)
from .partition_config import (ParMETISConfig)
from .metis_partition import ParMetisPartitionAlgorithm
from .partition_algo_base import LocalPartitionAlgorithm
from .partition_config import ParMETISConfig
from .random_partition import RandomPartitionAlgorithm
from .range_partition import RangePartitionAlgorithm
30 changes: 26 additions & 4 deletions python/graphstorm/gpartition/dist_partition_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
import os
import queue
import time
import shutil
import subprocess
import sys
from typing import Dict
from threading import Thread

from graphstorm.gpartition import (ParMetisPartitionAlgorithm, ParMETISConfig,
RandomPartitionAlgorithm)
from graphstorm.gpartition import (
ParMetisPartitionAlgorithm,
ParMETISConfig,
RandomPartitionAlgorithm,
RangePartitionAlgorithm,
)
from graphstorm.utils import get_log_level


Expand Down Expand Up @@ -122,6 +127,8 @@ def main():
partition_config = ParMETISConfig(args.ip_config, args.input_path,
args.dgl_tool_path, args.metadata_filename)
partitioner = ParMetisPartitionAlgorithm(metadata_dict, partition_config)
elif args.partition_algorithm == "range":
partitioner = RangePartitionAlgorithm(metadata_dict)
else:
raise RuntimeError(f"Unknown partition algorithm {args.part_algorithm}")

Expand All @@ -133,7 +140,10 @@ def main():
part_assignment_dir)

part_end = time.time()
logging.info("Partition assignment took %f sec", part_end - part_start)
logging.info("Partition assignment with algorithm '%s' took %f sec",
args.partition_algorithm,
part_end - part_start,
)

if args.do_dispatch:
run_build_dglgraph(
Expand All @@ -147,6 +157,18 @@ def main():

logging.info("DGL graph building took %f sec", part_end - time.time())

# Copy raw_id_mappings to dist_graph if they exist in the input
raw_id_mappings_path = os.path.join(args.input_path, "raw_id_mappings")

if os.path.exists(raw_id_mappings_path):
logging.info("Copying raw_id_mappings to dist_graph")
shutil.copytree(
raw_id_mappings_path,
os.path.join(output_path, 'dist_graph/raw_id_mappings'),
dirs_exist_ok=True,
)


logging.info('Partition assignment and DGL graph creation took %f seconds',
time.time() - start)

Expand All @@ -166,7 +188,7 @@ def parse_args() -> argparse.Namespace:
argparser.add_argument("--dgl-tool-path", type=str, default="/root/dgl/tools",
help="The path to dgl/tools")
argparser.add_argument("--partition-algorithm", type=str, default="random",
choices=["random", "parmetis"], help="Partition algorithm to use.")
choices=["random", "parmetis", "range"], help="Partition algorithm to use.")
argparser.add_argument("--ip-config", type=str,
help=("A file storing a list of IPs, one line for "
"each instance of the partition cluster."))
Expand Down
3 changes: 2 additions & 1 deletion python/graphstorm/gpartition/partition_algo_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ def create_partitions(self, num_partitions: int, partition_assignment_dir: str):
@abstractmethod
def _assign_partitions(self, num_partitions: int, partition_dir: str):
"""Assigns each node in the data to a partition from 0 to `num_partitions-1`,
and creates one "{ntype}>.txt" partition assignment file per node type.
and creates one "{ntype}.txt" partition assignment file per node type
under the ``partition_dir``.

Parameters
----------
Expand Down
10 changes: 8 additions & 2 deletions python/graphstorm/gpartition/random_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,16 @@ def _assign_partitions(self, num_partitions: int, partition_dir: str):
logging.info("Generating random partition for node type %s", ntype)
ntype_output = os.path.join(partition_dir, f"{ntype}.txt")

partition_assignment = np.random.randint(0, num_partitions, (num_nodes_for_type,))
partition_dtype = np.uint8 if num_partitions <= 256 else np.uint16

partition_assignment = np.random.randint(
0,
num_partitions,
(num_nodes_for_type,),
dtype=partition_dtype)

arrow_partitions = pa.Table.from_arrays(
[pa.array(partition_assignment, type=pa.int64())],
[pa.array(partition_assignment)],
names=["partition_id"])
options = pa_csv.WriteOptions(include_header=False, delimiter=' ')
pa_csv.write_csv(arrow_partitions, ntype_output, write_options=options)
Expand Down
81 changes: 81 additions & 0 deletions python/graphstorm/gpartition/range_partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Single-instance random partition assignment
"""
import os
import logging
import json
from typing import List

import numpy as np
import pyarrow as pa
import pyarrow.csv as pa_csv

from .random_partition import LocalPartitionAlgorithm

class RangePartitionAlgorithm(LocalPartitionAlgorithm):
"""
Single-instance range partitioning algorithm.

The partition algorithm accepts the intermediate output from GraphStorm
gs-processing which matches the requirements of the DGL distributed
partitioning pipeline. It sequentially assigns nodes to partitions
and outputs the node assignment results and partition
metadata file to the provided output directory.


Parameters
----------
metadata: dict
DGL "Chunked graph data" JSON, as defined in
https://docs.dgl.ai/guide/distributed-preprocessing.html#specification
"""
def _assign_partitions(self, num_partitions: int, partition_dir: str):
num_nodes_per_type = self.metadata_dict["num_nodes_per_type"] # type: List[int]
ntypes = self.metadata_dict["node_type"] # type: List[str]

# Note: This assumes that the order of node_type is the same as the order num_nodes_per_type
for ntype, num_nodes_for_type in zip(ntypes, num_nodes_per_type):
logging.debug("Generating range partition for node type %s", ntype)
ntype_output_path = os.path.join(partition_dir, f"{ntype}.txt")

partition_dtype = np.uint8 if num_partitions <= 256 else np.uint16

assigned_parts = np.array_split(
np.empty(num_nodes_for_type, dtype=partition_dtype),
num_partitions)

for idx, assigned_part in enumerate(assigned_parts):
assigned_part[:] = idx

arrow_partitions = pa.Table.from_arrays(
[np.concatenate(assigned_parts)],
names=["partition_id"])
options = pa_csv.WriteOptions(include_header=False, delimiter=' ')
pa_csv.write_csv(arrow_partitions, ntype_output_path, write_options=options)


def _create_metadata(self, num_partitions: int, partition_dir: str) -> None:
# TODO: DGL currently restricts the names we can give in the metadata, will
# fix once https://github.com/dmlc/dgl/pull/7361 is merged into a release
partition_meta = {
"algo_name": "random",
"num_parts": num_partitions,
"version": "1.0.0"
}
partition_meta_filepath = os.path.join(partition_dir, "partition_meta.json")
with open(partition_meta_filepath, "w", encoding='utf-8') as metafile:
json.dump(partition_meta, metafile)
Loading
Loading