Skip to content

Commit

Permalink
Merge branch 'master' into ext-ellpack-dense-1
Browse files Browse the repository at this point in the history
  • Loading branch information
trivialfis committed Sep 20, 2024
2 parents 38af445 + d5e1c41 commit 34dcd63
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 41 deletions.
11 changes: 7 additions & 4 deletions doc/tutorials/model.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Introduction to Boosted Trees
#############################
XGBoost stands for "Extreme Gradient Boosting", where the term "Gradient Boosting" originates from the paper *Greedy Function Approximation: A Gradient Boosting Machine*, by Friedman.

The **gradient boosted trees** has been around for a while, and there are a lot of materials on the topic.
The term **gradient boosted trees** has been around for a while, and there are a lot of materials on the topic.
This tutorial will explain boosted trees in a self-contained and principled way using the elements of supervised learning.
We think this explanation is cleaner, more formal, and motivates the model formulation used in XGBoost.

Expand Down Expand Up @@ -119,13 +119,16 @@ Let the following be the objective function (remember it always needs to contain

.. math::
\text{obj} = \sum_{i=1}^n l(y_i, \hat{y}_i^{(t)}) + \sum_{i=1}^t\omega(f_i)
\text{obj} = \sum_{i=1}^n l(y_i, \hat{y}_i^{(t)}) + \sum_{k=1}^t\omega(f_k)
in which :math:`t` is the number of trees in our ensemble.
(Each training step will add one new tree, so that at step :math:`t` the ensemble contains :math:`K=t` trees).

Additive Training
=================

The first question we want to ask: what are the **parameters** of trees?
You can find that what we need to learn are those functions :math:`f_i`, each containing the structure
You can find that what we need to learn are those functions :math:`f_k`, each containing the structure
of the tree and the leaf scores. Learning tree structure is much harder than traditional optimization problem where you can simply take the gradient.
It is intractable to learn all the trees at once.
Instead, we use an additive strategy: fix what we have learned, and add one new tree at a time.
Expand All @@ -150,7 +153,7 @@ If we consider using mean squared error (MSE) as our loss function, the objectiv

.. math::
\text{obj}^{(t)} & = \sum_{i=1}^n (y_i - (\hat{y}_i^{(t-1)} + f_t(x_i)))^2 + \sum_{i=1}^t\omega(f_i) \\
\text{obj}^{(t)} & = \sum_{i=1}^n (y_i - (\hat{y}_i^{(t-1)} + f_t(x_i)))^2 + \sum_{k=1}^t\omega(f_k) \\
& = \sum_{i=1}^n [2(\hat{y}_i^{(t-1)} - y_i)f_t(x_i) + f_t(x_i)^2] + \omega(f_t) + \mathrm{constant}
The form of MSE is friendly, with a first order term (usually called the residual) and a quadratic term.
Expand Down
9 changes: 7 additions & 2 deletions python-package/xgboost/testing/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,13 @@ def check_extmem_qdm(
)

booster_it = xgb.train({"device": device}, Xy_it, num_boost_round=8)
X, y, w = it.as_arrays()
Xy = xgb.QuantileDMatrix(X, y, weight=w)
it = tm.IteratorForTest(
*tm.make_batches(
n_samples_per_batch, n_features, n_batches, use_cupy=device != "cpu"
),
cache=None,
)
Xy = xgb.QuantileDMatrix(it)
booster = xgb.train({"device": device}, Xy, num_boost_round=8)

if device == "cpu":
Expand Down
61 changes: 26 additions & 35 deletions tests/python/test_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,44 +34,48 @@ def test_socket_error():
tracker.free()


def run_rabit_ops(client, n_workers):
from xgboost.dask import CommunicatorContext, _get_dask_config, _get_rabit_args

workers = tm.get_client_workers(client)
rabit_args = client.sync(_get_rabit_args, len(workers), _get_dask_config(), client)
assert not collective.is_distributed()
n_workers_from_dask = len(workers)
assert n_workers == n_workers_from_dask
def run_rabit_ops(pool, n_workers: int, address: str) -> None:
tracker = RabitTracker(host_ip=address, n_workers=n_workers)
tracker.start()
args = tracker.worker_args()

def local_test(worker_id):
with CommunicatorContext(**rabit_args):
def local_test(worker_id: int, rabit_args: dict) -> int:
with collective.CommunicatorContext(**rabit_args):
a = 1
assert collective.is_distributed()
a = np.array([a])
reduced = collective.allreduce(a, collective.Op.SUM)
arr = np.array([a])
reduced = collective.allreduce(arr, collective.Op.SUM)
assert reduced[0] == n_workers

worker_id = np.array([worker_id])
reduced = collective.allreduce(worker_id, collective.Op.MAX)
arr = np.array([worker_id])
reduced = collective.allreduce(arr, collective.Op.MAX)
assert reduced == n_workers - 1

return 1

futures = client.map(local_test, range(len(workers)), workers=workers)
results = client.gather(futures)
fn = update_wrapper(partial(local_test, rabit_args=args), local_test)
results = pool.map(fn, range(n_workers))
assert sum(results) == n_workers


@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_loky())
def test_rabit_ops():
from distributed import Client, LocalCluster
from loky import get_reusable_executor

n_workers = 3
with LocalCluster(n_workers=n_workers) as cluster:
with Client(cluster) as client:
run_rabit_ops(client, n_workers)
n_workers = 4
with get_reusable_executor(max_workers=n_workers) as pool:
run_rabit_ops(pool, n_workers, "127.0.0.1")


@pytest.mark.skipif(**tm.no_ipv6())
@pytest.mark.skipif(**tm.no_loky())
def test_rabit_ops_ipv6():
from loky import get_reusable_executor

n_workers = 4
with get_reusable_executor(max_workers=n_workers) as pool:
run_rabit_ops(pool, n_workers, "::1")


def run_allreduce(pool, n_workers: int) -> None:
tracker = RabitTracker(host_ip="127.0.0.1", n_workers=n_workers)
Expand Down Expand Up @@ -133,19 +137,6 @@ def test_broadcast():
run_broadcast(pool, n_workers)


@pytest.mark.skipif(**tm.no_ipv6())
@pytest.mark.skipif(**tm.no_dask())
def test_rabit_ops_ipv6():
import dask
from distributed import Client, LocalCluster

n_workers = 3
with dask.config.set({"xgboost.scheduler_address": "[::1]"}):
with LocalCluster(n_workers=n_workers, host="[::1]") as cluster:
with Client(cluster) as client:
run_rabit_ops(client, n_workers)


@pytest.mark.skipif(**tm.no_dask())
def test_rank_assignment() -> None:
from distributed import Client, LocalCluster
Expand Down

0 comments on commit 34dcd63

Please sign in to comment.