Skip to content

Commit

Permalink
feat: enable periodic shard auditing
Browse files Browse the repository at this point in the history
This commit includes the reconciliation of the supporting tools with
the proof integration and represents the point at which you can start
running the system with periodic auditing enabled.

It was largely written by James Lovejoy for the storing-values version
of this code and was ported by the author.

Co-authored-by: James Lovejoy <[email protected]>
Signed-off-by: Sam Stuewe <[email protected]>
  • Loading branch information
HalosGhost and metalicjames committed May 30, 2024
1 parent 825d271 commit 9ecd34a
Show file tree
Hide file tree
Showing 12 changed files with 421 additions and 9 deletions.
5 changes: 3 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ find_library(NURAFT_LIBRARY nuraft REQUIRED)
find_library(GTEST_LIBRARY gtest REQUIRED)
find_library(GTEST_MAIN_LIBRARY gtest_main REQUIRED)
find_package(benchmark REQUIRED)

find_library(LUA_LIBRARY lua REQUIRED)
find_library(KECCAK_LIBRARY keccak REQUIRED)
find_library(EVMC_INSTRUCTIONS_LIBRARY evmc-instructions REQUIRED)
Expand All @@ -59,6 +60,7 @@ endif()

if(CMAKE_BUILD_TYPE STREQUAL "Debug")
add_link_options(--coverage)
add_compile_options(-Og -g -ggdb)
endif()

if(CMAKE_BUILD_TYPE STREQUAL "Profiling")
Expand Down Expand Up @@ -91,5 +93,4 @@ endif()
add_subdirectory(src)
add_subdirectory(tests)
add_subdirectory(benchmarks)
add_subdirectory(tools/bench)
add_subdirectory(tools/shard-seeder)
add_subdirectory(tools)
38 changes: 38 additions & 0 deletions src/uhs/atomizer/shard/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ namespace cbdc::shard {
t.join();
}
}

if(m_audit_thread.joinable()) {
m_audit_thread.join();
}
}

auto controller::init() -> bool {
Expand All @@ -49,6 +53,13 @@ namespace cbdc::shard {
return false;
}

m_audit_log.open(m_opts.m_shard_audit_logs[m_shard_id],
std::ios::app | std::ios::out);
if(!m_audit_log.good()) {
m_logger->error("Failed to open audit log");
return false;
}

if(!m_archiver_client.init()) {
m_logger->warn("Failed to connect to archiver");
}
Expand Down Expand Up @@ -139,6 +150,7 @@ namespace cbdc::shard {
const auto past_blk = m_archiver_client.get_block(i);
if(past_blk) {
m_shard.digest_block(past_blk.value());
audit();
} else {
m_logger->info("Waiting for archiver sync");
const auto wait_time = std::chrono::milliseconds(10);
Expand All @@ -148,6 +160,7 @@ namespace cbdc::shard {
}
}
}
audit();

m_logger->info("Digested block", blk.m_height);
return std::nullopt;
Expand Down Expand Up @@ -206,4 +219,29 @@ namespace cbdc::shard {
std::visit(res_handler, res);
}
}

void controller::audit() {
auto height = m_shard.best_block_height();
if(m_opts.m_shard_audit_interval > 0
&& height % m_opts.m_shard_audit_interval != 0) {
return;
}

auto snp = m_shard.get_snapshot();
if(m_audit_thread.joinable()) {
m_audit_thread.join();
}
m_audit_thread = std::thread([this, s = std::move(snp), height]() {
auto range_summaries = m_shard.audit(s);
auto buf = cbdc::buffer();
buf.extend(sizeof(commitment_t));
for(const auto& [bucket, summary] : range_summaries) {
buf.clear();
buf.append(summary.data(), summary.size());
m_audit_log << height << " " << static_cast<int>(bucket) << " "
<< buf.to_hex() << std::endl;
}
m_logger->info("Audit completed for", height);
});
}
}
4 changes: 4 additions & 0 deletions src/uhs/atomizer/shard/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@ namespace cbdc::shard {
blocking_queue<network::message_t> m_request_queue;
std::vector<std::thread> m_handler_threads;

std::ofstream m_audit_log;
std::thread m_audit_thread;

auto server_handler(cbdc::network::message_t&& pkt)
-> std::optional<cbdc::buffer>;
auto atomizer_handler(cbdc::network::message_t&& pkt)
-> std::optional<cbdc::buffer>;
void request_consumer();
void audit();
};
}

Expand Down
55 changes: 55 additions & 0 deletions src/uhs/atomizer/shard/shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,61 @@ namespace cbdc::shard {
m_snp = std::move(snp);
}

