Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: logs for btc-notify #55

Merged
merged 2 commits into from
Mar 23, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 52 additions & 19 deletions crates/btc-notify/src/client.rs
Original file line number Diff line number Diff line change
@@ -10,8 +10,9 @@ use bitcoincore_zmq::Message;
use futures::StreamExt;
use tokio::{
sync::{mpsc, Mutex},
task::JoinHandle,
task::{self, JoinHandle},
};
use tracing::{error, info, trace};

pub use crate::{
config::BtcZmqConfig,
@@ -62,6 +63,7 @@ impl BtcZmqClient {
///
/// It takes a [`BtcZmqConfig`] and uses that information to connect to `bitcoind`.
pub fn connect(cfg: BtcZmqConfig) -> Result<Self, Box<dyn Error>> {
trace!(?cfg, "subscribing to bitcoind");
let state_machine = Arc::new(Mutex::new(BtcZmqSM::init(cfg.bury_depth)));

let sockets = cfg
@@ -81,37 +83,63 @@ impl BtcZmqClient {
let tx_subs = Arc::new(Mutex::new(Vec::<TxSubscriptionDetails>::new()));
let tx_subs_thread = tx_subs.clone();
let state_machine_thread = state_machine.clone();
let thread_handle = Arc::new(tokio::task::spawn(async move {
let thread_handle = Arc::new(task::spawn(async move {
loop {
// This loop has no break condition. It is only aborted when the BtcZmqClient is
// dropped.
info!("listening for ZMQ events");
while let Some(res) = stream.next().await {
let mut sm = state_machine_thread.lock().await;
let diff = match res {
Ok(Message::HashBlock(_, _)) => Vec::new(),
Ok(Message::HashTx(_, _)) => Vec::new(),
Ok(Message::Block(block, _)) => {
// First send the block to the block subscribers.
block_subs_thread
.lock()
.await
.retain(|sub| sub.send(block.clone()).is_ok());

// Now we process the block to understand what the relevant transaction
// diff is.
sm.process_block(block)
Ok(msg) => {
let topic = msg.topic_str();
match msg {
Message::HashBlock(_, _) => {
trace!(%topic, "received event");
Vec::new()
}
Message::HashTx(_, _) => {
trace!(%topic, "received event");
Vec::new()
}
Message::Block(block, _) => {
trace!(%topic, "received event");
// First send the block to the block subscribers.
block_subs_thread
.lock()
.await
.retain(|sub| sub.send(block.clone()).is_ok());

// Now we process the block to understand what the relevant
// transaction diff is.
trace!(?block, "processing block");
info!(block_hash=%block.block_hash(), "processing block");
sm.process_block(block)
}
Message::Tx(tx, _) => {
trace!(%topic, "received event");
info!(txid=%tx.compute_txid(), "processing transaction");
sm.process_tx(tx)
}
Message::Sequence(seq, _) => {
trace!(%topic, "received event");
info!(%seq, "processing sequence");
sm.process_sequence(seq)
}
}
}
Ok(Message::Tx(tx, _)) => sm.process_tx(tx),
Ok(Message::Sequence(seq, _)) => sm.process_sequence(seq),
Err(e) => {
tracing::error!("ERROR: {e}");
error!(%e, "Error processing ZMQ message");
Vec::new()
}
};
// Now we send the diff to the relevant subscribers. If we ever encounter a send
// error, it means the receiver has been dropped.

tx_subs_thread.lock().await.retain(|sub| {
info!("applying filtering predicates on the btc chain state diff");
for msg in diff.iter().filter(|event| (sub.predicate)(&event.rawtx)) {
// Now we send the diff to the relevant subscribers.
// If we ever encounter a send error,
// it means the receiver has been dropped.
if sub.outbox.send(msg.clone()).is_err() {
sm.rm_filter(&sub.predicate);
return false;
@@ -123,6 +151,8 @@ impl BtcZmqClient {
}
}));

info!("subscribed to bitcoind");

Ok(BtcZmqClient {
block_subs,
tx_subs,
@@ -143,6 +173,7 @@ impl BtcZmqClient {
predicate: Arc::new(f),
outbox: send,
};
trace!(?details, "subscribing to transactions");

let mut subs = self.tx_subs.lock().await;
let mut sm = self.state_machine.lock().await;
@@ -159,6 +190,8 @@ impl BtcZmqClient {
pub async fn subscribe_blocks(&self) -> Subscription<Block> {
let (send, recv) = mpsc::unbounded_channel();

trace!("subscribing to blocks");

self.block_subs.lock().await.push(send);

Subscription::from_receiver(recv)
101 changes: 88 additions & 13 deletions crates/btc-notify/src/state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{
collections::{BTreeMap, VecDeque},
fmt,
sync::Arc,
};

use bitcoin::{Block, BlockHash, Transaction, Txid};
use bitcoincore_zmq::SequenceMessage;
use tracing::{debug, error, info, trace, warn};

use crate::event::{TxEvent, TxStatus};

@@ -54,8 +56,8 @@ pub(crate) struct BtcZmqSM {

// Coverage is disabled because when tests pass, most Debug impls will never be invoked.
#[cfg_attr(coverage_nightly, coverage(off))]
impl std::fmt::Debug for BtcZmqSM {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Debug for BtcZmqSM {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BtcZmqSM")
.field("bury_depth", &self.bury_depth)
.field(
@@ -86,13 +88,15 @@ impl PartialEq for BtcZmqSM {
&& self.unburied_blocks == other.unburied_blocks
}
}

impl Eq for BtcZmqSM {}

impl BtcZmqSM {
/// Initializes a [`BtcZmqSM`] with the supplied bury_depth. bury_depth is the number of blocks
/// that must be built on top of a given block before that block's transactions are
/// considered Buried.
pub(crate) fn init(bury_depth: usize) -> Self {
info!(%bury_depth, "initializing a ZMQ state machine");
BtcZmqSM {
bury_depth,
tx_filters: Vec::new(),
@@ -106,11 +110,13 @@ impl BtcZmqSM {
/// The state machine will track any transaction that matches the disjunction of predicates
/// added.
pub(crate) fn add_filter(&mut self, pred: TxPredicate) {
trace!("adding a predicate filter to a ZMQ state machine");
self.tx_filters.push(pred);
}

/// Takes a [`TxPredicate`] that was previously added via [`BtcZmqSM::add_filter`].
pub(crate) fn rm_filter(&mut self, pred: &TxPredicate) {
trace!("Removing a predicate filter from a ZMQ state machine");
if let Some(idx) = self.tx_filters.iter().position(|p| Arc::ptr_eq(p, pred)) {
self.tx_filters.swap_remove(idx);
}
@@ -119,9 +125,13 @@ impl BtcZmqSM {
/// One of the three primary state transition functions of the [`BtcZmqSM`], updating internal
/// state to reflect the the `rawblock` event.
pub(crate) fn process_block(&mut self, block: Block) -> Vec<TxEvent> {
trace!(?block, "started processing a Block");
info!(block_hash=%block.block_hash(), "started processing a Block");
match self.unburied_blocks.front() {
Some(tip) => {
if block.header.prev_blockhash == tip.block_hash() {
trace!(?block, prev_block=?tip, "block's previous block hash matches the tip");
info!(block_hash=%block.block_hash(), prev_block_hash=%tip.block_hash(), "block's previous block hash matches the tip");
self.unburied_blocks.push_front(block)
} else {
// This implies that we missed a block.
@@ -130,6 +140,7 @@ impl BtcZmqSM {
// stream processing would cause this to fire during a
// reorg. Race conditions MUST NOT cause this to fire. This MUST
// be fixed.
error!(block_hash=%block.block_hash(), prev_block_hash=%tip.block_hash(), "invariant violated: blocks received out of order");
panic!("invariant violated: blocks received out of order");
}
}
@@ -138,7 +149,14 @@ impl BtcZmqSM {
// history. Fixing this requires us to backfill the block history at startup
// using the RPC interface, or accepting the blocks newer than the
// bury depth as an argument to the constructor.
None => self.unburied_blocks.push_front(block),
None => {
trace!(
?block,
"block's previous block hash does not match the tip, possible reorg detected"
);
warn!(block_hash=%block.block_hash(), "block's previous block hash does not match the tip, possible reorg detected");
self.unburied_blocks.push_front(block);
}
}
let block = self.unburied_blocks.front().unwrap();

@@ -157,6 +175,7 @@ impl BtcZmqSM {
.iter()
.filter(|tx| self.tx_filters.iter().any(|f| f(tx)))
{
trace!(?matched_tx, "processing transactions in the block");
match self.tx_lifecycles.get_mut(&matched_tx.compute_txid()) {
// This is either the scenario where we haven't yet seen the transaction in any
// capacity, or where we have a MempoolAcceptance event for it but
@@ -166,6 +185,7 @@ impl BtcZmqSM {
None | Some(None) => {
let blockhash = block.block_hash();
let height = block.bip34_block_height().unwrap_or(0);
debug!(txid=%matched_tx.compute_txid(), blockhash=%blockhash, %height, "processing newly mined transaction");
let lifecycle = TxLifecycle {
raw: matched_tx.clone(),
block: Some((height, blockhash)),
@@ -176,11 +196,13 @@ impl BtcZmqSM {
rawtx: matched_tx.clone(),
status: TxStatus::Mined { blockhash, height },
});
info!(txid=%matched_tx.compute_txid(), blockhash=%blockhash, %height, "processed newly mined transaction");
}
// This means we have seen the rawtx event for this transaction before.
Some(Some(lifecycle)) => {
let blockhash = block.block_hash();
let height = block.bip34_block_height().unwrap_or(0);
debug!(txid=%matched_tx.compute_txid(), blockhash=%blockhash, %height, "validating already seen transaction");
if let Some((_, prior_blockhash)) = lifecycle.block {
// This means that it was previously mined. This is pretty weird and so we
// include some debug assertions to rule out
@@ -195,6 +217,8 @@ impl BtcZmqSM {
rawtx: matched_tx.clone(),
status: TxStatus::Mined { blockhash, height },
});

info!(txid=%matched_tx.compute_txid(), blockhash=%blockhash, %height, "processed already seen transaction");
}
}
}
@@ -207,14 +231,25 @@ impl BtcZmqSM {
// from the current lifecycle map.
let blockhash = newly_buried.block_hash();
let height = newly_buried.bip34_block_height().unwrap_or(0);

trace!(?newly_buried, %blockhash, %height, "handled all mined transactions, starting to process newly buried transactions");
info!(%blockhash, %height, "handled all mined transactions, starting to process newly buried transactions from block");

for buried_tx in newly_buried.txdata {
self.tx_lifecycles.remove(&buried_tx.compute_txid());
let buried_txid = buried_tx.compute_txid();

trace!(?buried_tx, %buried_txid, %blockhash, %height, "handled all mined transactions, starting to process buried transactions");
debug!(%buried_txid, %blockhash, %height, "handled all mined transactions, starting to process buried transactions");

self.tx_lifecycles.remove(&buried_txid);
if self.tx_filters.iter().any(|f| f(&buried_tx)) {
diff.push(TxEvent {
rawtx: buried_tx,
status: TxStatus::Buried { blockhash, height },
});
}

info!(%blockhash, %height, "processed all buried transactions");
}
}
}
@@ -225,15 +260,18 @@ impl BtcZmqSM {
/// One of the three primary state transition functions of the [`BtcZmqSM`], updating
/// internal state to reflect the `rawtx` event.
pub(crate) fn process_tx(&mut self, tx: Transaction) -> Vec<TxEvent> {
let txid = tx.compute_txid();
trace!(?tx, %txid, "filtering transactions");
if !self.tx_filters.iter().any(|f| f(&tx)) {
return Vec::new();
}

let txid = tx.compute_txid();
let lifecycle = self.tx_lifecycles.get_mut(&txid);
match lifecycle {
// In this case we have never seen any information on this transaction whatsoever.
None => {
trace!(?tx, %txid, ?lifecycle, "received new transaction");
debug!(%txid, ?lifecycle, "received new transaction");
let lifecycle = TxLifecycle {
raw: tx,
block: None,
@@ -250,6 +288,8 @@ impl BtcZmqSM {
// In this case we have seen a MempoolAcceptance event for this txid, but haven't seen
// the actual transaction data yet.
Some(None) => {
trace!(?tx, %txid, ?lifecycle, "received MempoolAcceptance event");
debug!(%txid, ?lifecycle, "received MempoolAcceptance");
let lifecycle = TxLifecycle {
raw: tx.clone(),
block: None,
@@ -267,7 +307,11 @@ impl BtcZmqSM {
}
// In this case we know everything we need to about this transaction, and this is
// probably a rawtx event that accompanies an upcoming new block event.
Some(Some(_)) => Vec::new(),
Some(Some(_)) => {
trace!(?tx, %txid, ?lifecycle, "received duplicate transaction event");
debug!(%txid, ?lifecycle, "received duplicate transaction event");
Vec::new()
}
}
}

@@ -276,12 +320,19 @@ impl BtcZmqSM {
pub(crate) fn process_sequence(&mut self, seq: SequenceMessage) -> Vec<TxEvent> {
let mut diff = Vec::new();
match seq {
SequenceMessage::BlockConnect { .. } => { /* NOOP */ }
SequenceMessage::BlockConnect { .. } => {
trace!(?diff, ?seq, "BlockConnect received");
debug!(?seq, "BlockConnect received");
/* NOOP */
}
SequenceMessage::BlockDisconnect { blockhash } => {
// If the block is disconnected we reset all transactions that currently have that
// blockhash as their containing block.
trace!(?diff, ?seq, "BlockDisconnect received");
debug!(?seq, %blockhash, "BlockDisconnect received");
if let Some(block) = self.unburied_blocks.front() {
if block.block_hash() == blockhash {
info!(%blockhash, "block disconnected, removing all included transactions");
self.unburied_blocks.pop_front();
} else {
// As far as I can tell, the block connect and disconnect events are done in
@@ -290,6 +341,7 @@ impl BtcZmqSM {
// chronological order. If we get a block disconnect event that doesn't
// match our current tip then this assumption has
// broken down.
error!(%blockhash, "invariant violated: out of order block disconnect");
panic!("invariant violated: out of order block disconnect");
}
}
@@ -306,16 +358,25 @@ impl BtcZmqSM {
rawtx: lifecycle.raw.clone(),
status: TxStatus::Unknown,
});
trace!(tx=?lifecycle.raw, txid=%lifecycle.raw.compute_txid(), "Block disconnected, removing transaction");
false
}
// Otherwise keep it.
_ => true,
_ => {
trace!(tx=?lifecycle.raw, txid=%lifecycle.raw.compute_txid(), "retaining transaction");
true
}
},
None => {
trace!(?v, "keeping transaction");
true
},
None => true,
}
});
}
SequenceMessage::MempoolAcceptance { txid, .. } => {
trace!(?diff, ?seq, "MempoolAcceptance received");
debug!(?seq, %txid, "MempoolAcceptance received");
match self.tx_lifecycles.get_mut(&txid) {
// In this case we are well aware of the full transaction data here
Some(Some(lifecycle)) => {
@@ -326,11 +387,15 @@ impl BtcZmqSM {
rawtx: lifecycle.raw.clone(),
status: TxStatus::Mempool,
}];
trace!(?diff, ?seq, %txid, "received MempoolAcceptance");
}
// This can happen because there is a race between the rawblock event
// delivery and the sequence event for a given transaction. If we
// encounter this, we will ignore the MempoolAcceptance.
Some(_) => { /* NOOP */ }
Some(_) => {
trace!(?diff, ?seq, %txid, "ignoring duplicate MempoolAcceptance");
/* NOOP */
}
}
}
// In this case we have received a MempoolAcceptance event for this txid, but
@@ -340,7 +405,9 @@ impl BtcZmqSM {
// think it is a material issue if we do. This is currently
// a panic to allow us to quickly discover
// if this assumption doesn't hold and what it means
Some(None) => panic!("invariant violated: duplicate mempool acceptance"),
Some(None) => {
panic!("invariant violated: MempoolAcceptance received before rawtx")
}
// In this case we know nothing of this transaction yet.
None => {
// We insert a placeholder because we expect the rawtx event to fill in the
@@ -352,6 +419,7 @@ impl BtcZmqSM {
// MempoolAcceptance event we are guaranteed to have a corresponding rawtx
// event. So this shouldn't cause a memory leak
// unless we miss ZMQ events entirely.
trace!(?diff, ?seq, %txid, "saw dangling transaction in mempool");
self.tx_lifecycles.insert(txid, None);
}
}
@@ -377,12 +445,19 @@ impl BtcZmqSM {
rawtx: lifecycle.raw,
status: TxStatus::Unknown,
}];
trace!(?diff, ?seq, %txid, "MempoolRemoval received for a transaction with some lifecycle");
}
// This will happen if we've only received a MempoolAcceptance event, the
// removal will cancel it fully.
Some(None) => { /* NOOP */ }
Some(None) => {
trace!(?diff, ?seq, %txid, "transaction removed from mempool");
/* NOOP */
}
// This happens if we've never heard anything about this transaction before.
None => { /* NOOP */ }
None => {
trace!(?diff, ?seq, %txid, "observed removal of a new transaction from mempool");
/* NOOP */
}
}
}
}