From 19998dd6569c4f146bfc3b486c2f3bafb2b03140 Mon Sep 17 00:00:00 2001 From: Sam Stuewe Date: Thu, 25 Jul 2024 10:26:26 -0500 Subject: [PATCH] [WIP] feat: enable basic telemetry for 2PC loadgen Signed-off-by: Sam Stuewe --- scripts/get-tx-lifecycle.sh | 79 +++++++++++++++++++ src/uhs/twophase/coordinator/controller.cpp | 12 +++ .../twophase/locking_shard/locking_shard.cpp | 10 +++ src/uhs/twophase/sentinel_2pc/controller.cpp | 29 ++++++- tools/bench/twophase_gen.cpp | 14 ++-- 5 files changed, 137 insertions(+), 7 deletions(-) create mode 100755 scripts/get-tx-lifecycle.sh diff --git a/scripts/get-tx-lifecycle.sh b/scripts/get-tx-lifecycle.sh new file mode 100755 index 000000000..6b0bb1f1d --- /dev/null +++ b/scripts/get-tx-lifecycle.sh @@ -0,0 +1,79 @@ +#!/usr/bin/env bash + +LOGDIR='.' +TXID= + +IFS='' read -r -d '' usage <<'EOF' +Usage: %s [options] + +Options: + -h, --help print this help and exit + -d, --log-dir=PATH PATH where the log-files are located + (defaults to '.' if not specified) + -t, --tx-id=ID ID of the transaction to trace +EOF + +_help= +if [[ $# -eq 0 ]]; then + _help=1 +fi + +_err=0 +while [[ $# -gt 0 ]]; do + optarg= + shft_cnt=1 + if [[ "$1" =~ [=] ]]; then + optarg="${1#*=}" + elif [[ "$1" =~ ^-- && $# -gt 1 && ! "$2" =~ ^- ]]; then + optarg="$2" + shft_cnt=2 + elif [[ "$1" =~ ^-[^-] && $# -gt 1 && ! "$2" =~ ^- ]]; then + optarg="$2" + shft_cnt=2 + elif [[ "$1" =~ ^-[^-] ]]; then + optarg="${1/??/}" + fi + + case "$1" in + -d*|--log-dir*) LOGDIR="${optarg:-$LOGDIR}"; shift "$shft_cnt";; + -t*|--tx-id*) + if [[ ! "$optarg" ]]; then + printf '`--tx-id` requires an argument\n' + _help=1; _err=1; + break + else + TXID="$optarg" + shift "$shft_cnt" + fi;; + -h|--help) _help=1; shift "$shft_cnt";; + *) + printf 'Unrecognized option: %s\n' "$1" + _help=1; _err=1; + break;; + esac +done + +if [[ -n "$_help" ]]; then + printf "$usage" "$(basename $0)" + exit "$_err" +fi + +pushd "$LOGDIR" &>/dev/null + +all_lines=$(grep -n "$TXID.*at" *.log) +sorted_lines=$(printf "%s\n" "$all_lines" | awk -F ' at ' '{ print $2, $0 }' | sort | sed 's/^[^ ]* //') + +printf "%s\n" "$sorted_lines" + +fst=$(printf "%s\n" "$sorted_lines" | head -n 1) +lst=$(printf "%s\n" "$sorted_lines" | tail -n 1) +if [[ $(printf "%s\n" "$fst" | cut -d':' -f1) != $(printf "%s\n" "$lst" | cut -d':' -f1) ]]; then + printf 'First and last message for %s are in different logs!\n' "$TXID" +fi + +start=$(printf "%s\n" "$fst" | awk -F ' at ' '{ print $2 }') +end=$(printf "%s\n" "$lst" | awk -F ' at ' '{ print $2 }') + +printf "total elapsed time (assuming all clocks synchronized): %d ticks\n" "$(( end - start ))" + +popd &>/dev/null diff --git a/src/uhs/twophase/coordinator/controller.cpp b/src/uhs/twophase/coordinator/controller.cpp index 5ddfc5d51..c1ef666b2 100644 --- a/src/uhs/twophase/coordinator/controller.cpp +++ b/src/uhs/twophase/coordinator/controller.cpp @@ -689,6 +689,12 @@ namespace cbdc::coordinator { return false; } + auto recv_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Received", to_string(tx.m_id), "at", + recv_time); + if(!transaction::validation::check_attestations( tx, m_opts.m_sentinel_public_keys, @@ -719,6 +725,12 @@ namespace cbdc::coordinator { m_current_txs->emplace( tx.m_id, std::make_pair(std::move(result_callback), idx)); + + auto add_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Added", to_string(tx.m_id), "to batch at", + add_time); return true; }(); if(added) { diff --git a/src/uhs/twophase/locking_shard/locking_shard.cpp b/src/uhs/twophase/locking_shard/locking_shard.cpp index 92a325f9c..92c33acf5 100644 --- a/src/uhs/twophase/locking_shard/locking_shard.cpp +++ b/src/uhs/twophase/locking_shard/locking_shard.cpp @@ -103,6 +103,11 @@ namespace cbdc::locking_shard { auto locking_shard::check_and_lock_tx(const tx& t) -> bool { bool success{true}; + auto recv_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Began check-and-lock of", + to_string(t.m_tx.m_id), "at", recv_time); if(!transaction::validation::check_attestations( t.m_tx, m_opts.m_sentinel_public_keys, @@ -129,6 +134,11 @@ namespace cbdc::locking_shard { } } } + auto resp_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Completed check-and-lock of", + to_string(t.m_tx.m_id), "at", resp_time); return success; } diff --git a/src/uhs/twophase/sentinel_2pc/controller.cpp b/src/uhs/twophase/sentinel_2pc/controller.cpp index 4c6fc797f..8543eb52d 100644 --- a/src/uhs/twophase/sentinel_2pc/controller.cpp +++ b/src/uhs/twophase/sentinel_2pc/controller.cpp @@ -98,9 +98,14 @@ namespace cbdc::sentinel_2pc { auto controller::execute_transaction( transaction::full_tx tx, execute_result_callback_type result_callback) -> bool { + auto tx_id = transaction::tx_id(tx); + auto recv_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Began execution of", to_string(tx_id), "at", + recv_time); const auto validation_err = transaction::validation::check_tx(tx); if(validation_err.has_value()) { - auto tx_id = transaction::tx_id(tx); m_logger->debug( "Rejected (", transaction::validation::to_string(validation_err.value()), @@ -119,6 +124,13 @@ namespace cbdc::sentinel_2pc { compact_tx.m_attestations.insert(attestation); } + auto fwd_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + + m_logger->trace("Accumulating attestations for", to_string(tx_id), "at", + fwd_time); + gather_attestations(tx, std::move(result_callback), compact_tx, {}); return true; @@ -143,6 +155,11 @@ namespace cbdc::sentinel_2pc { auto controller::validate_transaction( transaction::full_tx tx, validate_result_callback_type result_callback) -> bool { + auto recv_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Began validation of", + to_string(transaction::tx_id(tx)), "at", recv_time); const auto validation_err = transaction::validation::check_tx(tx); if(validation_err.has_value()) { result_callback(std::nullopt); @@ -150,6 +167,11 @@ namespace cbdc::sentinel_2pc { } auto compact_tx = cbdc::transaction::compact_tx(tx); auto attestation = compact_tx.sign(m_secp.get(), m_privkey); + auto resp_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->trace("Validated", to_string(transaction::tx_id(tx)), "at", + resp_time); result_callback(std::move(attestation)); return true; } @@ -201,7 +223,10 @@ namespace cbdc::sentinel_2pc { return; } - m_logger->debug("Accepted", to_string(ctx.m_id)); + auto acc_time = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + m_logger->debug("Accepted", to_string(ctx.m_id), "at", acc_time); send_compact_tx(ctx, std::move(result_callback)); } diff --git a/tools/bench/twophase_gen.cpp b/tools/bench/twophase_gen.cpp index 646ad4856..9a65b0195 100644 --- a/tools/bench/twophase_gen.cpp +++ b/tools/bench/twophase_gen.cpp @@ -336,10 +336,17 @@ auto main(int argc, char** argv) -> int { .time_since_epoch() .count(); + auto tx_id = cbdc::transaction::tx_id(tx.value()); + logger->trace("Sent", cbdc::to_string(tx_id), "at", send_time); + auto res_cb - = [&, txn = tx.value(), send_time = send_time]( + = [&, txn = tx.value(), send_time = send_time, tx_id = tx_id]( cbdc::sentinel::rpc::client::execute_result_type res) { - auto tx_id = cbdc::transaction::tx_id(txn); + auto now = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + logger->trace("Received response for", + cbdc::to_string(tx_id), "at", now); if(!res.has_value()) { logger->warn("Failure response from sentinel for", cbdc::to_string(tx_id)); @@ -351,9 +358,6 @@ auto main(int argc, char** argv) -> int { == cbdc::sentinel::tx_status::confirmed) { wallet.confirm_transaction(txn); second_conf_queue.push(tx_id); - auto now = std::chrono::high_resolution_clock::now() - .time_since_epoch() - .count(); const auto tx_delay = now - send_time; latency_log << now << " " << tx_delay << "\n"; constexpr auto max_invalid = 100000;