Skip to content

Commit

Permalink
Supporting __partitioned__ interface (#153)
Browse files Browse the repository at this point in the history
Co-authored-by: Antoni Baum <[email protected]>
  • Loading branch information
fschlimb and Yard1 authored Oct 13, 2021
1 parent 7dbf7f1 commit b5d725a
Show file tree
Hide file tree
Showing 6 changed files with 369 additions and 7 deletions.
3 changes: 2 additions & 1 deletion run_ci_examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ echo "running simple_dask.py" && python simple_dask.py --smoke-test
echo "running simple_modin.py" && python simple_modin.py --smoke-test
echo "running simple_objectstore.py" && python simple_objectstore.py --smoke-test
echo "running simple_ray_dataset.py" && python simple_objectstore.py --smoke-test
echo "running simple_partitioned.py" && python simple_partitioned.py --smoke-test

if [ "$TUNE" = "1" ]; then
echo "running simple_tune.py" && python simple_tune.py --smoke-test
Expand All @@ -42,4 +43,4 @@ popd
pushd xgboost_ray/tests
echo "running examples with Ray Client"
python -m pytest -v --durations=0 -x test_client.py
popd || exit 1
popd || exit 1
8 changes: 5 additions & 3 deletions xgboost_ray/data_sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
from xgboost_ray.data_sources.parquet import Parquet
from xgboost_ray.data_sources.object_store import ObjectStore
from xgboost_ray.data_sources.ray_dataset import RayDataset
from xgboost_ray.data_sources.partitioned import Partitioned

data_sources = [
Numpy, Pandas, Modin, Dask, MLDataset, Petastorm, CSV, Parquet,
ObjectStore, RayDataset
Numpy, Pandas, Partitioned, Modin, Dask, MLDataset, Petastorm, CSV,
Parquet, ObjectStore, RayDataset
]

__all__ = [
"DataSource", "RayFileType", "Numpy", "Pandas", "Modin", "Dask",
"MLDataset", "Petastorm", "CSV", "Parquet", "ObjectStore", "RayDataset"
"MLDataset", "Petastorm", "CSV", "Parquet", "ObjectStore", "RayDataset",
"Partitioned"
]
100 changes: 100 additions & 0 deletions xgboost_ray/data_sources/partitioned.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from typing import Any, Optional, Sequence, Dict, Tuple

from collections import defaultdict
import pandas as pd
import numpy as np

from ray import ObjectRef
from ray.actor import ActorHandle

from xgboost_ray.data_sources._distributed import \
assign_partitions_to_actors, get_actor_rank_ips
from xgboost_ray.data_sources.data_source import DataSource, RayFileType
from xgboost_ray.data_sources.pandas import Pandas
from xgboost_ray.data_sources.numpy import Numpy


class Partitioned(DataSource):
"""Read from distributed data structure implementing __partitioned__.
__partitioned__ provides meta data about how the data is partitioned and
distributed across several compute nodes, making supporting objects them
suitable for distributed loading.
Also see the __partitioned__ spec:
https://github.com/IntelPython/DPPY-Spec/blob/draft/partitioned/Partitioned.md
"""
supports_central_loading = True
supports_distributed_loading = True

@staticmethod
def is_data_type(data: Any,
filetype: Optional[RayFileType] = None) -> bool:
return hasattr(data, "__partitioned__")

@staticmethod
def load_data(
data: Any, # __partitioned__ dict
ignore: Optional[Sequence[str]] = None,
indices: Optional[Sequence[ObjectRef]] = None,
**kwargs) -> pd.DataFrame:

assert isinstance(data, dict), "Expected __partitioned__ dict"
_get = data["get"]

if indices is None or len(indices) == 0:
tiling = data["partition_tiling"]
ndims = len(tiling)
# we need tuples to access partitions in the right order
pos_suffix = (0, ) * (ndims - 1)
parts = data["partitions"]
# get the full data, e.g. all shards/partitions
local_df = [
_get(parts[(i, ) + pos_suffix]["data"])
for i in range(tiling[0])
]
else:
# here we got a list of futures for partitions
local_df = _get(indices)

if isinstance(local_df[0], pd.DataFrame):
return Pandas.load_data(
pd.concat(local_df, copy=False), ignore=ignore)
else:
return Numpy.load_data(np.concatenate(local_df), ignore=ignore)

@staticmethod
def get_actor_shards(
data: Any, # partitioned.pandas.DataFrame
actors: Sequence[ActorHandle]) -> \
Tuple[Any, Optional[Dict[int, Any]]]:
assert hasattr(data, "__partitioned__")

actor_rank_ips = get_actor_rank_ips(actors)

# Get accessor func and partitions
parted = data.__partitioned__
parts = parted["partitions"]
tiling = parted["partition_tiling"]
ndims = len(tiling)
if ndims < 1 or ndims > 2 or any(tiling[x] != 1
for x in range(1, ndims)):
raise RuntimeError(
"Only row-wise partitionings of 1d/2d structures supported.")

# Now build a table mapping from IP to list of partitions
ip_to_parts = defaultdict(lambda: [])
# we need tuples to access partitions in the right order
pos_suffix = (0, ) * (ndims - 1)
for i in range(tiling[0]):
part = parts[(i, ) + pos_suffix] # this works for 1d and 2d
ip_to_parts[part["location"][0]].append(part["data"])
# __partitioned__ is serializable, so pass it here
# as the first return value
ret = parted, assign_partitions_to_actors(ip_to_parts, actor_rank_ips)
return ret

@staticmethod
def get_n(data: Any):
"""Get length of data source partitions for sharding."""
return data.__partitioned__["shape"][0]
134 changes: 134 additions & 0 deletions xgboost_ray/examples/simple_partitioned.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import argparse

from sklearn import datasets
from sklearn.model_selection import train_test_split

import numpy as np

import ray

from xgboost_ray import RayDMatrix, train, RayParams

nc = 31


@ray.remote
class AnActor:
"""We mimic a distributed DF by having several actors create
data which form the global DF.
"""

@ray.method(num_returns=2)
def genData(self, rank, nranks, nrows):
"""Generate global dataset and cut out local piece.
In real life each actor would of course directly create local data.
"""
# Load dataset
data, labels = datasets.load_breast_cancer(return_X_y=True)
# Split into train and test set
train_x, _, train_y, _ = train_test_split(data, labels, test_size=0.25)
train_y = train_y.reshape((train_y.shape[0], 1))
train = np.hstack([train_x, train_y])
assert nrows <= train.shape[0]
assert nc == train.shape[1]
sz = nrows // nranks
return train[sz * rank:sz * (rank + 1)], ray.util.get_node_ip_address()


class Parted:
"""Class exposing __partitioned__
"""

def __init__(self, parted):
self.__partitioned__ = parted


def main(cpus_per_actor, num_actors):
nr = 424
actors = [AnActor.remote() for _ in range(num_actors)]
parts = [
actors[i].genData.remote(i, num_actors, nr) for i in range(num_actors)
]
rowsperpart = nr // num_actors
nr = rowsperpart * num_actors
parted = Parted({
"shape": (nr, nc),
"partition_tiling": (num_actors, 1),
"get": lambda x: ray.get(x),
"partitions": {(i, 0): {
"start": (i * rowsperpart, 0),
"shape": (rowsperpart, nc),
"data": parts[i][0],
"location": [ray.get(parts[i][1])],
}
for i in range(num_actors)}
})

yl = nc - 1
# Let's create DMatrix from our __partitioned__ structure
train_set = RayDMatrix(parted, f"f{yl}")

evals_result = {}
# Set XGBoost config.
xgboost_params = {
"tree_method": "approx",
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
}

# Train the classifier
bst = train(
params=xgboost_params,
dtrain=train_set,
evals=[(train_set, "train")],
evals_result=evals_result,
ray_params=RayParams(
max_actor_restarts=0,
gpus_per_actor=0,
cpus_per_actor=cpus_per_actor,
num_actors=num_actors),
verbose_eval=False,
num_boost_round=10)

model_path = "partitioned.xgb"
bst.save_model(model_path)
print("Final training error: {:.4f}".format(
evals_result["train"]["error"][-1]))


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--address",
required=False,
type=str,
help="the address to use for Ray")
parser.add_argument(
"--server-address",
required=False,
type=str,
help="Address of the remote server if using Ray Client.")
parser.add_argument(
"--cpus-per-actor",
type=int,
default=1,
help="Sets number of CPUs per xgboost training worker.")
parser.add_argument(
"--num-actors",
type=int,
default=4,
help="Sets number of xgboost workers to use.")
parser.add_argument(
"--smoke-test", action="store_true", default=False, help="gpu")

args, _ = parser.parse_known_args()

if not ray.is_initialized():
if args.smoke_test:
ray.init(num_cpus=args.num_actors + 1)
elif args.server_address:
ray.util.connect(args.server_address)
else:
ray.init(address=args.address)

main(args.cpus_per_actor, args.num_actors)
4 changes: 2 additions & 2 deletions xgboost_ray/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ def get_data_source(self) -> Type[DataSource]:

# Todo (krfricke): It would be good to have a more general way to
# check for compatibility here. Combine with test below?
if not isinstance(self.data,
(Iterable, MLDataset, RayDataset)) or invalid_data:
if not (isinstance(self.data, (Iterable, MLDataset, RayDataset))
or hasattr(self.data, "__partitioned__")) or invalid_data:
raise ValueError(
f"Distributed data loading only works with already "
f"distributed datasets. These should be specified through a "
Expand Down
Loading

0 comments on commit b5d725a

Please sign in to comment.