diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e23aa462..1d838b2ae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,7 +54,7 @@ if(DEFINED CMAKE_PREFIX_PATH) endif() if(CMAKE_BUILD_TYPE STREQUAL "Debug") - add_compile_options(-fprofile-arcs -ftest-coverage) + add_compile_options(-fprofile-arcs -ftest-coverage -Og -ggdb3) endif() if(CMAKE_BUILD_TYPE STREQUAL "Debug") diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index d616364f0..782cbb270 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(run_benchmarks ${GTEST_LIBRARY} shard watchtower locking_shard + raft transaction rpc network @@ -26,4 +27,5 @@ target_link_libraries(run_benchmarks ${GTEST_LIBRARY} crypto secp256k1 ${LEVELDB_LIBRARY} + ${NURAFT_LIBRARY} ${CMAKE_THREAD_LIBS_INIT}) diff --git a/src/uhs/twophase/coordinator/controller.cpp b/src/uhs/twophase/coordinator/controller.cpp index 5ddfc5d51..9f2ddad8f 100644 --- a/src/uhs/twophase/coordinator/controller.cpp +++ b/src/uhs/twophase/coordinator/controller.cpp @@ -106,7 +106,21 @@ namespace cbdc::coordinator { // Initialize NuRaft with the state machine we just created. Register // our callback function to notify us when we become a leader or // follower. - return m_raft_serv->init(m_raft_params); + if(!m_raft_serv->init(m_raft_params)) { + return false; + } + + auto n_threads = std::thread::hardware_concurrency(); + + m_logger->info("Creating", n_threads, "attestation check threads"); + + for(size_t i = 0; i < n_threads; i++) { + m_attestation_check_threads.emplace_back([&]() { + attestation_check_worker(); + }); + } + + return true; } auto controller::raft_callback(nuraft::cb_func::Type type, @@ -665,6 +679,13 @@ namespace cbdc::coordinator { if(m_start_thread.joinable()) { m_start_thread.join(); } + + m_attestation_check_queue.clear(); + for(auto& t : m_attestation_check_threads) { + if(t.joinable()) { + t.join(); + } + } } auto controller::sm_command_header::operator==( @@ -681,6 +702,27 @@ namespace cbdc::coordinator { rhs.m_discard_txs); } + void controller::attestation_check_worker() { + while(!m_quit) { + auto v = queued_attestation_check(); + if(m_attestation_check_queue.pop(v)) { + auto [tx, cb] = v; + auto valid = transaction::validation::check_attestations( + tx, + m_opts.m_sentinel_public_keys, + m_opts.m_attestation_threshold); + cb(tx, valid); + } + } + } + + auto controller::check_tx_attestation(const transaction::compact_tx& tx, + attestation_check_callback cb) + -> bool { + m_attestation_check_queue.push({tx, std::move(cb)}); + return true; + } + auto controller::execute_transaction(transaction::compact_tx tx, callback_type result_callback) -> bool { @@ -689,44 +731,45 @@ namespace cbdc::coordinator { return false; } - if(!transaction::validation::check_attestations( - tx, - m_opts.m_sentinel_public_keys, - m_opts.m_attestation_threshold)) { - m_logger->warn("Received invalid compact transaction", - to_string(tx.m_id)); - return false; - } + return check_tx_attestation( + tx, + [&, + res_cb = std::move(result_callback)](transaction::compact_tx tx2, + bool result) { + if(!result) { + m_logger->warn("Received invalid compact transaction", + to_string(tx.m_id)); + res_cb(false); + return; + } + auto added = [&]() { + // Wait until there's space in the current batch + std::unique_lock l(m_batch_mut); + m_batch_cv.wait(l, [&]() { + return m_current_txs->size() < m_batch_size + || !m_running; + }); + if(!m_running) { + return false; + } - auto added = [&]() { - // Wait until there's space in the current batch - std::unique_lock l(m_batch_mut); - m_batch_cv.wait(l, [&]() { - return m_current_txs->size() < m_batch_size || !m_running; + // Make sure the TX is not already in the current batch + if(m_current_txs->find(tx2.m_id) != m_current_txs->end()) { + return false; + } + // Add the tx to the current dtx batch and record its index + auto idx = m_current_batch->add_tx(tx2); + // Map the index of the tx to the transaction ID and + // sentinel ID + m_current_txs->emplace(tx2.m_id, + std::make_pair(res_cb, idx)); + return true; + }(); + if(added) { + // If this was a new TX, notify the executor thread there's + // work to do + m_batch_cv.notify_one(); + } }); - if(!m_running) { - return false; - } - - // Make sure the TX is not already in the current batch - if(m_current_txs->find(tx.m_id) != m_current_txs->end()) { - return false; - } - // Add the tx to the current dtx batch and record its index - auto idx = m_current_batch->add_tx(tx); - // Map the index of the tx to the transaction ID and sentinel - // ID - m_current_txs->emplace( - tx.m_id, - std::make_pair(std::move(result_callback), idx)); - return true; - }(); - if(added) { - // If this was a new TX, notify the executor thread there's work to - // do - m_batch_cv.notify_one(); - } - - return added; } } diff --git a/src/uhs/twophase/coordinator/controller.hpp b/src/uhs/twophase/coordinator/controller.hpp index b9ca62009..f7279a283 100644 --- a/src/uhs/twophase/coordinator/controller.hpp +++ b/src/uhs/twophase/coordinator/controller.hpp @@ -11,6 +11,7 @@ #include "server.hpp" #include "state_machine.hpp" #include "uhs/twophase/locking_shard/locking_shard.hpp" +#include "util/common/blocking_queue.hpp" #include "util/common/buffer.hpp" #include "util/common/random_source.hpp" #include "util/network/connection_manager.hpp" @@ -136,6 +137,11 @@ namespace cbdc::coordinator { -> bool override; private: + using attestation_check_callback + = std::function; + using queued_attestation_check + = std::pair; + size_t m_node_id; size_t m_coordinator_id; cbdc::config::options m_opts; @@ -164,6 +170,8 @@ namespace cbdc::coordinator { std::vector, std::atomic_bool>> m_exec_threads; std::shared_mutex m_exec_mut; + blocking_queue m_attestation_check_queue{}; + std::vector m_attestation_check_threads{}; std::thread m_start_thread; bool m_start_flag{false}; @@ -206,6 +214,10 @@ namespace cbdc::coordinator { void schedule_exec(std::function&& f); void join_execs(); + + auto check_tx_attestation(const transaction::compact_tx& tx, + attestation_check_callback cb) -> bool; + void attestation_check_worker(); }; } diff --git a/src/uhs/twophase/locking_shard/controller.cpp b/src/uhs/twophase/locking_shard/controller.cpp index 8b961f78d..4f6eaa37a 100644 --- a/src/uhs/twophase/locking_shard/controller.cpp +++ b/src/uhs/twophase/locking_shard/controller.cpp @@ -8,6 +8,7 @@ #include "format.hpp" #include "state_machine.hpp" #include "status_client.hpp" +#include "uhs/transaction/validation.hpp" #include "util/rpc/tcp_server.hpp" #include "util/serialization/format.hpp" @@ -16,7 +17,7 @@ namespace cbdc::locking_shard { controller::controller(size_t shard_id, size_t node_id, - config::options opts, + cbdc::config::options opts, std::shared_ptr logger) : m_opts(std::move(opts)), m_logger(std::move(logger)), @@ -111,11 +112,31 @@ namespace cbdc::locking_shard { return true; } + controller::~controller() { + m_running = false; + m_validation_queue.clear(); + for(auto& t : m_validation_threads) { + if(t.joinable()) { + t.join(); + } + } + m_validation_threads.clear(); + m_server.reset(); + } + auto controller::raft_callback(nuraft::cb_func::Type type, nuraft::cb_func::Param* /* param */) -> nuraft::cb_func::ReturnCode { if(type == nuraft::cb_func::Type::BecomeFollower) { m_logger->warn("Became follower, stopping listener"); + + m_running = false; + for(auto& t : m_validation_threads) { + if(t.joinable()) { + t.join(); + } + } + m_validation_threads.clear(); m_server.reset(); return nuraft::cb_func::ReturnCode::Ok; } @@ -123,11 +144,83 @@ namespace cbdc::locking_shard { m_logger->warn("Became leader, starting listener"); m_server = std::make_unique( m_opts.m_locking_shard_endpoints[m_shard_id][m_node_id]); - m_server->register_raft_node(m_raft_serv); + m_server->register_raft_node( + m_raft_serv, + [&, this](cbdc::buffer buf, + cbdc::raft::rpc::validation_callback cb) { + return enqueue_validation(std::move(buf), std::move(cb)); + }); + + auto n_threads = std::thread::hardware_concurrency(); + for(size_t i = 0; i < n_threads; i++) { + m_validation_threads.emplace_back([&, this]() { + validation_worker(); + }); + } + if(!m_server->init()) { m_logger->fatal("Couldn't start message handler server"); } } return nuraft::cb_func::ReturnCode::Ok; } + + void controller::validation_worker() { + while(m_running) { + auto v = validation_request(); + if(m_validation_queue.pop(v)) { + auto [req, cb] = v; + validate_request(std::move(req), cb); + } + } + } + + auto + controller::enqueue_validation(cbdc::buffer request, + cbdc::raft::rpc::validation_callback cb) + -> bool { + m_validation_queue.push({std::move(request), std::move(cb)}); + return true; + } + + auto controller::validate_request( + cbdc::buffer request, + const cbdc::raft::rpc::validation_callback& cb) -> bool { + auto maybe_req + = cbdc::from_buffer>(request); + auto valid = true; + if(maybe_req) { + valid = std::visit( + overloaded{ + [&](rpc::lock_params&& params) -> bool { + auto result = true; + for(auto&& t : params) { + if(!transaction::validation::check_attestations( + t.m_tx, + m_opts.m_sentinel_public_keys, + m_opts.m_attestation_threshold)) { + m_logger->warn( + "Received invalid compact transaction", + to_string(t.m_tx.m_id)); + result = false; + break; + } + } + + return result; + }, + [&](rpc::apply_params&&) -> bool { + return true; + }, + [&](rpc::discard_params&&) -> bool { + return true; + }}, + std::move(maybe_req->m_payload.m_params)); + } else { + valid = false; + } + + cb(std::move(request), valid); + return true; + } } diff --git a/src/uhs/twophase/locking_shard/controller.hpp b/src/uhs/twophase/locking_shard/controller.hpp index c23850a30..28a5faef3 100644 --- a/src/uhs/twophase/locking_shard/controller.hpp +++ b/src/uhs/twophase/locking_shard/controller.hpp @@ -10,6 +10,7 @@ #include "locking_shard.hpp" #include "state_machine.hpp" #include "status_server.hpp" +#include "util/common/blocking_queue.hpp" #include "util/raft/node.hpp" #include "util/raft/rpc_server.hpp" #include "util/rpc/tcp_server.hpp" @@ -25,9 +26,10 @@ namespace cbdc::locking_shard { /// \param logger log to use for output. controller(size_t shard_id, size_t node_id, - config::options opts, + cbdc::config::options opts, std::shared_ptr logger); - ~controller() = default; + + ~controller(); controller() = delete; controller(const controller&) = delete; @@ -46,12 +48,25 @@ namespace cbdc::locking_shard { auto raft_callback(nuraft::cb_func::Type type, nuraft::cb_func::Param* param) -> nuraft::cb_func::ReturnCode; + auto validate_request(cbdc::buffer request, + const cbdc::raft::rpc::validation_callback& cb) + -> bool; + + auto enqueue_validation(cbdc::buffer request, + cbdc::raft::rpc::validation_callback cb) + -> bool; + void validation_worker(); config::options m_opts; std::shared_ptr m_logger; size_t m_shard_id; size_t m_node_id; std::string m_preseed_dir; + std::vector m_validation_threads; + using validation_request + = std::pair; + blocking_queue m_validation_queue; + std::atomic m_running{true}; std::shared_ptr m_state_machine; std::shared_ptr m_shard; diff --git a/src/uhs/twophase/locking_shard/locking_shard.cpp b/src/uhs/twophase/locking_shard/locking_shard.cpp index 92a325f9c..8612e48e1 100644 --- a/src/uhs/twophase/locking_shard/locking_shard.cpp +++ b/src/uhs/twophase/locking_shard/locking_shard.cpp @@ -103,21 +103,11 @@ namespace cbdc::locking_shard { auto locking_shard::check_and_lock_tx(const tx& t) -> bool { bool success{true}; - if(!transaction::validation::check_attestations( - t.m_tx, - m_opts.m_sentinel_public_keys, - m_opts.m_attestation_threshold)) { - m_logger->warn("Received invalid compact transaction", - to_string(t.m_tx.m_id)); - success = false; - } - if(success) { - for(const auto& uhs_id : t.m_tx.m_inputs) { - if(hash_in_shard_range(uhs_id) - && m_uhs.find(uhs_id) == m_uhs.end()) { - success = false; - break; - } + for(const auto& uhs_id : t.m_tx.m_inputs) { + if(hash_in_shard_range(uhs_id) + && m_uhs.find(uhs_id) == m_uhs.end()) { + success = false; + break; } } if(success) { diff --git a/src/uhs/twophase/sentinel_2pc/controller.cpp b/src/uhs/twophase/sentinel_2pc/controller.cpp index 4c6fc797f..51360b164 100644 --- a/src/uhs/twophase/sentinel_2pc/controller.cpp +++ b/src/uhs/twophase/sentinel_2pc/controller.cpp @@ -88,6 +88,22 @@ namespace cbdc::sentinel_2pc { return false; } + auto n_threads = std::thread::hardware_concurrency() / 2; + if(n_threads < 1) { + n_threads = 1; + } + for(size_t i = 0; i < n_threads; i++) { + m_validation_threads.emplace_back([&]() { + validation_worker(); + }); + } + + for(size_t i = 0; i < n_threads; i++) { + m_attestation_threads.emplace_back([&]() { + attestation_worker(); + }); + } + m_rpc_server = std::make_unique( this, std::move(rpc_server)); @@ -95,35 +111,70 @@ namespace cbdc::sentinel_2pc { return true; } - auto controller::execute_transaction( - transaction::full_tx tx, - execute_result_callback_type result_callback) -> bool { - const auto validation_err = transaction::validation::check_tx(tx); - if(validation_err.has_value()) { - auto tx_id = transaction::tx_id(tx); - m_logger->debug( - "Rejected (", - transaction::validation::to_string(validation_err.value()), - ")", - to_string(tx_id)); - result_callback(cbdc::sentinel::execute_response{ - cbdc::sentinel::tx_status::static_invalid, - validation_err}); - return true; + controller::~controller() { + stop(); + } + + void controller::validation_worker() { + while(m_running) { + auto v = queued_validation(); + if(m_validation_queue.pop(v)) { + auto [tx, cb] = v; + cb(tx, transaction::validation::check_tx(tx)); + } } + } - auto compact_tx = cbdc::transaction::compact_tx(tx); + auto controller::validate_tx(const transaction::full_tx& tx, + validation_callback cb) -> bool { + m_validation_queue.push({tx, std::move(cb)}); + return true; + } - if(m_opts.m_attestation_threshold > 0) { - auto attestation = compact_tx.sign(m_secp.get(), m_privkey); - compact_tx.m_attestations.insert(attestation); + void controller::attestation_worker() { + while(m_running) { + auto v = queued_attestation(); + if(m_attestation_queue.pop(v)) { + auto [tx, cb] = v; + auto compact_tx = cbdc::transaction::compact_tx(tx); + cb(tx, compact_tx.sign(m_secp.get(), m_privkey)); + } } + } - gather_attestations(tx, std::move(result_callback), compact_tx, {}); - + auto controller::attest_tx(const transaction::full_tx& tx, + attestation_callback cb) -> bool { + m_attestation_queue.push({tx, std::move(cb)}); return true; } + auto controller::execute_transaction( + transaction::full_tx tx, + execute_result_callback_type result_callback) -> bool { + return controller::validate_tx( + tx, + [&, result_callback]( + const transaction::full_tx& tx2, + std::optional err) { + auto tx_id = cbdc::transaction::tx_id(tx2); + if(err.has_value()) { + m_logger->debug( + "Rejected (", + transaction::validation::to_string(err.value()), + ")", + to_string(tx_id)); + result_callback(cbdc::sentinel::execute_response{ + cbdc::sentinel::tx_status::static_invalid, + err}); + return; + } + + auto compact_tx = cbdc::transaction::compact_tx(tx2); + gather_attestations(tx2, result_callback, compact_tx, {}); + return; + }); + } + void controller::result_handler(std::optional res, const execute_result_callback_type& res_cb) { @@ -143,15 +194,23 @@ namespace cbdc::sentinel_2pc { auto controller::validate_transaction( transaction::full_tx tx, validate_result_callback_type result_callback) -> bool { - const auto validation_err = transaction::validation::check_tx(tx); - if(validation_err.has_value()) { - result_callback(std::nullopt); - return true; - } - auto compact_tx = cbdc::transaction::compact_tx(tx); - auto attestation = compact_tx.sign(m_secp.get(), m_privkey); - result_callback(std::move(attestation)); - return true; + return controller::validate_tx( + tx, + [&, result_callback]( + const transaction::full_tx& tx2, + std::optional err) { + if(err.has_value()) { + result_callback(std::nullopt); + return; + } + controller::attest_tx( + tx2, + [&, result_callback]( + const transaction::full_tx& /* tx3 */, + std::optional res) { + result_callback(std::move(res)); + }); + }); } void controller::validate_result_handler( @@ -173,14 +232,51 @@ namespace cbdc::sentinel_2pc { std::move(requested)); } + void controller::stop() { + m_running = false; + m_rpc_server.reset(); + + m_validation_queue.clear(); + m_attestation_queue.clear(); + + for(auto& t : m_validation_threads) { + if(t.joinable()) { + t.join(); + } + } + m_validation_threads.clear(); + + for(auto& t : m_attestation_threads) { + if(t.joinable()) { + t.join(); + } + } + m_attestation_threads.clear(); + } + void controller::gather_attestations( const transaction::full_tx& tx, execute_result_callback_type result_callback, const transaction::compact_tx& ctx, std::unordered_set requested) { if(ctx.m_attestations.size() < m_opts.m_attestation_threshold) { + if(ctx.m_attestations.empty()) { + // Self-attest first + controller::attest_tx( + tx, + [&, ctx, result_callback](const transaction::full_tx& tx2, + validate_result res) { + validate_result_handler(res, + tx2, + result_callback, + ctx, + {}); + }); + + return; + } auto success = false; - while(!success) { + while(!success && m_running) { auto sentinel_id = m_dist(m_rand); if(requested.find(sentinel_id) != requested.end()) { continue; @@ -209,14 +305,16 @@ namespace cbdc::sentinel_2pc { void controller::send_compact_tx(const transaction::compact_tx& ctx, execute_result_callback_type result_callback) { - auto cb = - [&, res_cb = std::move(result_callback)](std::optional res) { - result_handler(res, res_cb); - }; + auto cb = [&, ctx, res_cb = std::move(result_callback)]( + std::optional res) { + result_handler(res, res_cb); + }; - // TODO: add a "retry" error response to offload sentinels from this + // TODO: add a "retry" error response to offload sentinels from + // this // infinite retry responsibility. - while(!m_coordinator_client.execute_transaction(ctx, cb)) { + while(!m_coordinator_client.execute_transaction(ctx, cb) + && m_running) { // TODO: the network currently doesn't provide a callback for // reconnection events so we have to sleep here to // prevent a needless spin. Instead, add such a callback diff --git a/src/uhs/twophase/sentinel_2pc/controller.hpp b/src/uhs/twophase/sentinel_2pc/controller.hpp index b5485ebf2..890158a61 100644 --- a/src/uhs/twophase/sentinel_2pc/controller.hpp +++ b/src/uhs/twophase/sentinel_2pc/controller.hpp @@ -13,6 +13,7 @@ #include "uhs/sentinel/format.hpp" #include "uhs/transaction/messages.hpp" #include "uhs/twophase/coordinator/client.hpp" +#include "util/common/blocking_queue.hpp" #include "util/common/config.hpp" #include "util/common/hashmap.hpp" #include "util/network/connection_manager.hpp" @@ -37,7 +38,7 @@ namespace cbdc::sentinel_2pc { const config::options& opts, std::shared_ptr logger); - ~controller() override = default; + ~controller() override; /// Initializes the controller. Connects to the shard coordinator /// network and launches a server thread for external clients. @@ -66,7 +67,22 @@ namespace cbdc::sentinel_2pc { validate_result_callback_type result_callback) -> bool override; + /// Cleanly stop the controller + void stop(); + private: + using validation_callback = std::function)>; + using queued_validation + = std::pair; + + using attestation_callback = std::function)>; + using queued_attestation + = std::pair; + static void result_handler(std::optional res, const execute_result_callback_type& res_cb); @@ -85,10 +101,24 @@ namespace cbdc::sentinel_2pc { void send_compact_tx(const transaction::compact_tx& ctx, execute_result_callback_type result_callback); + auto validate_tx(const transaction::full_tx& tx, + validation_callback cb) -> bool; + void validation_worker(); + + auto attest_tx(const transaction::full_tx& tx, attestation_callback cb) + -> bool; + void attestation_worker(); + uint32_t m_sentinel_id; cbdc::config::options m_opts; std::shared_ptr m_logger; + blocking_queue m_validation_queue{}; + std::vector m_validation_threads{}; + + blocking_queue m_attestation_queue{}; + std::vector m_attestation_threads{}; + std::unique_ptr m_rpc_server; std::unique_ptr m_dist{}; privkey_t m_privkey{}; + + std::atomic m_running{true}; }; } diff --git a/src/uhs/twophase/sentinel_2pc/sentineld_2pc.cpp b/src/uhs/twophase/sentinel_2pc/sentineld_2pc.cpp index d562d9326..c5248914e 100644 --- a/src/uhs/twophase/sentinel_2pc/sentineld_2pc.cpp +++ b/src/uhs/twophase/sentinel_2pc/sentineld_2pc.cpp @@ -63,6 +63,8 @@ auto main(int argc, char** argv) -> int { logger->info("Shutting down..."); + ctl.stop(); + return 0; } // LCOV_EXCL_STOP diff --git a/src/uhs/twophase/sentinel_2pc/server.cpp b/src/uhs/twophase/sentinel_2pc/server.cpp index d00276b21..ad2736508 100644 --- a/src/uhs/twophase/sentinel_2pc/server.cpp +++ b/src/uhs/twophase/sentinel_2pc/server.cpp @@ -11,22 +11,46 @@ namespace cbdc::sentinel::rpc { std::unique_ptr> srv) : m_impl(impl), m_srv(std::move(srv)) { + m_processing_thread = std::thread([this]() { + process(); + }); m_srv->register_handler_callback( [&](const request& req, async_interface::result_callback_type callback) { - auto res = std::visit( - overloaded{[&](execute_request e_req) { - return m_impl->execute_transaction( - std::move(e_req), - callback); - }, - [&](validate_request v_req) { - return m_impl->validate_transaction( - std::move(v_req), - callback); - }}, - req); - return res; + auto req_item = request_queue_t{req, std::move(callback)}; + m_request_queue.push(req_item); + return true; }); } + auto operator<(const request_queue_t& a, const request_queue_t& b) + -> bool { + // Prioritize validate requests over execute requests + return (std::holds_alternative(a.m_req) + && std::holds_alternative(b.m_req)); + } + async_server::~async_server() { + m_running = false; + m_request_queue.clear(); + if(m_processing_thread.joinable()) { + m_processing_thread.join(); + } + } + void async_server::process() { + auto q = request_queue_t(); + while(m_running) { + if(m_request_queue.pop(q)) { + std::visit(overloaded{[&](execute_request e_req) { + return m_impl->execute_transaction( + std::move(e_req), + q.m_cb); + }, + [&](validate_request v_req) { + return m_impl->validate_transaction( + std::move(v_req), + q.m_cb); + }}, + q.m_req); + } + } + } } diff --git a/src/uhs/twophase/sentinel_2pc/server.hpp b/src/uhs/twophase/sentinel_2pc/server.hpp index 618aa24f0..5efad9ca9 100644 --- a/src/uhs/twophase/sentinel_2pc/server.hpp +++ b/src/uhs/twophase/sentinel_2pc/server.hpp @@ -8,10 +8,17 @@ #include "uhs/sentinel/async_interface.hpp" #include "uhs/transaction/messages.hpp" +#include "util/common/blocking_queue.hpp" #include "util/rpc/async_server.hpp" #include "util/rpc/format.hpp" namespace cbdc::sentinel::rpc { + struct request_queue_t { + request m_req; + async_interface::result_callback_type m_cb; + }; + + auto operator<(const request_queue_t& a, const request_queue_t& b) -> bool; /// Asynchronous RPC server for a sentinel. class async_server { public: @@ -25,9 +32,22 @@ namespace cbdc::sentinel::rpc { // implementation std::unique_ptr> srv); + ~async_server(); + async_server(async_server&&) noexcept = delete; + auto operator=(async_server&&) noexcept -> async_server& = delete; + async_server(const async_server&) = delete; + auto operator=(const async_server&) -> async_server& = delete; + private: + void process(); + async_interface* m_impl; std::unique_ptr> m_srv; + + blocking_priority_queue> + m_request_queue{}; + std::thread m_processing_thread; + std::atomic m_running = true; }; } diff --git a/src/util/raft/rpc_server.hpp b/src/util/raft/rpc_server.hpp index 8a5bc8c12..70420168f 100644 --- a/src/util/raft/rpc_server.hpp +++ b/src/util/raft/rpc_server.hpp @@ -10,6 +10,10 @@ #include "util/rpc/async_server.hpp" namespace cbdc::raft::rpc { + using validation_callback = std::function; + using validate_function_type + = std::function; + /// Generic RPC server for raft nodes for which the replicated state /// machine handles the request processing logic. Replicates /// requests to the cluster which executes them via its state machine. Once @@ -22,7 +26,28 @@ namespace cbdc::raft::rpc { /// \param impl pointer to the raft node. /// \see cbdc::rpc::server void register_raft_node(std::shared_ptr impl) { + register_raft_node(std::move(impl), std::nullopt); + } + + /// Registers the raft node whose state machine handles RPC requests + /// for this server. + /// \param impl pointer to the raft node. + /// \param validate optional method to validate requests before + /// replicating them to the raft cluster + /// \see cbdc::rpc::server + void + register_raft_node(std::shared_ptr impl, + std::optional validate) { m_impl = std::move(impl); + if(validate.has_value()) { + m_validate_func = std::move(validate.value()); + } else { + m_validate_func + = [&](buffer b, const validation_callback& cb) { + cb(std::move(b), true); + return true; + }; + } cbdc::rpc::raw_async_server::register_handler_callback( [&](buffer req, response_callback_type resp_cb) { return request_handler(std::move(req), std::move(resp_cb)); @@ -33,6 +58,7 @@ namespace cbdc::raft::rpc { private: std::shared_ptr m_impl; + validate_function_type m_validate_func; using response_callback_type = typename cbdc::rpc::raw_async_server::response_callback_type; @@ -44,35 +70,43 @@ namespace cbdc::raft::rpc { return false; } - // TODO: make network and sockets generic over the buffer type so - // these copy operations for the request and response to get - // over the nuraft/cbdc boundary are not needed. - auto new_log = nuraft::buffer::alloc(request_buf.size()); - nuraft::buffer_serializer bs(new_log); - bs.put_raw(request_buf.data(), request_buf.size()); - - auto success = m_impl->replicate( - new_log, - [&, resp_cb = std::move(response_callback), req_buf = new_log]( - result_type& r, - nuraft::ptr& err) { - if(err) { - resp_cb(std::nullopt); + return m_validate_func( + std::move(request_buf), + [&, res_cb = std::move(response_callback)](buffer buf2, + bool valid) { + if(!valid) { + res_cb(std::nullopt); return; } - const auto res = r.get(); - if(!res) { - resp_cb(std::nullopt); - return; - } + auto new_log = nuraft::buffer::alloc(buf2.size()); + nuraft::buffer_serializer bs(new_log); + bs.put_raw(buf2.data(), buf2.size()); - auto resp_pkt = cbdc::buffer(); - resp_pkt.append(res->data_begin(), res->size()); - resp_cb(std::move(resp_pkt)); - }); + auto success = m_impl->replicate( + new_log, + [&, resp_cb = res_cb, req_buf = new_log]( + result_type& r, + nuraft::ptr& err) { + if(err) { + resp_cb(std::nullopt); + return; + } - return success; + const auto res = r.get(); + if(!res) { + resp_cb(std::nullopt); + return; + } + + auto resp_pkt = cbdc::buffer(); + resp_pkt.append(res->data_begin(), res->size()); + resp_cb(std::move(resp_pkt)); + }); + if(!success) { + res_cb(std::nullopt); + } + }); } }; } diff --git a/tests/unit/sentinel_2pc/controller_test.cpp b/tests/unit/sentinel_2pc/controller_test.cpp index 4a35b6bf5..8513cfeeb 100644 --- a/tests/unit/sentinel_2pc/controller_test.cpp +++ b/tests/unit/sentinel_2pc/controller_test.cpp @@ -99,6 +99,10 @@ class sentinel_2pc_test : public ::testing::Test { std::unique_ptr m_ctl; cbdc::transaction::full_tx m_valid_tx{}; std::shared_ptr m_logger; + std::unique_ptr + m_secp{secp256k1_context_create(SECP256K1_CONTEXT_SIGN + | SECP256K1_CONTEXT_VERIFY), + &secp256k1_context_destroy}; }; TEST_F(sentinel_2pc_test, test_init) { @@ -170,17 +174,14 @@ TEST_F(sentinel_2pc_test, digest_valid_transaction_network) { TEST_F(sentinel_2pc_test, tx_validation_test) { ASSERT_TRUE(m_ctl->init()); auto ctx = cbdc::transaction::compact_tx(m_valid_tx); - auto secp = std::unique_ptr{ - secp256k1_context_create(SECP256K1_CONTEXT_SIGN - | SECP256K1_CONTEXT_VERIFY), - &secp256k1_context_destroy}; auto res = m_ctl->validate_transaction(m_valid_tx, [&](auto validation_res) { ASSERT_TRUE(validation_res.has_value()); - ASSERT_TRUE(ctx.verify(secp.get(), validation_res.value())); + ASSERT_TRUE(ctx.verify(m_secp.get(), validation_res.value())); }); ASSERT_TRUE(res); + // ensures the validation callback has completed before we go out-of-scope + m_ctl->stop(); } TEST_F(sentinel_2pc_test, bad_coordinator_endpoint) {