From d7599e095b83cd39a29918b93af821c8b19b8424 Mon Sep 17 00:00:00 2001 From: Dmitry Razdoburdin Date: Sat, 21 Sep 2024 20:01:57 +0200 Subject: [PATCH] [SYCL] Add dask support for distributed (#10812) --- plugin/sycl/common/hist_util.cc | 27 +++++++++ plugin/sycl/common/hist_util.h | 9 +++ plugin/sycl/tree/hist_row_adder.h | 36 ++++++++++++ plugin/sycl/tree/hist_synchronizer.h | 62 ++++++++++++++++++++ plugin/sycl/tree/hist_updater.cc | 25 ++++++++ plugin/sycl/tree/hist_updater.h | 9 +++ plugin/sycl/tree/updater_quantile_hist.cc | 3 +- python-package/xgboost/core.py | 11 ++-- tests/ci_build/conda_env/linux_sycl_test.yml | 1 + tests/python-sycl/test_sycl_simple_dask.py | 42 +++++++++++++ 10 files changed, 219 insertions(+), 6 deletions(-) create mode 100644 tests/python-sycl/test_sycl_simple_dask.py diff --git a/plugin/sycl/common/hist_util.cc b/plugin/sycl/common/hist_util.cc index 2f2417f3a29a..59a815f5fc40 100644 --- a/plugin/sycl/common/hist_util.cc +++ b/plugin/sycl/common/hist_util.cc @@ -31,6 +31,33 @@ template void InitHist(::sycl::queue qu, GHistRow* hist, size_t size, ::sycl::event* event); +/*! + * \brief Copy histogram from src to dst + */ +template +void CopyHist(::sycl::queue qu, + GHistRow* dst, + const GHistRow& src, + size_t size) { + GradientSumT* pdst = reinterpret_cast(dst->Data()); + const GradientSumT* psrc = reinterpret_cast(src.DataConst()); + + qu.submit([&](::sycl::handler& cgh) { + cgh.parallel_for<>(::sycl::range<1>(2 * size), [=](::sycl::item<1> pid) { + const size_t i = pid.get_id(0); + pdst[i] = psrc[i]; + }); + }).wait(); +} +template void CopyHist(::sycl::queue qu, + GHistRow* dst, + const GHistRow& src, + size_t size); +template void CopyHist(::sycl::queue qu, + GHistRow* dst, + const GHistRow& src, + size_t size); + /*! * \brief Compute Subtraction: dst = src1 - src2 */ diff --git a/plugin/sycl/common/hist_util.h b/plugin/sycl/common/hist_util.h index aa9b4f5817bb..cbf0d34a86fd 100644 --- a/plugin/sycl/common/hist_util.h +++ b/plugin/sycl/common/hist_util.h @@ -36,6 +36,15 @@ void InitHist(::sycl::queue qu, GHistRow* hist, size_t size, ::sycl::event* event); +/*! + * \brief Copy histogram from src to dst + */ +template +void CopyHist(::sycl::queue qu, + GHistRow* dst, + const GHistRow& src, + size_t size); + /*! * \brief Compute subtraction: dst = src1 - src2 */ diff --git a/plugin/sycl/tree/hist_row_adder.h b/plugin/sycl/tree/hist_row_adder.h index 968bcca737dc..93650d5d0746 100644 --- a/plugin/sycl/tree/hist_row_adder.h +++ b/plugin/sycl/tree/hist_row_adder.h @@ -39,6 +39,42 @@ class BatchHistRowsAdder: public HistRowsAdder { } }; + +template +class DistributedHistRowsAdder: public HistRowsAdder { + public: + void AddHistRows(HistUpdater* builder, + std::vector* sync_ids, RegTree *p_tree) override { + builder->builder_monitor_.Start("AddHistRows"); + const size_t explicit_size = builder->nodes_for_explicit_hist_build_.size(); + const size_t subtaction_size = builder->nodes_for_subtraction_trick_.size(); + std::vector merged_node_ids(explicit_size + subtaction_size); + for (size_t i = 0; i < explicit_size; ++i) { + merged_node_ids[i] = builder->nodes_for_explicit_hist_build_[i].nid; + } + for (size_t i = 0; i < subtaction_size; ++i) { + merged_node_ids[explicit_size + i] = + builder->nodes_for_subtraction_trick_[i].nid; + } + std::sort(merged_node_ids.begin(), merged_node_ids.end()); + sync_ids->clear(); + for (auto const& nid : merged_node_ids) { + if ((*p_tree)[nid].IsLeftChild()) { + builder->hist_.AddHistRow(nid); + builder->hist_local_worker_.AddHistRow(nid); + sync_ids->push_back(nid); + } + } + for (auto const& nid : merged_node_ids) { + if (!((*p_tree)[nid].IsLeftChild())) { + builder->hist_.AddHistRow(nid); + builder->hist_local_worker_.AddHistRow(nid); + } + } + builder->builder_monitor_.Stop("AddHistRows"); + } +}; + } // namespace tree } // namespace sycl } // namespace xgboost diff --git a/plugin/sycl/tree/hist_synchronizer.h b/plugin/sycl/tree/hist_synchronizer.h index 2275a51dba37..c89215cf85d2 100644 --- a/plugin/sycl/tree/hist_synchronizer.h +++ b/plugin/sycl/tree/hist_synchronizer.h @@ -61,6 +61,68 @@ class BatchHistSynchronizer: public HistSynchronizer { std::vector<::sycl::event> hist_sync_events_; }; +template +class DistributedHistSynchronizer: public HistSynchronizer { + public: + void SyncHistograms(HistUpdater* builder, + const std::vector& sync_ids, + RegTree *p_tree) override { + builder->builder_monitor_.Start("SyncHistograms"); + const size_t nbins = builder->hist_builder_.GetNumBins(); + for (int node = 0; node < builder->nodes_for_explicit_hist_build_.size(); node++) { + const auto entry = builder->nodes_for_explicit_hist_build_[node]; + auto& this_hist = builder->hist_[entry.nid]; + // // Store posible parent node + auto& this_local = builder->hist_local_worker_[entry.nid]; + common::CopyHist(builder->qu_, &this_local, this_hist, nbins); + + if (!(*p_tree)[entry.nid].IsRoot()) { + const size_t parent_id = (*p_tree)[entry.nid].Parent(); + auto sibling_nid = entry.GetSiblingId(p_tree, parent_id); + auto& parent_hist = builder->hist_local_worker_[parent_id]; + + auto& sibling_hist = builder->hist_[sibling_nid]; + common::SubtractionHist(builder->qu_, &sibling_hist, parent_hist, + this_hist, nbins, ::sycl::event()); + builder->qu_.wait_and_throw(); + // Store posible parent node + auto& sibling_local = builder->hist_local_worker_[sibling_nid]; + common::CopyHist(builder->qu_, &sibling_local, sibling_hist, nbins); + } + } + builder->ReduceHists(sync_ids, nbins); + + ParallelSubtractionHist(builder, builder->nodes_for_explicit_hist_build_, p_tree); + ParallelSubtractionHist(builder, builder->nodes_for_subtraction_trick_, p_tree); + + builder->builder_monitor_.Stop("SyncHistograms"); + } + + void ParallelSubtractionHist(HistUpdater* builder, + const std::vector& nodes, + const RegTree * p_tree) { + const size_t nbins = builder->hist_builder_.GetNumBins(); + for (int node = 0; node < nodes.size(); node++) { + const auto entry = nodes[node]; + if (!((*p_tree)[entry.nid].IsLeftChild())) { + auto& this_hist = builder->hist_[entry.nid]; + + if (!(*p_tree)[entry.nid].IsRoot()) { + const size_t parent_id = (*p_tree)[entry.nid].Parent(); + auto& parent_hist = builder->hist_[parent_id]; + auto& sibling_hist = builder->hist_[entry.GetSiblingId(p_tree, parent_id)]; + common::SubtractionHist(builder->qu_, &this_hist, parent_hist, + sibling_hist, nbins, ::sycl::event()); + builder->qu_.wait_and_throw(); + } + } + } + } + + private: + std::vector<::sycl::event> hist_sync_events_; +}; + } // namespace tree } // namespace sycl } // namespace xgboost diff --git a/plugin/sycl/tree/hist_updater.cc b/plugin/sycl/tree/hist_updater.cc index 097e2da7384f..30c7b25ffe84 100644 --- a/plugin/sycl/tree/hist_updater.cc +++ b/plugin/sycl/tree/hist_updater.cc @@ -22,6 +22,30 @@ using ::sycl::ext::oneapi::plus; using ::sycl::ext::oneapi::minimum; using ::sycl::ext::oneapi::maximum; +template +void HistUpdater::ReduceHists(const std::vector& sync_ids, + size_t nbins) { + if (reduce_buffer_.size() < sync_ids.size() * nbins) { + reduce_buffer_.resize(sync_ids.size() * nbins); + } + for (size_t i = 0; i < sync_ids.size(); i++) { + auto& this_hist = hist_[sync_ids[i]]; + const GradientPairT* psrc = reinterpret_cast(this_hist.DataConst()); + qu_.memcpy(reduce_buffer_.data() + i * nbins, psrc, nbins*sizeof(GradientPairT)).wait(); + } + + auto buffer_vec = linalg::MakeVec(reinterpret_cast(reduce_buffer_.data()), + 2 * nbins * sync_ids.size()); + auto rc = collective::Allreduce(ctx_, buffer_vec, collective::Op::kSum); + SafeColl(rc); + + for (size_t i = 0; i < sync_ids.size(); i++) { + auto& this_hist = hist_[sync_ids[i]]; + GradientPairT* psrc = reinterpret_cast(this_hist.Data()); + qu_.memcpy(psrc, reduce_buffer_.data() + i * nbins, nbins*sizeof(GradientPairT)).wait(); + } +} + template void HistUpdater::SetHistSynchronizer( HistSynchronizer *sync) { @@ -492,6 +516,7 @@ void HistUpdater::InitData( // initialize histogram collection uint32_t nbins = gmat.cut.Ptrs().back(); hist_.Init(qu_, nbins); + hist_local_worker_.Init(qu_, nbins); hist_buffer_.Init(qu_, nbins); size_t buffer_size = kBufferSize; diff --git a/plugin/sycl/tree/hist_updater.h b/plugin/sycl/tree/hist_updater.h index fd5fdda9433d..fe50e1aee0e2 100644 --- a/plugin/sycl/tree/hist_updater.h +++ b/plugin/sycl/tree/hist_updater.h @@ -87,7 +87,10 @@ class HistUpdater { protected: friend class BatchHistSynchronizer; + friend class DistributedHistSynchronizer; + friend class BatchHistRowsAdder; + friend class DistributedHistRowsAdder; struct SplitQuery { bst_node_t nid; @@ -183,6 +186,8 @@ class HistUpdater { RegTree* p_tree, const USMVector& gpair); + void ReduceHists(const std::vector& sync_ids, size_t nbins); + inline static bool LossGuide(ExpandEntry lhs, ExpandEntry rhs) { if (lhs.GetLossChange() == rhs.GetLossChange()) { return lhs.GetNodeId() > rhs.GetNodeId(); // favor small timestamp @@ -230,6 +235,8 @@ class HistUpdater { common::ParallelGHistBuilder hist_buffer_; /*! \brief culmulative histogram of gradients. */ common::HistCollection hist_; + /*! \brief culmulative local parent histogram of gradients. */ + common::HistCollection hist_local_worker_; /*! \brief TreeNode Data: statistics for each constructed node */ std::vector> snode_host_; @@ -258,6 +265,8 @@ class HistUpdater { USMVector out_preds_buf_; bst_float* out_pred_ptr = nullptr; + std::vector reduce_buffer_; + ::sycl::queue qu_; }; diff --git a/plugin/sycl/tree/updater_quantile_hist.cc b/plugin/sycl/tree/updater_quantile_hist.cc index ee7a7ad0f101..030e850f4cd2 100644 --- a/plugin/sycl/tree/updater_quantile_hist.cc +++ b/plugin/sycl/tree/updater_quantile_hist.cc @@ -51,7 +51,8 @@ void QuantileHistMaker::SetPimpl(std::unique_ptr>* pim param_, int_constraint_, dmat)); if (collective::IsDistributed()) { - LOG(FATAL) << "Distributed mode is not yet upstreamed for sycl"; + (*pimpl)->SetHistSynchronizer(new DistributedHistSynchronizer()); + (*pimpl)->SetHistRowsAdder(new DistributedHistRowsAdder()); } else { (*pimpl)->SetHistSynchronizer(new BatchHistSynchronizer()); (*pimpl)->SetHistRowsAdder(new BatchHistRowsAdder()); diff --git a/python-package/xgboost/core.py b/python-package/xgboost/core.py index dff608ce1ff6..39ab5846b950 100644 --- a/python-package/xgboost/core.py +++ b/python-package/xgboost/core.py @@ -306,11 +306,12 @@ def _check_distributed_params(kwargs: Dict[str, Any]) -> None: raise TypeError(msg) if device and device.find(":") != -1: - raise ValueError( - "Distributed training doesn't support selecting device ordinal as GPUs are" - " managed by the distributed frameworks. use `device=cuda` or `device=gpu`" - " instead." - ) + if device != "sycl:gpu": + raise ValueError( + "Distributed training doesn't support selecting device ordinal as GPUs are" + " managed by the distributed frameworks. use `device=cuda` or `device=gpu`" + " instead." + ) if kwargs.get("booster", None) == "gblinear": raise NotImplementedError( diff --git a/tests/ci_build/conda_env/linux_sycl_test.yml b/tests/ci_build/conda_env/linux_sycl_test.yml index 6a9f84bddb19..eb45f5c3732f 100644 --- a/tests/ci_build/conda_env/linux_sycl_test.yml +++ b/tests/ci_build/conda_env/linux_sycl_test.yml @@ -17,5 +17,6 @@ dependencies: - pytest - pytest-timeout - pytest-cov +- dask - dpcpp_linux-64 - onedpl-devel diff --git a/tests/python-sycl/test_sycl_simple_dask.py b/tests/python-sycl/test_sycl_simple_dask.py new file mode 100644 index 000000000000..19eebebee3e5 --- /dev/null +++ b/tests/python-sycl/test_sycl_simple_dask.py @@ -0,0 +1,42 @@ +from xgboost import dask as dxgb +from xgboost import testing as tm + +from hypothesis import given, strategies, assume, settings, note + +import dask.array as da +import dask.distributed + + +def train_result(client, param, dtrain, num_rounds): + result = dxgb.train( + client, + param, + dtrain, + num_rounds, + verbose_eval=False, + evals=[(dtrain, "train")], + ) + return result + + +class TestSYCLDask: + # The simplest test verify only one node training. + def test_simple(self): + cluster = dask.distributed.LocalCluster(n_workers=1) + client = dask.distributed.Client(cluster) + + param = {} + param["tree_method"] = "hist" + param["device"] = "sycl" + param["verbosity"] = 0 + param["objective"] = "reg:squarederror" + + # X and y must be Dask dataframes or arrays + num_obs = 1e4 + num_features = 20 + X = da.random.random(size=(num_obs, num_features), chunks=(1000, num_features)) + y = da.random.random(size=(num_obs, 1), chunks=(1000, 1)) + dtrain = dxgb.DaskDMatrix(client, X, y) + + result = train_result(client, param, dtrain, 10) + assert tm.non_increasing(result["history"]["train"]["rmse"])