diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index 8b35b2b1f..56cacdc16 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -1487,6 +1487,8 @@ td::Status ValidatorEngine::load_global_config() { } validator_options_.write().set_fast_state_serializer_enabled(fast_state_serializer_enabled_); + validator_options_.write().set_cpu_threads_count(cpu_threads_count_); + return td::Status::OK(); } @@ -4235,6 +4237,7 @@ int main(int argc, char *argv[]) { threads = v; return td::Status::OK(); }); + acts.push_back([&x, &threads]() { td::actor::send_closure(x, &ValidatorEngine::set_cpu_threads_count, threads); }); p.add_checked_option('u', "user", "change user", [&](td::Slice user) { return td::change_user(user.str()); }); p.add_checked_option('\0', "shutdown-at", "stop validator at the given time (unix timestamp)", [&](td::Slice arg) { TRY_RESULT(at, td::to_integer_safe(arg)); diff --git a/validator-engine/validator-engine.hpp b/validator-engine/validator-engine.hpp index 50cd5a323..61fe7394a 100644 --- a/validator-engine/validator-engine.hpp +++ b/validator-engine/validator-engine.hpp @@ -223,6 +223,8 @@ class ValidatorEngine : public td::actor::Actor { std::string session_logs_file_; bool fast_state_serializer_enabled_ = false; + size_t cpu_threads_count_; + std::set unsafe_catchains_; std::map> unsafe_catchain_rotations_; @@ -310,6 +312,9 @@ class ValidatorEngine : public td::actor::Actor { void set_fast_state_serializer_enabled(bool value) { fast_state_serializer_enabled_ = value; } + void set_cpu_threads_count(size_t cpu_threads_count) { + cpu_threads_count_ = cpu_threads_count; + } void start_up() override; ValidatorEngine() { } @@ -401,8 +406,8 @@ class ValidatorEngine : public td::actor::Actor { void load_custom_overlays_config(); td::Status write_custom_overlays_config(); - void add_custom_overlay_to_config( - ton::tl_object_ptr overlay, td::Promise promise); + void add_custom_overlay_to_config(ton::tl_object_ptr overlay, + td::Promise promise); void del_custom_overlay_from_config(std::string name, td::Promise promise); void load_collator_options(); diff --git a/validator/db/celldb.cpp b/validator/db/celldb.cpp index 9dcecdb35..05471c62d 100644 --- a/validator/db/celldb.cpp +++ b/validator/db/celldb.cpp @@ -68,9 +68,35 @@ void CellDbBase::execute_sync(std::function f) { f(); } -CellDbIn::CellDbIn(td::actor::ActorId root_db, td::actor::ActorId parent, std::string path, +InMemoryInfo InMemoryInfo::create_from_rocksdb(const std::string& path) { + td::RocksDbOptions read_db_options; + read_db_options.use_direct_reads = true; + read_db_options.no_block_cache = true; + read_db_options.block_cache = {}; + LOG(WARNING) << "Loading all cells in memory (because of --celldb-in-memory)"; + td::Timer timer; + auto read_cell_db = std::make_shared(td::RocksDb::open(path, std::move(read_db_options)).move_as_ok()); + auto boc = vm::DynamicBagOfCellsDb::create_in_memory(read_cell_db.get(), {}); + const auto in_memory_load_time = timer.elapsed(); + + return InMemoryInfo{.boc_ = std::move(boc), .in_memory_load_time_ = in_memory_load_time}; +} + +CellDbIn::CellDbIn(td::actor::ActorId root_db, td::actor::ActorId parent, std::string path, int obj_id, + InMemoryInfo inmem_info, std::shared_ptr rocks_db, td::RocksDbOptions rdb_opts, td::Ref opts) - : root_db_(root_db), parent_(parent), path_(std::move(path)), opts_(opts) { + : root_db_(root_db) + , parent_(parent) + , path_(std::move(path)) + , obj_id_(obj_id) + , boc_(inmem_info.boc_) + , cell_db_(rocks_db) + , rocks_db_(rocks_db->raw_db()) + , in_memory_load_time_(inmem_info.in_memory_load_time_) { + statistics_ = rdb_opts.statistics; + statistics_flush_at_ = td::Timestamp::in(60.0); + snapshot_statistics_ = rdb_opts.snapshot_statistics; + opts_ = opts; } void CellDbIn::start_up() { @@ -88,42 +114,13 @@ void CellDbIn::start_up() { }; CellDbBase::start_up(); - td::RocksDbOptions db_options; - if (!opts_->get_disable_rocksdb_stats()) { - statistics_ = td::RocksDb::create_statistics(); - statistics_flush_at_ = td::Timestamp::in(60.0); - snapshot_statistics_ = std::make_shared(); - db_options.snapshot_statistics = snapshot_statistics_; - } - db_options.statistics = statistics_; - if (opts_->get_celldb_cache_size()) { - db_options.block_cache = td::RocksDb::create_cache(opts_->get_celldb_cache_size().value()); - LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(opts_->get_celldb_cache_size().value()); - } - db_options.use_direct_reads = opts_->get_celldb_direct_io(); - if (opts_->get_celldb_in_memory()) { - td::RocksDbOptions read_db_options; - read_db_options.use_direct_reads = true; - read_db_options.no_block_cache = true; - read_db_options.block_cache = {}; - LOG(WARNING) << "Loading all cells in memory (because of --celldb-in-memory)"; - td::Timer timer; - auto read_cell_db = - std::make_shared(td::RocksDb::open(path_, std::move(read_db_options)).move_as_ok()); - boc_ = vm::DynamicBagOfCellsDb::create_in_memory(read_cell_db.get(), {}); - in_memory_load_time_ = timer.elapsed(); - td::actor::send_closure(parent_, &CellDb::set_in_memory_boc, boc_); - } - - auto rocks_db = std::make_shared(td::RocksDb::open(path_, std::move(db_options)).move_as_ok()); - rocks_db_ = rocks_db->raw_db(); - cell_db_ = std::move(rocks_db); if (!opts_->get_celldb_in_memory()) { + CHECK(boc_ == nullptr); boc_ = vm::DynamicBagOfCellsDb::create(); boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth()); boc_->set_loader(std::make_unique(cell_db_->snapshot(), on_load_callback_)).ensure(); - td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot()); + td::actor::send_closure(root_db_, &RootDb::update_snapshot); } alarm_timestamp() = td::Timestamp::in(10.0); @@ -238,7 +235,7 @@ void CellDbIn::store_cell(BlockIdExt block_id, td::Ref cell, td::Promi if (!opts_->get_celldb_in_memory()) { boc_->set_loader(std::make_unique(cell_db_->snapshot(), on_load_callback_)).ensure(); - td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot()); + td::actor::send_closure(root_db_, &RootDb::update_snapshot); } promise.set_result(boc_->load_cell(cell->get_hash().as_slice())); @@ -255,11 +252,10 @@ void CellDbIn::store_cell(BlockIdExt block_id, td::Ref cell, td::Promi void CellDbIn::get_cell_db_reader(td::Promise> promise) { if (db_busy_) { - action_queue_.push( - [self = this, promise = std::move(promise)](td::Result R) mutable { - R.ensure(); - self->get_cell_db_reader(std::move(promise)); - }); + action_queue_.push([self = this, promise = std::move(promise)](td::Result R) mutable { + R.ensure(); + self->get_cell_db_reader(std::move(promise)); + }); return; } promise.set_result(boc_->get_cell_db_reader()); @@ -317,8 +313,8 @@ void CellDbIn::flush_db_stats() { auto stats = td::RocksDb::statistics_to_string(statistics_) + snapshot_statistics_->to_string() + ss.as_cslice().str(); - auto to_file_r = - td::FileFd::open(path_ + "/db_stats.txt", td::FileFd::Truncate | td::FileFd::Create | td::FileFd::Write, 0644); + auto to_file_r = td::FileFd::open(path_ + "/" + std::to_string(obj_id_) + "_db_stats.txt", + td::FileFd::Truncate | td::FileFd::Create | td::FileFd::Write, 0644); if (to_file_r.is_error()) { LOG(ERROR) << "Failed to open db_stats.txt: " << to_file_r.move_as_error(); return; @@ -468,7 +464,7 @@ void CellDbIn::gc_cont2(BlockHandle handle) { td::PerfWarningTimer timer_finish{"gccell_finish", 0.05}; if (!opts_->get_celldb_in_memory()) { boc_->set_loader(std::make_unique(cell_db_->snapshot(), on_load_callback_)).ensure(); - td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot()); + td::actor::send_closure(root_db_, &RootDb::update_snapshot); } DCHECK(get_block(key_hash).is_error()); @@ -583,7 +579,7 @@ void CellDbIn::migrate_cells() { } cell_db_->commit_write_batch().ensure(); boc_->set_loader(std::make_unique(cell_db_->snapshot(), on_load_callback_)).ensure(); - td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot()); + td::actor::send_closure(root_db_, &RootDb::update_snapshot); double time = timer.elapsed(); LOG(DEBUG) << "CellDb migration: migrated=" << migrated << " checked=" << checked << " time=" << time; @@ -653,7 +649,9 @@ void CellDb::start_up() { CellDbBase::start_up(); boc_ = vm::DynamicBagOfCellsDb::create(); boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth()); - cell_db_ = td::actor::create_actor("celldbin", root_db_, actor_id(this), path_, opts_); + + cell_db_ = td::actor::create_actor("celldbin", root_db_, actor_id(this), path_, obj_id_, inmem_info_, + rocks_db_, rdb_opts_, opts_); on_load_callback_ = [actor = std::make_shared>( td::actor::create_actor("celldbmigration", cell_db_.get())), compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) { @@ -666,6 +664,11 @@ void CellDb::start_up() { td::Bits256{res.cell_->get_hash().bits()}); } }; + + if (opts_->get_celldb_in_memory()) { + CHECK(inmem_info_.boc_ != nullptr); + td::actor::send_closure(actor_id(this), &CellDb::set_in_memory_boc, inmem_info_.boc_); + } } CellDbIn::DbEntry::DbEntry(tl_object_ptr entry) diff --git a/validator/db/celldb.hpp b/validator/db/celldb.hpp index 5639b9748..bb49236ae 100644 --- a/validator/db/celldb.hpp +++ b/validator/db/celldb.hpp @@ -58,6 +58,13 @@ class CellDbBase : public td::actor::Actor { friend CellDbAsyncExecutor; }; +struct InMemoryInfo { + static InMemoryInfo create_from_rocksdb(const std::string& path); + + std::shared_ptr boc_; + std::optional in_memory_load_time_; +}; + class CellDbIn : public CellDbBase { public: using KeyHash = td::Bits256; @@ -71,7 +78,8 @@ class CellDbIn : public CellDbBase { void flush_db_stats(); - CellDbIn(td::actor::ActorId root_db, td::actor::ActorId parent, std::string path, + CellDbIn(td::actor::ActorId root_db, td::actor::ActorId parent, std::string path, int obj_id, + InMemoryInfo inmem_info, std::shared_ptr rocks_db, td::RocksDbOptions rdb_opts, td::Ref opts); void start_up() override; @@ -113,6 +121,7 @@ class CellDbIn : public CellDbBase { td::actor::ActorId parent_; std::string path_; + int obj_id_; td::Ref opts_; std::shared_ptr boc_; @@ -187,13 +196,13 @@ class CellDb : public CellDbBase { void prepare_stats(td::Promise>> promise); void load_cell(RootHash hash, td::Promise> promise); void store_cell(BlockIdExt block_id, td::Ref cell, td::Promise> promise); - void update_snapshot(std::unique_ptr snapshot) { + void update_snapshot() { CHECK(!opts_->get_celldb_in_memory()); if (!started_) { alarm(); } started_ = true; - boc_->set_loader(std::make_unique(std::move(snapshot), on_load_callback_)).ensure(); + boc_->set_loader(std::make_unique(rocks_db_->snapshot(), on_load_callback_)).ensure(); } void set_in_memory_boc(std::shared_ptr in_memory_boc) { CHECK(opts_->get_celldb_in_memory()); @@ -205,15 +214,28 @@ class CellDb : public CellDbBase { } void get_cell_db_reader(td::Promise> promise); - CellDb(td::actor::ActorId root_db, std::string path, td::Ref opts) - : root_db_(root_db), path_(path), opts_(opts) { + CellDb(td::actor::ActorId root_db, std::string path, int obj_id, InMemoryInfo inmem_info, + std::shared_ptr rocks_db, td::RocksDbOptions rdb_opts, td::Ref opts) + : obj_id_(obj_id) + , inmem_info_(inmem_info) + , rocks_db_(rocks_db) + , rdb_opts_(rdb_opts) + , path_(path) + , root_db_(root_db) + , opts_(opts) { } void start_up() override; private: - td::actor::ActorId root_db_; + // just used to store temporary and then pass to CellDbIn + int obj_id_; + InMemoryInfo inmem_info_; + std::shared_ptr rocks_db_; + td::RocksDbOptions rdb_opts_; + std::string path_; + td::actor::ActorId root_db_; td::Ref opts_; td::actor::ActorOwn cell_db_; diff --git a/validator/db/rootdb.cpp b/validator/db/rootdb.cpp index bb5d767fd..eaeaed0d7 100644 --- a/validator/db/rootdb.cpp +++ b/validator/db/rootdb.cpp @@ -253,7 +253,7 @@ void RootDb::store_block_state(BlockHandle handle, td::Ref state, td::actor::send_closure(b, &ArchiveManager::update_handle, std::move(handle), std::move(P)); } }); - td::actor::send_closure(cell_db_, &CellDb::store_cell, handle->id(), state->root_cell(), std::move(P)); + td::actor::send_closure(cell_db_writer_, &CellDb::store_cell, handle->id(), state->root_cell(), std::move(P)); } else { get_block_state(handle, std::move(promise)); } @@ -265,6 +265,9 @@ void RootDb::get_block_state(ConstBlockHandle handle, td::Promise> R) mutable { if (R.is_error()) { @@ -275,14 +278,14 @@ void RootDb::get_block_state(ConstBlockHandle handle, td::Promisestate(), std::move(P)); + td::actor::send_closure(cell_db_readers_[index], &CellDb::load_cell, handle->state(), std::move(P)); } else { promise.set_error(td::Status::Error(ErrorCode::notready, "state not in db")); } } void RootDb::get_cell_db_reader(td::Promise> promise) { - td::actor::send_closure(cell_db_, &CellDb::get_cell_db_reader, std::move(promise)); + td::actor::send_closure(cell_db_writer_, &CellDb::get_cell_db_reader, std::move(promise)); } void RootDb::store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, @@ -414,7 +417,33 @@ void RootDb::get_hardforks(td::Promise> promise) { } void RootDb::start_up() { - cell_db_ = td::actor::create_actor("celldb", actor_id(this), root_path_ + "/celldb/", opts_); + const auto celldb_path = root_path_ + "/celldb/"; + + InMemoryInfo inmem_info; + if (opts_->get_celldb_in_memory()) { + inmem_info = InMemoryInfo::create_from_rocksdb(celldb_path); + } + + td::RocksDbOptions db_options; + if (!opts_->get_disable_rocksdb_stats()) { + db_options.statistics = td::RocksDb::create_statistics(); + db_options.snapshot_statistics = std::make_shared(); + } + if (opts_->get_celldb_cache_size()) { + db_options.block_cache = td::RocksDb::create_cache(opts_->get_celldb_cache_size().value()); + LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(opts_->get_celldb_cache_size().value()); + } + db_options.use_direct_reads = opts_->get_celldb_direct_io(); + auto rocks_db = std::make_shared(td::RocksDb::open(celldb_path, db_options).move_as_ok()); + + cell_db_writer_ = td::actor::create_actor("celldbwriter", actor_id(this), celldb_path, 0, inmem_info, + rocks_db, db_options, opts_); + const auto reader_count = 0 == opts_->get_cpu_threads_count() ? 1 : opts_->get_cpu_threads_count(); + for (size_t i = 0; i < reader_count; i++) { + cell_db_readers_.push_back(td::actor::create_actor("celldbreader", actor_id(this), celldb_path, i + 1, + inmem_info, rocks_db, db_options, opts_)); + } + state_db_ = td::actor::create_actor("statedb", actor_id(this), root_path_ + "/state/"); static_files_db_ = td::actor::create_actor("staticfilesdb", actor_id(this), root_path_ + "/static/"); archive_db_ = td::actor::create_actor("archive", actor_id(this), root_path_, opts_); @@ -435,7 +464,7 @@ void RootDb::allow_block_gc(BlockIdExt block_id, td::Promise promise) { void RootDb::prepare_stats(td::Promise>> promise) { auto merger = StatsMerger::create(std::move(promise)); - td::actor::send_closure(cell_db_, &CellDb::prepare_stats, merger.make_promise("celldb.")); + td::actor::send_closure(cell_db_writer_, &CellDb::prepare_stats, merger.make_promise("celldb.")); } void RootDb::truncate(BlockSeqno seqno, ConstBlockHandle handle, td::Promise promise) { diff --git a/validator/db/rootdb.hpp b/validator/db/rootdb.hpp index 755ff2578..9fde8e456 100644 --- a/validator/db/rootdb.hpp +++ b/validator/db/rootdb.hpp @@ -139,12 +139,22 @@ class RootDb : public Db { void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) override; + public: + void update_snapshot() { + // we don't update snapshot of writer because the update action is triggered by writer. + for (const auto& reader : cell_db_readers_) { + td::actor::send_closure(reader, &CellDb::update_snapshot); + } + } + private: td::actor::ActorId validator_manager_; std::string root_path_; td::Ref opts_; - td::actor::ActorOwn cell_db_; + td::actor::ActorOwn cell_db_writer_; + std::vector> cell_db_readers_; + std::size_t reader_index_ = 0; td::actor::ActorOwn state_db_; td::actor::ActorOwn static_files_db_; td::actor::ActorOwn archive_db_; diff --git a/validator/validator-options.hpp b/validator/validator-options.hpp index 203aa5ebc..cc1164b33 100644 --- a/validator/validator-options.hpp +++ b/validator/validator-options.hpp @@ -156,6 +156,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { bool get_fast_state_serializer_enabled() const override { return fast_state_serializer_enabled_; } + size_t get_cpu_threads_count() const override { + return cpu_threads_count_; + } void set_zero_block_id(BlockIdExt block_id) override { zero_block_id_ = block_id; @@ -197,7 +200,8 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { unsafe_catchains_.insert(seqno); } void add_unsafe_catchain_rotate(BlockSeqno seqno, CatchainSeqno cc_seqno, td::uint32 value) override { - VLOG(INFO) << "Add unsafe catchain rotation: Master block seqno " << seqno<<" Catchain seqno " << cc_seqno << " New value "<< value; + VLOG(INFO) << "Add unsafe catchain rotation: Master block seqno " << seqno << " Catchain seqno " << cc_seqno + << " New value " << value; unsafe_catchain_rotates_[cc_seqno] = std::make_pair(seqno, value); } void truncate_db(BlockSeqno seqno) override { @@ -251,6 +255,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { void set_fast_state_serializer_enabled(bool value) override { fast_state_serializer_enabled_ = value; } + void set_cpu_threads_count(size_t cpu_threads_count) override { + cpu_threads_count_ = cpu_threads_count; + } ValidatorManagerOptionsImpl *make_copy() const override { return new ValidatorManagerOptionsImpl(*this); @@ -258,9 +265,8 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { ValidatorManagerOptionsImpl(BlockIdExt zero_block_id, BlockIdExt init_block_id, std::function check_shard, - bool allow_blockchain_init, double sync_blocks_before, - double block_ttl, double state_ttl, double max_mempool_num, - double archive_ttl, double key_proof_ttl, + bool allow_blockchain_init, double sync_blocks_before, double block_ttl, double state_ttl, + double max_mempool_num, double archive_ttl, double key_proof_ttl, bool initial_sync_disabled) : zero_block_id_(zero_block_id) , init_block_id_(init_block_id) @@ -306,6 +312,7 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { bool state_serializer_enabled_ = true; td::Ref collator_options_{true}; bool fast_state_serializer_enabled_ = false; + size_t cpu_threads_count_; }; } // namespace validator diff --git a/validator/validator.h b/validator/validator.h index 9dbaa185f..b0d36e478 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -48,7 +48,7 @@ class DownloadToken { struct PerfTimerStats { std::string name; - std::deque> stats; // + std::deque> stats; // }; struct CollatorOptions : public td::CntObject { @@ -115,6 +115,7 @@ struct ValidatorManagerOptions : public td::CntObject { virtual bool get_state_serializer_enabled() const = 0; virtual td::Ref get_collator_options() const = 0; virtual bool get_fast_state_serializer_enabled() const = 0; + virtual size_t get_cpu_threads_count() const = 0; virtual void set_zero_block_id(BlockIdExt block_id) = 0; virtual void set_init_block_id(BlockIdExt block_id) = 0; @@ -148,6 +149,7 @@ struct ValidatorManagerOptions : public td::CntObject { virtual void set_state_serializer_enabled(bool value) = 0; virtual void set_collator_options(td::Ref value) = 0; virtual void set_fast_state_serializer_enabled(bool value) = 0; + virtual void set_cpu_threads_count(size_t cpu_threads_count) = 0; static td::Ref create( BlockIdExt zero_block_id, BlockIdExt init_block_id, @@ -155,8 +157,7 @@ struct ValidatorManagerOptions : public td::CntObject { ShardCheckMode) { return true; }, bool allow_blockchain_init = false, double sync_blocks_before = 3600, double block_ttl = 86400, double state_ttl = 86400, double archive_ttl = 86400 * 7, double key_proof_ttl = 86400 * 3650, - double max_mempool_num = 999999, - bool initial_sync_disabled = false); + double max_mempool_num = 999999, bool initial_sync_disabled = false); }; class ValidatorManagerInterface : public td::actor::Actor {