diff --git a/run_ci_examples.sh b/run_ci_examples.sh index 65fc8ccf..f03d7707 100755 --- a/run_ci_examples.sh +++ b/run_ci_examples.sh @@ -29,6 +29,7 @@ echo "running simple_dask.py" && python simple_dask.py --smoke-test echo "running simple_modin.py" && python simple_modin.py --smoke-test echo "running simple_objectstore.py" && python simple_objectstore.py --smoke-test echo "running simple_ray_dataset.py" && python simple_objectstore.py --smoke-test +echo "running simple_partitioned.py" && python simple_partitioned.py --smoke-test if [ "$TUNE" = "1" ]; then echo "running simple_tune.py" && python simple_tune.py --smoke-test @@ -42,4 +43,4 @@ popd pushd xgboost_ray/tests echo "running examples with Ray Client" python -m pytest -v --durations=0 -x test_client.py -popd || exit 1 \ No newline at end of file +popd || exit 1 diff --git a/xgboost_ray/data_sources/__init__.py b/xgboost_ray/data_sources/__init__.py index e1355a9a..63a6e269 100644 --- a/xgboost_ray/data_sources/__init__.py +++ b/xgboost_ray/data_sources/__init__.py @@ -9,13 +9,15 @@ from xgboost_ray.data_sources.parquet import Parquet from xgboost_ray.data_sources.object_store import ObjectStore from xgboost_ray.data_sources.ray_dataset import RayDataset +from xgboost_ray.data_sources.partitioned import Partitioned data_sources = [ - Numpy, Pandas, Modin, Dask, MLDataset, Petastorm, CSV, Parquet, - ObjectStore, RayDataset + Numpy, Pandas, Partitioned, Modin, Dask, MLDataset, Petastorm, CSV, + Parquet, ObjectStore, RayDataset ] __all__ = [ "DataSource", "RayFileType", "Numpy", "Pandas", "Modin", "Dask", - "MLDataset", "Petastorm", "CSV", "Parquet", "ObjectStore", "RayDataset" + "MLDataset", "Petastorm", "CSV", "Parquet", "ObjectStore", "RayDataset", + "Partitioned" ] diff --git a/xgboost_ray/data_sources/partitioned.py b/xgboost_ray/data_sources/partitioned.py new file mode 100644 index 00000000..d290aa18 --- /dev/null +++ b/xgboost_ray/data_sources/partitioned.py @@ -0,0 +1,100 @@ +from typing import Any, Optional, Sequence, Dict, Tuple + +from collections import defaultdict +import pandas as pd +import numpy as np + +from ray import ObjectRef +from ray.actor import ActorHandle + +from xgboost_ray.data_sources._distributed import \ + assign_partitions_to_actors, get_actor_rank_ips +from xgboost_ray.data_sources.data_source import DataSource, RayFileType +from xgboost_ray.data_sources.pandas import Pandas +from xgboost_ray.data_sources.numpy import Numpy + + +class Partitioned(DataSource): + """Read from distributed data structure implementing __partitioned__. + + __partitioned__ provides meta data about how the data is partitioned and + distributed across several compute nodes, making supporting objects them + suitable for distributed loading. + + Also see the __partitioned__ spec: + https://github.com/IntelPython/DPPY-Spec/blob/draft/partitioned/Partitioned.md + """ + supports_central_loading = True + supports_distributed_loading = True + + @staticmethod + def is_data_type(data: Any, + filetype: Optional[RayFileType] = None) -> bool: + return hasattr(data, "__partitioned__") + + @staticmethod + def load_data( + data: Any, # __partitioned__ dict + ignore: Optional[Sequence[str]] = None, + indices: Optional[Sequence[ObjectRef]] = None, + **kwargs) -> pd.DataFrame: + + assert isinstance(data, dict), "Expected __partitioned__ dict" + _get = data["get"] + + if indices is None or len(indices) == 0: + tiling = data["partition_tiling"] + ndims = len(tiling) + # we need tuples to access partitions in the right order + pos_suffix = (0, ) * (ndims - 1) + parts = data["partitions"] + # get the full data, e.g. all shards/partitions + local_df = [ + _get(parts[(i, ) + pos_suffix]["data"]) + for i in range(tiling[0]) + ] + else: + # here we got a list of futures for partitions + local_df = _get(indices) + + if isinstance(local_df[0], pd.DataFrame): + return Pandas.load_data( + pd.concat(local_df, copy=False), ignore=ignore) + else: + return Numpy.load_data(np.concatenate(local_df), ignore=ignore) + + @staticmethod + def get_actor_shards( + data: Any, # partitioned.pandas.DataFrame + actors: Sequence[ActorHandle]) -> \ + Tuple[Any, Optional[Dict[int, Any]]]: + assert hasattr(data, "__partitioned__") + + actor_rank_ips = get_actor_rank_ips(actors) + + # Get accessor func and partitions + parted = data.__partitioned__ + parts = parted["partitions"] + tiling = parted["partition_tiling"] + ndims = len(tiling) + if ndims < 1 or ndims > 2 or any(tiling[x] != 1 + for x in range(1, ndims)): + raise RuntimeError( + "Only row-wise partitionings of 1d/2d structures supported.") + + # Now build a table mapping from IP to list of partitions + ip_to_parts = defaultdict(lambda: []) + # we need tuples to access partitions in the right order + pos_suffix = (0, ) * (ndims - 1) + for i in range(tiling[0]): + part = parts[(i, ) + pos_suffix] # this works for 1d and 2d + ip_to_parts[part["location"][0]].append(part["data"]) + # __partitioned__ is serializable, so pass it here + # as the first return value + ret = parted, assign_partitions_to_actors(ip_to_parts, actor_rank_ips) + return ret + + @staticmethod + def get_n(data: Any): + """Get length of data source partitions for sharding.""" + return data.__partitioned__["shape"][0] diff --git a/xgboost_ray/examples/simple_partitioned.py b/xgboost_ray/examples/simple_partitioned.py new file mode 100644 index 00000000..9a6bb7c7 --- /dev/null +++ b/xgboost_ray/examples/simple_partitioned.py @@ -0,0 +1,134 @@ +import argparse + +from sklearn import datasets +from sklearn.model_selection import train_test_split + +import numpy as np + +import ray + +from xgboost_ray import RayDMatrix, train, RayParams + +nc = 31 + + +@ray.remote +class AnActor: + """We mimic a distributed DF by having several actors create + data which form the global DF. + """ + + @ray.method(num_returns=2) + def genData(self, rank, nranks, nrows): + """Generate global dataset and cut out local piece. + In real life each actor would of course directly create local data. + """ + # Load dataset + data, labels = datasets.load_breast_cancer(return_X_y=True) + # Split into train and test set + train_x, _, train_y, _ = train_test_split(data, labels, test_size=0.25) + train_y = train_y.reshape((train_y.shape[0], 1)) + train = np.hstack([train_x, train_y]) + assert nrows <= train.shape[0] + assert nc == train.shape[1] + sz = nrows // nranks + return train[sz * rank:sz * (rank + 1)], ray.util.get_node_ip_address() + + +class Parted: + """Class exposing __partitioned__ + """ + + def __init__(self, parted): + self.__partitioned__ = parted + + +def main(cpus_per_actor, num_actors): + nr = 424 + actors = [AnActor.remote() for _ in range(num_actors)] + parts = [ + actors[i].genData.remote(i, num_actors, nr) for i in range(num_actors) + ] + rowsperpart = nr // num_actors + nr = rowsperpart * num_actors + parted = Parted({ + "shape": (nr, nc), + "partition_tiling": (num_actors, 1), + "get": lambda x: ray.get(x), + "partitions": {(i, 0): { + "start": (i * rowsperpart, 0), + "shape": (rowsperpart, nc), + "data": parts[i][0], + "location": [ray.get(parts[i][1])], + } + for i in range(num_actors)} + }) + + yl = nc - 1 + # Let's create DMatrix from our __partitioned__ structure + train_set = RayDMatrix(parted, f"f{yl}") + + evals_result = {} + # Set XGBoost config. + xgboost_params = { + "tree_method": "approx", + "objective": "binary:logistic", + "eval_metric": ["logloss", "error"], + } + + # Train the classifier + bst = train( + params=xgboost_params, + dtrain=train_set, + evals=[(train_set, "train")], + evals_result=evals_result, + ray_params=RayParams( + max_actor_restarts=0, + gpus_per_actor=0, + cpus_per_actor=cpus_per_actor, + num_actors=num_actors), + verbose_eval=False, + num_boost_round=10) + + model_path = "partitioned.xgb" + bst.save_model(model_path) + print("Final training error: {:.4f}".format( + evals_result["train"]["error"][-1])) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--address", + required=False, + type=str, + help="the address to use for Ray") + parser.add_argument( + "--server-address", + required=False, + type=str, + help="Address of the remote server if using Ray Client.") + parser.add_argument( + "--cpus-per-actor", + type=int, + default=1, + help="Sets number of CPUs per xgboost training worker.") + parser.add_argument( + "--num-actors", + type=int, + default=4, + help="Sets number of xgboost workers to use.") + parser.add_argument( + "--smoke-test", action="store_true", default=False, help="gpu") + + args, _ = parser.parse_known_args() + + if not ray.is_initialized(): + if args.smoke_test: + ray.init(num_cpus=args.num_actors + 1) + elif args.server_address: + ray.util.connect(args.server_address) + else: + ray.init(address=args.address) + + main(args.cpus_per_actor, args.num_actors) diff --git a/xgboost_ray/matrix.py b/xgboost_ray/matrix.py index 28a908db..777bfb96 100644 --- a/xgboost_ray/matrix.py +++ b/xgboost_ray/matrix.py @@ -394,8 +394,8 @@ def get_data_source(self) -> Type[DataSource]: # Todo (krfricke): It would be good to have a more general way to # check for compatibility here. Combine with test below? - if not isinstance(self.data, - (Iterable, MLDataset, RayDataset)) or invalid_data: + if not (isinstance(self.data, (Iterable, MLDataset, RayDataset)) + or hasattr(self.data, "__partitioned__")) or invalid_data: raise ValueError( f"Distributed data loading only works with already " f"distributed datasets. These should be specified through a " diff --git a/xgboost_ray/tests/test_data_source.py b/xgboost_ray/tests/test_data_source.py index d8fe978e..69fc3077 100644 --- a/xgboost_ray/tests/test_data_source.py +++ b/xgboost_ray/tests/test_data_source.py @@ -8,7 +8,7 @@ import ray from ray import ObjectRef -from xgboost_ray.data_sources import Modin, Dask +from xgboost_ray.data_sources import Modin, Dask, Partitioned from xgboost_ray.data_sources.ray_dataset import RAY_DATASET_AVAILABLE, \ RayDataset from xgboost_ray.main import _RemoteRayXGBoostActor @@ -581,6 +581,131 @@ def create_remote_df(arr): f"partition {i} is not partition with ID {part_id}.") +class PartitionedSourceTest(_DistributedDataSourceTest, unittest.TestCase): + def _testAssignPartitions(self, part_nodes, actor_nodes, + expected_actor_parts): + partitions = [ + ray.put(pd.DataFrame(p)) + for p in np.array_split(self.x, len(part_nodes)) + ] + + # Dict from partition (obj ref) to node host + part_to_node = dict(zip(partitions, [f"node{n}" for n in part_nodes])) + + actors_to_node = dict(enumerate(f"node{n}" for n in actor_nodes)) + + actor_to_parts = self._getActorToParts(actors_to_node, partitions, + part_to_node, part_nodes) + + for actor_rank, part_ids in expected_actor_parts.items(): + for i, part_id in enumerate(part_ids): + self.assertEqual( + actor_to_parts[actor_rank][i], + partitions[part_id], + msg=f"Assignment failed: Actor rank {actor_rank}, " + f"partition {i} is not partition with ID {part_id}.") + + def _mk_partitioned(self, part_to_node, nr, nc, shapes): + class Parted: + """Class exposing __partitioned__ + """ + + def __init__(self, parted): + self.__partitioned__ = parted + + num_parts = len(part_to_node) + data = { + "shape": (nr, nc), + "partition_tiling": (num_parts, 1), + "get": lambda x: ray.get(x), + "partitions": {} + } + startx = 0 + for i, pn in enumerate(part_to_node.items()): + partref, node = pn + data["partitions"][(i, 0)] = { + "start": (startx, 0), + "shape": shapes[partref], + "data": partref, + "location": [node], + } + startx = startx + shapes[partref][0] + + return Parted(data) + + def _getActorToParts(self, actors_to_node, partitions, part_to_node, + part_nodes): + def actor_ranks(actors): + return actors_to_node + + with patch("xgboost_ray.data_sources.partitioned.get_actor_rank_ips" + ) as mock_ranks: + mock_ranks.side_effect = actor_ranks + + nr, nc = self.x.shape + data = self._mk_partitioned( + part_to_node, nr, nc, + {p: ray.get(p).shape + for p in partitions}) + + _, actor_to_parts = Partitioned.get_actor_shards( + data=data, actors=[]) + + return actor_to_parts + + def _testDataSourceAssignment(self, part_nodes, actor_nodes, + expected_actor_parts): + node_ips = [ + node["NodeManagerAddress"] for node in ray.nodes() if node["Alive"] + ] + if len(node_ips) < max(max(actor_nodes), max(part_nodes)) + 1: + print("Not running on cluster, skipping rest of this test.") + return + + actor_node_ips = [node_ips[nid] for nid in actor_nodes] + part_node_ips = [node_ips[nid] for nid in part_nodes] + + # Initialize data frames on remote nodes + # This way we can control which partition is on which node + @ray.remote(num_cpus=0.1) + def create_remote_df(arr): + return ray.put(pd.DataFrame(arr)) + + partitions = np.array_split(self.x, len(part_nodes)) + node_dfs, shapes = {}, {} + for pid, pip in enumerate(part_node_ips): + pref = ray.get( + create_remote_df.options(resources={ + f"node:{pip}": 0.1 + }).remote(partitions[pid])) + node_dfs[pref] = pip + shapes[pref] = partitions[pid].shape + + nr, nc = self.x.shape + # Create structure with __partitioned__ from distributed partitions + parted = self._mk_partitioned(node_dfs, nr, nc, shapes) + + # Create ray actors + actors = [ + _RemoteRayXGBoostActor.options(resources={ + f"node:{nip}": 0.1 + }).remote(rank=rank, num_actors=len(actor_nodes)) + for rank, nip in enumerate(actor_node_ips) + ] + + # Calculate shards + _, actor_to_parts = Partitioned.get_actor_shards(parted, actors) + + for actor_rank, part_ids in expected_actor_parts.items(): + for i, part_id in enumerate(part_ids): + assigned_df = ray.get(actor_to_parts[actor_rank][i]) + part_df = pd.DataFrame(partitions[part_id]) + self.assertTrue( + assigned_df.equals(part_df), + msg=f"Assignment failed: Actor rank {actor_rank}, " + f"partition {i} is not partition with ID {part_id}.") + + if __name__ == "__main__": import pytest import sys