Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pr/189 update #258

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ if(DEFINED CMAKE_PREFIX_PATH)
endif()

if(CMAKE_BUILD_TYPE STREQUAL "Debug")
add_compile_options(-fprofile-arcs -ftest-coverage)
add_compile_options(-fprofile-arcs -ftest-coverage -Og -ggdb3)
endif()

if(CMAKE_BUILD_TYPE STREQUAL "Debug")
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ target_link_libraries(run_benchmarks ${GTEST_LIBRARY}
shard
watchtower
locking_shard
raft
transaction
rpc
network
Expand All @@ -26,4 +27,5 @@ target_link_libraries(run_benchmarks ${GTEST_LIBRARY}
crypto
secp256k1
${LEVELDB_LIBRARY}
${NURAFT_LIBRARY}
${CMAKE_THREAD_LIBS_INIT})
119 changes: 81 additions & 38 deletions src/uhs/twophase/coordinator/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,21 @@ namespace cbdc::coordinator {
// Initialize NuRaft with the state machine we just created. Register
// our callback function to notify us when we become a leader or
// follower.
return m_raft_serv->init(m_raft_params);
if(!m_raft_serv->init(m_raft_params)) {
return false;
}

auto n_threads = std::thread::hardware_concurrency();

m_logger->info("Creating", n_threads, "attestation check threads");

for(size_t i = 0; i < n_threads; i++) {
m_attestation_check_threads.emplace_back([&]() {
attestation_check_worker();
});
}

return true;
}

auto controller::raft_callback(nuraft::cb_func::Type type,
Expand Down Expand Up @@ -665,6 +679,13 @@ namespace cbdc::coordinator {
if(m_start_thread.joinable()) {
m_start_thread.join();
}

m_attestation_check_queue.clear();
for(auto& t : m_attestation_check_threads) {
if(t.joinable()) {
t.join();
}
}
}

auto controller::sm_command_header::operator==(
Expand All @@ -681,6 +702,27 @@ namespace cbdc::coordinator {
rhs.m_discard_txs);
}

void controller::attestation_check_worker() {
while(!m_quit) {
auto v = queued_attestation_check();
if(m_attestation_check_queue.pop(v)) {
auto [tx, cb] = v;
auto valid = transaction::validation::check_attestations(
tx,
m_opts.m_sentinel_public_keys,
m_opts.m_attestation_threshold);
cb(tx, valid);
}
}
}

auto controller::check_tx_attestation(const transaction::compact_tx& tx,
attestation_check_callback cb)
-> bool {
m_attestation_check_queue.push({tx, std::move(cb)});
return true;
}

auto controller::execute_transaction(transaction::compact_tx tx,
callback_type result_callback)
-> bool {
Expand All @@ -689,44 +731,45 @@ namespace cbdc::coordinator {
return false;
}

if(!transaction::validation::check_attestations(
tx,
m_opts.m_sentinel_public_keys,
m_opts.m_attestation_threshold)) {
m_logger->warn("Received invalid compact transaction",
to_string(tx.m_id));
return false;
}
return check_tx_attestation(
tx,
[&,
res_cb = std::move(result_callback)](transaction::compact_tx tx2,
bool result) {
if(!result) {
m_logger->warn("Received invalid compact transaction",
to_string(tx.m_id));
res_cb(false);
return;
}
auto added = [&]() {
// Wait until there's space in the current batch
std::unique_lock<std::mutex> l(m_batch_mut);
m_batch_cv.wait(l, [&]() {
return m_current_txs->size() < m_batch_size
|| !m_running;
});
if(!m_running) {
return false;
}

auto added = [&]() {
// Wait until there's space in the current batch
std::unique_lock<std::mutex> l(m_batch_mut);
m_batch_cv.wait(l, [&]() {
return m_current_txs->size() < m_batch_size || !m_running;
// Make sure the TX is not already in the current batch
if(m_current_txs->find(tx2.m_id) != m_current_txs->end()) {
return false;
}
// Add the tx to the current dtx batch and record its index
auto idx = m_current_batch->add_tx(tx2);
// Map the index of the tx to the transaction ID and
// sentinel ID
m_current_txs->emplace(tx2.m_id,
std::make_pair(res_cb, idx));
return true;
}();
if(added) {
// If this was a new TX, notify the executor thread there's
// work to do
m_batch_cv.notify_one();
}
});
if(!m_running) {
return false;
}

// Make sure the TX is not already in the current batch
if(m_current_txs->find(tx.m_id) != m_current_txs->end()) {
return false;
}
// Add the tx to the current dtx batch and record its index
auto idx = m_current_batch->add_tx(tx);
// Map the index of the tx to the transaction ID and sentinel
// ID
m_current_txs->emplace(
tx.m_id,
std::make_pair(std::move(result_callback), idx));
return true;
}();
if(added) {
// If this was a new TX, notify the executor thread there's work to
// do
m_batch_cv.notify_one();
}

return added;
}
}
12 changes: 12 additions & 0 deletions src/uhs/twophase/coordinator/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "server.hpp"
#include "state_machine.hpp"
#include "uhs/twophase/locking_shard/locking_shard.hpp"
#include "util/common/blocking_queue.hpp"
#include "util/common/buffer.hpp"
#include "util/common/random_source.hpp"
#include "util/network/connection_manager.hpp"
Expand Down Expand Up @@ -136,6 +137,11 @@ namespace cbdc::coordinator {
-> bool override;

private:
using attestation_check_callback
= std::function<void(const transaction::compact_tx&, bool)>;
using queued_attestation_check
= std::pair<transaction::compact_tx, attestation_check_callback>;

size_t m_node_id;
size_t m_coordinator_id;
cbdc::config::options m_opts;
Expand Down Expand Up @@ -164,6 +170,8 @@ namespace cbdc::coordinator {
std::vector<std::pair<std::shared_ptr<std::thread>, std::atomic_bool>>
m_exec_threads;
std::shared_mutex m_exec_mut;
blocking_queue<queued_attestation_check> m_attestation_check_queue{};
std::vector<std::thread> m_attestation_check_threads{};

std::thread m_start_thread;
bool m_start_flag{false};
Expand Down Expand Up @@ -206,6 +214,10 @@ namespace cbdc::coordinator {
void schedule_exec(std::function<void(size_t)>&& f);

void join_execs();

auto check_tx_attestation(const transaction::compact_tx& tx,
attestation_check_callback cb) -> bool;
void attestation_check_worker();
};
}

Expand Down
97 changes: 95 additions & 2 deletions src/uhs/twophase/locking_shard/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "format.hpp"
#include "state_machine.hpp"
#include "status_client.hpp"
#include "uhs/transaction/validation.hpp"
#include "util/rpc/tcp_server.hpp"
#include "util/serialization/format.hpp"

Expand All @@ -16,7 +17,7 @@
namespace cbdc::locking_shard {
controller::controller(size_t shard_id,
size_t node_id,
config::options opts,
cbdc::config::options opts,
std::shared_ptr<logging::log> logger)
: m_opts(std::move(opts)),
m_logger(std::move(logger)),
Expand Down Expand Up @@ -111,23 +112,115 @@ namespace cbdc::locking_shard {
return true;
}

controller::~controller() {
m_running = false;
m_validation_queue.clear();
for(auto& t : m_validation_threads) {
if(t.joinable()) {
t.join();
}
}
m_validation_threads.clear();
m_server.reset();
}

auto controller::raft_callback(nuraft::cb_func::Type type,
nuraft::cb_func::Param* /* param */)
-> nuraft::cb_func::ReturnCode {
if(type == nuraft::cb_func::Type::BecomeFollower) {
m_logger->warn("Became follower, stopping listener");

m_running = false;
for(auto& t : m_validation_threads) {
if(t.joinable()) {
t.join();
}
}
m_validation_threads.clear();
m_server.reset();
return nuraft::cb_func::ReturnCode::Ok;
}
if(type == nuraft::cb_func::Type::BecomeLeader) {
m_logger->warn("Became leader, starting listener");
m_server = std::make_unique<decltype(m_server)::element_type>(
m_opts.m_locking_shard_endpoints[m_shard_id][m_node_id]);
m_server->register_raft_node(m_raft_serv);
m_server->register_raft_node(
m_raft_serv,
[&, this](cbdc::buffer buf,
cbdc::raft::rpc::validation_callback cb) {
return enqueue_validation(std::move(buf), std::move(cb));
});

auto n_threads = std::thread::hardware_concurrency();
for(size_t i = 0; i < n_threads; i++) {
m_validation_threads.emplace_back([&, this]() {
validation_worker();
});
}

if(!m_server->init()) {
m_logger->fatal("Couldn't start message handler server");
}
}
return nuraft::cb_func::ReturnCode::Ok;
}

void controller::validation_worker() {
while(m_running) {
auto v = validation_request();
if(m_validation_queue.pop(v)) {
auto [req, cb] = v;
validate_request(std::move(req), cb);
}
}
}

auto
controller::enqueue_validation(cbdc::buffer request,
cbdc::raft::rpc::validation_callback cb)
-> bool {
m_validation_queue.push({std::move(request), std::move(cb)});
return true;
}

auto controller::validate_request(
cbdc::buffer request,
const cbdc::raft::rpc::validation_callback& cb) -> bool {
auto maybe_req
= cbdc::from_buffer<cbdc::rpc::request<rpc::request>>(request);
auto valid = true;
if(maybe_req) {
valid = std::visit(
overloaded{
[&](rpc::lock_params&& params) -> bool {
auto result = true;
for(auto&& t : params) {
if(!transaction::validation::check_attestations(
t.m_tx,
m_opts.m_sentinel_public_keys,
m_opts.m_attestation_threshold)) {
m_logger->warn(
"Received invalid compact transaction",
to_string(t.m_tx.m_id));
result = false;
break;
}
}

return result;
},
[&](rpc::apply_params&&) -> bool {
return true;
},
[&](rpc::discard_params&&) -> bool {
return true;
}},
std::move(maybe_req->m_payload.m_params));
} else {
valid = false;
}

cb(std::move(request), valid);
return true;
}
}
19 changes: 17 additions & 2 deletions src/uhs/twophase/locking_shard/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "locking_shard.hpp"
#include "state_machine.hpp"
#include "status_server.hpp"
#include "util/common/blocking_queue.hpp"
#include "util/raft/node.hpp"
#include "util/raft/rpc_server.hpp"
#include "util/rpc/tcp_server.hpp"
Expand All @@ -25,9 +26,10 @@ namespace cbdc::locking_shard {
/// \param logger log to use for output.
controller(size_t shard_id,
size_t node_id,
config::options opts,
cbdc::config::options opts,
std::shared_ptr<logging::log> logger);
~controller() = default;

~controller();

controller() = delete;
controller(const controller&) = delete;
Expand All @@ -46,12 +48,25 @@ namespace cbdc::locking_shard {
auto raft_callback(nuraft::cb_func::Type type,
nuraft::cb_func::Param* param)
-> nuraft::cb_func::ReturnCode;
auto validate_request(cbdc::buffer request,
const cbdc::raft::rpc::validation_callback& cb)
-> bool;

auto enqueue_validation(cbdc::buffer request,
cbdc::raft::rpc::validation_callback cb)
-> bool;
void validation_worker();

config::options m_opts;
std::shared_ptr<logging::log> m_logger;
size_t m_shard_id;
size_t m_node_id;
std::string m_preseed_dir;
std::vector<std::thread> m_validation_threads;
using validation_request
= std::pair<cbdc::buffer, cbdc::raft::rpc::validation_callback>;
blocking_queue<validation_request> m_validation_queue;
std::atomic<bool> m_running{true};

std::shared_ptr<state_machine> m_state_machine;
std::shared_ptr<locking_shard> m_shard;
Expand Down
Loading
Loading