Skip to content

Commit

Permalink
[dask] Fix LTR with empty partition and NCCL error. (#11152)
Browse files Browse the repository at this point in the history
  • Loading branch information
trivialfis authored Jan 10, 2025
1 parent 096b0fa commit 51b3c46
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 17 deletions.
1 change: 1 addition & 0 deletions ops/pipeline/test-python-wheel-impl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ case "$suite" in
mgpu)
echo "-- Run Python tests, using multiple GPUs"
python -c 'from cupy.cuda import jitify; jitify._init_module()'
export NCCL_RAS_ENABLE=0
pytest -v -s -rxXs --fulltrace --durations=0 -m 'mgpu' tests/python-gpu
pytest -v -s -rxXs --fulltrace --durations=0 -m 'mgpu' \
tests/test_distributed/test_gpu_with_dask
Expand Down
41 changes: 29 additions & 12 deletions python-package/xgboost/dask/data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# pylint: disable=too-many-arguments
"""Copyright 2019-2024, XGBoost contributors"""
"""Copyright 2019-2025, XGBoost contributors"""

import logging
from collections.abc import Sequence
Expand Down Expand Up @@ -283,6 +283,25 @@ def append(i: int, name: str) -> None:
return result


def _get_is_cuda(parts: Optional[_DataParts]) -> bool:
if parts is not None:
is_cuda = is_on_cuda(parts[0].get("data"))
else:
is_cuda = False

is_cuda = bool(coll.allreduce(np.array([is_cuda], dtype=np.int32), coll.Op.MAX)[0])
return is_cuda


def _make_empty(is_cuda: bool) -> np.ndarray:
if is_cuda:
cp = import_cupy()
empty = cp.empty((0, 0))
else:
empty = np.empty((0, 0))
return empty


def _create_quantile_dmatrix(
*,
feature_names: Optional[FeatureNames],
Expand All @@ -297,29 +316,26 @@ def _create_quantile_dmatrix(
ref: Optional[DMatrix] = None,
) -> QuantileDMatrix:
worker = distributed.get_worker()
is_cuda = _get_is_cuda(parts)
if parts is None:
msg = f"Worker {worker.address} has an empty DMatrix."
LOGGER.warning(msg)

Xy = QuantileDMatrix(
np.empty((0, 0)),
LOGGER.warning("Worker %s has an empty DMatrix.", worker.address)
return QuantileDMatrix(
_make_empty(is_cuda),
feature_names=feature_names,
feature_types=feature_types,
max_bin=max_bin,
ref=ref,
enable_categorical=enable_categorical,
max_quantile_batches=max_quantile_batches,
)
return Xy

unzipped_dict = _get_worker_parts(parts)
it = DaskPartitionIter(
**unzipped_dict,
**_get_worker_parts(parts),
feature_types=feature_types,
feature_names=feature_names,
feature_weights=feature_weights,
)
Xy = QuantileDMatrix(
return QuantileDMatrix(
it,
missing=missing,
nthread=nthread,
Expand All @@ -328,7 +344,6 @@ def _create_quantile_dmatrix(
enable_categorical=enable_categorical,
max_quantile_batches=max_quantile_batches,
)
return Xy


def _create_dmatrix( # pylint: disable=too-many-locals
Expand All @@ -350,11 +365,13 @@ def _create_dmatrix( # pylint: disable=too-many-locals
"""
worker = distributed.get_worker()
list_of_parts = parts
is_cuda = _get_is_cuda(parts)

if list_of_parts is None:
msg = f"Worker {worker.address} has an empty DMatrix."
LOGGER.warning(msg)
Xy = DMatrix(
np.empty((0, 0)),
_make_empty(is_cuda),
feature_names=feature_names,
feature_types=feature_types,
enable_categorical=enable_categorical,
Expand Down
2 changes: 1 addition & 1 deletion python-package/xgboost/testing/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def check_no_group_split(client: Client, device: str) -> None:
client, 1024, 128, n_query_groups=4, max_rel=5, device=device
)

ltr = dxgb.DaskXGBRanker(allow_group_split=False, n_estimators=32, device=device)
ltr = dxgb.DaskXGBRanker(allow_group_split=False, n_estimators=36, device=device)
ltr.fit(
X_tr,
y_tr,
Expand Down
5 changes: 4 additions & 1 deletion src/common/threading_utils.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2021-2024, XGBoost Contributors
* Copyright 2021-2025, XGBoost Contributors
*/
#ifndef XGBOOST_COMMON_THREADING_UTILS_CUH_
#define XGBOOST_COMMON_THREADING_UTILS_CUH_
Expand All @@ -20,6 +20,9 @@ namespace xgboost::common {
* \param h hight
*/
XGBOOST_DEVICE inline std::size_t DiscreteTrapezoidArea(std::size_t n, std::size_t h) {
if (n == 0 || h == 0) {
return 0;
}
n -= 1; // without diagonal entries
h = std::min(n, h); // Used for ranking, h <= n
std::size_t total = ((n - (h - 1)) + n) * h / 2;
Expand Down
17 changes: 15 additions & 2 deletions tests/cpp/objective/test_lambdarank_obj.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2023 by XGBoost Contributors
* Copyright 2023-2025, XGBoost Contributors
*/
#include "test_lambdarank_obj.h"

Expand All @@ -8,7 +8,6 @@
#include <algorithm> // for sort
#include <cstddef> // for size_t
#include <initializer_list> // for initializer_list
#include <map> // for map
#include <memory> // for unique_ptr, shared_ptr, make_shared
#include <numeric> // for iota
#include <string> // for char_traits, basic_string, string
Expand Down Expand Up @@ -106,6 +105,20 @@ void TestNDCGGPair(Context const* ctx) {
}
}

{
// Test empty input
std::unique_ptr<xgboost::ObjFunction> obj{xgboost::ObjFunction::Create("rank:ndcg", ctx)};
obj->Configure(Args{{"lambdarank_pair_method", "topk"}});

HostDeviceVector<float> predts;
MetaInfo info;
info.labels = linalg::Tensor<float, 2>{{}, {0, 1}, ctx->Device()};
info.group_ptr_ = {0, 0};
info.num_row_ = 0;
linalg::Matrix<GradientPair> gpairs;
obj->GetGradient(predts, info, 0, &gpairs);
ASSERT_EQ(gpairs.Size(), 0);
}
ASSERT_NO_THROW(obj->DefaultEvalMetric());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
pytestmark = [
pytest.mark.skipif(**tm.no_dask()),
pytest.mark.skipif(**tm.no_dask_cuda()),
tm.timeout(120),
tm.timeout(180),
]

try:
Expand Down

0 comments on commit 51b3c46

Please sign in to comment.