diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 6bff227689..f16c8571e0 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3789,3 +3789,42 @@ TEST (node, process_local_overflow) auto result = node.process_local (send1); ASSERT_FALSE (result); } + +TEST (node, local_block_broadcast) +{ + nano::test::system system; + + // Disable active elections to prevent the block from being broadcasted by the election + auto node_config = system.default_config (); + node_config.active_elections.size = 0; + node_config.local_block_broadcaster.rebroadcast_interval = 1s; + auto & node1 = *system.add_node (node_config); + auto & node2 = *system.make_disconnected_node (); + + nano::keypair key1; + nano::send_block_builder builder; + auto latest_hash = nano::dev::genesis->hash (); + auto send1 = builder.make_block () + .previous (latest_hash) + .destination (key1.pub) + .balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (latest_hash)) + .build (); + + auto result = node1.process_local (send1); + ASSERT_TRUE (result); + ASSERT_NEVER (500ms, node1.active.active (send1->qualified_root ())); + + // Wait until a broadcast is attempted + ASSERT_TIMELY_EQ (5s, node1.local_block_broadcaster.size (), 1); + ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::local_block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out) >= 1); + + // The other node should not have received the block + ASSERT_NEVER (500ms, node2.block (send1->hash ())); + + // Connect the nodes and check that the block is propagated + node1.network.merge_peer (node2.network.endpoint ()); + ASSERT_TIMELY (5s, node1.network.find_node_id (node2.get_node_id ())); + ASSERT_TIMELY (10s, node2.block (send1->hash ())); +} \ No newline at end of file diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index 2609248c9c..52b409b746 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -80,6 +80,7 @@ enum class type peer_history, message_processor, online_reps, + local_block_broadcaster, // bootstrap bulk_pull_client, diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 0da465ae22..4a0f903a8a 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -7,7 +7,9 @@ namespace nano { class active_elections; +class confirming_set; class ledger; +class local_block_broadcaster; class local_vote_history; class logger; class network; diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index 4e97d1eb50..2a530712d4 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -2,17 +2,22 @@ #include #include #include +#include #include #include #include #include -nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::stats & stats_a, bool enabled_a) : +nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_config const & config_a, nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a, bool enabled_a) : + config{ config_a }, node{ node_a }, block_processor{ block_processor_a }, network{ network_a }, + confirming_set{ confirming_set_a }, stats{ stats_a }, - enabled{ enabled_a } + logger{ logger_a }, + enabled{ enabled_a }, + limiter{ config.broadcast_rate_limit, config.broadcast_rate_burst_ratio } { if (!enabled) { @@ -26,9 +31,20 @@ nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nan // Only rebroadcast local blocks that were successfully processed (no forks or gaps) if (result == nano::block_status::progress && context.source == nano::block_source::local) { + release_assert (context.block != nullptr); + nano::lock_guard guard{ mutex }; + local_blocks.emplace_back (local_entry{ context.block, std::chrono::steady_clock::now () }); stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::insert); + + // Erase oldest blocks if the queue gets too big + while (local_blocks.size () > config.max_size) + { + stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_oldest); + local_blocks.pop_front (); + } + should_notify = true; } } @@ -41,7 +57,13 @@ nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nan block_processor.rolled_back.add ([this] (auto const & block) { nano::lock_guard guard{ mutex }; auto erased = local_blocks.get ().erase (block->hash ()); - stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, nano::stat::dir::in, erased); + stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased); + }); + + confirming_set.cemented_observers.add ([this] (auto const & block) { + nano::lock_guard guard{ mutex }; + auto erased = local_blocks.get ().erase (block->hash ()); + stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::cemented, erased); }); } @@ -76,44 +98,77 @@ void nano::local_block_broadcaster::stop () nano::join_or_pass (thread); } +size_t nano::local_block_broadcaster::size () const +{ + nano::lock_guard lock{ mutex }; + return local_blocks.size (); +} + void nano::local_block_broadcaster::run () { nano::unique_lock lock{ mutex }; while (!stopped) { - stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop); - - condition.wait_for (lock, check_interval); + condition.wait_for (lock, 1s); debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds - if (!stopped) + if (!stopped && !local_blocks.empty ()) { - cleanup (); + stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop); + + if (cleanup_interval.elapsed (config.cleanup_interval)) + { + cleanup (lock); + debug_assert (lock.owns_lock ()); + } + run_broadcasts (lock); - debug_assert (lock.owns_lock ()); + debug_assert (!lock.owns_lock ()); + lock.lock (); } } } +std::chrono::milliseconds nano::local_block_broadcaster::rebroadcast_interval (unsigned rebroadcasts) const +{ + return std::min (config.rebroadcast_interval * rebroadcasts, config.max_rebroadcast_interval); +} + void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock & lock) { debug_assert (lock.owns_lock ()); + debug_assert (!mutex.try_lock ()); - std::vector> to_broadcast; + std::deque to_broadcast; auto const now = std::chrono::steady_clock::now (); - for (auto & entry : local_blocks) + + // Iterate blocks with next_broadcast <= now + auto & by_broadcast = local_blocks.get (); + for (auto const & entry : boost::make_iterator_range (by_broadcast.begin (), by_broadcast.upper_bound (now))) { - if (elapsed (entry.last_broadcast, broadcast_interval, now)) - { + debug_assert (entry.next_broadcast <= now); + release_assert (entry.block != nullptr); + to_broadcast.push_back (entry); + } + + // Modify multi index container outside of the loop to avoid invalidating iterators + auto & by_hash = local_blocks.get (); + for (auto const & entry : to_broadcast) + { + auto it = by_hash.find (entry.hash ()); + release_assert (it != by_hash.end ()); + bool success = by_hash.modify (it, [this, now] (auto & entry) { + entry.rebroadcasts += 1; entry.last_broadcast = now; - to_broadcast.push_back (entry.block); - } + entry.next_broadcast = now + rebroadcast_interval (entry.rebroadcasts); + }); + release_assert (success, "modify failed"); // Should never fail } lock.unlock (); - for (auto const & block : to_broadcast) + for (auto const & entry : to_broadcast) { while (!limiter.should_pass (1)) { @@ -124,41 +179,47 @@ void nano::local_block_broadcaster::run_broadcasts (nano::unique_lockhash ().to_string (), + entry.rebroadcasts); - network.flood_block_initial (block); + stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out); + network.flood_block_initial (entry.block); } - - lock.lock (); } -void nano::local_block_broadcaster::cleanup () +void nano::local_block_broadcaster::cleanup (nano::unique_lock & lock) { debug_assert (!mutex.try_lock ()); - // Erase oldest blocks if the queue gets too big - while (local_blocks.size () > max_size) - { - stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_oldest); - local_blocks.pop_front (); - } + // Copy the local blocks to avoid holding the mutex during IO + auto local_blocks_copy = local_blocks; - // TODO: Mutex is held during IO, but it should be fine since it's not performance critical - auto transaction = node.ledger.tx_begin_read (); - erase_if (local_blocks, [this, &transaction] (auto const & entry) { - transaction.refresh_if_needed (); + lock.unlock (); - if (entry.last_broadcast == std::chrono::steady_clock::time_point{}) + std::set already_confirmed; + { + auto transaction = node.ledger.tx_begin_read (); + for (auto const & entry : local_blocks_copy) { // This block has never been broadcasted, keep it so it's broadcasted at least once - return false; - } - if (node.block_confirmed_or_being_confirmed (transaction, entry.block->hash ())) - { - stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_confirmed); - return true; + if (entry.last_broadcast == std::chrono::steady_clock::time_point{}) + { + continue; + } + if (node.block_confirmed_or_being_confirmed (transaction, entry.block->hash ())) + { + stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::already_confirmed); + already_confirmed.insert (entry.block->hash ()); + } } - return false; + } + + lock.lock (); + + // Erase blocks that have been confirmed + erase_if (local_blocks, [&already_confirmed] (auto const & entry) { + return already_confirmed.contains (entry.block->hash ()); }); } @@ -169,9 +230,4 @@ std::unique_ptr nano::local_block_broadcaster::c auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "local", local_blocks.size (), sizeof (decltype (local_blocks)::value_type) })); return composite; -} - -nano::block_hash nano::local_block_broadcaster::local_entry::hash () const -{ - return block->hash (); -} +} \ No newline at end of file diff --git a/nano/node/local_block_broadcaster.hpp b/nano/node/local_block_broadcaster.hpp index b4e7093dd6..e86af05a9e 100644 --- a/nano/node/local_block_broadcaster.hpp +++ b/nano/node/local_block_broadcaster.hpp @@ -1,9 +1,11 @@ #pragma once +#include #include #include #include #include +#include #include #include @@ -20,63 +22,89 @@ namespace mi = boost::multi_index; namespace nano { -class node; -class network; -} - -namespace nano +class local_block_broadcaster_config final { +public: + explicit local_block_broadcaster_config (nano::network_constants const & network) + { + if (network.is_dev_network ()) + { + rebroadcast_interval = 1s; + cleanup_interval = 1s; + } + } + + // TODO: Serialization & deserialization + +public: + std::size_t max_size{ 1024 * 8 }; + std::chrono::seconds rebroadcast_interval{ 3 }; + std::chrono::seconds max_rebroadcast_interval{ 60 }; + std::size_t broadcast_rate_limit{ 32 }; + double broadcast_rate_burst_ratio{ 3 }; + std::chrono::seconds cleanup_interval{ 60 }; +}; + /** * Broadcasts blocks to the network * Tracks local blocks for more aggressive propagation */ -class local_block_broadcaster +class local_block_broadcaster final { - enum class broadcast_strategy - { - normal, - aggressive, - }; - public: - local_block_broadcaster (nano::node &, nano::block_processor &, nano::network &, nano::stats &, bool enabled = false); + local_block_broadcaster (local_block_broadcaster_config const &, nano::node &, nano::block_processor &, nano::network &, nano::confirming_set &, nano::stats &, nano::logger &, bool enabled = false); ~local_block_broadcaster (); void start (); void stop (); + size_t size () const; + std::unique_ptr collect_container_info (std::string const & name) const; private: void run (); void run_broadcasts (nano::unique_lock &); - void cleanup (); + void cleanup (nano::unique_lock &); + std::chrono::milliseconds rebroadcast_interval (unsigned rebroadcasts) const; private: // Dependencies + local_block_broadcaster_config const & config; nano::node & node; nano::block_processor & block_processor; nano::network & network; + nano::confirming_set & confirming_set; nano::stats & stats; + nano::logger & logger; private: struct local_entry { - std::shared_ptr const block; - std::chrono::steady_clock::time_point const arrival; - mutable std::chrono::steady_clock::time_point last_broadcast{}; // Not part of any index + std::shared_ptr block; + std::chrono::steady_clock::time_point arrival; + + std::chrono::steady_clock::time_point last_broadcast{}; + std::chrono::steady_clock::time_point next_broadcast{}; + unsigned rebroadcasts{ 0 }; - nano::block_hash hash () const; + nano::block_hash hash () const + { + return block->hash (); + } }; // clang-format off class tag_sequenced {}; class tag_hash {}; + class tag_broadcast {}; using ordered_locals = boost::multi_index_container>, mi::hashed_unique, - mi::const_mem_fun> + mi::const_mem_fun>, + mi::ordered_non_unique, + mi::member> >>; // clang-format on @@ -84,19 +112,12 @@ class local_block_broadcaster private: bool enabled{ false }; - - nano::bandwidth_limiter limiter{ broadcast_rate_limit, broadcast_rate_burst_ratio }; + nano::bandwidth_limiter limiter; + nano::interval cleanup_interval; std::atomic stopped{ false }; nano::condition_variable condition; mutable nano::mutex mutex; std::thread thread; - - // TODO: Make these configurable - static std::size_t constexpr max_size{ 1024 * 8 }; - static std::chrono::seconds constexpr check_interval{ 30 }; - static std::chrono::seconds constexpr broadcast_interval{ 60 }; - static std::size_t constexpr broadcast_rate_limit{ 32 }; - static double constexpr broadcast_rate_burst_ratio{ 3 }; }; } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 84b2838333..4e897c3a97 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -219,7 +220,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy ascendboot{ config, block_processor, ledger, network, stats }, websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger }, epoch_upgrader{ *this, ledger, store, network_params, logger }, - local_block_broadcaster{ *this, block_processor, network, stats, !flags.disable_block_processor_republishing }, + local_block_broadcaster_impl{ std::make_unique (config.local_block_broadcaster, *this, block_processor, network, confirming_set, stats, logger, !flags.disable_block_processor_republishing) }, + local_block_broadcaster{ *local_block_broadcaster_impl }, process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket }, peer_history_impl{ std::make_unique (config.peer_history, store, network, logger, stats) }, peer_history{ *peer_history_impl }, diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 5dd18903fd..164b6260ea 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -211,7 +210,8 @@ class node final : public std::enable_shared_from_this nano::bootstrap_ascending::service ascendboot; nano::websocket_server websocket; nano::epoch_upgrader epoch_upgrader; - nano::local_block_broadcaster local_block_broadcaster; + std::unique_ptr local_block_broadcaster_impl; + nano::local_block_broadcaster & local_block_broadcaster; nano::process_live_dispatcher process_live_dispatcher; std::unique_ptr peer_history_impl; nano::peer_history & peer_history; diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 683902c75d..0eef9a5674 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -39,7 +39,8 @@ nano::node_config::node_config (const std::optional & peering_port_a, block_processor{ network_params.network }, peer_history{ network_params.network }, tcp{ network_params.network }, - network{ network_params.network } + network{ network_params.network }, + local_block_broadcaster{ network_params.network } { if (peering_port == 0) { diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 1b9fd403bc..a71953b690 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -152,6 +153,7 @@ class node_config nano::request_aggregator_config request_aggregator; nano::message_processor_config message_processor; nano::network_config network; + nano::local_block_broadcaster_config local_block_broadcaster; public: std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const;