From 46b5d234df5f66b2ddec1ccc98f321f20e8e5062 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Fri, 22 Nov 2024 15:12:34 +0800 Subject: [PATCH] [dask] Sort by QID and auto partitioning. - Implement automatic local sort. - Implement partitioning by query ID. - Document for distributed ranking. --- demo/dask/dask_learning_to_rank.py | 201 ++++++++++++++++++ demo/guide-python/learning_to_rank.py | 4 +- doc/tutorials/dask.rst | 60 +++++- doc/tutorials/learning_to_rank.rst | 18 +- python-package/xgboost/dask/__init__.py | 132 +++++++++++- python-package/xgboost/dask/data.py | 165 +++++++++++++- python-package/xgboost/testing/__init__.py | 13 +- python-package/xgboost/testing/dask.py | 40 +++- src/data/data.cc | 4 +- src/objective/lambdarank_obj.cc | 3 +- src/objective/lambdarank_obj.cuh | 11 +- .../test_with_dask/test_ranking.py | 27 ++- 12 files changed, 640 insertions(+), 38 deletions(-) create mode 100644 demo/dask/dask_learning_to_rank.py diff --git a/demo/dask/dask_learning_to_rank.py b/demo/dask/dask_learning_to_rank.py new file mode 100644 index 000000000000..565fdc402dc2 --- /dev/null +++ b/demo/dask/dask_learning_to_rank.py @@ -0,0 +1,201 @@ +""" +Learning to rank with the Dask Interface +======================================== + + .. versionadded:: 3.0.0 + +This is a demonstration of using XGBoost for learning to rank tasks using the +MSLR_10k_letor dataset. For more infomation about the dataset, please visit its +`description page `_. + +See :ref:`ltr-dist` for a general description for distributed learning to rank and +:ref:`ltr-dask` for Dask-specific features. + +""" + +from __future__ import annotations + +import argparse +import os +from contextlib import contextmanager +from typing import Generator + +import dask +import numpy as np +from dask import array as da +from dask import dataframe as dd +from distributed import Client, LocalCluster, wait +from sklearn.datasets import load_svmlight_file +from sklearn.metrics import root_mean_squared_error + +from xgboost import dask as dxgb + + +def load_mlsr_10k( + device: str, data_path: str, cache_path: str +) -> tuple[dd.DataFrame, dd.DataFrame, dd.DataFrame]: + """Load the MSLR10k dataset from data_path and save parquet files in the cache_path.""" + root_path = os.path.expanduser(args.data) + cache_path = os.path.expanduser(args.cache) + + # Use only the Fold1 for demo: + # Train, Valid, Test + # {S1,S2,S3}, S4, S5 + fold = 1 + + if not os.path.exists(cache_path): + os.mkdir(cache_path) + fold_path = os.path.join(root_path, f"Fold{fold}") + train_path = os.path.join(fold_path, "train.txt") + valid_path = os.path.join(fold_path, "vali.txt") + test_path = os.path.join(fold_path, "test.txt") + + X_train, y_train, qid_train = load_svmlight_file( + train_path, query_id=True, dtype=np.float32 + ) + columns = [f"f{i}" for i in range(X_train.shape[1])] + X_train = dd.from_array(X_train.toarray(), columns=columns) + y_train = y_train.astype(np.int32) + qid_train = qid_train.astype(np.int32) + + X_train["y"] = dd.from_array(y_train) + X_train["qid"] = dd.from_array(qid_train) + X_train.to_parquet(os.path.join(cache_path, "train"), engine="pyarrow") + + X_valid, y_valid, qid_valid = load_svmlight_file( + valid_path, query_id=True, dtype=np.float32 + ) + X_valid = dd.from_array(X_valid.toarray(), columns=columns) + y_valid = y_valid.astype(np.int32) + qid_valid = qid_valid.astype(np.int32) + + X_valid["y"] = dd.from_array(y_valid) + X_valid["qid"] = dd.from_array(qid_valid) + X_valid.to_parquet(os.path.join(cache_path, "valid"), engine="pyarrow") + + X_test, y_test, qid_test = load_svmlight_file( + test_path, query_id=True, dtype=np.float32 + ) + + X_test = dd.from_array(X_test.toarray(), columns=columns) + y_test = y_test.astype(np.int32) + qid_test = qid_test.astype(np.int32) + + X_test["y"] = dd.from_array(y_test) + X_test["qid"] = dd.from_array(qid_test) + X_test.to_parquet(os.path.join(cache_path, "test"), engine="pyarrow") + + df_train = dd.read_parquet( + os.path.join(cache_path, "train"), calculate_divisions=True + ) + df_valid = dd.read_parquet( + os.path.join(cache_path, "valid"), calculate_divisions=True + ) + df_test = dd.read_parquet( + os.path.join(cache_path, "test"), calculate_divisions=True + ) + + return df_train, df_valid, df_test + + +def ranking_demo(client: Client, args: argparse.Namespace) -> None: + """Learning to rank with data sorted locally.""" + df_tr, df_va, _ = load_mlsr_10k(args.device, args.data, args.cache) + + X_train: dd.DataFrame = df_tr[df_tr.columns.difference(["y", "qid"])] + y_train = df_tr[["y", "qid"]] + Xy_train = dxgb.DaskQuantileDMatrix(client, X_train, y_train.y, qid=y_train.qid) + + X_valid: dd.DataFrame = df_va[df_va.columns.difference(["y", "qid"])] + y_valid = df_va[["y", "qid"]] + Xy_valid = dxgb.DaskQuantileDMatrix( + client, X_valid, y_valid.y, qid=y_valid.qid, ref=Xy_train + ) + # Upon training, you will see a performance warning about sorting data based on + # query groups. + dxgb.train( + client, + {"objective": "rank:ndcg", "device": args.device}, + Xy_train, + evals=[(Xy_train, "Train"), (Xy_valid, "Valid")], + ) + + +def ranking_wo_split_demo(client: Client, args: argparse.Namespace) -> None: + """Learning to rank with data partitioned according to query groups.""" + df_tr, df_va, df_te = load_mlsr_10k(args.device, args.data, args.cache) + + X_tr = df_tr[df_tr.columns.difference(["y", "qid"])] + X_va = df_va[df_va.columns.difference(["y", "qid"])] + + # `allow_group_split=False` makes sure data is partitioned according the the query + # groups. + ltr = dxgb.DaskXGBRanker(allow_group_split=False) + ltr.client = client + ltr = ltr.fit( + X_tr, + df_tr.y, + qid=df_tr.qid, + eval_set=[(X_tr, df_tr.y), (X_va, df_va.y)], + eval_qid=[df_tr.qid, df_va.qid], + ) + + df_te = df_te.persist() + wait([df_te]) + + X_te = df_te[df_te.columns.difference(["y", "qid"])] + predt = ltr.predict(X_te).compute() + y = client.compute(df_te.y).result() + + # No available group-based evaluation metric outside of XGBoost we can use, RMSE + # here only for demostration. + print("RMSE:", root_mean_squared_error(y, predt)) + + +@contextmanager +def gen_client(device: str) -> Generator[Client, None, None]: + match device: + case "cuda": + from dask_cuda import LocalCUDACluster + + with LocalCUDACluster() as cluster: + with Client(cluster) as client: + with dask.config.set( + {"array.backend": "cupy", "dataframe.backend": "cudf"} + ): + yield client + case "cpu": + with LocalCluster() as cluster: + with Client(cluster) as client: + yield client + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Demonstration of learning to rank using XGBoost." + ) + parser.add_argument( + "--data", + type=str, + help="Root directory of the MSLR-WEB10K data.", + required=True, + ) + parser.add_argument( + "--cache", + type=str, + help="Directory for caching processed data.", + required=True, + ) + parser.add_argument("--device", choices=["cpu", "cuda"], default="cpu") + parser.add_argument( + "--no-split", + action="store_true", + help="Flag to indicate query groups should not be split.", + ) + args = parser.parse_args() + + with gen_client(args.device) as client: + if args.no_split: + ranking_wo_split_demo(client, args) + else: + ranking_demo(client, args) diff --git a/demo/guide-python/learning_to_rank.py b/demo/guide-python/learning_to_rank.py index b131b31f76f6..8baba8d85dcc 100644 --- a/demo/guide-python/learning_to_rank.py +++ b/demo/guide-python/learning_to_rank.py @@ -12,8 +12,8 @@ train on relevance degree, and the second part simulates click data and enable the position debiasing training. -For an overview of learning to rank in XGBoost, please see -:doc:`Learning to Rank `. +For an overview of learning to rank in XGBoost, please see :doc:`Learning to Rank +`. """ from __future__ import annotations diff --git a/doc/tutorials/dask.rst b/doc/tutorials/dask.rst index 6e68d83a0083..60c572fad95a 100644 --- a/doc/tutorials/dask.rst +++ b/doc/tutorials/dask.rst @@ -355,15 +355,18 @@ Working with asyncio .. versionadded:: 1.2.0 -XGBoost's dask interface supports the new ``asyncio`` in Python and can be integrated into -asynchronous workflows. For using dask with asynchronous operations, please refer to -`this dask example `_ and document in -`distributed `_. To use XGBoost's -dask interface asynchronously, the ``client`` which is passed as an argument for training and -prediction must be operating in asynchronous mode by specifying ``asynchronous=True`` when the -``client`` is created (example below). All functions (including ``DaskDMatrix``) provided -by the functional interface will then return coroutines which can then be awaited to retrieve -their result. +XGBoost's dask interface supports the new :py:mod:`asyncio` in Python and can be +integrated into asynchronous workflows. For using dask with asynchronous operations, +please refer to `this dask example +`_ and document in `distributed +`_. To use XGBoost's Dask +interface asynchronously, the ``client`` which is passed as an argument for training and +prediction must be operating in asynchronous mode by specifying ``asynchronous=True`` when +the ``client`` is created (example below). All functions (including ``DaskDMatrix``) +provided by the functional interface will then return coroutines which can then be awaited +to retrieve their result. Please note that XGBoost is a compute-bounded application, where +parallelism is more important than concurrency. The support for `asyncio` is more about +compatibility instead of performance gain. Functional interface: @@ -526,6 +529,45 @@ See https://github.com/coiled/dask-xgboost-nyctaxi for a set of examples of usin with dask and optuna. +.. _ltr-dask: + +**************** +Learning to Rank +**************** + + .. versionadded:: 3.0.0 + + .. note:: + + Position debiasing is not yet supported. + +There are two operation modes in the Dask learning to rank for performance reasons. The +difference is whether a distributed global sort is needed. Please see :ref:`ltr-dist` for +how rankings work with distributed training in general. Below we will discuss some of the +Dask-specific features. + +First, if you use the :py:class:`~xgboost.dask.DaskQuantileDMatrix` interface or the +:py:class:`~xgboost.dask.DaskXGBRanker` with ``allow_group_split`` set to ``True``, +XGBoost will try to sort and group the samples for each worker based on the query ID. This +mode tries to skip the global sort and sort only worker-local data, and hence no +inter-worker data shuffle. Please note that even worker-local sort is costly, particularly +in terms of memory usage as there's no spilling when +:py:meth:`~pandas.DataFrame.sort_values` is used. XGBoost first checks whether the QID is +already sorted before actually performing the sorting operation. One can choose this if +the query groups are relatively consecutive, meaning most of the samples within a query +group are close to each other and are likely to be resided to the same worker. Don't use +this if you have performed a random shuffle on your data. + +If the input data is random, then there's no way we can guarantee most of data within the +same group being in the same worker. For large query groups, this might not be an +issue. But for small query groups, it's possible that each worker gets only one or two +samples from their group for all groups, which can lead to disastrous performance. In that +case, we can partition the data according to query group, which is the default behavior of +the :py:class:`~xgboost.dask.DaskXGBRanker` unless the ``allow_group_split`` is set to +``True``. This mode performs a sort and a groupby on the entire dataset in addition to an +encoding operation for the query group IDs, which can lead to slow performance. See +:ref:`sphx_glr_python_dask-examples_dask_learning_to_rank.py` for a worked example. + .. _tracker-ip: *************** diff --git a/doc/tutorials/learning_to_rank.rst b/doc/tutorials/learning_to_rank.rst index 4d2cbad4aa47..8743a672d219 100644 --- a/doc/tutorials/learning_to_rank.rst +++ b/doc/tutorials/learning_to_rank.rst @@ -165,10 +165,26 @@ On the other hand, if you have comparatively small amount of training data: For any method chosen, you can modify ``lambdarank_num_pair_per_sample`` to control the amount of pairs generated. +.. _ltr-dist: + ******************** Distributed Training ******************** -XGBoost implements distributed learning-to-rank with integration of multiple frameworks including Dask, Spark, and PySpark. The interface is similar to the single-node counterpart. Please refer to document of the respective XGBoost interface for details. Scattering a query group onto multiple workers is theoretically sound but can affect the model accuracy. For most of the use cases, the small discrepancy is not an issue, as the amount of training data is usually large when distributed training is used. As a result, users don't need to partition the data based on query groups. As long as each data partition is correctly sorted by query IDs, XGBoost can aggregate sample gradients accordingly. + +XGBoost implements distributed learning-to-rank with integration of multiple frameworks +including :doc:`Dask `, :doc:`Spark `, and +:doc:`PySpark `. The interface is similar to the single-node +counterpart. Please refer to document of the respective XGBoost interface for details. + +.. warning:: + + Position-debiasing is not yet supported for existing distributed interfaces. + +XGBoost works with collective operations, which means data is scattered to multiple workers. We can divide the data partitions by query group and ensure no query group is split among workers. However, this requires a costly sort and groupby operation and might only be necessary for selected use cases. Splitting and scattering a query group to multiple workers is theoretically sound but can affect the model's accuracy. If there are only a small number of groups sitting at the boundaries of workers, the small discrepancy is not an issue, as the amount of training data is usually large when distributed training is used. + +For a longer explanation, assuming the pairwise ranking method is used, we calculate the gradient based on relevance degree by constructing pairs within a query group. If a single query group is split among workers and we use worker-local data for gradient calculation, then we are simply sampling pairs from a smaller group for each worker to calculate the gradient and the evaluation metric. The comparison between each pair doesn't change because a group is split into sub-groups, what changes is the number of total and effective pairs and normalizers like `IDCG`. One can generate more pairs from a large group than it's from two smaller subgroups. As a result, the obtained gradient is still valid from a theoretical standpoint but might not be optimal. As long as each data partitions within a worker are correctly sorted by query IDs, XGBoost can aggregate sample gradients accordingly. And both the (Py)Spark interface and the Dask interface can sort the data according to query ID, please see respected tutorials for more information. + +However, it's possible that a distributed framework shuffles the data during map reduce and splits every query group into multiple workers. In that case, the performance would be disastrous. As a result, it depends on the data and the framework for whether a sorted groupby is needed. ******************* Reproducible Result diff --git a/python-package/xgboost/dask/__init__.py b/python-package/xgboost/dask/__init__.py index 76fcc1a6ad92..06991593c5a7 100644 --- a/python-package/xgboost/dask/__init__.py +++ b/python-package/xgboost/dask/__init__.py @@ -73,6 +73,7 @@ Tuple, TypeAlias, TypedDict, + TypeGuard, TypeVar, Union, ) @@ -118,7 +119,7 @@ ) from ..tracker import RabitTracker from ..training import train as worker_train -from .data import _create_dmatrix, _create_quantile_dmatrix +from .data import _create_dmatrix, _create_quantile_dmatrix, no_group_split from .utils import get_address_from_user, get_n_threads _DaskCollection: TypeAlias = Union[da.Array, dd.DataFrame, dd.Series] @@ -1899,6 +1900,20 @@ def _argmax(x: Any) -> Any: """, ["estimators", "model"], + extra_parameters=""" + allow_group_split : + + .. versionadded:: 3.0.0 + + Whether a query group can be split among multiple workers. When set to `False`, + inputs must be Dask dataframes or series. + + .. warning:: + + GPU is not yet supported when the `dask-expr` is enabled. In addition, async + environment may not work. + +""", end_note=""" .. note:: @@ -1911,36 +1926,36 @@ def __init__( self, *, objective: str = "rank:pairwise", + allow_group_split: bool = False, coll_cfg: Optional[CollConfig] = None, **kwargs: Any, ) -> None: if callable(objective): raise ValueError("Custom objective function not supported by XGBRanker.") + self.allow_group_split = allow_group_split super().__init__(objective=objective, coll_cfg=coll_cfg, **kwargs) + def _wrapper_params(self) -> Set[str]: + params = super()._wrapper_params() + params.add("allow_group_split") + return params + async def _fit_async( self, X: _DataT, y: _DaskCollection, *, - group: Optional[_DaskCollection], qid: Optional[_DaskCollection], sample_weight: Optional[_DaskCollection], base_margin: Optional[_DaskCollection], eval_set: Optional[Sequence[Tuple[_DaskCollection, _DaskCollection]]], sample_weight_eval_set: Optional[Sequence[_DaskCollection]], base_margin_eval_set: Optional[Sequence[_DaskCollection]], - eval_group: Optional[Sequence[_DaskCollection]], eval_qid: Optional[Sequence[_DaskCollection]], verbose: Union[int, bool], xgb_model: Optional[Union[XGBModel, Booster]], feature_weights: Optional[_DaskCollection], ) -> "DaskXGBRanker": - msg = "Use the `qid` instead of the `group` with the dask interface." - if not (group is None and eval_group is None): - raise ValueError(msg) - if qid is None: - raise ValueError("`qid` is required for ranking.") params = self.get_xgb_params() dtrain, evals = await _async_wrap_evaluation_matrices( self.client, @@ -2007,8 +2022,105 @@ def fit( base_margin_eval_set: Optional[Sequence[_DaskCollection]] = None, feature_weights: Optional[_DaskCollection] = None, ) -> "DaskXGBRanker": - args = {k: v for k, v in locals().items() if k not in ("self", "__class__")} - return self._client_sync(self._fit_async, **args) + msg = "Use the `qid` instead of the `group` with the dask interface." + if not (group is None and eval_group is None): + raise ValueError(msg) + if qid is None: + raise ValueError("`qid` is required for ranking.") + + def check_df(X: _DaskCollection) -> TypeGuard[dd.DataFrame]: + if not isinstance(X, dd.DataFrame): + raise TypeError( + "When `allow_group_split` is set to False, X is required to be" + " a dataframe." + ) + return True + + def check_ser( + qid: Optional[_DaskCollection], name: str + ) -> TypeGuard[Optional[dd.Series]]: + if not isinstance(qid, dd.Series) and qid is not None: + raise TypeError( + f"When `allow_group_split` is set to False, {name} is required to be" + " a series." + ) + return True + + if not self.allow_group_split: + assert ( + check_df(X) + and check_ser(qid, "qid") + and check_ser(y, "y") + and check_ser(sample_weight, "sample_weight") + and check_ser(base_margin, "base_margin") + ) + assert qid is not None and y is not None + X_id = id(X) + X, qid, y, sample_weight, base_margin = no_group_split( + X, + qid, + y=y, + sample_weight=sample_weight, + base_margin=base_margin, + ) + + if eval_set is not None: + new_eval_set = [] + new_eval_qid = [] + new_sample_weight_eval_set = [] + new_base_margin_eval_set = [] + assert eval_qid + for i, (Xe, ye) in enumerate(eval_set): + we = sample_weight_eval_set[i] if sample_weight_eval_set else None + be = base_margin_eval_set[i] if base_margin_eval_set else None + assert check_df(Xe) + assert eval_qid + qe = eval_qid[i] + assert ( + eval_qid + and check_ser(qe, "qid") + and check_ser(ye, "y") + and check_ser(we, "sample_weight") + and check_ser(be, "base_margin") + ) + assert qe is not None and ye is not None + if id(Xe) != X_id: + Xe, qe, ye, we, be = no_group_split(Xe, qe, ye, we, be) + else: + Xe, qe, ye, we, be = X, qid, y, sample_weight, base_margin + + new_eval_set.append((Xe, ye)) + new_eval_qid.append(qe) + + if we is not None: + new_sample_weight_eval_set.append(we) + if be is not None: + new_base_margin_eval_set.append(be) + + eval_set = new_eval_set + eval_qid = new_eval_qid + sample_weight_eval_set = ( + new_sample_weight_eval_set if new_sample_weight_eval_set else None + ) + base_margin_eval_set = ( + new_base_margin_eval_set if new_base_margin_eval_set else None + ) + + return self._client_sync( + self._fit_async, + X=X, + y=y, + qid=qid, + sample_weight=sample_weight, + base_margin=base_margin, + eval_set=eval_set, + eval_qid=eval_qid, + verbose=verbose, + xgb_model=xgb_model, + sample_weight_eval_set=sample_weight_eval_set, + base_margin_eval_set=base_margin_eval_set, + feature_weights=feature_weights, + ) # FIXME(trivialfis): arguments differ due to additional parameters like group and # qid. diff --git a/python-package/xgboost/dask/data.py b/python-package/xgboost/dask/data.py index c4f0f138b298..52947158ec41 100644 --- a/python-package/xgboost/dask/data.py +++ b/python-package/xgboost/dask/data.py @@ -3,15 +3,29 @@ import logging from collections.abc import Sequence -from typing import Any, Callable, Dict, List, Optional, TypeVar, Union +from typing import ( + Any, + Callable, + Dict, + List, + Optional, + Tuple, + TypeVar, + Union, + cast, + overload, +) import distributed import numpy as np +import pandas as pd from dask import dataframe as dd +from .. import collective as coll from .._typing import _T, FeatureNames -from ..compat import concat +from ..compat import concat, import_cupy from ..core import DataIter, DMatrix, QuantileDMatrix +from ..data import is_on_cuda LOGGER = logging.getLogger("[xgboost.dask]") @@ -96,6 +110,150 @@ def next(self, input_data: Callable) -> bool: return True +@overload +def _add_column(df: dd.DataFrame, col: dd.Series) -> Tuple[dd.DataFrame, str]: ... + + +@overload +def _add_column(df: dd.DataFrame, col: None) -> Tuple[dd.DataFrame, None]: ... + + +def _add_column( + df: dd.DataFrame, col: Optional[dd.Series] +) -> Tuple[dd.DataFrame, Optional[str]]: + if col is None: + return df, col + + trails = 0 + uid = f"{col.name}_{trails}" + while uid in df.columns: + trails += 1 + uid = f"{col.name}_{trails}" + + df = df.assign(**{uid: col}) + return df, uid + + +def no_group_split( + df: dd.DataFrame, + qid: dd.Series, + y: dd.Series, + sample_weight: Optional[dd.Series], + base_margin: Optional[dd.Series], +) -> Tuple[ + dd.DataFrame, dd.Series, dd.Series, Optional[dd.Series], Optional[dd.Series] +]: + """A function to prevent query group from being scattered to different + workers. Please see the tutorial in the document for the implication for not having + partition boundary based on query groups. + + """ + + df, qid_uid = _add_column(df, qid) + df, y_uid = _add_column(df, y) + df, w_uid = _add_column(df, sample_weight) + df, bm_uid = _add_column(df, base_margin) + + df = df.persist() + # Encode the QID to make it dense. + df[qid_uid] = df[qid_uid].astype("category").cat.as_known().cat.codes + # The shuffle here is costly. + df = df.sort_values(by=qid_uid) + cnt = df.groupby(qid_uid)[qid_uid].count() + div = cnt.index.compute().values.tolist() + div = sorted(div) + div = tuple(div + [div[-1] + 1]) + + df = df.set_index( + qid_uid, + drop=False, + divisions=div, + ).persist() + + qid = df[qid_uid] + y = df[y_uid] + sample_weight, base_margin = ( + cast(dd.Series, df[uid]) if uid is not None else None for uid in (w_uid, bm_uid) + ) + + uids = [uid for uid in [qid_uid, y_uid, w_uid, bm_uid] if uid is not None] + df = df.drop(uids, axis=1).persist() + return df, qid, y, sample_weight, base_margin + + +def sort_data_by_qid(**kwargs: List[Any]) -> Dict[str, List[Any]]: + """Sort worker-local data by query ID for learning to rank tasks.""" + data_parts = kwargs.get("data") + assert data_parts is not None + n_parts = len(data_parts) + + if is_on_cuda(data_parts[0]): + from cudf import DataFrame + else: + from pandas import DataFrame + + def get_dict(i: int) -> dict: + def _get(attr: Optional[List[Any]]) -> Optional[Any]: + if attr is not None: + return attr[i] + return None + + data = {k: _get(kwargs.get(k, None)) for k in meta} + data = {k: v for k, v in data.items() if v is not None} + return data + + # This function was created for the `dd.from_mapq constructor for sorting with a + # Dask DF. We did not proceed with that route but kept some of the utilities. It + # might be necessary to try again in the future since concatenating and sorting is + # extremely expensive in terms of memory usage. + def map_fn(i: int) -> pd.DataFrame: + data = get_dict(i) + return DataFrame(data) + + qid_parts = [map_fn(i) for i in range(n_parts)] + dfq = concat(qid_parts) + if dfq.qid.is_monotonic_increasing: + return kwargs + + LOGGER.warning( + "[r%d]: Sorting data with %d partitions for ranking. " + "This is a costly operation and will increase the memory usage significantly. " + "To avoid this warning, sort the data based on qid before passing it into " + "XGBoost. Alternatively, you can use set the `allow_group_split` to False.", + coll.get_rank(), + n_parts, + ) + # I tried to construct a new dask DF to perform the sort, but it's quite difficult + # to get the partition alignment right. Along with the still maturing shuffle + # implementation and GPU compatibility, a simple concat is used. + # + # In case it might become useful one day, I managed to get a CPU version working, + # albeit qutie slow (much slower than concatenated sort). The implementation merges + # everything into a single Dask DF and runs `DF.sort_values`, then retrieve the + # individual X,y,qid, ... from calculated partition values `client.compute([p for p + # in df.partitions])`. It was to avoid creating mismatched partitions. + dfx = concat(data_parts) + + if is_on_cuda(dfq): + cp = import_cupy() + sorted_idx = cp.argsort(dfq.qid) + else: + sorted_idx = np.argsort(dfq.qid) + dfq = dfq.iloc[sorted_idx, :] + + if hasattr(dfx, "iloc"): + dfx = dfx.iloc[sorted_idx, :] + else: + dfx = dfx[sorted_idx, :] + + kwargs.update({"data": [dfx]}) + for i, c in enumerate(dfq.columns): + assert c in kwargs + kwargs.update({c: [dfq[c]]}) + + return kwargs + + def _get_worker_parts(list_of_parts: _DataParts) -> Dict[str, List[Any]]: assert isinstance(list_of_parts, list) result: Dict[str, List[Any]] = {} @@ -115,6 +273,9 @@ def append(i: int, name: str) -> None: for k in meta: append(i, k) + qid = result.get("qid", None) + if qid is not None: + result = sort_data_by_qid(**result) return result diff --git a/python-package/xgboost/testing/__init__.py b/python-package/xgboost/testing/__init__.py index 5fbafd6ec58f..80e0ad2db1f5 100644 --- a/python-package/xgboost/testing/__init__.py +++ b/python-package/xgboost/testing/__init__.py @@ -457,7 +457,11 @@ def make_categorical( def make_ltr( - n_samples: int, n_features: int, n_query_groups: int, max_rel: int + n_samples: int, + n_features: int, + n_query_groups: int, + max_rel: int, + sort_qid: bool = True, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Make a dataset for testing LTR.""" rng = np.random.default_rng(1994) @@ -470,7 +474,8 @@ def make_ltr( w = rng.normal(0, 1.0, size=n_query_groups) w -= np.min(w) w /= np.max(w) - qid = np.sort(qid) + if sort_qid: + qid = np.sort(qid) return X, y, qid, w @@ -637,6 +642,10 @@ def non_increasing(L: Sequence[float], tolerance: float = 1e-4) -> bool: return all((y - x) < tolerance for x, y in zip(L, L[1:])) +def non_decreasing(L: Sequence[float], tolerance: float = 1e-4) -> bool: + return all((y - x) >= -tolerance for x, y in zip(L, L[1:])) + + def predictor_equal(lhs: xgb.DMatrix, rhs: xgb.DMatrix) -> bool: """Assert whether two DMatrices contain the same predictors.""" lcsr = lhs.get_data() diff --git a/python-package/xgboost/testing/dask.py b/python-package/xgboost/testing/dask.py index 541009a73c85..69ae82ff06cd 100644 --- a/python-package/xgboost/testing/dask.py +++ b/python-package/xgboost/testing/dask.py @@ -1,6 +1,6 @@ """Tests for dask shared by different test modules.""" -from typing import Any, List, Literal, cast +from typing import Any, List, Literal, Tuple, cast import numpy as np import pandas as pd @@ -179,3 +179,41 @@ def get_client_workers(client: Any) -> List[str]: "Get workers from a dask client." workers = client.scheduler_info()["workers"] return list(workers.keys()) + + +def make_ltr( + client: Client, n_samples: int, n_features: int, n_query_groups: int, max_rel: int +) -> Tuple[dd.DataFrame, dd.Series, dd.Series]: + """Synthetic dataset for learning to rank.""" + workers = get_client_workers(client) + n_samples_per_worker = n_samples // len(workers) + + def make(n: int, seed: int) -> pd.DataFrame: + rng = np.random.default_rng(seed) + X, y = make_classification( + n, n_features, n_informative=n_features, n_redundant=0, n_classes=max_rel + ) + qid = rng.integers(size=(n,), low=0, high=n_query_groups) + df = pd.DataFrame(X, columns=[f"f{i}" for i in range(n_features)]) + df["qid"] = qid + df["y"] = y + return df + + futures = [] + i = 0 + for k in range(0, n_samples, n_samples_per_worker): + fut = client.submit( + make, n=n_samples_per_worker, seed=k, workers=[workers[i % len(workers)]] + ) + futures.append(fut) + i += 1 + + last = n_samples - (n_samples_per_worker * len(workers)) + if last != 0: + fut = client.submit(make, n=last, seed=n_samples_per_worker * len(workers)) + futures.append(fut) + + meta = make(1, 0) + df = dd.from_delayed(futures, meta=meta) + assert isinstance(df, dd.DataFrame) + return df.drop(["qid", "y"], axis=1), df.y, df.qid diff --git a/src/data/data.cc b/src/data/data.cc index 47836bb5134b..713ad4a1a514 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -539,7 +539,9 @@ void MetaInfo::SetInfoFromHost(Context const* ctx, StringView key, Json arr) { } else if (key == "label") { CopyTensorInfoImpl(ctx, arr, &this->labels); if (this->num_row_ != 0 && this->labels.Shape(0) != this->num_row_) { - CHECK_EQ(this->labels.Size() % this->num_row_, 0) << "Incorrect size for labels."; + CHECK_EQ(this->labels.Size() % this->num_row_, 0) + << "Incorrect size for labels: (" << this->labels.Shape(0) << "," << this->labels.Shape(1) + << ") v.s. " << this->num_row_; size_t n_targets = this->labels.Size() / this->num_row_; this->labels.Reshape(this->num_row_, n_targets); } diff --git a/src/objective/lambdarank_obj.cc b/src/objective/lambdarank_obj.cc index 94acf5a238d9..c50a55b3a17c 100644 --- a/src/objective/lambdarank_obj.cc +++ b/src/objective/lambdarank_obj.cc @@ -1,5 +1,5 @@ /** - * Copyright (c) 2023, XGBoost contributors + * Copyright 2023-2024, XGBoost contributors */ #include "lambdarank_obj.h" @@ -23,7 +23,6 @@ #include "../common/optional_weight.h" // for MakeOptionalWeights, OptionalWeights #include "../common/ranking_utils.h" // for RankingCache, LambdaRankParam, MAPCache, NDCGC... #include "../common/threading_utils.h" // for ParallelFor, Sched -#include "../common/transform_iterator.h" // for IndexTransformIter #include "init_estimation.h" // for FitIntercept #include "xgboost/base.h" // for bst_group_t, GradientPair, kRtEps, GradientPai... #include "xgboost/context.h" // for Context diff --git a/src/objective/lambdarank_obj.cuh b/src/objective/lambdarank_obj.cuh index 2e5724f7f1fd..e1a78f905434 100644 --- a/src/objective/lambdarank_obj.cuh +++ b/src/objective/lambdarank_obj.cuh @@ -1,5 +1,5 @@ /** - * Copyright 2023 XGBoost contributors + * Copyright 2023-2024, XGBoost contributors */ #ifndef XGBOOST_OBJECTIVE_LAMBDARANK_OBJ_CUH_ #define XGBOOST_OBJECTIVE_LAMBDARANK_OBJ_CUH_ @@ -71,13 +71,13 @@ struct KernelInputs { std::int32_t iter; }; /** - * \brief Functor for generating pairs + * @brief Functor for generating pairs */ template struct MakePairsOp { KernelInputs args; /** - * \brief Make pair for the topk pair method. + * @brief Make pair for the topk pair method. */ [[nodiscard]] XGBOOST_DEVICE std::tuple WithTruncation( std::size_t idx, bst_group_t g) const { @@ -86,9 +86,6 @@ struct MakePairsOp { auto data_group_begin = static_cast(args.d_group_ptr[g]); std::size_t n_data = args.d_group_ptr[g + 1] - data_group_begin; - // obtain group segment data. - auto g_label = args.labels.Slice(linalg::Range(data_group_begin, data_group_begin + n_data), 0); - auto g_sorted_idx = args.d_sorted_idx.subspan(data_group_begin, n_data); std::size_t i = 0, j = 0; common::UnravelTrapeziodIdx(idx_in_thread_group, n_data, &i, &j); @@ -97,7 +94,7 @@ struct MakePairsOp { return std::make_tuple(rank_high, rank_low); } /** - * \brief Make pair for the mean pair method + * @brief Make pair for the mean pair method */ XGBOOST_DEVICE std::tuple WithSampling(std::size_t idx, bst_group_t g) const { diff --git a/tests/test_distributed/test_with_dask/test_ranking.py b/tests/test_distributed/test_with_dask/test_ranking.py index 0b2ea404fde1..568ebe03fcd9 100644 --- a/tests/test_distributed/test_with_dask/test_ranking.py +++ b/tests/test_distributed/test_with_dask/test_ranking.py @@ -11,6 +11,7 @@ from xgboost import dask as dxgb from xgboost import testing as tm +from xgboost.testing import dask as dtm @pytest.fixture(scope="module") @@ -59,7 +60,10 @@ def test_dask_ranking(client: Client) -> None: qid_test = qid_test.astype(np.uint32) rank = dxgb.DaskXGBRanker( - n_estimators=2500, eval_metric=["ndcg"], early_stopping_rounds=10 + n_estimators=2500, + eval_metric=["ndcg"], + early_stopping_rounds=10, + allow_group_split=True, ) rank.fit( x_train, @@ -71,3 +75,24 @@ def test_dask_ranking(client: Client) -> None: ) assert rank.n_features_in_ == 46 assert rank.best_score > 0.98 + + +@pytest.mark.filterwarnings("error") +def test_no_group_split(client: Client) -> None: + X_tr, q_tr, y_tr = dtm.make_ltr(client, 4096, 128, 4, 5) + X_va, q_va, y_va = dtm.make_ltr(client, 1024, 128, 4, 5) + + ltr = dxgb.DaskXGBRanker(allow_group_split=False, n_estimators=32) + ltr.fit( + X_tr, + y_tr, + qid=q_tr, + eval_set=[(X_tr, y_tr), (X_va, y_va)], + eval_qid=[q_tr, q_va], + verbose=True, + ) + + assert ltr.n_features_in_ == 128 + assert X_tr.shape[1] == ltr.n_features_in_ # no change + ndcg = ltr.evals_result()["validation_0"]["ndcg@32"] + assert tm.non_decreasing(ndcg, tolerance=1e-2), ndcg