Skip to content

Commit

Permalink
Update readme (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
krfricke authored May 20, 2021
1 parent 1249834 commit eee7831
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 8 deletions.
200 changes: 193 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
Distributed XGBoost on Ray
==========================
![Build Status](https://github.com/ray-project/xgboost_ray/workflows/pytest%20on%20push/badge.svg)
[![docs.ray.io](https://img.shields.io/badge/docs-ray.io-blue)](https://docs.ray.io/en/master/xgboost-ray.html)

This library adds a new backend for XGBoost utilizing the
XGBoost-Ray is a distributed backend for
[XGBoost](https://xgboost.readthedocs.io/en/latest/), built
on top of
[distributed computing framework Ray](https://ray.io).

XGBoost-Ray enables multi node and multi GPU
training with an interface compatible with the usual
XGBoost API. It also integrates with [Ray Tune](#hyperparameter-tuning)
and offers advanced fault tolerance configuration.
XGBoost-Ray
- enables [**multi-node**](#usage) and [**multi-GPU**](#multi-gpu-training) training
- integrates seamlessly with distributed [**hyperparameter optimization**](#hyperparameter-tuning) library [Ray Tune](http://tune.io)
- comes with advanced [**fault tolerance handling**](#fault-tolerance) mechanisms, and
- supports [**distributed dataframes** and **distributed data loading**](#distributed-data-loading)

All releases are tested on large clusters and workloads.


Installation
------------
You can install the latest XGBoost-Ray release like this:
You can install the latest XGBoost-Ray release from PIP:

```bash
pip install xgboost_ray
Expand All @@ -34,6 +38,11 @@ XGBoost-Ray provides a drop-in replacement for XGBoost's `train`
function. To pass data, instead of using `xgb.DMatrix` you will
have to use `xgboost_ray.RayDMatrix`.

Distributed training parameters are configured with a
`xgboost_ray.RayParams` object. For instance, you can set
the `num_actors` property to specify how many distributed actors
you would like to use.

Here is a simplified example (which requires `sklearn`):

**Training:**
Expand All @@ -56,7 +65,7 @@ bst = train(
evals=[(train_set, "train")],
verbose_eval=False,
ray_params=RayParams(
num_actors=2,
num_actors=2, # Number of remote actors
cpus_per_actor=1))

bst.save_model("model.xgb")
Expand Down Expand Up @@ -179,6 +188,61 @@ print("Best hyperparameters", analysis.best_config)

Also see examples/simple_tune.py for another example.

Fault tolerance
---------------
XGBoost-Ray leverages the stateful Ray actor model to
enable fault tolerant training. There are currently
two modes implemented.

### Non-elastic training (warm restart)

When an actor or node dies, XGBoost-Ray will retain the
state of the remaining actors. In non-elastic training,
the failed actors will be replaced as soon as resources
are available again. Only these actors will reload their
parts of the data. Training will resume once all actors
are ready for training again.

You can set this mode in the `RayParams`:

```python
from xgboost_ray import RayParams

ray_params = RayParams(
elastic_training=False, # Use non-elastic training
max_actor_restarts=2, # How often are actors allowed to fail
)
```

### Elastic training

In elastic training, XGBoost-Ray will continue training
with fewer actors (and on fewer data) when a node or actor
dies. The missing actors are staged in the background,
and are reintegrated into training once they are back and
loaded their data.

This mode will train on fewer data for a period of time,
which can impact accuracy. In practice, we found these
effects to be minor, especially for large shuffled datasets.
The immediate benefit is that training time is reduced
significantly to almost the same level as if no actors died.
Thus, especially when data loading takes a large part of
the total training time, this setting can dramatically speed
up training times for large distributed jobs.

You can configure this mode in the `RayParams`:

```python
from xgboost_ray import RayParams

ray_params = RayParams(
elastic_training=True, # Use elastic training
max_failed_actors=3, # Only allow at most 3 actors to die at the same time
max_actor_restarts=2, # How often are actors allowed to fail
)
```

Resources
---------
By default, XGBoost-Ray tries to determine the number of CPUs
Expand All @@ -192,6 +256,126 @@ setting this explicitly.
The number of XGBoost actors always has to be set manually with
the `num_actors` argument.

### Multi GPU training
XGBoost-Ray enables multi GPU training. The XGBoost core backend
will automatically leverage NCCL2 for cross-device communication.
All you have to do is to start one actor per GPU.

For instance, if you have 2 machines with 4 GPUs each, you will want
to start 8 remote actors, and set `gpus_per_actor=1`. There is usually
no benefit in allocating less (e.g. 0.5) or more than one GPU per actor.

You should divide the CPUs evenly across actors per machine, so if your
machines have 16 CPUs in addition to the 4 GPUs, each actor should have
4 CPUs to use.

```python
from xgboost_ray import RayParams

ray_params = RayParams(
num_actors=8,
gpus_per_actor=1,
cpus_per_actor=4, # Divide evenly across actors per machine
)
```

### How many remote actors should I use?

This depends on your workload and your cluster setup.
Generally there is no inherent benefit of running more than
one remote actor per node for CPU-only training. This is because
XGBoost core can already leverage multiple CPUs via threading.

However, there are some cases when you should consider starting
more than one actor per node:

- For [**multi GPU training**](#multi-gpu-training), each GPU should have a separate
remote actor. Thus, if your machine has 24 CPUs and 4 GPUs,
you will want to start 4 remote actors with 6 CPUs and 1 GPU
each
- In a **heterogeneous cluster**, you might want to find the
[greatest common divisor](https://en.wikipedia.org/wiki/Greatest_common_divisor)
for the number of CPUs.
E.g. for a cluster with three nodes of 4, 8, and 12 CPUs, respectively,
you should set the number of actors to 6 and the CPUs per
actor to 4.

Distributed data loading
------------------------
XGBoost-Ray can leverage both centralized and distributed data loading.

In **centralized data loading**, the data is partitioned by the head node
and stored in the object store. Each remote actor then retrieves their
partitions by querying the Ray object store. Centralized loading is used
when you pass centralized in-memory dataframes, such as Pandas dataframes
or Numpy arrays, or when you pass a single source file, such as a single CSV
or Parquet file.


```python
from xgboost_ray import RayDMatrix

# This will use centralized data loading, as only one source file is specified
# `label_col` is a column in the CSV, used as the target label
ray_params = RayDMatrix("./source_file.csv", label="label_col")
```

In **distributed data loading**, each remote actor loads their data directly from
the source (e.g. local hard disk, NFS, HDFS, S3),
without a central bottleneck. The data is still stored in the
object store, but locally to each actor. This mode is used automatically
when loading data from multiple CSV or Parquet files. Please note that
we do not check or enforce partition sizes in this case - it is your job
to make sure the data is evenly distributed across the source files.

```python
from xgboost_ray import RayDMatrix

# This will use distributed data loading, as four source files are specified
# Please note that you cannot schedule more than four actors in this case.
# `label_col` is a column in the Parquet files, used as the target label
ray_params = RayDMatrix([
"hdfs:///tmp/part1.parquet",
"hdfs:///tmp/part2.parquet",
"hdfs:///tmp/part3.parquet",
"hdfs:///tmp/part4.parquet",
], label="label_col")
```

Lastly, XGBoost-Ray supports **distributed dataframe** representations, such
as [Modin](https://modin.readthedocs.io/en/latest/) and
[Dask dataframes](https://docs.dask.org/en/latest/dataframe.html)
(used with [Dask on Ray](https://docs.ray.io/en/master/dask-on-ray.html)).
Here, XGBoost-Ray will check on which nodes the distributed partitions
are currently located, and will assign partitions to actors in order to
minimize cross-node data transfer. Please note that we also assume here
that partition sizes are uniform.

```python
from xgboost_ray import RayDMatrix

# This will try to allocate the existing Modin partitions
# to co-located Ray actors. If this is not possible, data will
# be transferred across nodes
ray_params = RayDMatrix(existing_modin_df)
```

### Data sources

| Type | Centralized loading | Distributed loading |
|------------------------------------------------------------------|---------------------|---------------------|
| Numpy array | Yes | No |
| Pandas dataframe | Yes | No |
| Single CSV | Yes | No |
| Multi CSV | Yes | Yes |
| Single Parquet | Yes | No |
| Multi Parquet | Yes | Yes |
| [Petastorm](https://github.com/uber/petastorm) | Yes | Yes |
| [Ray MLDataset](https://docs.ray.io/en/master/iter.html) | Yes | Yes |
| [Dask dataframe](https://docs.dask.org/en/latest/dataframe.html) | Yes | Yes |
| [Modin dataframe](https://modin.readthedocs.io/en/latest/) | Yes | Yes |


Memory usage
-------------
XGBoost uses a compute-optimized datastructure, the `DMatrix`,
Expand Down Expand Up @@ -281,4 +465,6 @@ the [examples folder](examples/):

Resources
---------
* [XGBoost-Ray documentation](https://docs.ray.io/en/master/xgboost-ray.html)
* [Ray community slack](https://forms.gle/9TSdDYUgxYs8SA9e8)

2 changes: 1 addition & 1 deletion xgboost_ray/data_sources/modin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
try:
import modin # noqa: F401
MODIN_INSTALLED = modin.__version__ >= "0.9.0"
except ImportError:
except (ImportError, AttributeError):
MODIN_INSTALLED = False


Expand Down

0 comments on commit eee7831

Please sign in to comment.