From 1c13e11be58c8dda5c1e41333604957df4c92a8a Mon Sep 17 00:00:00 2001 From: Gert-Jaap Glasbergen Date: Wed, 7 Sep 2022 08:16:41 +0200 Subject: [PATCH 1/6] Multi-threaded validation and attestation in sentinel Signed-off-by: Gert-Jaap Glasbergen --- src/uhs/twophase/sentinel_2pc/controller.cpp | 164 ++++++++++++++---- src/uhs/twophase/sentinel_2pc/controller.hpp | 32 ++++ .../twophase/sentinel_2pc/sentineld_2pc.cpp | 2 + 3 files changed, 161 insertions(+), 37 deletions(-) diff --git a/src/uhs/twophase/sentinel_2pc/controller.cpp b/src/uhs/twophase/sentinel_2pc/controller.cpp index 4c6fc797f..24614d969 100644 --- a/src/uhs/twophase/sentinel_2pc/controller.cpp +++ b/src/uhs/twophase/sentinel_2pc/controller.cpp @@ -88,6 +88,22 @@ namespace cbdc::sentinel_2pc { return false; } + auto n_threads = std::thread::hardware_concurrency() / 2; + if(n_threads < 1) { + n_threads = 1; + } + for(size_t i = 0; i < n_threads; i++) { + m_validation_threads.emplace_back([&]() { + validation_worker(); + }); + } + + for(size_t i = 0; i < n_threads; i++) { + m_attestation_threads.emplace_back([&]() { + attestation_worker(); + }); + } + m_rpc_server = std::make_unique( this, std::move(rpc_server)); @@ -95,35 +111,69 @@ namespace cbdc::sentinel_2pc { return true; } - auto controller::execute_transaction( - transaction::full_tx tx, - execute_result_callback_type result_callback) -> bool { - const auto validation_err = transaction::validation::check_tx(tx); - if(validation_err.has_value()) { - auto tx_id = transaction::tx_id(tx); - m_logger->debug( - "Rejected (", - transaction::validation::to_string(validation_err.value()), - ")", - to_string(tx_id)); - result_callback(cbdc::sentinel::execute_response{ - cbdc::sentinel::tx_status::static_invalid, - validation_err}); - return true; + void controller::validation_worker() { + while(m_running) { + auto v = queued_validation(); + if(m_validation_queue.pop(v)) { + auto [tx, cb] = v; + cb(std::move(tx), transaction::validation::check_tx(tx)); + } } + } - auto compact_tx = cbdc::transaction::compact_tx(tx); + auto controller::validate_tx(const transaction::full_tx& tx, + validation_callback cb) -> bool { + m_validation_queue.push({std::move(tx), std::move(cb)}); + return true; + } - if(m_opts.m_attestation_threshold > 0) { - auto attestation = compact_tx.sign(m_secp.get(), m_privkey); - compact_tx.m_attestations.insert(attestation); + void controller::attestation_worker() { + while(m_running) { + auto v = queued_attestation(); + if(m_attestation_queue.pop(v)) { + auto [tx, cb] = v; + auto compact_tx = cbdc::transaction::compact_tx(tx); + cb(std::move(tx), compact_tx.sign(m_secp.get(), m_privkey)); + } } + } - gather_attestations(tx, std::move(result_callback), compact_tx, {}); - + auto controller::attest_tx(const transaction::full_tx& tx, + attestation_callback cb) -> bool { + m_attestation_queue.push({std::move(tx), std::move(cb)}); return true; } + auto controller::execute_transaction( + transaction::full_tx tx, + execute_result_callback_type result_callback) -> bool { + return controller::validate_tx( + std::move(tx), + [&, result_callback]( + const transaction::full_tx& tx2, + std::optional err) { + auto tx_id = cbdc::transaction::tx_id(tx2); + if(err.has_value()) { + m_logger->debug( + "Rejected (", + transaction::validation::to_string(err.value()), + ")", + to_string(tx_id)); + result_callback(cbdc::sentinel::execute_response{ + cbdc::sentinel::tx_status::static_invalid, + err}); + return; + } + + auto compact_tx = cbdc::transaction::compact_tx(tx2); + gather_attestations(std::move(tx2), + std::move(result_callback), + compact_tx, + {}); + return; + }); + } + void controller::result_handler(std::optional res, const execute_result_callback_type& res_cb) { @@ -143,15 +193,23 @@ namespace cbdc::sentinel_2pc { auto controller::validate_transaction( transaction::full_tx tx, validate_result_callback_type result_callback) -> bool { - const auto validation_err = transaction::validation::check_tx(tx); - if(validation_err.has_value()) { - result_callback(std::nullopt); - return true; - } - auto compact_tx = cbdc::transaction::compact_tx(tx); - auto attestation = compact_tx.sign(m_secp.get(), m_privkey); - result_callback(std::move(attestation)); - return true; + return controller::validate_tx( + std::move(tx), + [&, result_callback]( + const transaction::full_tx& tx2, + std::optional err) { + if(err.has_value()) { + result_callback(std::nullopt); + return; + } + controller::attest_tx( + std::move(tx2), + [&, result_callback]( + const transaction::full_tx& /* tx3 */, + std::optional res) { + result_callback(std::move(res)); + }); + }); } void controller::validate_result_handler( @@ -173,14 +231,44 @@ namespace cbdc::sentinel_2pc { std::move(requested)); } + void controller::stop() { + m_running = false; + for(auto& t : m_validation_threads) { + if(t.joinable()) { + t.join(); + } + } + + for(auto& t : m_attestation_threads) { + if(t.joinable()) { + t.join(); + } + } + } + void controller::gather_attestations( const transaction::full_tx& tx, execute_result_callback_type result_callback, const transaction::compact_tx& ctx, std::unordered_set requested) { if(ctx.m_attestations.size() < m_opts.m_attestation_threshold) { + if(ctx.m_attestations.size() == 0) { + // Self-attest first + controller::attest_tx( + std::move(tx), + [&, ctx, result_callback](const transaction::full_tx& tx2, + validate_result res) { + validate_result_handler(res, + tx2, + result_callback, + ctx, + {}); + }); + + return; + } auto success = false; - while(!success) { + while(!success && m_running) { auto sentinel_id = m_dist(m_rand); if(requested.find(sentinel_id) != requested.end()) { continue; @@ -209,14 +297,16 @@ namespace cbdc::sentinel_2pc { void controller::send_compact_tx(const transaction::compact_tx& ctx, execute_result_callback_type result_callback) { - auto cb = - [&, res_cb = std::move(result_callback)](std::optional res) { - result_handler(res, res_cb); - }; + auto cb = [&, this, ctx, res_cb = std::move(result_callback)]( + std::optional res) { + result_handler(res, res_cb); + }; - // TODO: add a "retry" error response to offload sentinels from this + // TODO: add a "retry" error response to offload sentinels from + // this // infinite retry responsibility. - while(!m_coordinator_client.execute_transaction(ctx, cb)) { + while(!m_coordinator_client.execute_transaction(ctx, cb) + && m_running) { // TODO: the network currently doesn't provide a callback for // reconnection events so we have to sleep here to // prevent a needless spin. Instead, add such a callback diff --git a/src/uhs/twophase/sentinel_2pc/controller.hpp b/src/uhs/twophase/sentinel_2pc/controller.hpp index b5485ebf2..7d059a703 100644 --- a/src/uhs/twophase/sentinel_2pc/controller.hpp +++ b/src/uhs/twophase/sentinel_2pc/controller.hpp @@ -13,6 +13,7 @@ #include "uhs/sentinel/format.hpp" #include "uhs/transaction/messages.hpp" #include "uhs/twophase/coordinator/client.hpp" +#include "util/common/blocking_queue.hpp" #include "util/common/config.hpp" #include "util/common/hashmap.hpp" #include "util/network/connection_manager.hpp" @@ -66,7 +67,22 @@ namespace cbdc::sentinel_2pc { validate_result_callback_type result_callback) -> bool override; + /// Cleanly stop the controller + void stop(); + private: + using validation_callback = std::function)>; + using queued_validation + = std::pair; + + using attestation_callback = std::function)>; + using queued_attestation + = std::pair; + static void result_handler(std::optional res, const execute_result_callback_type& res_cb); @@ -85,10 +101,24 @@ namespace cbdc::sentinel_2pc { void send_compact_tx(const transaction::compact_tx& ctx, execute_result_callback_type result_callback); + auto validate_tx(const transaction::full_tx& tx, + validation_callback cb) -> bool; + void validation_worker(); + + auto attest_tx(const transaction::full_tx& tx, attestation_callback cb) + -> bool; + void attestation_worker(); + uint32_t m_sentinel_id; cbdc::config::options m_opts; std::shared_ptr m_logger; + blocking_queue m_validation_queue{}; + std::vector m_validation_threads{}; + + blocking_queue m_attestation_queue{}; + std::vector m_attestation_threads{}; + std::unique_ptr m_rpc_server; std::unique_ptr m_dist{}; privkey_t m_privkey{}; + + std::atomic m_running{true}; }; } diff --git a/src/uhs/twophase/sentinel_2pc/sentineld_2pc.cpp b/src/uhs/twophase/sentinel_2pc/sentineld_2pc.cpp index d562d9326..c5248914e 100644 --- a/src/uhs/twophase/sentinel_2pc/sentineld_2pc.cpp +++ b/src/uhs/twophase/sentinel_2pc/sentineld_2pc.cpp @@ -63,6 +63,8 @@ auto main(int argc, char** argv) -> int { logger->info("Shutting down..."); + ctl.stop(); + return 0; } // LCOV_EXCL_STOP From af63120671a565b14ddcd90583b5897e207444f4 Mon Sep 17 00:00:00 2001 From: Gert-Jaap Glasbergen Date: Wed, 7 Sep 2022 08:17:14 +0200 Subject: [PATCH 2/6] Prioritize cross-sentinel attestation requests Signed-off-by: Gert-Jaap Glasbergen --- src/uhs/twophase/sentinel_2pc/server.cpp | 48 +++++++++++++++++------- src/uhs/twophase/sentinel_2pc/server.hpp | 16 ++++++++ 2 files changed, 51 insertions(+), 13 deletions(-) diff --git a/src/uhs/twophase/sentinel_2pc/server.cpp b/src/uhs/twophase/sentinel_2pc/server.cpp index d00276b21..f8f8abd99 100644 --- a/src/uhs/twophase/sentinel_2pc/server.cpp +++ b/src/uhs/twophase/sentinel_2pc/server.cpp @@ -11,22 +11,44 @@ namespace cbdc::sentinel::rpc { std::unique_ptr> srv) : m_impl(impl), m_srv(std::move(srv)) { + m_processing_thread = std::thread([this]() { + process(); + }); m_srv->register_handler_callback( [&](const request& req, async_interface::result_callback_type callback) { - auto res = std::visit( - overloaded{[&](execute_request e_req) { - return m_impl->execute_transaction( - std::move(e_req), - callback); - }, - [&](validate_request v_req) { - return m_impl->validate_transaction( - std::move(v_req), - callback); - }}, - req); - return res; + auto req_item = request_queue_t{req, callback}; + m_request_queue.push(req_item); + return true; }); } + bool operator<(const request_queue_t& a, const request_queue_t& b) { + // Prioritize validate requests over execute requests + return (std::holds_alternative(a.m_req) + && std::holds_alternative(b.m_req)); + } + async_server::~async_server() { + m_running = false; + if(m_processing_thread.joinable()) { + m_processing_thread.join(); + } + } + void async_server::process() { + auto q = request_queue_t(); + while(m_running) { + if(m_request_queue.pop(q)) { + std::visit(overloaded{[&](execute_request e_req) { + return m_impl->execute_transaction( + std::move(e_req), + q.m_cb); + }, + [&](validate_request v_req) { + return m_impl->validate_transaction( + std::move(v_req), + q.m_cb); + }}, + q.m_req); + } + } + } } diff --git a/src/uhs/twophase/sentinel_2pc/server.hpp b/src/uhs/twophase/sentinel_2pc/server.hpp index 618aa24f0..ffd676f63 100644 --- a/src/uhs/twophase/sentinel_2pc/server.hpp +++ b/src/uhs/twophase/sentinel_2pc/server.hpp @@ -8,10 +8,17 @@ #include "uhs/sentinel/async_interface.hpp" #include "uhs/transaction/messages.hpp" +#include "util/common/blocking_queue.hpp" #include "util/rpc/async_server.hpp" #include "util/rpc/format.hpp" namespace cbdc::sentinel::rpc { + struct request_queue_t { + request m_req; + async_interface::result_callback_type m_cb; + }; + + bool operator<(const request_queue_t& a, const request_queue_t& b); /// Asynchronous RPC server for a sentinel. class async_server { public: @@ -25,9 +32,18 @@ namespace cbdc::sentinel::rpc { // implementation std::unique_ptr> srv); + ~async_server(); + private: + void process(); + async_interface* m_impl; std::unique_ptr> m_srv; + + blocking_priority_queue> + m_request_queue{}; + std::thread m_processing_thread; + std::atomic m_running = true; }; } From 7299535c2680db108a58c50fb6cd9c8ade2c9d1f Mon Sep 17 00:00:00 2001 From: Gert-Jaap Glasbergen Date: Wed, 7 Sep 2022 08:18:09 +0200 Subject: [PATCH 3/6] Multi-threaded attestation check in coordinator Signed-off-by: Gert-Jaap Glasbergen --- src/uhs/twophase/coordinator/controller.cpp | 119 +++++++++++++------- src/uhs/twophase/coordinator/controller.hpp | 12 ++ 2 files changed, 93 insertions(+), 38 deletions(-) diff --git a/src/uhs/twophase/coordinator/controller.cpp b/src/uhs/twophase/coordinator/controller.cpp index 5ddfc5d51..aba46669e 100644 --- a/src/uhs/twophase/coordinator/controller.cpp +++ b/src/uhs/twophase/coordinator/controller.cpp @@ -106,7 +106,21 @@ namespace cbdc::coordinator { // Initialize NuRaft with the state machine we just created. Register // our callback function to notify us when we become a leader or // follower. - return m_raft_serv->init(m_raft_params); + if(!m_raft_serv->init(m_raft_params)) { + return false; + } + + auto n_threads = std::thread::hardware_concurrency(); + + m_logger->info("Creating", n_threads, "attestation check threads"); + + for(size_t i = 0; i < n_threads; i++) { + m_attestation_check_threads.emplace_back([&]() { + attestation_check_worker(); + }); + } + + return true; } auto controller::raft_callback(nuraft::cb_func::Type type, @@ -665,6 +679,12 @@ namespace cbdc::coordinator { if(m_start_thread.joinable()) { m_start_thread.join(); } + + for(auto& t : m_attestation_check_threads) { + if(t.joinable()) { + t.join(); + } + } } auto controller::sm_command_header::operator==( @@ -681,6 +701,27 @@ namespace cbdc::coordinator { rhs.m_discard_txs); } + void controller::attestation_check_worker() { + while(!m_quit) { + auto v = queued_attestation_check(); + if(m_attestation_check_queue.pop(v)) { + auto [tx, cb] = v; + auto valid = transaction::validation::check_attestations( + tx, + m_opts.m_sentinel_public_keys, + m_opts.m_attestation_threshold); + cb(std::move(tx), valid); + } + } + } + + auto controller::check_tx_attestation(const transaction::compact_tx& tx, + attestation_check_callback cb) + -> bool { + m_attestation_check_queue.push({std::move(tx), std::move(cb)}); + return true; + } + auto controller::execute_transaction(transaction::compact_tx tx, callback_type result_callback) -> bool { @@ -689,44 +730,46 @@ namespace cbdc::coordinator { return false; } - if(!transaction::validation::check_attestations( - tx, - m_opts.m_sentinel_public_keys, - m_opts.m_attestation_threshold)) { - m_logger->warn("Received invalid compact transaction", - to_string(tx.m_id)); - return false; - } + return check_tx_attestation( + std::move(tx), + [&, + res_cb = std::move(result_callback)](transaction::compact_tx tx2, + bool result) { + if(!result) { + m_logger->warn("Received invalid compact transaction", + to_string(tx.m_id)); + res_cb(false); + return; + } + auto added = [&]() { + // Wait until there's space in the current batch + std::unique_lock l(m_batch_mut); + m_batch_cv.wait(l, [&]() { + return m_current_txs->size() < m_batch_size + || !m_running; + }); + if(!m_running) { + return false; + } - auto added = [&]() { - // Wait until there's space in the current batch - std::unique_lock l(m_batch_mut); - m_batch_cv.wait(l, [&]() { - return m_current_txs->size() < m_batch_size || !m_running; + // Make sure the TX is not already in the current batch + if(m_current_txs->find(tx2.m_id) != m_current_txs->end()) { + return false; + } + // Add the tx to the current dtx batch and record its index + auto idx = m_current_batch->add_tx(tx2); + // Map the index of the tx to the transaction ID and + // sentinel ID + m_current_txs->emplace( + tx2.m_id, + std::make_pair(std::move(res_cb), idx)); + return true; + }(); + if(added) { + // If this was a new TX, notify the executor thread there's + // work to do + m_batch_cv.notify_one(); + } }); - if(!m_running) { - return false; - } - - // Make sure the TX is not already in the current batch - if(m_current_txs->find(tx.m_id) != m_current_txs->end()) { - return false; - } - // Add the tx to the current dtx batch and record its index - auto idx = m_current_batch->add_tx(tx); - // Map the index of the tx to the transaction ID and sentinel - // ID - m_current_txs->emplace( - tx.m_id, - std::make_pair(std::move(result_callback), idx)); - return true; - }(); - if(added) { - // If this was a new TX, notify the executor thread there's work to - // do - m_batch_cv.notify_one(); - } - - return added; } } diff --git a/src/uhs/twophase/coordinator/controller.hpp b/src/uhs/twophase/coordinator/controller.hpp index b9ca62009..f7279a283 100644 --- a/src/uhs/twophase/coordinator/controller.hpp +++ b/src/uhs/twophase/coordinator/controller.hpp @@ -11,6 +11,7 @@ #include "server.hpp" #include "state_machine.hpp" #include "uhs/twophase/locking_shard/locking_shard.hpp" +#include "util/common/blocking_queue.hpp" #include "util/common/buffer.hpp" #include "util/common/random_source.hpp" #include "util/network/connection_manager.hpp" @@ -136,6 +137,11 @@ namespace cbdc::coordinator { -> bool override; private: + using attestation_check_callback + = std::function; + using queued_attestation_check + = std::pair; + size_t m_node_id; size_t m_coordinator_id; cbdc::config::options m_opts; @@ -164,6 +170,8 @@ namespace cbdc::coordinator { std::vector, std::atomic_bool>> m_exec_threads; std::shared_mutex m_exec_mut; + blocking_queue m_attestation_check_queue{}; + std::vector m_attestation_check_threads{}; std::thread m_start_thread; bool m_start_flag{false}; @@ -206,6 +214,10 @@ namespace cbdc::coordinator { void schedule_exec(std::function&& f); void join_execs(); + + auto check_tx_attestation(const transaction::compact_tx& tx, + attestation_check_callback cb) -> bool; + void attestation_check_worker(); }; } From a7b3606c2184078606ebd04ec50fdf60adbbdbe4 Mon Sep 17 00:00:00 2001 From: Gert-Jaap Glasbergen Date: Wed, 7 Sep 2022 08:39:09 +0200 Subject: [PATCH 4/6] Support pre-replication validation for RAFT requests Signed-off-by: Gert-Jaap Glasbergen --- src/util/raft/rpc_server.hpp | 81 +++++++++++++++++++++++++----------- 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/src/util/raft/rpc_server.hpp b/src/util/raft/rpc_server.hpp index 8a5bc8c12..879ec0f1e 100644 --- a/src/util/raft/rpc_server.hpp +++ b/src/util/raft/rpc_server.hpp @@ -10,6 +10,10 @@ #include "util/rpc/async_server.hpp" namespace cbdc::raft::rpc { + using validation_callback = std::function; + using validate_function_type + = std::function; + /// Generic RPC server for raft nodes for which the replicated state /// machine handles the request processing logic. Replicates /// requests to the cluster which executes them via its state machine. Once @@ -22,7 +26,27 @@ namespace cbdc::raft::rpc { /// \param impl pointer to the raft node. /// \see cbdc::rpc::server void register_raft_node(std::shared_ptr impl) { + register_raft_node(impl, std::nullopt); + } + + /// Registers the raft node whose state machine handles RPC requests + /// for this server. + /// \param impl pointer to the raft node. + /// \param validate optional method to validate requests before + /// replicating them to the raft cluster + /// \see cbdc::rpc::server + void + register_raft_node(std::shared_ptr impl, + std::optional validate) { m_impl = std::move(impl); + if(validate.has_value()) { + m_validate_func = std::move(validate.value()); + } else { + m_validate_func = [&](buffer b, validation_callback cb) { + cb(std::move(b), true); + return true; + }; + } cbdc::rpc::raw_async_server::register_handler_callback( [&](buffer req, response_callback_type resp_cb) { return request_handler(std::move(req), std::move(resp_cb)); @@ -33,6 +57,7 @@ namespace cbdc::raft::rpc { private: std::shared_ptr m_impl; + validate_function_type m_validate_func; using response_callback_type = typename cbdc::rpc::raw_async_server::response_callback_type; @@ -44,35 +69,43 @@ namespace cbdc::raft::rpc { return false; } - // TODO: make network and sockets generic over the buffer type so - // these copy operations for the request and response to get - // over the nuraft/cbdc boundary are not needed. - auto new_log = nuraft::buffer::alloc(request_buf.size()); - nuraft::buffer_serializer bs(new_log); - bs.put_raw(request_buf.data(), request_buf.size()); - - auto success = m_impl->replicate( - new_log, - [&, resp_cb = std::move(response_callback), req_buf = new_log]( - result_type& r, - nuraft::ptr& err) { - if(err) { - resp_cb(std::nullopt); + return m_validate_func( + std::move(request_buf), + [&, res_cb = std::move(response_callback)](buffer buf2, + bool valid) { + if(!valid) { + res_cb(std::nullopt); return; } - const auto res = r.get(); - if(!res) { - resp_cb(std::nullopt); - return; - } + auto new_log = nuraft::buffer::alloc(buf2.size()); + nuraft::buffer_serializer bs(new_log); + bs.put_raw(buf2.data(), buf2.size()); - auto resp_pkt = cbdc::buffer(); - resp_pkt.append(res->data_begin(), res->size()); - resp_cb(std::move(resp_pkt)); - }); + auto success = m_impl->replicate( + new_log, + [&, resp_cb = std::move(res_cb), req_buf = new_log]( + result_type& r, + nuraft::ptr& err) { + if(err) { + resp_cb(std::nullopt); + return; + } - return success; + const auto res = r.get(); + if(!res) { + resp_cb(std::nullopt); + return; + } + + auto resp_pkt = cbdc::buffer(); + resp_pkt.append(res->data_begin(), res->size()); + resp_cb(std::move(resp_pkt)); + }); + if(!success) { + res_cb(std::nullopt); + } + }); } }; } From e31a309b0fc9a974009feaf52137066481902010 Mon Sep 17 00:00:00 2001 From: Gert-Jaap Glasbergen Date: Wed, 7 Sep 2022 08:39:32 +0200 Subject: [PATCH 5/6] Validate attestations in shard outside of state machine Signed-off-by: Gert-Jaap Glasbergen --- src/uhs/twophase/locking_shard/controller.cpp | 85 ++++++++++++++++++- src/uhs/twophase/locking_shard/controller.hpp | 15 +++- .../twophase/locking_shard/locking_shard.cpp | 20 ++--- 3 files changed, 102 insertions(+), 18 deletions(-) diff --git a/src/uhs/twophase/locking_shard/controller.cpp b/src/uhs/twophase/locking_shard/controller.cpp index 8b961f78d..97f16a540 100644 --- a/src/uhs/twophase/locking_shard/controller.cpp +++ b/src/uhs/twophase/locking_shard/controller.cpp @@ -8,6 +8,7 @@ #include "format.hpp" #include "state_machine.hpp" #include "status_client.hpp" +#include "uhs/transaction/validation.hpp" #include "util/rpc/tcp_server.hpp" #include "util/serialization/format.hpp" @@ -16,7 +17,7 @@ namespace cbdc::locking_shard { controller::controller(size_t shard_id, size_t node_id, - config::options opts, + const cbdc::config::options& opts, std::shared_ptr logger) : m_opts(std::move(opts)), m_logger(std::move(logger)), @@ -116,6 +117,14 @@ namespace cbdc::locking_shard { -> nuraft::cb_func::ReturnCode { if(type == nuraft::cb_func::Type::BecomeFollower) { m_logger->warn("Became follower, stopping listener"); + + m_running = false; + for(auto& t : m_validation_threads) { + if(t.joinable()) { + t.join(); + } + } + m_validation_threads.clear(); m_server.reset(); return nuraft::cb_func::ReturnCode::Ok; } @@ -123,11 +132,83 @@ namespace cbdc::locking_shard { m_logger->warn("Became leader, starting listener"); m_server = std::make_unique( m_opts.m_locking_shard_endpoints[m_shard_id][m_node_id]); - m_server->register_raft_node(m_raft_serv); + m_server->register_raft_node( + m_raft_serv, + [&, this](cbdc::buffer buf, + cbdc::raft::rpc::validation_callback cb) { + return enqueue_validation(std::move(buf), std::move(cb)); + }); + + auto n_threads = std::thread::hardware_concurrency(); + for(size_t i = 0; i < n_threads; i++) { + m_validation_threads.emplace_back([&, this]() { + validation_worker(); + }); + } + if(!m_server->init()) { m_logger->fatal("Couldn't start message handler server"); } } return nuraft::cb_func::ReturnCode::Ok; } + + void controller::validation_worker() { + while(m_running) { + auto v = validation_request(); + if(m_validation_queue.pop(v)) { + auto [req, cb] = v; + validate_request(std::move(req), std::move(cb)); + } + } + } + + auto + controller::enqueue_validation(cbdc::buffer buf, + cbdc::raft::rpc::validation_callback cb) + -> bool { + m_validation_queue.push({std::move(buf), std::move(cb)}); + return true; + } + + auto controller::validate_request(cbdc::buffer buf, + cbdc::raft::rpc::validation_callback cb) + -> bool { + auto maybe_req + = cbdc::from_buffer>(buf); + auto valid = true; + if(maybe_req) { + valid = std::visit( + overloaded{ + [&](rpc::lock_params&& params) -> bool { + auto result = true; + for(auto&& t : params) { + if(!transaction::validation::check_attestations( + t.m_tx, + m_opts.m_sentinel_public_keys, + m_opts.m_attestation_threshold)) { + m_logger->warn( + "Received invalid compact transaction", + to_string(t.m_tx.m_id)); + result = false; + break; + } + } + + return result; + }, + [&](rpc::apply_params&&) -> bool { + return true; + }, + [&](rpc::discard_params&&) -> bool { + return true; + }}, + std::move(maybe_req->m_payload.m_params)); + } else { + valid = false; + } + + cb(std::move(buf), valid); + return true; + } } diff --git a/src/uhs/twophase/locking_shard/controller.hpp b/src/uhs/twophase/locking_shard/controller.hpp index c23850a30..50e0c9bbe 100644 --- a/src/uhs/twophase/locking_shard/controller.hpp +++ b/src/uhs/twophase/locking_shard/controller.hpp @@ -10,6 +10,7 @@ #include "locking_shard.hpp" #include "state_machine.hpp" #include "status_server.hpp" +#include "util/common/blocking_queue.hpp" #include "util/raft/node.hpp" #include "util/raft/rpc_server.hpp" #include "util/rpc/tcp_server.hpp" @@ -25,7 +26,7 @@ namespace cbdc::locking_shard { /// \param logger log to use for output. controller(size_t shard_id, size_t node_id, - config::options opts, + const cbdc::config::options& opts, std::shared_ptr logger); ~controller() = default; @@ -46,12 +47,24 @@ namespace cbdc::locking_shard { auto raft_callback(nuraft::cb_func::Type type, nuraft::cb_func::Param* param) -> nuraft::cb_func::ReturnCode; + auto validate_request(cbdc::buffer request, + cbdc::raft::rpc::validation_callback cb) -> bool; + + auto enqueue_validation(cbdc::buffer request, + cbdc::raft::rpc::validation_callback cb) + -> bool; + void validation_worker(); config::options m_opts; std::shared_ptr m_logger; size_t m_shard_id; size_t m_node_id; std::string m_preseed_dir; + std::vector m_validation_threads; + using validation_request + = std::pair; + blocking_queue m_validation_queue; + std::atomic m_running{true}; std::shared_ptr m_state_machine; std::shared_ptr m_shard; diff --git a/src/uhs/twophase/locking_shard/locking_shard.cpp b/src/uhs/twophase/locking_shard/locking_shard.cpp index 92a325f9c..8612e48e1 100644 --- a/src/uhs/twophase/locking_shard/locking_shard.cpp +++ b/src/uhs/twophase/locking_shard/locking_shard.cpp @@ -103,21 +103,11 @@ namespace cbdc::locking_shard { auto locking_shard::check_and_lock_tx(const tx& t) -> bool { bool success{true}; - if(!transaction::validation::check_attestations( - t.m_tx, - m_opts.m_sentinel_public_keys, - m_opts.m_attestation_threshold)) { - m_logger->warn("Received invalid compact transaction", - to_string(t.m_tx.m_id)); - success = false; - } - if(success) { - for(const auto& uhs_id : t.m_tx.m_inputs) { - if(hash_in_shard_range(uhs_id) - && m_uhs.find(uhs_id) == m_uhs.end()) { - success = false; - break; - } + for(const auto& uhs_id : t.m_tx.m_inputs) { + if(hash_in_shard_range(uhs_id) + && m_uhs.find(uhs_id) == m_uhs.end()) { + success = false; + break; } } if(success) { From f86ef09f05107113d7d94e0c66756461d6e7beb1 Mon Sep 17 00:00:00 2001 From: Sam Stuewe Date: Tue, 30 Apr 2024 12:05:31 -0500 Subject: [PATCH 6/6] [WIP] fix: ensure tests pass In particular, several of the commits in this set assume their `blocking_queue`s will be empty by the time the destructor is called. However, this is not guaranteeable, and causes segfaults and/or indefinite hangs when encountered. This commit predominantly ensures that the queues are all `clear()`d appropriately. Signed-off-by: Sam Stuewe --- CMakeLists.txt | 2 +- benchmarks/CMakeLists.txt | 2 ++ src/uhs/twophase/coordinator/controller.cpp | 12 +++---- src/uhs/twophase/locking_shard/controller.cpp | 30 +++++++++++----- src/uhs/twophase/locking_shard/controller.hpp | 8 +++-- src/uhs/twophase/sentinel_2pc/controller.cpp | 36 +++++++++++-------- src/uhs/twophase/sentinel_2pc/controller.hpp | 2 +- src/uhs/twophase/sentinel_2pc/server.cpp | 6 ++-- src/uhs/twophase/sentinel_2pc/server.hpp | 6 +++- src/util/raft/rpc_server.hpp | 13 +++---- tests/unit/sentinel_2pc/controller_test.cpp | 13 +++---- 11 files changed, 81 insertions(+), 49 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e23aa462..1d838b2ae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,7 +54,7 @@ if(DEFINED CMAKE_PREFIX_PATH) endif() if(CMAKE_BUILD_TYPE STREQUAL "Debug") - add_compile_options(-fprofile-arcs -ftest-coverage) + add_compile_options(-fprofile-arcs -ftest-coverage -Og -ggdb3) endif() if(CMAKE_BUILD_TYPE STREQUAL "Debug") diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index d616364f0..782cbb270 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(run_benchmarks ${GTEST_LIBRARY} shard watchtower locking_shard + raft transaction rpc network @@ -26,4 +27,5 @@ target_link_libraries(run_benchmarks ${GTEST_LIBRARY} crypto secp256k1 ${LEVELDB_LIBRARY} + ${NURAFT_LIBRARY} ${CMAKE_THREAD_LIBS_INIT}) diff --git a/src/uhs/twophase/coordinator/controller.cpp b/src/uhs/twophase/coordinator/controller.cpp index aba46669e..9f2ddad8f 100644 --- a/src/uhs/twophase/coordinator/controller.cpp +++ b/src/uhs/twophase/coordinator/controller.cpp @@ -680,6 +680,7 @@ namespace cbdc::coordinator { m_start_thread.join(); } + m_attestation_check_queue.clear(); for(auto& t : m_attestation_check_threads) { if(t.joinable()) { t.join(); @@ -710,7 +711,7 @@ namespace cbdc::coordinator { tx, m_opts.m_sentinel_public_keys, m_opts.m_attestation_threshold); - cb(std::move(tx), valid); + cb(tx, valid); } } } @@ -718,7 +719,7 @@ namespace cbdc::coordinator { auto controller::check_tx_attestation(const transaction::compact_tx& tx, attestation_check_callback cb) -> bool { - m_attestation_check_queue.push({std::move(tx), std::move(cb)}); + m_attestation_check_queue.push({tx, std::move(cb)}); return true; } @@ -731,7 +732,7 @@ namespace cbdc::coordinator { } return check_tx_attestation( - std::move(tx), + tx, [&, res_cb = std::move(result_callback)](transaction::compact_tx tx2, bool result) { @@ -760,9 +761,8 @@ namespace cbdc::coordinator { auto idx = m_current_batch->add_tx(tx2); // Map the index of the tx to the transaction ID and // sentinel ID - m_current_txs->emplace( - tx2.m_id, - std::make_pair(std::move(res_cb), idx)); + m_current_txs->emplace(tx2.m_id, + std::make_pair(res_cb, idx)); return true; }(); if(added) { diff --git a/src/uhs/twophase/locking_shard/controller.cpp b/src/uhs/twophase/locking_shard/controller.cpp index 97f16a540..4f6eaa37a 100644 --- a/src/uhs/twophase/locking_shard/controller.cpp +++ b/src/uhs/twophase/locking_shard/controller.cpp @@ -17,7 +17,7 @@ namespace cbdc::locking_shard { controller::controller(size_t shard_id, size_t node_id, - const cbdc::config::options& opts, + cbdc::config::options opts, std::shared_ptr logger) : m_opts(std::move(opts)), m_logger(std::move(logger)), @@ -112,6 +112,18 @@ namespace cbdc::locking_shard { return true; } + controller::~controller() { + m_running = false; + m_validation_queue.clear(); + for(auto& t : m_validation_threads) { + if(t.joinable()) { + t.join(); + } + } + m_validation_threads.clear(); + m_server.reset(); + } + auto controller::raft_callback(nuraft::cb_func::Type type, nuraft::cb_func::Param* /* param */) -> nuraft::cb_func::ReturnCode { @@ -158,24 +170,24 @@ namespace cbdc::locking_shard { auto v = validation_request(); if(m_validation_queue.pop(v)) { auto [req, cb] = v; - validate_request(std::move(req), std::move(cb)); + validate_request(std::move(req), cb); } } } auto - controller::enqueue_validation(cbdc::buffer buf, + controller::enqueue_validation(cbdc::buffer request, cbdc::raft::rpc::validation_callback cb) -> bool { - m_validation_queue.push({std::move(buf), std::move(cb)}); + m_validation_queue.push({std::move(request), std::move(cb)}); return true; } - auto controller::validate_request(cbdc::buffer buf, - cbdc::raft::rpc::validation_callback cb) - -> bool { + auto controller::validate_request( + cbdc::buffer request, + const cbdc::raft::rpc::validation_callback& cb) -> bool { auto maybe_req - = cbdc::from_buffer>(buf); + = cbdc::from_buffer>(request); auto valid = true; if(maybe_req) { valid = std::visit( @@ -208,7 +220,7 @@ namespace cbdc::locking_shard { valid = false; } - cb(std::move(buf), valid); + cb(std::move(request), valid); return true; } } diff --git a/src/uhs/twophase/locking_shard/controller.hpp b/src/uhs/twophase/locking_shard/controller.hpp index 50e0c9bbe..28a5faef3 100644 --- a/src/uhs/twophase/locking_shard/controller.hpp +++ b/src/uhs/twophase/locking_shard/controller.hpp @@ -26,9 +26,10 @@ namespace cbdc::locking_shard { /// \param logger log to use for output. controller(size_t shard_id, size_t node_id, - const cbdc::config::options& opts, + cbdc::config::options opts, std::shared_ptr logger); - ~controller() = default; + + ~controller(); controller() = delete; controller(const controller&) = delete; @@ -48,7 +49,8 @@ namespace cbdc::locking_shard { nuraft::cb_func::Param* param) -> nuraft::cb_func::ReturnCode; auto validate_request(cbdc::buffer request, - cbdc::raft::rpc::validation_callback cb) -> bool; + const cbdc::raft::rpc::validation_callback& cb) + -> bool; auto enqueue_validation(cbdc::buffer request, cbdc::raft::rpc::validation_callback cb) diff --git a/src/uhs/twophase/sentinel_2pc/controller.cpp b/src/uhs/twophase/sentinel_2pc/controller.cpp index 24614d969..51360b164 100644 --- a/src/uhs/twophase/sentinel_2pc/controller.cpp +++ b/src/uhs/twophase/sentinel_2pc/controller.cpp @@ -111,19 +111,23 @@ namespace cbdc::sentinel_2pc { return true; } + controller::~controller() { + stop(); + } + void controller::validation_worker() { while(m_running) { auto v = queued_validation(); if(m_validation_queue.pop(v)) { auto [tx, cb] = v; - cb(std::move(tx), transaction::validation::check_tx(tx)); + cb(tx, transaction::validation::check_tx(tx)); } } } auto controller::validate_tx(const transaction::full_tx& tx, validation_callback cb) -> bool { - m_validation_queue.push({std::move(tx), std::move(cb)}); + m_validation_queue.push({tx, std::move(cb)}); return true; } @@ -133,14 +137,14 @@ namespace cbdc::sentinel_2pc { if(m_attestation_queue.pop(v)) { auto [tx, cb] = v; auto compact_tx = cbdc::transaction::compact_tx(tx); - cb(std::move(tx), compact_tx.sign(m_secp.get(), m_privkey)); + cb(tx, compact_tx.sign(m_secp.get(), m_privkey)); } } } auto controller::attest_tx(const transaction::full_tx& tx, attestation_callback cb) -> bool { - m_attestation_queue.push({std::move(tx), std::move(cb)}); + m_attestation_queue.push({tx, std::move(cb)}); return true; } @@ -148,7 +152,7 @@ namespace cbdc::sentinel_2pc { transaction::full_tx tx, execute_result_callback_type result_callback) -> bool { return controller::validate_tx( - std::move(tx), + tx, [&, result_callback]( const transaction::full_tx& tx2, std::optional err) { @@ -166,10 +170,7 @@ namespace cbdc::sentinel_2pc { } auto compact_tx = cbdc::transaction::compact_tx(tx2); - gather_attestations(std::move(tx2), - std::move(result_callback), - compact_tx, - {}); + gather_attestations(tx2, result_callback, compact_tx, {}); return; }); } @@ -194,7 +195,7 @@ namespace cbdc::sentinel_2pc { transaction::full_tx tx, validate_result_callback_type result_callback) -> bool { return controller::validate_tx( - std::move(tx), + tx, [&, result_callback]( const transaction::full_tx& tx2, std::optional err) { @@ -203,7 +204,7 @@ namespace cbdc::sentinel_2pc { return; } controller::attest_tx( - std::move(tx2), + tx2, [&, result_callback]( const transaction::full_tx& /* tx3 */, std::optional res) { @@ -233,17 +234,24 @@ namespace cbdc::sentinel_2pc { void controller::stop() { m_running = false; + m_rpc_server.reset(); + + m_validation_queue.clear(); + m_attestation_queue.clear(); + for(auto& t : m_validation_threads) { if(t.joinable()) { t.join(); } } + m_validation_threads.clear(); for(auto& t : m_attestation_threads) { if(t.joinable()) { t.join(); } } + m_attestation_threads.clear(); } void controller::gather_attestations( @@ -252,10 +260,10 @@ namespace cbdc::sentinel_2pc { const transaction::compact_tx& ctx, std::unordered_set requested) { if(ctx.m_attestations.size() < m_opts.m_attestation_threshold) { - if(ctx.m_attestations.size() == 0) { + if(ctx.m_attestations.empty()) { // Self-attest first controller::attest_tx( - std::move(tx), + tx, [&, ctx, result_callback](const transaction::full_tx& tx2, validate_result res) { validate_result_handler(res, @@ -297,7 +305,7 @@ namespace cbdc::sentinel_2pc { void controller::send_compact_tx(const transaction::compact_tx& ctx, execute_result_callback_type result_callback) { - auto cb = [&, this, ctx, res_cb = std::move(result_callback)]( + auto cb = [&, ctx, res_cb = std::move(result_callback)]( std::optional res) { result_handler(res, res_cb); }; diff --git a/src/uhs/twophase/sentinel_2pc/controller.hpp b/src/uhs/twophase/sentinel_2pc/controller.hpp index 7d059a703..890158a61 100644 --- a/src/uhs/twophase/sentinel_2pc/controller.hpp +++ b/src/uhs/twophase/sentinel_2pc/controller.hpp @@ -38,7 +38,7 @@ namespace cbdc::sentinel_2pc { const config::options& opts, std::shared_ptr logger); - ~controller() override = default; + ~controller() override; /// Initializes the controller. Connects to the shard coordinator /// network and launches a server thread for external clients. diff --git a/src/uhs/twophase/sentinel_2pc/server.cpp b/src/uhs/twophase/sentinel_2pc/server.cpp index f8f8abd99..ad2736508 100644 --- a/src/uhs/twophase/sentinel_2pc/server.cpp +++ b/src/uhs/twophase/sentinel_2pc/server.cpp @@ -17,18 +17,20 @@ namespace cbdc::sentinel::rpc { m_srv->register_handler_callback( [&](const request& req, async_interface::result_callback_type callback) { - auto req_item = request_queue_t{req, callback}; + auto req_item = request_queue_t{req, std::move(callback)}; m_request_queue.push(req_item); return true; }); } - bool operator<(const request_queue_t& a, const request_queue_t& b) { + auto operator<(const request_queue_t& a, const request_queue_t& b) + -> bool { // Prioritize validate requests over execute requests return (std::holds_alternative(a.m_req) && std::holds_alternative(b.m_req)); } async_server::~async_server() { m_running = false; + m_request_queue.clear(); if(m_processing_thread.joinable()) { m_processing_thread.join(); } diff --git a/src/uhs/twophase/sentinel_2pc/server.hpp b/src/uhs/twophase/sentinel_2pc/server.hpp index ffd676f63..5efad9ca9 100644 --- a/src/uhs/twophase/sentinel_2pc/server.hpp +++ b/src/uhs/twophase/sentinel_2pc/server.hpp @@ -18,7 +18,7 @@ namespace cbdc::sentinel::rpc { async_interface::result_callback_type m_cb; }; - bool operator<(const request_queue_t& a, const request_queue_t& b); + auto operator<(const request_queue_t& a, const request_queue_t& b) -> bool; /// Asynchronous RPC server for a sentinel. class async_server { public: @@ -33,6 +33,10 @@ namespace cbdc::sentinel::rpc { std::unique_ptr> srv); ~async_server(); + async_server(async_server&&) noexcept = delete; + auto operator=(async_server&&) noexcept -> async_server& = delete; + async_server(const async_server&) = delete; + auto operator=(const async_server&) -> async_server& = delete; private: void process(); diff --git a/src/util/raft/rpc_server.hpp b/src/util/raft/rpc_server.hpp index 879ec0f1e..70420168f 100644 --- a/src/util/raft/rpc_server.hpp +++ b/src/util/raft/rpc_server.hpp @@ -26,7 +26,7 @@ namespace cbdc::raft::rpc { /// \param impl pointer to the raft node. /// \see cbdc::rpc::server void register_raft_node(std::shared_ptr impl) { - register_raft_node(impl, std::nullopt); + register_raft_node(std::move(impl), std::nullopt); } /// Registers the raft node whose state machine handles RPC requests @@ -42,10 +42,11 @@ namespace cbdc::raft::rpc { if(validate.has_value()) { m_validate_func = std::move(validate.value()); } else { - m_validate_func = [&](buffer b, validation_callback cb) { - cb(std::move(b), true); - return true; - }; + m_validate_func + = [&](buffer b, const validation_callback& cb) { + cb(std::move(b), true); + return true; + }; } cbdc::rpc::raw_async_server::register_handler_callback( [&](buffer req, response_callback_type resp_cb) { @@ -84,7 +85,7 @@ namespace cbdc::raft::rpc { auto success = m_impl->replicate( new_log, - [&, resp_cb = std::move(res_cb), req_buf = new_log]( + [&, resp_cb = res_cb, req_buf = new_log]( result_type& r, nuraft::ptr& err) { if(err) { diff --git a/tests/unit/sentinel_2pc/controller_test.cpp b/tests/unit/sentinel_2pc/controller_test.cpp index 4a35b6bf5..8513cfeeb 100644 --- a/tests/unit/sentinel_2pc/controller_test.cpp +++ b/tests/unit/sentinel_2pc/controller_test.cpp @@ -99,6 +99,10 @@ class sentinel_2pc_test : public ::testing::Test { std::unique_ptr m_ctl; cbdc::transaction::full_tx m_valid_tx{}; std::shared_ptr m_logger; + std::unique_ptr + m_secp{secp256k1_context_create(SECP256K1_CONTEXT_SIGN + | SECP256K1_CONTEXT_VERIFY), + &secp256k1_context_destroy}; }; TEST_F(sentinel_2pc_test, test_init) { @@ -170,17 +174,14 @@ TEST_F(sentinel_2pc_test, digest_valid_transaction_network) { TEST_F(sentinel_2pc_test, tx_validation_test) { ASSERT_TRUE(m_ctl->init()); auto ctx = cbdc::transaction::compact_tx(m_valid_tx); - auto secp = std::unique_ptr{ - secp256k1_context_create(SECP256K1_CONTEXT_SIGN - | SECP256K1_CONTEXT_VERIFY), - &secp256k1_context_destroy}; auto res = m_ctl->validate_transaction(m_valid_tx, [&](auto validation_res) { ASSERT_TRUE(validation_res.has_value()); - ASSERT_TRUE(ctx.verify(secp.get(), validation_res.value())); + ASSERT_TRUE(ctx.verify(m_secp.get(), validation_res.value())); }); ASSERT_TRUE(res); + // ensures the validation callback has completed before we go out-of-scope + m_ctl->stop(); } TEST_F(sentinel_2pc_test, bad_coordinator_endpoint) {