diff --git a/scripts/get-tx-lifecycle.sh b/scripts/get-tx-lifecycle.sh new file mode 100755 index 000000000..fb07da005 --- /dev/null +++ b/scripts/get-tx-lifecycle.sh @@ -0,0 +1,79 @@ +#!/usr/bin/env bash + +LOGDIR='.' +TXID= + +IFS='' read -r -d '' usage <<'EOF' +Usage: %s [options] + +Options: + -h, --help print this help and exit + -d, --log-dir=PATH PATH where the log-files are located + (defaults to '.' if not specified) + -t, --tx-id=ID ID of the transaction to trace +EOF + +_help= +if [[ $# -eq 0 ]]; then + _help=1 +fi + +_err=0 +while [[ $# -gt 0 ]]; do + optarg= + shft_cnt=1 + if [[ "$1" =~ [=] ]]; then + optarg="${1#*=}" + elif [[ "$1" =~ ^-- && $# -gt 1 && ! "$2" =~ ^- ]]; then + optarg="$2" + shft_cnt=2 + elif [[ "$1" =~ ^-[^-] && $# -gt 1 && ! "$2" =~ ^- ]]; then + optarg="$2" + shft_cnt=2 + elif [[ "$1" =~ ^-[^-] ]]; then + optarg="${1/??/}" + fi + + case "$1" in + -d*|--log-dir*) LOGDIR="${optarg:-$LOGDIR}"; shift "$shft_cnt";; + -t*|--tx-id*) + if [[ ! "$optarg" ]]; then + printf '`--tx-id` requires an argument\n' + _help=1; _err=1; + break + else + TXID="$optarg" + shift "$shft_cnt" + fi;; + -h|--help) _help=1; shift "$shft_cnt";; + *) + printf 'Unrecognized option: %s\n' "$1" + _help=1; _err=1; + break;; + esac +done + +if [[ -n "$_help" ]]; then + printf "$usage" "$(basename $0)" + exit "$_err" +fi + +pushd "$LOGDIR" &>/dev/null + +all_lines=$(grep -n "$TXID.*at" *.{log,txt}) +sorted_lines=$(printf "%s\n" "$all_lines" | awk -F ' at ' '{ print $2, $0 }' | sort | sed 's/^[^ ]* //') + +printf "%s\n" "$sorted_lines" + +fst=$(printf "%s\n" "$sorted_lines" | head -n 1) +lst=$(printf "%s\n" "$sorted_lines" | tail -n 1) +if [[ $(printf "%s\n" "$fst" | cut -d':' -f1) != $(printf "%s\n" "$lst" | cut -d':' -f1) ]]; then + printf 'First and last message for %s are in different logs!\n' "$TXID" +fi + +start=$(printf "%s\n" "$fst" | awk -F ' at ' '{ print $2 }') +end=$(printf "%s\n" "$lst" | awk -F ' at ' '{ print $2 }') + +printf "total elapsed time (assuming all clocks synchronized): %d ticks\n" "$(( end - start ))" + +popd &>/dev/null diff --git a/scripts/native-system-benchmark.sh b/scripts/native-system-benchmark.sh index 3c1ff23f0..22aa67d7a 100755 --- a/scripts/native-system-benchmark.sh +++ b/scripts/native-system-benchmark.sh @@ -14,7 +14,8 @@ TL=$(git rev-parse --show-toplevel) RT="${TL:-$CWD}" BLD="$RT"/build SEEDDIR="$BLD"/preseeds -TESTDIR="$BLD"/test-$(date +"%s") +TESTID=$(date +"%s") +TESTDIR="$BLD"/test-"$TESTID" IFS='' read -r -d '' usage <<'EOF' Usage: %s [options] @@ -24,6 +25,8 @@ Options: -c, --config=PATH use PATH as the test configuration -s, --samples=TIME run test for TIME seconds (defaults to 30) (or indefinitely if set to inf, or infinity) + -t, --test-dir=PATH use PATH for conducting the test instead of + generating one based on the current time Cleanup: --clean delete all previous test and preseed artifacts @@ -51,6 +54,11 @@ fi _err=0 while [[ $# -gt 0 ]]; do + if [[ "$1" = '--' ]]; then + shift 1 + break + fi + optarg= shft_cnt=1 if [[ "$1" =~ [=] ]]; then @@ -86,6 +94,13 @@ while [[ $# -gt 0 ]]; do *) DBG="$optarg";; esac shift "$shft_cnt";; + -t*|--test-dir*) + if [[ "${optarg:0:1}" == '/' ]]; then + TESTDIR="$optarg" + else + TESTDIR="$CWD/$optarg" + fi + shift "$shft_cnt";; -c*|--config*) if [[ "$optarg" = /* ]]; then ORIG_CFG="${optarg}" @@ -111,7 +126,7 @@ if [[ -n "$_help" ]]; then exit "$_err" fi -if [[ -z "$ORIG_CFG" ]]; then +if [[ -z "$ORIG_CFG" && "$TESTDIR" = "$BLD"/test-"$TESTID" ]]; then printf '%s\n' 'No config specified; exiting' exit 0 fi @@ -136,7 +151,20 @@ BEGIN { EOF CFG="$TESTDIR"/config -awk "$normalize" "$ORIG_CFG" > "$CFG" +if [[ -f "$CFG" ]]; then + printf 'WARNING: Running from a pre-existing test directory\n' + if grep -q 'seed' "$CFG"; then + # loadgens don't currently serialize their wallet state to-disk + # so effectively all funds are orphaned in this case; disabling + # seeding means the new loadgens will mint and be able to send + # new transactions + printf 'Disabling seeding in-favor of minting\n' + grep -v 'seed' "$CFG" > "$TESTDIR"/config."$TESTID" + CFG="$TESTDIR"/config."$TESTID" + fi +else + awk "$normalize" "$ORIG_CFG" > "$CFG" +fi twophase=$(grep -q '2pc=1' "$CFG" && printf '1\n' || printf '0\n') arch= @@ -151,7 +179,7 @@ on_int() { printf 'Interrupting all components\n' trap '' SIGINT # avoid interrupting ourself for i in $PIDS; do # intentionally unquoted - if [[ -n "RECORD" ]]; then + if [[ -n "$RECORD" ]]; then kill -SIGINT -- "-$i" else kill -SIGINT -- "$i" @@ -192,7 +220,7 @@ on_int() { printf 'Terminating any remaining processes\n' for i in $PIDS; do # intentionally unquoted - if [[ -n "RECORD" ]]; then + if [[ -n "$RECORD" ]]; then kill -SIGTERM -- "-$i" else kill -SIGTERM -- "$i" @@ -322,7 +350,7 @@ launch() { "$RT"/scripts/wait-for-it.sh -q -t 5 -h localhost -p "$ep" done printf 'Launched logical %s %d, replica %d [PID: %d]\n' "$1" "$id" "$node" "$PID" - if [[ -n "RECORD" ]]; then + if [[ -n "$RECORD" ]]; then PIDS="$PIDS $(getpgid $PID)" else PIDS="$PIDS $PID" @@ -335,7 +363,7 @@ launch() { "$RT"/scripts/wait-for-it.sh -q -t 5 -h localhost -p "$ep" done printf 'Launched %s %d [PID: %d]\n' "$1" "$id" "$PID" - if [[ -n "RECORD" ]]; then + if [[ -n "$RECORD" ]]; then PIDS="$PIDS $(getpgid $PID)" else PIDS="$PIDS $PID" diff --git a/src/uhs/twophase/coordinator/controller.cpp b/src/uhs/twophase/coordinator/controller.cpp index 5ddfc5d51..c1ef666b2 100644 --- a/src/uhs/twophase/coordinator/controller.cpp +++ b/src/uhs/twophase/coordinator/controller.cpp @@ -689,6 +689,12 @@ namespace cbdc::coordinator { return false; } + auto recv_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Received", to_string(tx.m_id), "at", + recv_time); + if(!transaction::validation::check_attestations( tx, m_opts.m_sentinel_public_keys, @@ -719,6 +725,12 @@ namespace cbdc::coordinator { m_current_txs->emplace( tx.m_id, std::make_pair(std::move(result_callback), idx)); + + auto add_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Added", to_string(tx.m_id), "to batch at", + add_time); return true; }(); if(added) { diff --git a/src/uhs/twophase/locking_shard/locking_shard.cpp b/src/uhs/twophase/locking_shard/locking_shard.cpp index 92a325f9c..92c33acf5 100644 --- a/src/uhs/twophase/locking_shard/locking_shard.cpp +++ b/src/uhs/twophase/locking_shard/locking_shard.cpp @@ -103,6 +103,11 @@ namespace cbdc::locking_shard { auto locking_shard::check_and_lock_tx(const tx& t) -> bool { bool success{true}; + auto recv_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Began check-and-lock of", + to_string(t.m_tx.m_id), "at", recv_time); if(!transaction::validation::check_attestations( t.m_tx, m_opts.m_sentinel_public_keys, @@ -129,6 +134,11 @@ namespace cbdc::locking_shard { } } } + auto resp_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Completed check-and-lock of", + to_string(t.m_tx.m_id), "at", resp_time); return success; } diff --git a/src/uhs/twophase/sentinel_2pc/controller.cpp b/src/uhs/twophase/sentinel_2pc/controller.cpp index 4c6fc797f..8543eb52d 100644 --- a/src/uhs/twophase/sentinel_2pc/controller.cpp +++ b/src/uhs/twophase/sentinel_2pc/controller.cpp @@ -98,9 +98,14 @@ namespace cbdc::sentinel_2pc { auto controller::execute_transaction( transaction::full_tx tx, execute_result_callback_type result_callback) -> bool { + auto tx_id = transaction::tx_id(tx); + auto recv_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Began execution of", to_string(tx_id), "at", + recv_time); const auto validation_err = transaction::validation::check_tx(tx); if(validation_err.has_value()) { - auto tx_id = transaction::tx_id(tx); m_logger->debug( "Rejected (", transaction::validation::to_string(validation_err.value()), @@ -119,6 +124,13 @@ namespace cbdc::sentinel_2pc { compact_tx.m_attestations.insert(attestation); } + auto fwd_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + + m_logger->trace("Accumulating attestations for", to_string(tx_id), "at", + fwd_time); + gather_attestations(tx, std::move(result_callback), compact_tx, {}); return true; @@ -143,6 +155,11 @@ namespace cbdc::sentinel_2pc { auto controller::validate_transaction( transaction::full_tx tx, validate_result_callback_type result_callback) -> bool { + auto recv_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Began validation of", + to_string(transaction::tx_id(tx)), "at", recv_time); const auto validation_err = transaction::validation::check_tx(tx); if(validation_err.has_value()) { result_callback(std::nullopt); @@ -150,6 +167,11 @@ namespace cbdc::sentinel_2pc { } auto compact_tx = cbdc::transaction::compact_tx(tx); auto attestation = compact_tx.sign(m_secp.get(), m_privkey); + auto resp_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Validated", to_string(transaction::tx_id(tx)), "at", + resp_time); result_callback(std::move(attestation)); return true; } @@ -201,7 +223,10 @@ namespace cbdc::sentinel_2pc { return; } - m_logger->debug("Accepted", to_string(ctx.m_id)); + auto acc_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->debug("Accepted", to_string(ctx.m_id), "at", acc_time); send_compact_tx(ctx, std::move(result_callback)); } diff --git a/src/util/common/config.cpp b/src/util/common/config.cpp index 02811f4d2..e11183bd8 100644 --- a/src/util/common/config.cpp +++ b/src/util/common/config.cpp @@ -227,6 +227,13 @@ namespace cbdc::config { return ss.str(); } + auto get_loadgen_loglevel_key(size_t loadgen_id) -> std::string { + std::stringstream ss; + ss << loadgen_prefix << loadgen_id << config_separator + << loglevel_postfix; + return ss.str(); + } + auto get_sentinel_private_key_key(size_t sentinel_id) -> std::string { auto ss = std::stringstream(); get_sentinel_key_prefix(ss, sentinel_id); @@ -614,6 +621,23 @@ namespace cbdc::config { opts.m_loadgen_count = cfg.get_ulong(loadgen_count_key).value_or(opts.m_loadgen_count); + opts.m_loadgen_tps_target = cfg.get_ulong(tps_target_key) + .value_or(opts.m_loadgen_tps_target); + opts.m_loadgen_tps_step_time + = cfg.get_decimal(tps_steptime_key) + .value_or(opts.m_loadgen_tps_step_time); + opts.m_loadgen_tps_step_size + = cfg.get_decimal(tps_stepsize_key) + .value_or(opts.m_loadgen_tps_step_size); + opts.m_loadgen_tps_initial = cfg.get_decimal(tps_initial_key) + .value_or(opts.m_loadgen_tps_initial); + for(size_t i{0}; i < opts.m_loadgen_count; ++i) { + const auto loadgen_loglevel_key = get_loadgen_loglevel_key(i); + const auto loadgen_loglevel + = cfg.get_loglevel(loadgen_loglevel_key) + .value_or(defaults::log_level); + opts.m_loadgen_loglevels.push_back(loadgen_loglevel); + } } auto read_options(const std::string& config_file) diff --git a/src/util/common/config.hpp b/src/util/common/config.hpp index 9ed00fedc..d971b0086 100644 --- a/src/util/common/config.hpp +++ b/src/util/common/config.hpp @@ -100,6 +100,11 @@ namespace cbdc::config { static constexpr auto output_count_key = "loadgen_sendtx_output_count"; static constexpr auto invalid_rate_key = "loadgen_invalid_tx_rate"; static constexpr auto fixed_tx_rate_key = "loadgen_fixed_tx_rate"; + static constexpr auto loadgen_prefix = "loadgen"; + static constexpr auto tps_target_key = "loadgen_tps_target"; + static constexpr auto tps_steptime_key = "loadgen_tps_step_time"; + static constexpr auto tps_stepsize_key = "loadgen_tps_step_percentage"; + static constexpr auto tps_initial_key = "loadgen_tps_step_start"; static constexpr auto archiver_count_key = "archiver_count"; static constexpr auto watchtower_count_key = "watchtower_count"; static constexpr auto watchtower_prefix = "watchtower"; @@ -250,6 +255,18 @@ namespace cbdc::config { /// Number of load generators over which to split pre-seeded UTXOs. size_t m_loadgen_count{0}; + /// List of loadgen log levels, ordered by loadgen ID. + std::vector m_loadgen_loglevels; + + /// Maximum Tx/s the loadgens should produce + size_t m_loadgen_tps_target{0}; + /// Percent of target to send at test-start + double m_loadgen_tps_initial{0}; + /// Percent (of target-initial) to increase on each step + double m_loadgen_tps_step_size{0}; + /// Time (fractional seconds) to wait before the next step up + double m_loadgen_tps_step_time{0}; + /// Private keys for sentinels. std::unordered_map m_sentinel_private_keys; diff --git a/tools/bench/twophase_gen.cpp b/tools/bench/twophase_gen.cpp index 0b86eb518..abe07036f 100644 --- a/tools/bench/twophase_gen.cpp +++ b/tools/bench/twophase_gen.cpp @@ -34,8 +34,14 @@ auto main(int argc, char** argv) -> int { auto cfg = std::get(cfg_or_err); auto gen_id = std::stoull(args[2]); - auto logger - = std::make_shared(cbdc::logging::log_level::info); + if(gen_id >= cfg.m_loadgen_count) { + std::cerr << "Attempted to run more loadgens than configured" + << std::endl; + return -1; + } + + auto logger = std::make_shared( + cfg.m_loadgen_loglevels[gen_id]); auto sha2_impl = SHA256AutoDetect(); logger->info("using sha2: ", sha2_impl); @@ -115,6 +121,26 @@ auto main(int argc, char** argv) -> int { logger->info("Mint confirmed"); } + size_t per_gen_send_limit = 0; + size_t per_gen_step_size = 0; + if(cfg.m_loadgen_tps_target != 0) { + per_gen_send_limit = static_cast( + cfg.m_loadgen_tps_initial + * static_cast(cfg.m_loadgen_tps_target)); + double per_gen_range = static_cast(cfg.m_loadgen_tps_target) + - static_cast(per_gen_send_limit); + // clamp step (s) such that 1 <= s <= (target - initial) + per_gen_step_size = std::max( + std::min(static_cast(per_gen_range + * cfg.m_loadgen_tps_step_size), + static_cast(per_gen_range)), + size_t{1}); + } + + if(cfg.m_loadgen_tps_step_time == 0) { + per_gen_send_limit = cfg.m_loadgen_tps_target; + } + static constexpr auto lookup_timeout = std::chrono::milliseconds(5000); auto status_client = cbdc::locking_shard::rpc::status_client( cfg.m_locking_shard_readonly_endpoints, @@ -161,7 +187,17 @@ auto main(int argc, char** argv) -> int { constexpr auto send_amt = 5; + uint64_t send_gap{}; uint64_t gen_avg{}; + auto ramp_secs = std::chrono::duration>( + cfg.m_loadgen_tps_step_time); + auto ramp_timer_full + = std::chrono::duration_cast(ramp_secs); + + auto ramp_timer = ramp_timer_full; + auto ramping = ramp_timer.count() != 0 + && per_gen_send_limit != cfg.m_loadgen_tps_target + && cfg.m_loadgen_tps_target != 0; auto gen_thread = std::thread([&]() { while(running) { // Determine if we should attempt to send a double-spending @@ -219,8 +255,52 @@ auto main(int argc, char** argv) -> int { gen_avg = static_cast( (static_cast(gen_t.count()) * average_factor) + (static_cast(gen_avg) * (1.0 - average_factor))); + if(ramping) { + if(gen_t >= ramp_timer) { + logger->debug( + "Ramp Timer Exhausted (gen_t). Resetting"); + ramp_timer = ramp_timer_full; + per_gen_send_limit + = std::min(cfg.m_loadgen_tps_target, + per_gen_send_limit + per_gen_step_size); + logger->debug("New Send Limit:", per_gen_send_limit); + if(per_gen_send_limit == cfg.m_loadgen_tps_target) { + ramping = false; + logger->info("Reached Target Throughput"); + } + } else { + ramp_timer -= gen_t; + } + auto total_send_time + = std::chrono::nanoseconds(gen_avg * per_gen_send_limit); + if(total_send_time < std::chrono::seconds(1) + && per_gen_send_limit != 0) { + send_gap + = (std::chrono::seconds(1) - total_send_time).count() + / per_gen_send_limit; + logger->trace("New send-gap:", send_gap); + } + } } else { std::this_thread::sleep_for(std::chrono::nanoseconds(gen_avg)); + if(ramping) { + auto avg = std::chrono::nanoseconds(gen_avg); + if(avg >= ramp_timer) { + logger->debug("Ramp Timer Exhausted (dbl-spend " + "gen_avg). Resetting"); + ramp_timer = ramp_timer_full; + per_gen_send_limit + = std::min(cfg.m_loadgen_tps_target, + per_gen_send_limit + per_gen_step_size); + logger->debug("New Send Limit:", per_gen_send_limit); + if(per_gen_send_limit == cfg.m_loadgen_tps_target) { + ramping = false; + logger->info("Reached Target Throughput"); + } + } else { + ramp_timer -= avg; + } + } } // We couldn't send a double-spend or a newly generated valid @@ -234,6 +314,23 @@ auto main(int argc, char** argv) -> int { // instead. static constexpr auto send_delay = std::chrono::seconds(1); std::this_thread::sleep_for(send_delay); + if(ramping) { + if(send_delay >= ramp_timer) { + logger->debug( + "Ramp Timer Exhausted (send_delay). Resetting"); + ramp_timer = ramp_timer_full; + per_gen_send_limit + = std::min(cfg.m_loadgen_tps_target, + per_gen_send_limit + per_gen_step_size); + logger->debug("New Send Limit:", per_gen_send_limit); + if(per_gen_send_limit == cfg.m_loadgen_tps_target) { + ramping = false; + logger->info("Reached Target Throughput"); + } + } else { + ramp_timer -= send_delay; + } + } continue; } @@ -241,10 +338,17 @@ auto main(int argc, char** argv) -> int { .time_since_epoch() .count(); + auto tx_id = cbdc::transaction::tx_id(tx.value()); + logger->trace("Sent", cbdc::to_string(tx_id), "at", send_time); + auto res_cb - = [&, txn = tx.value(), send_time = send_time]( + = [&, txn = tx.value(), send_time = send_time, tx_id = tx_id]( cbdc::sentinel::rpc::client::execute_result_type res) { - auto tx_id = cbdc::transaction::tx_id(txn); + auto now = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + logger->trace("Received response for", + cbdc::to_string(tx_id), "at", now); if(!res.has_value()) { logger->warn("Failure response from sentinel for", cbdc::to_string(tx_id)); @@ -256,9 +360,6 @@ auto main(int argc, char** argv) -> int { == cbdc::sentinel::tx_status::confirmed) { wallet.confirm_transaction(txn); second_conf_queue.push(tx_id); - auto now = std::chrono::high_resolution_clock::now() - .time_since_epoch() - .count(); const auto tx_delay = now - send_time; latency_log << now << " " << tx_delay << "\n"; constexpr auto max_invalid = 100000; @@ -280,6 +381,27 @@ auto main(int argc, char** argv) -> int { logger->error("Failure sending transaction to sentinel"); wallet.confirm_inputs(tx.value().m_inputs); } + + auto gap = std::chrono::nanoseconds(send_gap); + if(gap < std::chrono::seconds(1)) { + std::this_thread::sleep_for(gap); + if(ramping) { + if(gap >= ramp_timer) { + logger->debug("Ramp Timer Exhausted (gap). Resetting"); + ramp_timer = ramp_timer_full; + per_gen_send_limit + = std::min(cfg.m_loadgen_tps_target, + per_gen_send_limit + per_gen_step_size); + logger->debug("New Send Limit:", per_gen_send_limit); + if(per_gen_send_limit == cfg.m_loadgen_tps_target) { + ramping = false; + logger->info("Reached Target Throughput"); + } + } else { + ramp_timer -= gap; + } + } + } } });