auto shard::audit(const std::shared_ptr<const leveldb::Snapshot>& snp)
-> std::unordered_map<unsigned char, commitment_t> {
std::unordered_map<unsigned char, std::vector<commitment_t>> comms{};
auto opts = leveldb::ReadOptions();
opts.snapshot = snp.get();
auto it = std::shared_ptr<leveldb::Iterator>(m_db->NewIterator(opts));
it->SeekToFirst();
// Skip best block height key
it->Next();
for(; it->Valid(); it->Next()) {
auto key = it->key();
auto val = it->value();

static constexpr auto comm_size
= sizeof(transaction::compact_output::m_value_commitment);
size_t rng_size{};
static constexpr auto sz_size = sizeof(size_t);

transaction::compact_output outp{};
hash_t id{};
std::memcpy(id.data(), key.data(), key.size());
std::memcpy(outp.m_value_commitment.data(), val.data(), comm_size);
val.remove_prefix(comm_size);
std::memcpy(&rng_size, val.data(), sz_size);
val.remove_prefix(sz_size);
outp.m_range.reserve(rng_size);
outp.m_range.assign(rng_size, 0);
std::memcpy(outp.m_range.data(), val.data(), rng_size);
val.remove_prefix(rng_size);
std::memcpy(outp.m_provenance.data(),
val.data(),
outp.m_provenance.size());

if(id != transaction::calculate_uhs_id(outp)) {
continue;
}
auto bucket = id[0];
if(comms.find(bucket) == comms.end()) {
std::vector<commitment_t> commits{};
commits.reserve(1);
comms.emplace(bucket, std::move(commits));
}
comms[bucket].emplace_back(outp.m_value_commitment);
}

std::unordered_map<unsigned char, commitment_t> summaries{};
for(auto& [k, v] : comms) {
auto summary = sum_commitments(m_secp.get(), v);
if(summary.has_value()) {
summaries[k] = summary.value();
}
}

return summaries;
}

auto shard::get_snapshot() -> std::shared_ptr<const leveldb::Snapshot> {
auto snp = std::shared_ptr<const leveldb::Snapshot>(
Expand Down
57 changes: 57 additions & 0 deletions src/uhs/twophase/locking_shard/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ namespace cbdc::locking_shard {
+ "_" + std::to_string(m_shard_id)
: "") {}

controller::~controller() {
m_running = false;
if(m_audit_thread.joinable()) {
m_audit_thread.join();
}
}

auto controller::init() -> bool {
if(!m_logger) {
std::cerr
Expand All @@ -38,6 +45,13 @@ namespace cbdc::locking_shard {
return false;
}

m_audit_log.open(m_opts.m_shard_audit_logs[m_shard_id],
std::ios::app | std::ios::out);
if(!m_audit_log.good()) {
m_logger->error("Failed to open audit log");
return false;
}

auto params = nuraft::raft_params();
params.election_timeout_lower_bound_
= static_cast<int>(m_opts.m_election_timeout_lower);
Expand All @@ -62,6 +76,10 @@ namespace cbdc::locking_shard {

m_shard = m_state_machine->get_shard_instance();

m_audit_thread = std::thread([this]() {
audit();
});

if(m_shard_id > (m_opts.m_locking_shard_raft_endpoints.size() - 1)) {
m_logger->error("The shard ID is out of range "
"of the m_locking_shard_raft_endpoints vector.");
Expand Down Expand Up @@ -131,4 +149,43 @@ namespace cbdc::locking_shard {
}
return nuraft::cb_func::ReturnCode::Ok;
}

void controller::audit() {
while(m_running) {
constexpr auto audit_wait_interval = std::chrono::seconds(1);
std::this_thread::sleep_for(audit_wait_interval);
if(!m_running) {
break;
}
auto highest_epoch = m_shard->highest_epoch();
if(highest_epoch - m_last_audit_epoch
> m_opts.m_shard_audit_interval) {
auto audit_epoch = highest_epoch;
if(m_opts.m_shard_audit_interval > 0) {
audit_epoch
= (highest_epoch - m_opts.m_shard_audit_interval)
- (highest_epoch % m_opts.m_shard_audit_interval);
}
if(audit_epoch > highest_epoch
|| audit_epoch <= m_last_audit_epoch) {
continue;
}

m_logger->info("Running Audit for", audit_epoch);
auto maybe_commit = m_shard->get_summary(audit_epoch);
if(!maybe_commit.has_value()) {
m_logger->error("Error running audit at epoch",
audit_epoch);
} else {
m_audit_log << audit_epoch << " " << to_string(maybe_commit.value())
<< std::endl;
m_logger->info("Audit completed for", audit_epoch);

m_last_audit_epoch = audit_epoch;

m_shard->prune(audit_epoch);
}
}
}
}
}
9 changes: 8 additions & 1 deletion src/uhs/twophase/locking_shard/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace cbdc::locking_shard {
size_t node_id,
config::options opts,
std::shared_ptr<logging::log> logger);
~controller() = default;
~controller();

controller() = delete;
controller(const controller&) = delete;
Expand All @@ -48,6 +48,8 @@ namespace cbdc::locking_shard {
nuraft::cb_func::Param* param)
-> nuraft::cb_func::ReturnCode;

void audit();

config::options m_opts;
std::shared_ptr<logging::log> m_logger;
size_t m_shard_id;
Expand All @@ -59,6 +61,11 @@ namespace cbdc::locking_shard {
std::shared_ptr<raft::node> m_raft_serv;
std::unique_ptr<rpc::status_server> m_status_server;
std::unique_ptr<cbdc::rpc::tcp_server<raft::rpc::server>> m_server;

std::atomic_bool m_running{true};
uint64_t m_last_audit_epoch{};
std::ofstream m_audit_log;
std::thread m_audit_thread;
};
}

Expand Down
Loading

0 comments on commit 9ecd34a

Please sign in to comment.