Skip to content

Commit

Permalink
[WIP] feat: enable basic telemetry for 2PC loadgen
Browse files Browse the repository at this point in the history
Signed-off-by: Sam Stuewe <[email protected]>
  • Loading branch information
HalosGhost committed Jul 31, 2024
1 parent 4c60e65 commit 19998dd
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 7 deletions.
79 changes: 79 additions & 0 deletions scripts/get-tx-lifecycle.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env bash

LOGDIR='.'
TXID=

IFS='' read -r -d '' usage <<'EOF'
Usage: %s [options]
Options:
-h, --help print this help and exit
-d, --log-dir=PATH PATH where the log-files are located
(defaults to '.' if not specified)
-t, --tx-id=ID ID of the transaction to trace
EOF

_help=
if [[ $# -eq 0 ]]; then
_help=1
fi

_err=0
while [[ $# -gt 0 ]]; do
optarg=
shft_cnt=1
if [[ "$1" =~ [=] ]]; then
optarg="${1#*=}"
elif [[ "$1" =~ ^-- && $# -gt 1 && ! "$2" =~ ^- ]]; then
optarg="$2"
shft_cnt=2
elif [[ "$1" =~ ^-[^-] && $# -gt 1 && ! "$2" =~ ^- ]]; then
optarg="$2"
shft_cnt=2
elif [[ "$1" =~ ^-[^-] ]]; then
optarg="${1/??/}"
fi

case "$1" in
-d*|--log-dir*) LOGDIR="${optarg:-$LOGDIR}"; shift "$shft_cnt";;
-t*|--tx-id*)
if [[ ! "$optarg" ]]; then
printf '`--tx-id` requires an argument\n'
_help=1; _err=1;
break
else
TXID="$optarg"
shift "$shft_cnt"
fi;;
-h|--help) _help=1; shift "$shft_cnt";;
*)
printf 'Unrecognized option: %s\n' "$1"
_help=1; _err=1;
break;;
esac
done

if [[ -n "$_help" ]]; then
printf "$usage" "$(basename $0)"
exit "$_err"
fi

pushd "$LOGDIR" &>/dev/null

all_lines=$(grep -n "$TXID.*at" *.log)
sorted_lines=$(printf "%s\n" "$all_lines" | awk -F ' at ' '{ print $2, $0 }' | sort | sed 's/^[^ ]* //')

printf "%s\n" "$sorted_lines"

fst=$(printf "%s\n" "$sorted_lines" | head -n 1)
lst=$(printf "%s\n" "$sorted_lines" | tail -n 1)
if [[ $(printf "%s\n" "$fst" | cut -d':' -f1) != $(printf "%s\n" "$lst" | cut -d':' -f1) ]]; then
printf 'First and last message for %s are in different logs!\n' "$TXID"
fi

start=$(printf "%s\n" "$fst" | awk -F ' at ' '{ print $2 }')
end=$(printf "%s\n" "$lst" | awk -F ' at ' '{ print $2 }')

printf "total elapsed time (assuming all clocks synchronized): %d ticks\n" "$(( end - start ))"

popd &>/dev/null
12 changes: 12 additions & 0 deletions src/uhs/twophase/coordinator/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,12 @@ namespace cbdc::coordinator {
return false;
}

auto recv_time = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
m_logger->trace("Received", to_string(tx.m_id), "at",
recv_time);

if(!transaction::validation::check_attestations(
tx,
m_opts.m_sentinel_public_keys,
Expand Down Expand Up @@ -719,6 +725,12 @@ namespace cbdc::coordinator {
m_current_txs->emplace(
tx.m_id,
std::make_pair(std::move(result_callback), idx));

auto add_time = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
m_logger->trace("Added", to_string(tx.m_id), "to batch at",
add_time);
return true;
}();
if(added) {
Expand Down
10 changes: 10 additions & 0 deletions src/uhs/twophase/locking_shard/locking_shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ namespace cbdc::locking_shard {

auto locking_shard::check_and_lock_tx(const tx& t) -> bool {
bool success{true};
auto recv_time = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
m_logger->trace("Began check-and-lock of",
to_string(t.m_tx.m_id), "at", recv_time);
if(!transaction::validation::check_attestations(
t.m_tx,
m_opts.m_sentinel_public_keys,
Expand All @@ -129,6 +134,11 @@ namespace cbdc::locking_shard {
}
}
}
auto resp_time = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
m_logger->trace("Completed check-and-lock of",
to_string(t.m_tx.m_id), "at", resp_time);
return success;
}

Expand Down
29 changes: 27 additions & 2 deletions src/uhs/twophase/sentinel_2pc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,14 @@ namespace cbdc::sentinel_2pc {
auto controller::execute_transaction(
transaction::full_tx tx,
execute_result_callback_type result_callback) -> bool {
auto tx_id = transaction::tx_id(tx);
auto recv_time = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
m_logger->trace("Began execution of", to_string(tx_id), "at",
recv_time);
const auto validation_err = transaction::validation::check_tx(tx);
if(validation_err.has_value()) {
auto tx_id = transaction::tx_id(tx);
m_logger->debug(
"Rejected (",
transaction::validation::to_string(validation_err.value()),
Expand All @@ -119,6 +124,13 @@ namespace cbdc::sentinel_2pc {
compact_tx.m_attestations.insert(attestation);
}

auto fwd_time = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();

m_logger->trace("Accumulating attestations for", to_string(tx_id), "at",
fwd_time);

gather_attestations(tx, std::move(result_callback), compact_tx, {});

return true;
Expand All @@ -143,13 +155,23 @@ namespace cbdc::sentinel_2pc {
auto controller::validate_transaction(
transaction::full_tx tx,
validate_result_callback_type result_callback) -> bool {
auto recv_time = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
m_logger->trace("Began validation of",
to_string(transaction::tx_id(tx)), "at", recv_time);
const auto validation_err = transaction::validation::check_tx(tx);
if(validation_err.has_value()) {
result_callback(std::nullopt);
return true;
}
auto compact_tx = cbdc::transaction::compact_tx(tx);
auto attestation = compact_tx.sign(m_secp.get(), m_privkey);
auto resp_time = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
m_logger->trace("Validated", to_string(transaction::tx_id(tx)), "at",
resp_time);
result_callback(std::move(attestation));
return true;
}
Expand Down Expand Up @@ -201,7 +223,10 @@ namespace cbdc::sentinel_2pc {
return;
}

m_logger->debug("Accepted", to_string(ctx.m_id));
auto acc_time = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
m_logger->debug("Accepted", to_string(ctx.m_id), "at", acc_time);

send_compact_tx(ctx, std::move(result_callback));
}
Expand Down
14 changes: 9 additions & 5 deletions tools/bench/twophase_gen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,17 @@ auto main(int argc, char** argv) -> int {
.time_since_epoch()
.count();

auto tx_id = cbdc::transaction::tx_id(tx.value());
logger->trace("Sent", cbdc::to_string(tx_id), "at", send_time);

auto res_cb
= [&, txn = tx.value(), send_time = send_time](
= [&, txn = tx.value(), send_time = send_time, tx_id = tx_id](
cbdc::sentinel::rpc::client::execute_result_type res) {
auto tx_id = cbdc::transaction::tx_id(txn);
auto now = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
logger->trace("Received response for",
cbdc::to_string(tx_id), "at", now);
if(!res.has_value()) {
logger->warn("Failure response from sentinel for",
cbdc::to_string(tx_id));
Expand All @@ -351,9 +358,6 @@ auto main(int argc, char** argv) -> int {
== cbdc::sentinel::tx_status::confirmed) {
wallet.confirm_transaction(txn);
second_conf_queue.push(tx_id);
auto now = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
const auto tx_delay = now - send_time;
latency_log << now << " " << tx_delay << "\n";
constexpr auto max_invalid = 100000;
Expand Down

0 comments on commit 19998dd

Please sign in to comment.