Skip to content

Commit

Permalink
Remove constraint.
Browse files Browse the repository at this point in the history
  • Loading branch information
trivialfis committed Sep 21, 2024
1 parent 274d9ea commit 1972369
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 33 deletions.
29 changes: 22 additions & 7 deletions demo/guide-python/external_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,23 @@ def __init__(self, device: str, file_paths: List[Tuple[str, str]]) -> None:
super().__init__(cache_prefix=os.path.join(".", "cache"))

def load_file(self) -> Tuple[np.ndarray, np.ndarray]:
"""Load a single batch of data."""
X_path, y_path = self._file_paths[self._it]
# When the `ExtMemQuantileDMatrix` is used, the device must match. This
# constraint will be relaxed in the future.
if self.device == "cpu":
X = np.load(X_path)
y = np.load(y_path)
else:
import cupy as cp

X = cp.load(X_path)
y = cp.load(y_path)

assert X.shape[0] == y.shape[0]
return X, y

def next(self, input_data: Callable) -> int:
"""Advance the iterator by 1 step and pass the data to XGBoost. This function is
called by XGBoost during the construction of ``DMatrix``
"""Advance the iterator by 1 step and pass the data to XGBoost. This function
is called by XGBoost during the construction of ``DMatrix``
"""
if self._it == len(self._file_paths):
Expand Down Expand Up @@ -131,6 +130,8 @@ def approx_train(it: Iterator) -> None:


def main(tmpdir: str, args: argparse.Namespace) -> None:
"""Entry point for training."""

# generate some random data for demo
files = make_batches(
n_samples_per_batch=1024, n_features=17, n_batches=31, tmpdir=tmpdir
Expand All @@ -145,6 +146,20 @@ def main(tmpdir: str, args: argparse.Namespace) -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--device", choices=["cpu", "cuda"], default="cpu")
args = parser.parse_args()

with tempfile.TemporaryDirectory() as tmpdir:
main(tmpdir, args)
if args.device == "cuda":
import cupy as cp
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

# It's important to use RMM for GPU-based external memory for good performance.
mr = rmm.mr.PoolMemoryResource(rmm.mr.CudaAsyncMemoryResource())
rmm.mr.set_current_device_resource(mr)
# Set the allocator for cupy as well.
cp.cuda.set_allocator(rmm_cupy_allocator)
# Make sure XGBoost is using RMM for all allocations.
with xgboost.config_context(use_rmm=True):
with tempfile.TemporaryDirectory() as tmpdir:
main(tmpdir, args)
else:
with tempfile.TemporaryDirectory() as tmpdir:
main(tmpdir, args)
6 changes: 0 additions & 6 deletions src/common/error_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,6 @@ inline auto NoCategorical(std::string name) {
return name + " doesn't support categorical features.";
}

inline void NoOnHost(bool on_host) {
if (on_host) {
LOG(FATAL) << "Caching on host memory is only available for GPU.";
}
}

inline void NoPageConcat(bool concat_pages) {
if (concat_pages) {
LOG(FATAL) << "`external_memory_concat_pages` must be false when there's no sampling or when "
Expand Down
3 changes: 2 additions & 1 deletion src/data/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,8 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
data::fileiter::Next,
std::numeric_limits<float>::quiet_NaN(),
1,
cache_file};
cache_file,
false};
}

return dmat;
Expand Down
2 changes: 2 additions & 0 deletions src/data/extmem_quantile_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "proxy_dmatrix.h" // for DataIterProxy, HostAdapterDispatch
#include "quantile_dmatrix.h" // for GetDataShape, MakeSketches
#include "simple_batch_iterator.h" // for SimpleBatchIteratorImpl
#include "sparse_page_source.h" // for MakeCachePrefix

#if !defined(XGBOOST_USE_CUDA)
#include "../common/common.h" // for AssertGPUSupport
Expand All @@ -26,6 +27,7 @@ ExtMemQuantileDMatrix::ExtMemQuantileDMatrix(DataIterHandle iter_handle, DMatrix
std::int32_t n_threads, std::string cache,
bst_bin_t max_bin, bool on_host)
: cache_prefix_{std::move(cache)}, on_host_{on_host} {
cache_prefix_ = MakeCachePrefix(cache_prefix_);
auto iter = std::make_shared<DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext>>(
iter_handle, reset, next);
iter->Reset();
Expand Down
16 changes: 5 additions & 11 deletions src/data/sparse_page_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
#include <utility> // for move
#include <variant> // for visit

#include "../collective/communicator-inl.h"
#include "batch_utils.h" // for RegenGHist
#include "gradient_index.h"
#include "batch_utils.h" // for RegenGHist
#include "gradient_index.h" // for GHistIndexMatrix
#include "sparse_page_source.h" // for MakeCachePrefix

namespace xgboost::data {
MetaInfo &SparsePageDMatrix::Info() { return info_; }
Expand All @@ -34,12 +34,9 @@ SparsePageDMatrix::SparsePageDMatrix(DataIterHandle iter_handle, DMatrixHandle p
cache_prefix_{std::move(cache_prefix)},
on_host_{on_host} {
Context ctx;
ctx.nthread = nthreads;
ctx.Init(Args{{"nthread", std::to_string(nthreads)}});
cache_prefix_ = MakeCachePrefix(cache_prefix_);

cache_prefix_ = cache_prefix_.empty() ? "DMatrix" : cache_prefix_;
if (collective::IsDistributed()) {
cache_prefix_ += ("-r" + std::to_string(collective::GetRank()));
}
DMatrixProxy *proxy = MakeProxy(proxy_);
auto iter = DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext>{
iter_, reset_, next_};
Expand Down Expand Up @@ -107,7 +104,6 @@ BatchSet<SparsePage> SparsePageDMatrix::GetRowBatches() {
BatchSet<CSCPage> SparsePageDMatrix::GetColumnBatches(Context const *ctx) {
auto id = MakeCache(this, ".col.page", on_host_, cache_prefix_, &cache_info_);
CHECK_NE(this->Info().num_col_, 0);
error::NoOnHost(on_host_);
this->InitializeSparsePage(ctx);
if (!column_source_) {
column_source_ =
Expand All @@ -122,7 +118,6 @@ BatchSet<CSCPage> SparsePageDMatrix::GetColumnBatches(Context const *ctx) {
BatchSet<SortedCSCPage> SparsePageDMatrix::GetSortedColumnBatches(Context const *ctx) {
auto id = MakeCache(this, ".sorted.col.page", on_host_, cache_prefix_, &cache_info_);
CHECK_NE(this->Info().num_col_, 0);
error::NoOnHost(on_host_);
this->InitializeSparsePage(ctx);
if (!sorted_column_source_) {
sorted_column_source_ = std::make_shared<SortedCSCPageSource>(
Expand All @@ -140,7 +135,6 @@ BatchSet<GHistIndexMatrix> SparsePageDMatrix::GetGradientIndex(Context const *ct
CHECK_GE(param.max_bin, 2);
}
detail::CheckEmpty(batch_param_, param);
error::NoOnHost(on_host_);
auto id = MakeCache(this, ".gradient_index.page", on_host_, cache_prefix_, &cache_info_);
if (!cache_info_.at(id)->written || detail::RegenGHist(batch_param_, param)) {
this->InitializeSparsePage(ctx);
Expand Down
6 changes: 3 additions & 3 deletions src/data/sparse_page_dmatrix.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ class SparsePageDMatrix : public DMatrix {
DataIterResetCallback *reset_;
XGDMatrixCallbackNext *next_;

float missing_;
float const missing_;
Context fmat_ctx_;
std::string cache_prefix_;
bool on_host_{false};
bool const on_host_;
std::uint32_t n_batches_{0};
// sparse page is the source to other page types, we make a special member function.
void InitializeSparsePage(Context const *ctx);
Expand All @@ -83,7 +83,7 @@ class SparsePageDMatrix : public DMatrix {
public:
explicit SparsePageDMatrix(DataIterHandle iter, DMatrixHandle proxy, DataIterResetCallback *reset,
XGDMatrixCallbackNext *next, float missing, int32_t nthreads,
std::string cache_prefix, bool on_host = false);
std::string cache_prefix, bool on_host);

~SparsePageDMatrix() override;

Expand Down
10 changes: 10 additions & 0 deletions src/data/sparse_page_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <numeric> // for partial_sum
#include <string> // for string

#include "../collective/communicator-inl.h" // for IsDistributed, GetRank

namespace xgboost::data {
void Cache::Commit() {
if (!this->written) {
Expand All @@ -28,6 +30,14 @@ void TryDeleteCacheFile(const std::string& file) {
}
}

std::string MakeCachePrefix(std::string cache_prefix) {
cache_prefix = cache_prefix.empty() ? "DMatrix" : cache_prefix;
if (collective::IsDistributed()) {
cache_prefix += ("-r" + std::to_string(collective::GetRank()));
}
return cache_prefix;
}

#if !defined(XGBOOST_USE_CUDA)
void InitNewThread::operator()() const { *GlobalConfigThreadLocalStore::Get() = config; }
#endif
Expand Down
2 changes: 2 additions & 0 deletions src/data/sparse_page_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
namespace xgboost::data {
void TryDeleteCacheFile(const std::string& file);

std::string MakeCachePrefix(std::string cache_prefix);

/**
* @brief Information about the cache including path and page offsets.
*/
Expand Down
2 changes: 1 addition & 1 deletion tests/cpp/c_api/test_c_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ auto MakeExtMemForTest(bst_idx_t n_samples, bst_feature_t n_features, Json dconf

NumpyArrayIterForTest iter_1{0.0f, n_samples, n_features, n_batches};
auto Xy = std::make_shared<data::SparsePageDMatrix>(
&iter_1, iter_1.Proxy(), Reset, Next, std::numeric_limits<float>::quiet_NaN(), 0, "");
&iter_1, iter_1.Proxy(), Reset, Next, std::numeric_limits<float>::quiet_NaN(), 0, "", false);
MakeLabelForTest(Xy, p_fmat);
return std::pair{p_fmat, Xy};
}
Expand Down
9 changes: 5 additions & 4 deletions tests/cpp/data/test_sparse_page_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ void TestSparseDMatrixLoadFile(Context const* ctx) {
data::fileiter::Next,
std::numeric_limits<float>::quiet_NaN(),
n_threads,
tmpdir.path + "cache"};
tmpdir.path + "cache",
false};
ASSERT_EQ(AllThreadsForTest(), m.Ctx()->Threads());
ASSERT_EQ(m.Info().num_col_, 5);
ASSERT_EQ(m.Info().num_row_, 64);
Expand Down Expand Up @@ -364,9 +365,9 @@ auto TestSparsePageDMatrixDeterminism(int32_t threads) {
CreateBigTestData(filename, 1 << 16);

data::FileIterator iter(filename + "?format=libsvm", 0, 1);
std::unique_ptr<DMatrix> sparse{
new data::SparsePageDMatrix{&iter, iter.Proxy(), data::fileiter::Reset, data::fileiter::Next,
std::numeric_limits<float>::quiet_NaN(), threads, filename}};
std::unique_ptr<DMatrix> sparse{new data::SparsePageDMatrix{
&iter, iter.Proxy(), data::fileiter::Reset, data::fileiter::Next,
std::numeric_limits<float>::quiet_NaN(), threads, filename, false}};
CHECK(sparse->Ctx()->Threads() == threads || sparse->Ctx()->Threads() == AllThreadsForTest());

DMatrixToCSR(sparse.get(), &sparse_data, &sparse_rptr, &sparse_cids);
Expand Down

0 comments on commit 1972369

Please sign in to comment.