Skip to content

Commit

Permalink
Add range partition option
Browse files Browse the repository at this point in the history
  • Loading branch information
thvasilo committed Jun 17, 2024
1 parent b94e9e3 commit a1282de
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 17 deletions.
3 changes: 2 additions & 1 deletion docs/source/scale/sagemaker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ data for distributed training.
Running the above will take the dataset in chunked format
from ``${DATASET_S3_PATH}`` as input and create a DistDGL graph with
``${NUM_PARTITIONS}`` under the output path, ``${OUTPUT_PATH}``.
Currently we only support ``random`` as the partitioning algorithm.
Currently we support ``random`` and ``range`` partition
assignment algorithms.

Passing additional arguments to the SageMaker
`````````````````````````````````````````````
Expand Down
7 changes: 4 additions & 3 deletions python/graphstorm/gpartition/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
Modules for local graph partitioning.
"""
from .random_partition import (RandomPartitionAlgorithm)
from .metis_partition import (ParMetisPartitionAlgorithm)
from .partition_config import (ParMETISConfig)
from .metis_partition import ParMetisPartitionAlgorithm
from .partition_algo_base import LocalPartitionAlgorithm
from .partition_config import ParMETISConfig
from .random_partition import RandomPartitionAlgorithm
from .range_partition import RangePartitionAlgorithm
5 changes: 4 additions & 1 deletion python/graphstorm/gpartition/dist_partition_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ParMetisPartitionAlgorithm,
ParMETISConfig,
RandomPartitionAlgorithm,
RangePartitionAlgorithm,
)
from graphstorm.utils import get_log_level

Expand Down Expand Up @@ -126,6 +127,8 @@ def main():
partition_config = ParMETISConfig(args.ip_config, args.input_path,
args.dgl_tool_path, args.metadata_filename)
partitioner = ParMetisPartitionAlgorithm(metadata_dict, partition_config)
elif args.partition_algorithm == "range":
partitioner = RangePartitionAlgorithm(metadata_dict)
else:
raise RuntimeError(f"Unknown partition algorithm {args.part_algorithm}")

Expand Down Expand Up @@ -185,7 +188,7 @@ def parse_args() -> argparse.Namespace:
argparser.add_argument("--dgl-tool-path", type=str, default="/root/dgl/tools",
help="The path to dgl/tools")
argparser.add_argument("--partition-algorithm", type=str, default="random",
choices=["random", "parmetis"], help="Partition algorithm to use.")
choices=["random", "parmetis", "range"], help="Partition algorithm to use.")
argparser.add_argument("--ip-config", type=str,
help=("A file storing a list of IPs, one line for "
"each instance of the partition cluster."))
Expand Down
3 changes: 2 additions & 1 deletion python/graphstorm/gpartition/partition_algo_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ def create_partitions(self, num_partitions: int, partition_assignment_dir: str):
@abstractmethod
def _assign_partitions(self, num_partitions: int, partition_dir: str):
"""Assigns each node in the data to a partition from 0 to `num_partitions-1`,
and creates one "{ntype}>.txt" partition assignment file per node type.
and creates one "{ntype}.txt" partition assignment file per node type
under the ``partition_dir``.
Parameters
----------
Expand Down
81 changes: 81 additions & 0 deletions python/graphstorm/gpartition/range_partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Single-instance random partition assignment
"""
import os
import logging
import json
from typing import List

import numpy as np
import pyarrow as pa
import pyarrow.csv as pa_csv

from .random_partition import LocalPartitionAlgorithm

class RangePartitionAlgorithm(LocalPartitionAlgorithm):
"""
Single-instance range partitioning algorithm.
The partition algorithm accepts the intermediate output from GraphStorm
gs-processing which matches the requirements of the DGL distributed
partitioning pipeline. It sequentially assigns nodes to partitions
and outputs the node assignment results and partition
metadata file to the provided output directory.
Parameters
----------
metadata: dict
DGL "Chunked graph data" JSON, as defined in
https://docs.dgl.ai/guide/distributed-preprocessing.html#specification
"""
def _assign_partitions(self, num_partitions: int, partition_dir: str):
num_nodes_per_type = self.metadata_dict["num_nodes_per_type"] # type: List[int]
ntypes = self.metadata_dict["node_type"] # type: List[str]

# Note: This assumes that the order of node_type is the same as the order num_nodes_per_type
for ntype, num_nodes_for_type in zip(ntypes, num_nodes_per_type):
logging.debug("Generating range partition for node type %s", ntype)
ntype_output_path = os.path.join(partition_dir, f"{ntype}.txt")

partition_dtype = np.uint8 if num_partitions <= 256 else np.uint16

assigned_parts = np.array_split(
np.empty(num_nodes_for_type, dtype=partition_dtype),
num_partitions)

for idx, assigned_part in enumerate(assigned_parts):
assigned_part[:] = idx

arrow_partitions = pa.Table.from_arrays(
[np.concatenate(assigned_parts)],
names=["partition_id"])
options = pa_csv.WriteOptions(include_header=False, delimiter=' ')
pa_csv.write_csv(arrow_partitions, ntype_output_path, write_options=options)


def _create_metadata(self, num_partitions: int, partition_dir: str) -> None:
# TODO: DGL currently restricts the names we can give in the metadata, will
# fix once https://github.com/dmlc/dgl/pull/7361 is merged into a release
partition_meta = {
"algo_name": "random",
"num_parts": num_partitions,
"version": "1.0.0"
}
partition_meta_filepath = os.path.join(partition_dir, "partition_meta.json")
with open(partition_meta_filepath, "w", encoding='utf-8') as metafile:
json.dump(partition_meta, metafile)
3 changes: 3 additions & 0 deletions python/graphstorm/sagemaker/sagemaker_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from .sm_partition_algorithm import (
SageMakerPartitionerConfig,
SageMakerRandomPartitioner,
SageMakerRangePartitioner,
)

DGL_TOOL_PATH = "/root/dgl/tools"
Expand Down Expand Up @@ -321,6 +322,8 @@ def run_partition(job_config: PartitionJobConfig):

if job_config.partition_algorithm == 'random':
sm_partitioner = SageMakerRandomPartitioner(partition_config)
elif job_config.partition_algorithm == 'range':
sm_partitioner = SageMakerRangePartitioner(partition_config)
else:
raise RuntimeError(f"Unknown partition algorithm: '{job_config.partition_algorithm}'", )

Expand Down
31 changes: 23 additions & 8 deletions python/graphstorm/sagemaker/sm_partition_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from typing import Tuple

from sagemaker import Session
from graphstorm.gpartition import RandomPartitionAlgorithm
from graphstorm.gpartition import RandomPartitionAlgorithm, RangePartitionAlgorithm, LocalPartitionAlgorithm

from .s3_utils import upload_file_to_s3

Expand Down Expand Up @@ -179,19 +179,16 @@ def _upload_results_to_s3(self, local_partition_directory: str, output_s3_path:
S3 prefix to upload the partitioning results to.
"""

class SageMakerRandomPartitioner(SageMakerPartitioner): # pylint: disable=too-few-public-methods
"""
Single-instance random partitioning algorithm running on SageMaker
"""
def _run_partitioning(self, num_partitions: int) -> str:
random_part = RandomPartitionAlgorithm(self.metadata)
class SageMakerSingleInstancePartitioner(SageMakerPartitioner):
local_partitioner: LocalPartitionAlgorithm

def _run_partitioning(self, num_partitions: int) -> str:
part_assignment_dir = os.path.join(self.local_output_path, "partition_assignment")
os.makedirs(part_assignment_dir, exist_ok=True)

# Only the leader creates partition assignments
if self.rank == 0:
random_part.create_partitions(num_partitions, part_assignment_dir)
self.local_partitioner.create_partitions(num_partitions, part_assignment_dir)

return part_assignment_dir

Expand All @@ -205,3 +202,21 @@ def _upload_results_to_s3(self, local_partition_directory: str, output_s3_path:
else:
# Workers do not hold any partitioning information locally
pass

class SageMakerRandomPartitioner(SageMakerSingleInstancePartitioner): # pylint: disable=too-few-public-methods
"""
Single-instance random partitioning algorithm running on SageMaker
"""
def __init__(self, partition_config: SageMakerPartitionerConfig):
super().__init__(partition_config)

self.local_partitioner = RandomPartitionAlgorithm(self.metadata)

class SageMakerRangePartitioner(SageMakerSingleInstancePartitioner): # pylint: disable=too-few-public-methods
"""
Single-instance range partitioning algorithm running on SageMaker
"""
def __init__(self, partition_config: SageMakerPartitionerConfig):
super().__init__(partition_config)

self.local_partitioner = RangePartitionAlgorithm(self.metadata)
2 changes: 1 addition & 1 deletion sagemaker/launch/launch_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def get_partition_parser():
help="File name of metadata config file for chunked format data")

partition_args.add_argument("--partition-algorithm", type=str, default='random',
help="Partition algorithm to use.", choices=['random'])
help="Partition algorithm to use.", choices=['random', 'range'])

partition_args.add_argument("--skip-partitioning", action='store_true',
help="When set, we skip the partitioning step. "
Expand Down
2 changes: 1 addition & 1 deletion sagemaker/local/generate_sagemaker_docker_compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def get_parser():
help="Skip partitioning step and only do GSL object creation. Partition assignments "
"need to exist under the <output-data-s3>/partitions location.")
partition_parser.add_argument("--partition-algorithm", required=False,
default='random', choices=['random'],
default='random', choices=['random', 'range'],
help="Partition algorithm to use.")
partition_parser.add_argument("--metadata-filename", required=False,
default="metadata.json", help="Metadata file that describes the files "
Expand Down
2 changes: 1 addition & 1 deletion sagemaker/run/partition_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def partition_arg_parser():
parser.add_argument("--metadata-filename", type=str,
default="metadata.json", help="file name of metadata config file")
parser.add_argument("--partition-algorithm", type=str, default='random',
choices=['random'],
choices=['random', 'range'],
help="Partition algorithm to use.")
parser.add_argument("--skip-partitioning", type=str, default='false',
choices=['true', 'false'],
Expand Down
42 changes: 42 additions & 0 deletions tests/unit-tests/gpartition/test_range_partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import os
from tempfile import TemporaryDirectory

from graphstorm.gpartition import RangePartitionAlgorithm

from conftest import simple_test_partition


def test_create_range_partition(chunked_metadata_dict):
range_partitioner = RangePartitionAlgorithm(chunked_metadata_dict)
# TODO: DGL only supports random and metis as a name downstream
simple_test_partition(range_partitioner, "random", chunked_metadata_dict)


def test_range_partition_ordered(chunked_metadata_dict):
with TemporaryDirectory() as tmpdir:
num_parts = 8
range_partitioner = RangePartitionAlgorithm(chunked_metadata_dict)
range_partitioner.create_partitions(num_parts, tmpdir)
for _, node_type in enumerate(chunked_metadata_dict["node_type"]):
with open(
os.path.join(tmpdir, f"{node_type}.txt"), "r", encoding="utf-8"
) as f:
ntype_partitions = [int(x) for x in f.read().splitlines()]
# Ensure the partition assignments are in increasing order
assert sorted(ntype_partitions) == ntype_partitions

0 comments on commit a1282de

Please sign in to comment.