From 197236980824049ce60cdb76d558a1082d3c0e30 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Sun, 22 Sep 2024 04:39:18 +0800 Subject: [PATCH] Remove constraint. --- demo/guide-python/external_memory.py | 29 ++++++++++++++++------ src/common/error_msg.h | 6 ----- src/data/data.cc | 3 ++- src/data/extmem_quantile_dmatrix.cc | 2 ++ src/data/sparse_page_dmatrix.cc | 16 ++++-------- src/data/sparse_page_dmatrix.h | 6 ++--- src/data/sparse_page_source.cc | 10 ++++++++ src/data/sparse_page_source.h | 2 ++ tests/cpp/c_api/test_c_api.cc | 2 +- tests/cpp/data/test_sparse_page_dmatrix.cc | 9 ++++--- 10 files changed, 52 insertions(+), 33 deletions(-) diff --git a/demo/guide-python/external_memory.py b/demo/guide-python/external_memory.py index d63785c9459e..e2e36422d860 100644 --- a/demo/guide-python/external_memory.py +++ b/demo/guide-python/external_memory.py @@ -58,6 +58,7 @@ def __init__(self, device: str, file_paths: List[Tuple[str, str]]) -> None: super().__init__(cache_prefix=os.path.join(".", "cache")) def load_file(self) -> Tuple[np.ndarray, np.ndarray]: + """Load a single batch of data.""" X_path, y_path = self._file_paths[self._it] # When the `ExtMemQuantileDMatrix` is used, the device must match. This # constraint will be relaxed in the future. @@ -65,8 +66,6 @@ def load_file(self) -> Tuple[np.ndarray, np.ndarray]: X = np.load(X_path) y = np.load(y_path) else: - import cupy as cp - X = cp.load(X_path) y = cp.load(y_path) @@ -74,8 +73,8 @@ def load_file(self) -> Tuple[np.ndarray, np.ndarray]: return X, y def next(self, input_data: Callable) -> int: - """Advance the iterator by 1 step and pass the data to XGBoost. This function is - called by XGBoost during the construction of ``DMatrix`` + """Advance the iterator by 1 step and pass the data to XGBoost. This function + is called by XGBoost during the construction of ``DMatrix`` """ if self._it == len(self._file_paths): @@ -131,6 +130,8 @@ def approx_train(it: Iterator) -> None: def main(tmpdir: str, args: argparse.Namespace) -> None: + """Entry point for training.""" + # generate some random data for demo files = make_batches( n_samples_per_batch=1024, n_features=17, n_batches=31, tmpdir=tmpdir @@ -145,6 +146,20 @@ def main(tmpdir: str, args: argparse.Namespace) -> None: parser = argparse.ArgumentParser() parser.add_argument("--device", choices=["cpu", "cuda"], default="cpu") args = parser.parse_args() - - with tempfile.TemporaryDirectory() as tmpdir: - main(tmpdir, args) + if args.device == "cuda": + import cupy as cp + import rmm + from rmm.allocators.cupy import rmm_cupy_allocator + + # It's important to use RMM for GPU-based external memory for good performance. + mr = rmm.mr.PoolMemoryResource(rmm.mr.CudaAsyncMemoryResource()) + rmm.mr.set_current_device_resource(mr) + # Set the allocator for cupy as well. + cp.cuda.set_allocator(rmm_cupy_allocator) + # Make sure XGBoost is using RMM for all allocations. + with xgboost.config_context(use_rmm=True): + with tempfile.TemporaryDirectory() as tmpdir: + main(tmpdir, args) + else: + with tempfile.TemporaryDirectory() as tmpdir: + main(tmpdir, args) diff --git a/src/common/error_msg.h b/src/common/error_msg.h index 4548aa98258c..8eeee06e58b0 100644 --- a/src/common/error_msg.h +++ b/src/common/error_msg.h @@ -106,12 +106,6 @@ inline auto NoCategorical(std::string name) { return name + " doesn't support categorical features."; } -inline void NoOnHost(bool on_host) { - if (on_host) { - LOG(FATAL) << "Caching on host memory is only available for GPU."; - } -} - inline void NoPageConcat(bool concat_pages) { if (concat_pages) { LOG(FATAL) << "`external_memory_concat_pages` must be false when there's no sampling or when " diff --git a/src/data/data.cc b/src/data/data.cc index b71820a962c2..b1b25f7078fd 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -920,7 +920,8 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s data::fileiter::Next, std::numeric_limits::quiet_NaN(), 1, - cache_file}; + cache_file, + false}; } return dmat; diff --git a/src/data/extmem_quantile_dmatrix.cc b/src/data/extmem_quantile_dmatrix.cc index 0bdab8f02dc5..e3659f205dd9 100644 --- a/src/data/extmem_quantile_dmatrix.cc +++ b/src/data/extmem_quantile_dmatrix.cc @@ -13,6 +13,7 @@ #include "proxy_dmatrix.h" // for DataIterProxy, HostAdapterDispatch #include "quantile_dmatrix.h" // for GetDataShape, MakeSketches #include "simple_batch_iterator.h" // for SimpleBatchIteratorImpl +#include "sparse_page_source.h" // for MakeCachePrefix #if !defined(XGBOOST_USE_CUDA) #include "../common/common.h" // for AssertGPUSupport @@ -26,6 +27,7 @@ ExtMemQuantileDMatrix::ExtMemQuantileDMatrix(DataIterHandle iter_handle, DMatrix std::int32_t n_threads, std::string cache, bst_bin_t max_bin, bool on_host) : cache_prefix_{std::move(cache)}, on_host_{on_host} { + cache_prefix_ = MakeCachePrefix(cache_prefix_); auto iter = std::make_shared>( iter_handle, reset, next); iter->Reset(); diff --git a/src/data/sparse_page_dmatrix.cc b/src/data/sparse_page_dmatrix.cc index 7cabfbd14cf4..3528105417bb 100644 --- a/src/data/sparse_page_dmatrix.cc +++ b/src/data/sparse_page_dmatrix.cc @@ -13,9 +13,9 @@ #include // for move #include // for visit -#include "../collective/communicator-inl.h" -#include "batch_utils.h" // for RegenGHist -#include "gradient_index.h" +#include "batch_utils.h" // for RegenGHist +#include "gradient_index.h" // for GHistIndexMatrix +#include "sparse_page_source.h" // for MakeCachePrefix namespace xgboost::data { MetaInfo &SparsePageDMatrix::Info() { return info_; } @@ -34,12 +34,9 @@ SparsePageDMatrix::SparsePageDMatrix(DataIterHandle iter_handle, DMatrixHandle p cache_prefix_{std::move(cache_prefix)}, on_host_{on_host} { Context ctx; - ctx.nthread = nthreads; + ctx.Init(Args{{"nthread", std::to_string(nthreads)}}); + cache_prefix_ = MakeCachePrefix(cache_prefix_); - cache_prefix_ = cache_prefix_.empty() ? "DMatrix" : cache_prefix_; - if (collective::IsDistributed()) { - cache_prefix_ += ("-r" + std::to_string(collective::GetRank())); - } DMatrixProxy *proxy = MakeProxy(proxy_); auto iter = DataIterProxy{ iter_, reset_, next_}; @@ -107,7 +104,6 @@ BatchSet SparsePageDMatrix::GetRowBatches() { BatchSet SparsePageDMatrix::GetColumnBatches(Context const *ctx) { auto id = MakeCache(this, ".col.page", on_host_, cache_prefix_, &cache_info_); CHECK_NE(this->Info().num_col_, 0); - error::NoOnHost(on_host_); this->InitializeSparsePage(ctx); if (!column_source_) { column_source_ = @@ -122,7 +118,6 @@ BatchSet SparsePageDMatrix::GetColumnBatches(Context const *ctx) { BatchSet SparsePageDMatrix::GetSortedColumnBatches(Context const *ctx) { auto id = MakeCache(this, ".sorted.col.page", on_host_, cache_prefix_, &cache_info_); CHECK_NE(this->Info().num_col_, 0); - error::NoOnHost(on_host_); this->InitializeSparsePage(ctx); if (!sorted_column_source_) { sorted_column_source_ = std::make_shared( @@ -140,7 +135,6 @@ BatchSet SparsePageDMatrix::GetGradientIndex(Context const *ct CHECK_GE(param.max_bin, 2); } detail::CheckEmpty(batch_param_, param); - error::NoOnHost(on_host_); auto id = MakeCache(this, ".gradient_index.page", on_host_, cache_prefix_, &cache_info_); if (!cache_info_.at(id)->written || detail::RegenGHist(batch_param_, param)) { this->InitializeSparsePage(ctx); diff --git a/src/data/sparse_page_dmatrix.h b/src/data/sparse_page_dmatrix.h index f40c16f72488..9f2eed9187ff 100644 --- a/src/data/sparse_page_dmatrix.h +++ b/src/data/sparse_page_dmatrix.h @@ -70,10 +70,10 @@ class SparsePageDMatrix : public DMatrix { DataIterResetCallback *reset_; XGDMatrixCallbackNext *next_; - float missing_; + float const missing_; Context fmat_ctx_; std::string cache_prefix_; - bool on_host_{false}; + bool const on_host_; std::uint32_t n_batches_{0}; // sparse page is the source to other page types, we make a special member function. void InitializeSparsePage(Context const *ctx); @@ -83,7 +83,7 @@ class SparsePageDMatrix : public DMatrix { public: explicit SparsePageDMatrix(DataIterHandle iter, DMatrixHandle proxy, DataIterResetCallback *reset, XGDMatrixCallbackNext *next, float missing, int32_t nthreads, - std::string cache_prefix, bool on_host = false); + std::string cache_prefix, bool on_host); ~SparsePageDMatrix() override; diff --git a/src/data/sparse_page_source.cc b/src/data/sparse_page_source.cc index 724260512695..dd4050a713ec 100644 --- a/src/data/sparse_page_source.cc +++ b/src/data/sparse_page_source.cc @@ -8,6 +8,8 @@ #include // for partial_sum #include // for string +#include "../collective/communicator-inl.h" // for IsDistributed, GetRank + namespace xgboost::data { void Cache::Commit() { if (!this->written) { @@ -28,6 +30,14 @@ void TryDeleteCacheFile(const std::string& file) { } } +std::string MakeCachePrefix(std::string cache_prefix) { + cache_prefix = cache_prefix.empty() ? "DMatrix" : cache_prefix; + if (collective::IsDistributed()) { + cache_prefix += ("-r" + std::to_string(collective::GetRank())); + } + return cache_prefix; +} + #if !defined(XGBOOST_USE_CUDA) void InitNewThread::operator()() const { *GlobalConfigThreadLocalStore::Get() = config; } #endif diff --git a/src/data/sparse_page_source.h b/src/data/sparse_page_source.h index 471a84d608a5..cefd13ad735c 100644 --- a/src/data/sparse_page_source.h +++ b/src/data/sparse_page_source.h @@ -33,6 +33,8 @@ namespace xgboost::data { void TryDeleteCacheFile(const std::string& file); +std::string MakeCachePrefix(std::string cache_prefix); + /** * @brief Information about the cache including path and page offsets. */ diff --git a/tests/cpp/c_api/test_c_api.cc b/tests/cpp/c_api/test_c_api.cc index 8729eba82fc3..0117cc8f2218 100644 --- a/tests/cpp/c_api/test_c_api.cc +++ b/tests/cpp/c_api/test_c_api.cc @@ -496,7 +496,7 @@ auto MakeExtMemForTest(bst_idx_t n_samples, bst_feature_t n_features, Json dconf NumpyArrayIterForTest iter_1{0.0f, n_samples, n_features, n_batches}; auto Xy = std::make_shared( - &iter_1, iter_1.Proxy(), Reset, Next, std::numeric_limits::quiet_NaN(), 0, ""); + &iter_1, iter_1.Proxy(), Reset, Next, std::numeric_limits::quiet_NaN(), 0, "", false); MakeLabelForTest(Xy, p_fmat); return std::pair{p_fmat, Xy}; } diff --git a/tests/cpp/data/test_sparse_page_dmatrix.cc b/tests/cpp/data/test_sparse_page_dmatrix.cc index f6991cfd508d..a7c1bb3afb90 100644 --- a/tests/cpp/data/test_sparse_page_dmatrix.cc +++ b/tests/cpp/data/test_sparse_page_dmatrix.cc @@ -37,7 +37,8 @@ void TestSparseDMatrixLoadFile(Context const* ctx) { data::fileiter::Next, std::numeric_limits::quiet_NaN(), n_threads, - tmpdir.path + "cache"}; + tmpdir.path + "cache", + false}; ASSERT_EQ(AllThreadsForTest(), m.Ctx()->Threads()); ASSERT_EQ(m.Info().num_col_, 5); ASSERT_EQ(m.Info().num_row_, 64); @@ -364,9 +365,9 @@ auto TestSparsePageDMatrixDeterminism(int32_t threads) { CreateBigTestData(filename, 1 << 16); data::FileIterator iter(filename + "?format=libsvm", 0, 1); - std::unique_ptr sparse{ - new data::SparsePageDMatrix{&iter, iter.Proxy(), data::fileiter::Reset, data::fileiter::Next, - std::numeric_limits::quiet_NaN(), threads, filename}}; + std::unique_ptr sparse{new data::SparsePageDMatrix{ + &iter, iter.Proxy(), data::fileiter::Reset, data::fileiter::Next, + std::numeric_limits::quiet_NaN(), threads, filename, false}}; CHECK(sparse->Ctx()->Threads() == threads || sparse->Ctx()->Threads() == AllThreadsForTest()); DMatrixToCSR(sparse.get(), &sparse_data, &sparse_rptr, &sparse_cids);