Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using multiple CellDb to concurrency read from celldb #1363

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,8 @@ td::Status ValidatorEngine::load_global_config() {
}
validator_options_.write().set_fast_state_serializer_enabled(fast_state_serializer_enabled_);

validator_options_.write().set_cpu_threads_count(cpu_threads_count_);

return td::Status::OK();
}

Expand Down Expand Up @@ -4235,6 +4237,7 @@ int main(int argc, char *argv[]) {
threads = v;
return td::Status::OK();
});
acts.push_back([&x, &threads]() { td::actor::send_closure(x, &ValidatorEngine::set_cpu_threads_count, threads); });
p.add_checked_option('u', "user", "change user", [&](td::Slice user) { return td::change_user(user.str()); });
p.add_checked_option('\0', "shutdown-at", "stop validator at the given time (unix timestamp)", [&](td::Slice arg) {
TRY_RESULT(at, td::to_integer_safe<td::uint32>(arg));
Expand Down
9 changes: 7 additions & 2 deletions validator-engine/validator-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ class ValidatorEngine : public td::actor::Actor {
std::string session_logs_file_;
bool fast_state_serializer_enabled_ = false;

size_t cpu_threads_count_;

std::set<ton::CatchainSeqno> unsafe_catchains_;
std::map<ton::BlockSeqno, std::pair<ton::CatchainSeqno, td::uint32>> unsafe_catchain_rotations_;

Expand Down Expand Up @@ -310,6 +312,9 @@ class ValidatorEngine : public td::actor::Actor {
void set_fast_state_serializer_enabled(bool value) {
fast_state_serializer_enabled_ = value;
}
void set_cpu_threads_count(size_t cpu_threads_count) {
cpu_threads_count_ = cpu_threads_count;
}
void start_up() override;
ValidatorEngine() {
}
Expand Down Expand Up @@ -401,8 +406,8 @@ class ValidatorEngine : public td::actor::Actor {

void load_custom_overlays_config();
td::Status write_custom_overlays_config();
void add_custom_overlay_to_config(
ton::tl_object_ptr<ton::ton_api::engine_validator_customOverlay> overlay, td::Promise<td::Unit> promise);
void add_custom_overlay_to_config(ton::tl_object_ptr<ton::ton_api::engine_validator_customOverlay> overlay,
td::Promise<td::Unit> promise);
void del_custom_overlay_from_config(std::string name, td::Promise<td::Unit> promise);
void load_collator_options();

Expand Down
91 changes: 47 additions & 44 deletions validator/db/celldb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,35 @@ void CellDbBase::execute_sync(std::function<void()> f) {
f();
}

CellDbIn::CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path,
InMemoryInfo InMemoryInfo::create_from_rocksdb(const std::string& path) {
td::RocksDbOptions read_db_options;
read_db_options.use_direct_reads = true;
read_db_options.no_block_cache = true;
read_db_options.block_cache = {};
LOG(WARNING) << "Loading all cells in memory (because of --celldb-in-memory)";
td::Timer timer;
auto read_cell_db = std::make_shared<td::RocksDb>(td::RocksDb::open(path, std::move(read_db_options)).move_as_ok());
auto boc = vm::DynamicBagOfCellsDb::create_in_memory(read_cell_db.get(), {});
const auto in_memory_load_time = timer.elapsed();

return InMemoryInfo{.boc_ = std::move(boc), .in_memory_load_time_ = in_memory_load_time};
}

CellDbIn::CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path, int obj_id,
InMemoryInfo inmem_info, std::shared_ptr<td::RocksDb> rocks_db, td::RocksDbOptions rdb_opts,
td::Ref<ValidatorManagerOptions> opts)
: root_db_(root_db), parent_(parent), path_(std::move(path)), opts_(opts) {
: root_db_(root_db)
, parent_(parent)
, path_(std::move(path))
, obj_id_(obj_id)
, boc_(inmem_info.boc_)
, cell_db_(rocks_db)
, rocks_db_(rocks_db->raw_db())
, in_memory_load_time_(inmem_info.in_memory_load_time_) {
statistics_ = rdb_opts.statistics;
statistics_flush_at_ = td::Timestamp::in(60.0);
snapshot_statistics_ = rdb_opts.snapshot_statistics;
opts_ = opts;
}

void CellDbIn::start_up() {
Expand All @@ -88,42 +114,13 @@ void CellDbIn::start_up() {
};

CellDbBase::start_up();
td::RocksDbOptions db_options;
if (!opts_->get_disable_rocksdb_stats()) {
statistics_ = td::RocksDb::create_statistics();
statistics_flush_at_ = td::Timestamp::in(60.0);
snapshot_statistics_ = std::make_shared<td::RocksDbSnapshotStatistics>();
db_options.snapshot_statistics = snapshot_statistics_;
}
db_options.statistics = statistics_;
if (opts_->get_celldb_cache_size()) {
db_options.block_cache = td::RocksDb::create_cache(opts_->get_celldb_cache_size().value());
LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(opts_->get_celldb_cache_size().value());
}
db_options.use_direct_reads = opts_->get_celldb_direct_io();

if (opts_->get_celldb_in_memory()) {
td::RocksDbOptions read_db_options;
read_db_options.use_direct_reads = true;
read_db_options.no_block_cache = true;
read_db_options.block_cache = {};
LOG(WARNING) << "Loading all cells in memory (because of --celldb-in-memory)";
td::Timer timer;
auto read_cell_db =
std::make_shared<td::RocksDb>(td::RocksDb::open(path_, std::move(read_db_options)).move_as_ok());
boc_ = vm::DynamicBagOfCellsDb::create_in_memory(read_cell_db.get(), {});
in_memory_load_time_ = timer.elapsed();
td::actor::send_closure(parent_, &CellDb::set_in_memory_boc, boc_);
}

auto rocks_db = std::make_shared<td::RocksDb>(td::RocksDb::open(path_, std::move(db_options)).move_as_ok());
rocks_db_ = rocks_db->raw_db();
cell_db_ = std::move(rocks_db);
if (!opts_->get_celldb_in_memory()) {
CHECK(boc_ == nullptr);
boc_ = vm::DynamicBagOfCellsDb::create();
boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth());
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
td::actor::send_closure(root_db_, &RootDb::update_snapshot);
}

alarm_timestamp() = td::Timestamp::in(10.0);
Expand Down Expand Up @@ -238,7 +235,7 @@ void CellDbIn::store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promi

if (!opts_->get_celldb_in_memory()) {
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
td::actor::send_closure(root_db_, &RootDb::update_snapshot);
}

promise.set_result(boc_->load_cell(cell->get_hash().as_slice()));
Expand All @@ -255,11 +252,10 @@ void CellDbIn::store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promi

void CellDbIn::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise) {
if (db_busy_) {
action_queue_.push(
[self = this, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
R.ensure();
self->get_cell_db_reader(std::move(promise));
});
action_queue_.push([self = this, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
R.ensure();
self->get_cell_db_reader(std::move(promise));
});
return;
}
promise.set_result(boc_->get_cell_db_reader());
Expand Down Expand Up @@ -317,8 +313,8 @@ void CellDbIn::flush_db_stats() {

auto stats =
td::RocksDb::statistics_to_string(statistics_) + snapshot_statistics_->to_string() + ss.as_cslice().str();
auto to_file_r =
td::FileFd::open(path_ + "/db_stats.txt", td::FileFd::Truncate | td::FileFd::Create | td::FileFd::Write, 0644);
auto to_file_r = td::FileFd::open(path_ + "/" + std::to_string(obj_id_) + "_db_stats.txt",
td::FileFd::Truncate | td::FileFd::Create | td::FileFd::Write, 0644);
if (to_file_r.is_error()) {
LOG(ERROR) << "Failed to open db_stats.txt: " << to_file_r.move_as_error();
return;
Expand Down Expand Up @@ -468,7 +464,7 @@ void CellDbIn::gc_cont2(BlockHandle handle) {
td::PerfWarningTimer timer_finish{"gccell_finish", 0.05};
if (!opts_->get_celldb_in_memory()) {
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
td::actor::send_closure(root_db_, &RootDb::update_snapshot);
}

DCHECK(get_block(key_hash).is_error());
Expand Down Expand Up @@ -583,7 +579,7 @@ void CellDbIn::migrate_cells() {
}
cell_db_->commit_write_batch().ensure();
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
td::actor::send_closure(root_db_, &RootDb::update_snapshot);

double time = timer.elapsed();
LOG(DEBUG) << "CellDb migration: migrated=" << migrated << " checked=" << checked << " time=" << time;
Expand Down Expand Up @@ -653,7 +649,9 @@ void CellDb::start_up() {
CellDbBase::start_up();
boc_ = vm::DynamicBagOfCellsDb::create();
boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth());
cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_, opts_);

cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_, obj_id_, inmem_info_,
rocks_db_, rdb_opts_, opts_);
on_load_callback_ = [actor = std::make_shared<td::actor::ActorOwn<CellDbIn::MigrationProxy>>(
td::actor::create_actor<CellDbIn::MigrationProxy>("celldbmigration", cell_db_.get())),
compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) {
Expand All @@ -666,6 +664,11 @@ void CellDb::start_up() {
td::Bits256{res.cell_->get_hash().bits()});
}
};

if (opts_->get_celldb_in_memory()) {
CHECK(inmem_info_.boc_ != nullptr);
td::actor::send_closure(actor_id(this), &CellDb::set_in_memory_boc, inmem_info_.boc_);
}
}

CellDbIn::DbEntry::DbEntry(tl_object_ptr<ton_api::db_celldb_value> entry)
Expand Down
34 changes: 28 additions & 6 deletions validator/db/celldb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ class CellDbBase : public td::actor::Actor {
friend CellDbAsyncExecutor;
};

struct InMemoryInfo {
static InMemoryInfo create_from_rocksdb(const std::string& path);

std::shared_ptr<vm::DynamicBagOfCellsDb> boc_;
std::optional<double> in_memory_load_time_;
};

class CellDbIn : public CellDbBase {
public:
using KeyHash = td::Bits256;
Expand All @@ -71,7 +78,8 @@ class CellDbIn : public CellDbBase {

void flush_db_stats();

CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path,
CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path, int obj_id,
InMemoryInfo inmem_info, std::shared_ptr<td::RocksDb> rocks_db, td::RocksDbOptions rdb_opts,
td::Ref<ValidatorManagerOptions> opts);

void start_up() override;
Expand Down Expand Up @@ -113,6 +121,7 @@ class CellDbIn : public CellDbBase {
td::actor::ActorId<CellDb> parent_;

std::string path_;
int obj_id_;
td::Ref<ValidatorManagerOptions> opts_;

std::shared_ptr<vm::DynamicBagOfCellsDb> boc_;
Expand Down Expand Up @@ -187,13 +196,13 @@ class CellDb : public CellDbBase {
void prepare_stats(td::Promise<std::vector<std::pair<std::string, std::string>>> promise);
void load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise);
void store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promise<td::Ref<vm::DataCell>> promise);
void update_snapshot(std::unique_ptr<td::KeyValueReader> snapshot) {
void update_snapshot() {
CHECK(!opts_->get_celldb_in_memory());
if (!started_) {
alarm();
}
started_ = true;
boc_->set_loader(std::make_unique<vm::CellLoader>(std::move(snapshot), on_load_callback_)).ensure();
boc_->set_loader(std::make_unique<vm::CellLoader>(rocks_db_->snapshot(), on_load_callback_)).ensure();
}
void set_in_memory_boc(std::shared_ptr<const vm::DynamicBagOfCellsDb> in_memory_boc) {
CHECK(opts_->get_celldb_in_memory());
Expand All @@ -205,15 +214,28 @@ class CellDb : public CellDbBase {
}
void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise);

CellDb(td::actor::ActorId<RootDb> root_db, std::string path, td::Ref<ValidatorManagerOptions> opts)
: root_db_(root_db), path_(path), opts_(opts) {
CellDb(td::actor::ActorId<RootDb> root_db, std::string path, int obj_id, InMemoryInfo inmem_info,
std::shared_ptr<td::RocksDb> rocks_db, td::RocksDbOptions rdb_opts, td::Ref<ValidatorManagerOptions> opts)
: obj_id_(obj_id)
, inmem_info_(inmem_info)
, rocks_db_(rocks_db)
, rdb_opts_(rdb_opts)
, path_(path)
, root_db_(root_db)
, opts_(opts) {
}

void start_up() override;

private:
td::actor::ActorId<RootDb> root_db_;
// just used to store temporary and then pass to CellDbIn
int obj_id_;
InMemoryInfo inmem_info_;
std::shared_ptr<td::RocksDb> rocks_db_;
td::RocksDbOptions rdb_opts_;

std::string path_;
td::actor::ActorId<RootDb> root_db_;
td::Ref<ValidatorManagerOptions> opts_;

td::actor::ActorOwn<CellDbIn> cell_db_;
Expand Down
39 changes: 34 additions & 5 deletions validator/db/rootdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ void RootDb::store_block_state(BlockHandle handle, td::Ref<ShardState> state,
td::actor::send_closure(b, &ArchiveManager::update_handle, std::move(handle), std::move(P));
}
});
td::actor::send_closure(cell_db_, &CellDb::store_cell, handle->id(), state->root_cell(), std::move(P));
td::actor::send_closure(cell_db_writer_, &CellDb::store_cell, handle->id(), state->root_cell(), std::move(P));
} else {
get_block_state(handle, std::move(promise));
}
Expand All @@ -265,6 +265,9 @@ void RootDb::get_block_state(ConstBlockHandle handle, td::Promise<td::Ref<ShardS
promise.set_error(td::Status::Error(ErrorCode::error, "state already gc'd"));
return;
}

const auto index = reader_index_;
reader_index_ = (reader_index_ + 1) % cell_db_readers_.size();
auto P =
td::PromiseCreator::lambda([handle, promise = std::move(promise)](td::Result<td::Ref<vm::DataCell>> R) mutable {
if (R.is_error()) {
Expand All @@ -275,14 +278,14 @@ void RootDb::get_block_state(ConstBlockHandle handle, td::Promise<td::Ref<ShardS
promise.set_value(S.move_as_ok());
}
});
td::actor::send_closure(cell_db_, &CellDb::load_cell, handle->state(), std::move(P));
td::actor::send_closure(cell_db_readers_[index], &CellDb::load_cell, handle->state(), std::move(P));
} else {
promise.set_error(td::Status::Error(ErrorCode::notready, "state not in db"));
}
}

void RootDb::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise) {
td::actor::send_closure(cell_db_, &CellDb::get_cell_db_reader, std::move(promise));
td::actor::send_closure(cell_db_writer_, &CellDb::get_cell_db_reader, std::move(promise));
}

void RootDb::store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state,
Expand Down Expand Up @@ -414,7 +417,33 @@ void RootDb::get_hardforks(td::Promise<std::vector<BlockIdExt>> promise) {
}

void RootDb::start_up() {
cell_db_ = td::actor::create_actor<CellDb>("celldb", actor_id(this), root_path_ + "/celldb/", opts_);
const auto celldb_path = root_path_ + "/celldb/";

InMemoryInfo inmem_info;
if (opts_->get_celldb_in_memory()) {
inmem_info = InMemoryInfo::create_from_rocksdb(celldb_path);
}

td::RocksDbOptions db_options;
if (!opts_->get_disable_rocksdb_stats()) {
db_options.statistics = td::RocksDb::create_statistics();
db_options.snapshot_statistics = std::make_shared<td::RocksDbSnapshotStatistics>();
}
if (opts_->get_celldb_cache_size()) {
db_options.block_cache = td::RocksDb::create_cache(opts_->get_celldb_cache_size().value());
LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(opts_->get_celldb_cache_size().value());
}
db_options.use_direct_reads = opts_->get_celldb_direct_io();
auto rocks_db = std::make_shared<td::RocksDb>(td::RocksDb::open(celldb_path, db_options).move_as_ok());

cell_db_writer_ = td::actor::create_actor<CellDb>("celldbwriter", actor_id(this), celldb_path, 0, inmem_info,
rocks_db, db_options, opts_);
const auto reader_count = 0 == opts_->get_cpu_threads_count() ? 1 : opts_->get_cpu_threads_count();
for (size_t i = 0; i < reader_count; i++) {
cell_db_readers_.push_back(td::actor::create_actor<CellDb>("celldbreader", actor_id(this), celldb_path, i + 1,
inmem_info, rocks_db, db_options, opts_));
}

state_db_ = td::actor::create_actor<StateDb>("statedb", actor_id(this), root_path_ + "/state/");
static_files_db_ = td::actor::create_actor<StaticFilesDb>("staticfilesdb", actor_id(this), root_path_ + "/static/");
archive_db_ = td::actor::create_actor<ArchiveManager>("archive", actor_id(this), root_path_, opts_);
Expand All @@ -435,7 +464,7 @@ void RootDb::allow_block_gc(BlockIdExt block_id, td::Promise<bool> promise) {

void RootDb::prepare_stats(td::Promise<std::vector<std::pair<std::string, std::string>>> promise) {
auto merger = StatsMerger::create(std::move(promise));
td::actor::send_closure(cell_db_, &CellDb::prepare_stats, merger.make_promise("celldb."));
td::actor::send_closure(cell_db_writer_, &CellDb::prepare_stats, merger.make_promise("celldb."));
}

void RootDb::truncate(BlockSeqno seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise) {
Expand Down
12 changes: 11 additions & 1 deletion validator/db/rootdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,22 @@ class RootDb : public Db {

void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) override;

public:
void update_snapshot() {
// we don't update snapshot of writer because the update action is triggered by writer.
for (const auto& reader : cell_db_readers_) {
td::actor::send_closure(reader, &CellDb::update_snapshot);
}
}

private:
td::actor::ActorId<ValidatorManager> validator_manager_;
std::string root_path_;
td::Ref<ValidatorManagerOptions> opts_;

td::actor::ActorOwn<CellDb> cell_db_;
td::actor::ActorOwn<CellDb> cell_db_writer_;
std::vector<td::actor::ActorOwn<CellDb>> cell_db_readers_;
std::size_t reader_index_ = 0;
td::actor::ActorOwn<StateDb> state_db_;
td::actor::ActorOwn<StaticFilesDb> static_files_db_;
td::actor::ActorOwn<ArchiveManager> archive_db_;
Expand Down
Loading