Skip to content

Commit

Permalink
Limit batch size for block processor (#4655)
Browse files Browse the repository at this point in the history
* Limited batch sizes for block processor

* Remove legacy test
  • Loading branch information
pwojcikdev authored Jul 3, 2024
1 parent 873db29 commit 339c819
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 78 deletions.
47 changes: 0 additions & 47 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2647,53 +2647,6 @@ TEST (node, block_processor_full)
ASSERT_TIMELY (5s, node.block_processor.full ());
}

TEST (node, block_processor_half_full)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 6;
node_flags.force_use_write_queue = true;
auto & node = *system.add_node (nano::node_config (system.get_available_port ()), node_flags);
nano::state_block_builder builder;
auto send1 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*node.work_generate_blocking (nano::dev::genesis->hash ()))
.build ();
auto send2 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (send1->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - 2 * nano::Gxrb_ratio)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*node.work_generate_blocking (send1->hash ()))
.build ();
auto send3 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (send2->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - 3 * nano::Gxrb_ratio)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*node.work_generate_blocking (send2->hash ()))
.build ();
// The write guard prevents block processor doing any writes
auto write_guard = node.store.write_queue.wait (nano::store::writer::testing);
node.block_processor.add (send1);
ASSERT_FALSE (node.block_processor.half_full ());
node.block_processor.add (send2);
ASSERT_FALSE (node.block_processor.half_full ());
node.block_processor.add (send3);
// Block processor may be not half_full during state blocks signatures verification
ASSERT_TIMELY (2s, node.block_processor.half_full ());
ASSERT_FALSE (node.block_processor.full ());
}

TEST (node, confirm_back)
{
nano::test::system system (1);
Expand Down
67 changes: 38 additions & 29 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,13 @@ void nano::block_processor::run ()
{
if (!queue.empty ())
{
lock.unlock ();
// TODO: Cleaner periodical logging
if (should_log ())
{
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
queue.size ({ nano::block_source::forced }));
}

auto processed = process_batch (lock);
debug_assert (!lock.owns_lock ());
Expand Down Expand Up @@ -291,40 +297,47 @@ auto nano::block_processor::next () -> context
release_assert (false, "next() called when no blocks are ready");
}

auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock_a) -> processed_batch_t
auto nano::block_processor::next_batch (size_t max_count) -> std::deque<context>
{
processed_batch_t processed;
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());

auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::blockprocessor);
nano::timer<std::chrono::milliseconds> timer_l;
queue.periodic_update ();

lock_a.lock ();
std::deque<context> results;
while (!queue.empty () && results.size () < max_count)
{
results.push_back (next ());
}
return results;
}

queue.periodic_update ();
auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock) -> processed_batch_t
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());

auto batch = next_batch (256);

lock.unlock ();

timer_l.start ();
auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::blockprocessor);

nano::timer<std::chrono::milliseconds> timer;
timer.start ();

// Processing blocks
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); };
auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; };
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };
size_t number_of_blocks_processed = 0;
size_t number_of_forced_processed = 0;

while (!queue.empty () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
processed_batch_t processed;
for (auto & ctx : batch)
{
// TODO: Cleaner periodical logging
if (should_log ())
{
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
queue.size ({ nano::block_source::forced }));
}

auto ctx = next ();
auto const hash = ctx.block->hash ();
bool const force = ctx.source == nano::block_source::forced;

lock_a.unlock ();
transaction.refresh_if_needed ();

if (force)
{
Expand All @@ -336,15 +349,11 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock

auto result = process_one (transaction, ctx, force);
processed.emplace_back (result, std::move (ctx));

lock_a.lock ();
}

lock_a.unlock ();

if (number_of_blocks_processed != 0 && timer_l.stop () > std::chrono::milliseconds (100))
if (number_of_blocks_processed != 0 && timer.stop () > std::chrono::milliseconds (100))
{
node.logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer_l.value ().count (), timer_l.unit ());
node.logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ());
}

return processed;
Expand Down
7 changes: 5 additions & 2 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,15 @@ class block_processor final

std::size_t size () const;
std::size_t size (block_source) const;
bool full () const;
bool half_full () const;
bool add (std::shared_ptr<nano::block> const &, block_source = block_source::live, std::shared_ptr<nano::transport::channel> const & channel = nullptr);
std::optional<nano::block_status> add_blocking (std::shared_ptr<nano::block> const & block, block_source);
void force (std::shared_ptr<nano::block> const &);
bool should_log ();

// TODO: Remove, used by legacy bootstrap
bool full () const;
bool half_full () const;

std::unique_ptr<container_info_component> collect_container_info (std::string const & name);

std::atomic<bool> flushing{ false };
Expand All @@ -120,6 +122,7 @@ class block_processor final
nano::block_status process_one (secure::write_transaction const &, context const &, bool forced = false);
void queue_unchecked (secure::write_transaction const &, nano::hash_or_account const &);
processed_batch_t process_batch (nano::unique_lock<nano::mutex> &);
std::deque<context> next_batch (size_t max_count);
context next ();
bool add_impl (context, std::shared_ptr<nano::transport::channel> const & channel = nullptr);

Expand Down

0 comments on commit 339c819

Please sign in to comment.