diff --git a/src/util/common/config.cpp b/src/util/common/config.cpp index 02811f4d2..964d65e7e 100644 --- a/src/util/common/config.cpp +++ b/src/util/common/config.cpp @@ -227,6 +227,13 @@ namespace cbdc::config { return ss.str(); } + auto get_loadgen_loglevel_key(size_t loadgen_id) -> std::string { + std::stringstream ss; + ss << loadgen_prefix << loadgen_id << config_separator + << loglevel_postfix; + return ss.str(); + } + auto get_sentinel_private_key_key(size_t sentinel_id) -> std::string { auto ss = std::stringstream(); get_sentinel_key_prefix(ss, sentinel_id); @@ -614,6 +621,23 @@ namespace cbdc::config { opts.m_loadgen_count = cfg.get_ulong(loadgen_count_key).value_or(opts.m_loadgen_count); + opts.m_loadgen_tps_target = cfg.get_ulong(tps_target_key) + .value_or(opts.m_loadgen_tps_target); + opts.m_loadgen_tps_step_time + = cfg.get_ulong(tps_steptime_key) + .value_or(opts.m_loadgen_tps_step_time); + opts.m_loadgen_tps_step_size + = cfg.get_ulong(tps_stepsize_key) + .value_or(opts.m_loadgen_tps_step_size); + opts.m_loadgen_tps_initial = cfg.get_ulong(tps_initial_key) + .value_or(opts.m_loadgen_tps_initial); + for(size_t i{0}; i < opts.m_loadgen_count; ++i) { + const auto loadgen_loglevel_key = get_loadgen_loglevel_key(i); + const auto loadgen_loglevel + = cfg.get_loglevel(loadgen_loglevel_key) + .value_or(defaults::log_level); + opts.m_loadgen_loglevels.push_back(loadgen_loglevel); + } } auto read_options(const std::string& config_file) diff --git a/src/util/common/config.hpp b/src/util/common/config.hpp index 9ed00fedc..c2931483e 100644 --- a/src/util/common/config.hpp +++ b/src/util/common/config.hpp @@ -100,6 +100,11 @@ namespace cbdc::config { static constexpr auto output_count_key = "loadgen_sendtx_output_count"; static constexpr auto invalid_rate_key = "loadgen_invalid_tx_rate"; static constexpr auto fixed_tx_rate_key = "loadgen_fixed_tx_rate"; + static constexpr auto loadgen_prefix = "loadgen"; + static constexpr auto tps_target_key = "loadgen_tps_target"; + static constexpr auto tps_steptime_key = "loadgen_tps_step_time"; + static constexpr auto tps_stepsize_key = "loadgen_tps_step_percentage"; + static constexpr auto tps_initial_key = "loadgen_tps_step_start"; static constexpr auto archiver_count_key = "archiver_count"; static constexpr auto watchtower_count_key = "watchtower_count"; static constexpr auto watchtower_prefix = "watchtower"; @@ -250,6 +255,18 @@ namespace cbdc::config { /// Number of load generators over which to split pre-seeded UTXOs. size_t m_loadgen_count{0}; + /// List of loadgen log levels, ordered by loadgen ID. + std::vector m_loadgen_loglevels; + + /// Maximum Tx/s the loadgens should produce + size_t m_loadgen_tps_target{0}; + /// Initial Tx/s to send at test-start + size_t m_loadgen_tps_initial{0}; + /// Tx/s to increase on each step + size_t m_loadgen_tps_step_size{0}; + /// Time (in miliseconds) to wait before the next step up + size_t m_loadgen_tps_step_time{0}; + /// Private keys for sentinels. std::unordered_map m_sentinel_private_keys; diff --git a/tools/bench/twophase_gen.cpp b/tools/bench/twophase_gen.cpp index 0b86eb518..a60dad433 100644 --- a/tools/bench/twophase_gen.cpp +++ b/tools/bench/twophase_gen.cpp @@ -34,8 +34,14 @@ auto main(int argc, char** argv) -> int { auto cfg = std::get(cfg_or_err); auto gen_id = std::stoull(args[2]); - auto logger - = std::make_shared(cbdc::logging::log_level::info); + if(gen_id >= cfg.m_loadgen_count) { + std::cerr << "Attempted to run more loadgens than configured" + << std::endl; + return -1; + } + + auto logger = std::make_shared( + cfg.m_loadgen_loglevels[gen_id]); auto sha2_impl = SHA256AutoDetect(); logger->info("using sha2: ", sha2_impl); @@ -115,6 +121,25 @@ auto main(int argc, char** argv) -> int { logger->info("Mint confirmed"); } + size_t per_gen_send_limit = 0; + size_t per_gen_step_size = 0; + size_t per_gen_send_tgt = 0; + if(cfg.m_loadgen_tps_target != 0) { + per_gen_send_tgt = cfg.m_loadgen_tps_target / cfg.m_loadgen_count; + per_gen_send_limit = cfg.m_loadgen_tps_initial / cfg.m_loadgen_count; + size_t range = cfg.m_loadgen_tps_target - cfg.m_loadgen_tps_initial; + size_t per_gen_range = range / cfg.m_loadgen_count; + per_gen_step_size = std::max( + std::min(static_cast( + per_gen_range * (cfg.m_loadgen_tps_step_size * .01)), + per_gen_range), + size_t{1}); + } + + if(cfg.m_loadgen_tps_step_time == 0) { + per_gen_send_limit = per_gen_send_tgt; + } + static constexpr auto lookup_timeout = std::chrono::milliseconds(5000); auto status_client = cbdc::locking_shard::rpc::status_client( cfg.m_locking_shard_readonly_endpoints, @@ -161,7 +186,13 @@ auto main(int argc, char** argv) -> int { constexpr auto send_amt = 5; + uint64_t send_gap{}; uint64_t gen_avg{}; + auto ramp_timer_full + = std::chrono::nanoseconds(cfg.m_loadgen_tps_step_time * 1000000); + auto ramp_timer = ramp_timer_full; + auto ramping + = ramp_timer.count() != 0 && per_gen_send_limit != per_gen_send_tgt; auto gen_thread = std::thread([&]() { while(running) { // Determine if we should attempt to send a double-spending @@ -219,8 +250,51 @@ auto main(int argc, char** argv) -> int { gen_avg = static_cast( (static_cast(gen_t.count()) * average_factor) + (static_cast(gen_avg) * (1.0 - average_factor))); + if(ramping) { + if(gen_t >= ramp_timer) { + logger->debug( + "Ramp Timer Exhausted (gen_t). Resetting"); + ramp_timer = ramp_timer_full; + per_gen_send_limit + = std::min(per_gen_send_tgt, + per_gen_send_limit + per_gen_step_size); + logger->debug("New Send Limit:", per_gen_send_limit); + if(per_gen_send_limit == per_gen_send_tgt) { + ramping = false; + logger->info("Reached Target Throughput"); + } + } else { + ramp_timer -= gen_t; + } + } + auto total_send_time + = std::chrono::nanoseconds(gen_avg * per_gen_send_limit); + if(total_send_time < std::chrono::seconds(1)) { + send_gap + = (std::chrono::seconds(1) - total_send_time).count() + / per_gen_send_limit; + logger->trace("New send-gap:", send_gap); + } } else { std::this_thread::sleep_for(std::chrono::nanoseconds(gen_avg)); + if(ramping) { + auto avg = std::chrono::nanoseconds(gen_avg); + if(avg >= ramp_timer) { + logger->debug("Ramp Timer Exhausted (dbl-spend " + "gen_avg). Resetting"); + ramp_timer = ramp_timer_full; + per_gen_send_limit + = std::min(per_gen_send_tgt, + per_gen_send_limit + per_gen_step_size); + logger->debug("New Send Limit:", per_gen_send_limit); + if(per_gen_send_limit == per_gen_send_tgt) { + ramping = false; + logger->info("Reached Target Throughput"); + } + } else { + ramp_timer -= avg; + } + } } // We couldn't send a double-spend or a newly generated valid @@ -234,6 +308,23 @@ auto main(int argc, char** argv) -> int { // instead. static constexpr auto send_delay = std::chrono::seconds(1); std::this_thread::sleep_for(send_delay); + if(ramping) { + if(send_delay >= ramp_timer) { + logger->debug( + "Ramp Timer Exhausted (send_delay). Resetting"); + ramp_timer = ramp_timer_full; + per_gen_send_limit + = std::min(per_gen_send_tgt, + per_gen_send_limit + per_gen_step_size); + logger->debug("New Send Limit:", per_gen_send_limit); + if(per_gen_send_limit == per_gen_send_tgt) { + ramping = false; + logger->info("Reached Target Throughput"); + } + } else { + ramp_timer -= send_delay; + } + } continue; } @@ -280,6 +371,27 @@ auto main(int argc, char** argv) -> int { logger->error("Failure sending transaction to sentinel"); wallet.confirm_inputs(tx.value().m_inputs); } + + auto gap = std::chrono::nanoseconds(send_gap); + if(gap < std::chrono::seconds(1)) { + std::this_thread::sleep_for(gap); + if(ramping) { + if(gap >= ramp_timer) { + logger->debug("Ramp Timer Exhausted (gap). Resetting"); + ramp_timer = ramp_timer_full; + per_gen_send_limit + = std::min(per_gen_send_tgt, + per_gen_send_limit + per_gen_step_size); + logger->debug("New Send Limit:", per_gen_send_limit); + if(per_gen_send_limit == per_gen_send_tgt) { + ramping = false; + logger->info("Reached Target Throughput"); + } + } else { + ramp_timer -= gap; + } + } + } } });