From 5f6cae99cf29be0c169d2022c341a1094586a16a Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Mon, 9 Sep 2024 23:04:17 +0800 Subject: [PATCH 1/3] [EM] Multi-level quantile sketching for GPU. This helps reduce memory usage when handling multiple batches. --- src/common/quantile.cuh | 23 ++++++-- src/data/quantile_dmatrix.cu | 100 +++++++++++++++++++++++++++------- src/data/sparse_page_source.h | 4 +- 3 files changed, 101 insertions(+), 26 deletions(-) diff --git a/src/common/quantile.cuh b/src/common/quantile.cuh index 1bd1672eb3dc..239388b3b62c 100644 --- a/src/common/quantile.cuh +++ b/src/common/quantile.cuh @@ -143,17 +143,30 @@ class SketchContainer { */ void Push(Context const* ctx, Span entries, Span columns_ptr, common::Span cuts_ptr, size_t total_cuts, Span weights = {}); - /* \brief Prune the quantile structure. + /** + * @brief Prune the quantile structure. * - * \param to The maximum size of pruned quantile. If the size of quantile - * structure is already less than `to`, then no operation is performed. + * @param to The maximum size of pruned quantile. If the size of quantile structure is + * already less than `to`, then no operation is performed. */ void Prune(Context const* ctx, size_t to); - /* \brief Merge another set of sketch. - * \param that columns of other. + /** + * @brief Merge another set of sketch. + * + * @param that_columns_ptr Column pointer of the quantile summary being merged. + * @param that Columns of the other quantile summary. */ void Merge(Context const* ctx, Span that_columns_ptr, Span that); + /** + * @brief Shrink the internal data structure to reduce memory usage. Can be used after + * prune. + */ + void ShrinkToFit() { + this->Current().shrink_to_fit(); + this->Other().clear(); + this->Other().shrink_to_fit(); + } /* \brief Merge quantiles from other GPU workers. */ void AllReduce(Context const* ctx, bool is_column_split); diff --git a/src/data/quantile_dmatrix.cu b/src/data/quantile_dmatrix.cu index 04db88405896..8b5e2d3eef4a 100644 --- a/src/data/quantile_dmatrix.cu +++ b/src/data/quantile_dmatrix.cu @@ -3,6 +3,7 @@ */ #include // for max #include // for partial_sum +#include // for pair #include // for vector #include "../collective/allreduce.h" // for Allreduce @@ -29,11 +30,39 @@ void MakeSketches(Context const* ctx, float missing, std::shared_ptr cuts, MetaInfo const& info, ExternalDataInfo* p_ext_info) { xgboost_NVTX_FN_RANGE(); - - std::unique_ptr sketch; + /** + * A variant of: A Fast Algorithm for Approximate Quantiles in High Speed Data Streams + * + * The original algorithm was designed for CPU where input is a stream with individual + * elements. For GPU, we process the data in batches. As a result, the implementation + * here simply uses the user input batch as the basic unit of sketching blocks. The + * number of blocks per-level grows exponentially. + */ + std::vector, bst_idx_t>> sketches; auto& ext_info = *p_ext_info; + auto lazy_init_sketch = [&] { + // Lazy because we need the `n_features`. + sketches.emplace_back(std::make_unique( + proxy->Info().feature_types, p.max_bin, ext_info.n_features, + data::BatchSamples(proxy), dh::GetDevice(ctx)), + 0); + }; + + // Workaround empty input with CPU ctx. + Context new_ctx; + Context const* p_ctx; + if (ctx->IsCUDA()) { + p_ctx = ctx; + } else { + new_ctx.UpdateAllowUnknown(Args{{"device", dh::GetDevice(ctx).Name()}}); + p_ctx = &new_ctx; + } + do { + /** + * Get the data shape. + */ // We use do while here as the first batch is fetched in ctor CHECK_LT(ctx->Ordinal(), common::AllVisibleGPUs()); common::SetDevice(dh::GetDevice(ctx).ordinal); @@ -46,28 +75,40 @@ void MakeSketches(Context const* ctx, CHECK_EQ(ext_info.n_features, ::xgboost::data::BatchColumns(proxy)) << "Inconsistent number of columns."; } + + auto batch_rows = data::BatchSamples(proxy); + ext_info.accumulated_rows += batch_rows; + + /** + * Handle sketching. + */ if (!ref) { - if (!sketch) { - sketch = std::make_unique( - proxy->Info().feature_types, p.max_bin, ext_info.n_features, data::BatchSamples(proxy), - dh::GetDevice(ctx)); + if (sketches.empty()) { + lazy_init_sketch(); + } + if (sketches.back().second > std::pow(2, sketches.size() - 1)) { + auto n_cuts_per_feat = + common::detail::RequiredSampleCutsPerColumn(p.max_bin, ext_info.accumulated_rows); + // Prune to a single block + sketches.back().first->Prune(p_ctx, n_cuts_per_feat); + sketches.back().first->ShrinkToFit(); + + std::cout << "prune n_batches:" << sketches.back().second << std::endl; + + sketches.back().second = 1; + lazy_init_sketch(); // Add a new level. } proxy->Info().weights_.SetDevice(dh::GetDevice(ctx)); cuda_impl::Dispatch(proxy, [&](auto const& value) { - // Workaround empty input with CPU ctx. - Context new_ctx; - Context const* p_ctx; - if (ctx->IsCUDA()) { - p_ctx = ctx; - } else { - new_ctx.UpdateAllowUnknown(Args{{"device", dh::GetDevice(ctx).Name()}}); - p_ctx = &new_ctx; - } - common::AdapterDeviceSketch(p_ctx, value, p.max_bin, proxy->Info(), missing, sketch.get()); + common::AdapterDeviceSketch(p_ctx, value, p.max_bin, proxy->Info(), missing, + sketches.back().first.get()); + sketches.back().second++; }); } - auto batch_rows = data::BatchSamples(proxy); - ext_info.accumulated_rows += batch_rows; + + /** + * Rest of the data shape. + */ dh::device_vector row_counts(batch_rows + 1, 0); common::Span row_counts_span(row_counts.data().get(), row_counts.size()); ext_info.row_stride = @@ -87,7 +128,28 @@ void MakeSketches(Context const* ctx, // Get reference common::SetDevice(dh::GetDevice(ctx).ordinal); if (!ref) { - sketch->MakeCuts(ctx, cuts.get(), info.IsColumnSplit()); + HostDeviceVector ft; + common::SketchContainer final_sketch( + sketches.empty() ? ft : sketches.front().first->FeatureTypes(), p.max_bin, + ext_info.n_features, ext_info.accumulated_rows, dh::GetDevice(ctx)); + // Reverse order since the last container might contain summary that's not yet pruned. + for (auto it = sketches.crbegin(); it != sketches.crend(); ++it) { + auto& sketch = *it; + + CHECK_GE(sketch.second, 1); + if (sketch.second > 1) { + sketch.first->Prune(p_ctx, common::detail::RequiredSampleCutsPerColumn( + p.max_bin, ext_info.accumulated_rows)); + sketch.first->ShrinkToFit(); + } + final_sketch.Merge(p_ctx, sketch.first->ColumnsPtr(), sketch.first->Data()); + final_sketch.FixError(); + } + + sketches.clear(); + sketches.shrink_to_fit(); + + final_sketch.MakeCuts(ctx, cuts.get(), info.IsColumnSplit()); } else { GetCutsFromRef(ctx, ref, ext_info.n_features, p, cuts.get()); } diff --git a/src/data/sparse_page_source.h b/src/data/sparse_page_source.h index e014dea0c2ff..2f37aa4130ca 100644 --- a/src/data/sparse_page_source.h +++ b/src/data/sparse_page_source.h @@ -289,11 +289,11 @@ class SparsePageSourceImpl : public BatchIteratorImpl, public FormatStreamPol auto page = std::make_shared(); this->exce_.Run([&] { std::unique_ptr fmt{ - this->CreatePageFormat(this->param_)}; + self->CreatePageFormat(self->param_)}; auto name = self->cache_info_->ShardName(); auto [offset, length] = self->cache_info_->View(fetch_it); std::unique_ptr fi{ - this->CreateReader(name, offset, length)}; + self->CreateReader(name, offset, length)}; CHECK(fmt->Read(page.get(), fi.get())); }); return page; From 129f57f40d81706310aa74c97c9339ca25926e52 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Mon, 9 Sep 2024 23:08:34 +0800 Subject: [PATCH 2/3] Cleanup. --- src/data/quantile_dmatrix.cu | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/data/quantile_dmatrix.cu b/src/data/quantile_dmatrix.cu index 8b5e2d3eef4a..ed70664124ad 100644 --- a/src/data/quantile_dmatrix.cu +++ b/src/data/quantile_dmatrix.cu @@ -86,15 +86,13 @@ void MakeSketches(Context const* ctx, if (sketches.empty()) { lazy_init_sketch(); } - if (sketches.back().second > std::pow(2, sketches.size() - 1)) { + if (sketches.back().second > (1ul << (sketches.size() - 1))) { auto n_cuts_per_feat = common::detail::RequiredSampleCutsPerColumn(p.max_bin, ext_info.accumulated_rows); // Prune to a single block sketches.back().first->Prune(p_ctx, n_cuts_per_feat); sketches.back().first->ShrinkToFit(); - std::cout << "prune n_batches:" << sketches.back().second << std::endl; - sketches.back().second = 1; lazy_init_sketch(); // Add a new level. } From bf9b18069522c2c43838df39ef5ecd4443252484 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Tue, 10 Sep 2024 00:20:25 +0800 Subject: [PATCH 3/3] Fix old error. --- demo/guide-python/quantile_data_iterator.py | 12 ++++++++---- tests/cpp/common/test_hist_util.cu | 6 +++--- tests/cpp/data/test_iterative_dmatrix.cu | 2 +- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/demo/guide-python/quantile_data_iterator.py b/demo/guide-python/quantile_data_iterator.py index ac7e4752f6fa..ac68bad119cc 100644 --- a/demo/guide-python/quantile_data_iterator.py +++ b/demo/guide-python/quantile_data_iterator.py @@ -105,17 +105,21 @@ def main(): assert m_with_it.num_row() == m.num_row() # Tree meethod must be `hist`. reg_with_it = xgboost.train( - {"tree_method": "hist", "device": "cuda"}, m_with_it, num_boost_round=rounds + {"tree_method": "hist", "device": "cuda"}, + m_with_it, + num_boost_round=rounds, + evals=[(m_with_it, "Train")], ) predict_with_it = reg_with_it.predict(m_with_it) reg = xgboost.train( - {"tree_method": "hist", "device": "cuda"}, m, num_boost_round=rounds + {"tree_method": "hist", "device": "cuda"}, + m, + num_boost_round=rounds, + evals=[(m, "Train")], ) predict = reg.predict(m) - numpy.testing.assert_allclose(predict_with_it, predict, rtol=1e6) - if __name__ == "__main__": main() diff --git a/tests/cpp/common/test_hist_util.cu b/tests/cpp/common/test_hist_util.cu index b3b77694c853..f981a181b89f 100644 --- a/tests/cpp/common/test_hist_util.cu +++ b/tests/cpp/common/test_hist_util.cu @@ -17,6 +17,7 @@ #include "../../../include/xgboost/logging.h" #include "../../../src/common/cuda_context.cuh" +#include "../../../src/common/cuda_rt_utils.h" // for SetDevice #include "../../../src/common/device_helpers.cuh" #include "../../../src/common/hist_util.cuh" #include "../../../src/common/hist_util.h" @@ -59,8 +60,7 @@ TEST(HistUtil, SketchBatchNumElements) { GTEST_SKIP_("Test not runnable with RMM enabled."); #endif // defined(XGBOOST_USE_RMM) && XGBOOST_USE_RMM == 1 size_t constexpr kCols = 10000; - int device; - dh::safe_cuda(cudaGetDevice(&device)); + std::int32_t device = dh::CurrentDevice(); auto avail = static_cast(dh::AvailableMemory(device) * 0.8); auto per_elem = detail::BytesPerElement(false); auto avail_elem = avail / per_elem; @@ -576,7 +576,7 @@ TEST(HistUtil, AdapterDeviceSketchBatches) { namespace { auto MakeData(Context const* ctx, std::size_t n_samples, bst_feature_t n_features) { - dh::safe_cuda(cudaSetDevice(ctx->Ordinal())); + common::SetDevice(ctx->Ordinal()); auto n = n_samples * n_features; std::vector x; x.resize(n); diff --git a/tests/cpp/data/test_iterative_dmatrix.cu b/tests/cpp/data/test_iterative_dmatrix.cu index c8eb1f015880..8797fc18d405 100644 --- a/tests/cpp/data/test_iterative_dmatrix.cu +++ b/tests/cpp/data/test_iterative_dmatrix.cu @@ -13,7 +13,7 @@ namespace xgboost::data { void TestEquivalent(float sparsity) { - Context ctx{MakeCUDACtx(0)}; + auto ctx = MakeCUDACtx(0); CudaArrayIterForTest iter{sparsity}; IterativeDMatrix m(&iter, iter.Proxy(), nullptr, Reset, Next,