Skip to content

Commit

Permalink
Squash merge PR nanocurrency#4662: Incremental backoff for `local_blo…
Browse files Browse the repository at this point in the history
…ck_broadcaster`
  • Loading branch information
gr0vity-dev committed Jul 2, 2024
1 parent 29fc179 commit b139035
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 77 deletions.
39 changes: 39 additions & 0 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3789,3 +3789,42 @@ TEST (node, process_local_overflow)
auto result = node.process_local (send1);
ASSERT_FALSE (result);
}

TEST (node, local_block_broadcast)
{
nano::test::system system;

// Disable active elections to prevent the block from being broadcasted by the election
auto node_config = system.default_config ();
node_config.active_elections.size = 0;
node_config.local_block_broadcaster.rebroadcast_interval = 1s;
auto & node1 = *system.add_node (node_config);
auto & node2 = *system.make_disconnected_node ();

nano::keypair key1;
nano::send_block_builder builder;
auto latest_hash = nano::dev::genesis->hash ();
auto send1 = builder.make_block ()
.previous (latest_hash)
.destination (key1.pub)
.balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (latest_hash))
.build ();

auto result = node1.process_local (send1);
ASSERT_TRUE (result);
ASSERT_NEVER (500ms, node1.active.active (send1->qualified_root ()));

// Wait until a broadcast is attempted
ASSERT_TIMELY_EQ (5s, node1.local_block_broadcaster.size (), 1);
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::local_block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out) >= 1);

// The other node should not have received the block
ASSERT_NEVER (500ms, node2.block (send1->hash ()));

// Connect the nodes and check that the block is propagated
node1.network.merge_peer (node2.network.endpoint ());
ASSERT_TIMELY (5s, node1.network.find_node_id (node2.get_node_id ()));
ASSERT_TIMELY (10s, node2.block (send1->hash ()));
}
1 change: 1 addition & 0 deletions nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ enum class type
peer_history,
message_processor,
online_reps,
local_block_broadcaster,

// bootstrap
bulk_pull_client,
Expand Down
2 changes: 2 additions & 0 deletions nano/node/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
namespace nano
{
class active_elections;
class confirming_set;
class ledger;
class local_block_broadcaster;
class local_vote_history;
class logger;
class network;
Expand Down
146 changes: 101 additions & 45 deletions nano/node/local_block_broadcaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/local_block_broadcaster.hpp>
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>
#include <nano/secure/ledger.hpp>

nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::stats & stats_a, bool enabled_a) :
nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_config const & config_a, nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a, bool enabled_a) :
config{ config_a },
node{ node_a },
block_processor{ block_processor_a },
network{ network_a },
confirming_set{ confirming_set_a },
stats{ stats_a },
enabled{ enabled_a }
logger{ logger_a },
enabled{ enabled_a },
limiter{ config.broadcast_rate_limit, config.broadcast_rate_burst_ratio }
{
if (!enabled)
{
Expand All @@ -26,9 +31,20 @@ nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nan
// Only rebroadcast local blocks that were successfully processed (no forks or gaps)
if (result == nano::block_status::progress && context.source == nano::block_source::local)
{
release_assert (context.block != nullptr);

nano::lock_guard<nano::mutex> guard{ mutex };

local_blocks.emplace_back (local_entry{ context.block, std::chrono::steady_clock::now () });
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::insert);

// Erase oldest blocks if the queue gets too big
while (local_blocks.size () > config.max_size)
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_oldest);
local_blocks.pop_front ();
}

should_notify = true;
}
}
Expand All @@ -41,7 +57,13 @@ nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nan
block_processor.rolled_back.add ([this] (auto const & block) {
nano::lock_guard<nano::mutex> guard{ mutex };
auto erased = local_blocks.get<tag_hash> ().erase (block->hash ());
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, nano::stat::dir::in, erased);
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased);
});

confirming_set.cemented_observers.add ([this] (auto const & block) {
nano::lock_guard<nano::mutex> guard{ mutex };
auto erased = local_blocks.get<tag_hash> ().erase (block->hash ());
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::cemented, erased);
});
}

Expand Down Expand Up @@ -76,44 +98,77 @@ void nano::local_block_broadcaster::stop ()
nano::join_or_pass (thread);
}

size_t nano::local_block_broadcaster::size () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return local_blocks.size ();
}

void nano::local_block_broadcaster::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop);

condition.wait_for (lock, check_interval);
condition.wait_for (lock, 1s);
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds

if (!stopped)
if (!stopped && !local_blocks.empty ())
{
cleanup ();
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop);

if (cleanup_interval.elapsed (config.cleanup_interval))
{
cleanup (lock);
debug_assert (lock.owns_lock ());
}

run_broadcasts (lock);
debug_assert (lock.owns_lock ());
debug_assert (!lock.owns_lock ());
lock.lock ();
}
}
}

std::chrono::milliseconds nano::local_block_broadcaster::rebroadcast_interval (unsigned rebroadcasts) const
{
return std::min (config.rebroadcast_interval * rebroadcasts, config.max_rebroadcast_interval);
}

void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());

std::vector<std::shared_ptr<nano::block>> to_broadcast;
std::deque<local_entry> to_broadcast;

auto const now = std::chrono::steady_clock::now ();
for (auto & entry : local_blocks)

// Iterate blocks with next_broadcast <= now
auto & by_broadcast = local_blocks.get<tag_broadcast> ();
for (auto const & entry : boost::make_iterator_range (by_broadcast.begin (), by_broadcast.upper_bound (now)))
{
if (elapsed (entry.last_broadcast, broadcast_interval, now))
{
debug_assert (entry.next_broadcast <= now);
release_assert (entry.block != nullptr);
to_broadcast.push_back (entry);
}

// Modify multi index container outside of the loop to avoid invalidating iterators
auto & by_hash = local_blocks.get<tag_hash> ();
for (auto const & entry : to_broadcast)
{
auto it = by_hash.find (entry.hash ());
release_assert (it != by_hash.end ());
bool success = by_hash.modify (it, [this, now] (auto & entry) {
entry.rebroadcasts += 1;
entry.last_broadcast = now;
to_broadcast.push_back (entry.block);
}
entry.next_broadcast = now + rebroadcast_interval (entry.rebroadcasts);
});
release_assert (success, "modify failed"); // Should never fail
}

lock.unlock ();

for (auto const & block : to_broadcast)
for (auto const & entry : to_broadcast)
{
while (!limiter.should_pass (1))
{
Expand All @@ -124,41 +179,47 @@ void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock<nano::mute
}
}

stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out);
logger.debug (nano::log::type::local_block_broadcaster, "Broadcasting block: {} (rebroadcasts so far: {})",
entry.block->hash ().to_string (),
entry.rebroadcasts);

network.flood_block_initial (block);
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out);
network.flood_block_initial (entry.block);
}

lock.lock ();
}

void nano::local_block_broadcaster::cleanup ()
void nano::local_block_broadcaster::cleanup (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (!mutex.try_lock ());

// Erase oldest blocks if the queue gets too big
while (local_blocks.size () > max_size)
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_oldest);
local_blocks.pop_front ();
}
// Copy the local blocks to avoid holding the mutex during IO
auto local_blocks_copy = local_blocks;

// TODO: Mutex is held during IO, but it should be fine since it's not performance critical
auto transaction = node.ledger.tx_begin_read ();
erase_if (local_blocks, [this, &transaction] (auto const & entry) {
transaction.refresh_if_needed ();
lock.unlock ();

if (entry.last_broadcast == std::chrono::steady_clock::time_point{})
std::set<nano::block_hash> already_confirmed;
{
auto transaction = node.ledger.tx_begin_read ();
for (auto const & entry : local_blocks_copy)
{
// This block has never been broadcasted, keep it so it's broadcasted at least once
return false;
}
if (node.block_confirmed_or_being_confirmed (transaction, entry.block->hash ()))
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_confirmed);
return true;
if (entry.last_broadcast == std::chrono::steady_clock::time_point{})
{
continue;
}
if (node.block_confirmed_or_being_confirmed (transaction, entry.block->hash ()))
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::already_confirmed);
already_confirmed.insert (entry.block->hash ());
}
}
return false;
}

lock.lock ();

// Erase blocks that have been confirmed
erase_if (local_blocks, [&already_confirmed] (auto const & entry) {
return already_confirmed.contains (entry.block->hash ());
});
}

Expand All @@ -169,9 +230,4 @@ std::unique_ptr<nano::container_info_component> nano::local_block_broadcaster::c
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "local", local_blocks.size (), sizeof (decltype (local_blocks)::value_type) }));
return composite;
}

nano::block_hash nano::local_block_broadcaster::local_entry::hash () const
{
return block->hash ();
}
}
Loading

0 comments on commit b139035

Please sign in to comment.