Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(chain)!: Add time_of_sync to SyncRequest and FullScanRequest WIP #1566

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions crates/chain/src/spk_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ impl<I> SyncRequestBuilder<I> {
self
}

/// Set the `time_of_sync` for unconfirmed transactions.
pub fn time_of_sync(mut self, time_of_sync: u64) -> Self {
self.inner.time_of_sync = Some(time_of_sync);
self
}

/// Set the closure that will inspect every sync item visited.
pub fn inspect<F>(mut self, inspect: F) -> Self
where
Expand Down Expand Up @@ -239,6 +245,7 @@ pub struct SyncRequest<I = ()> {
txids_consumed: usize,
outpoints: VecDeque<OutPoint>,
outpoints_consumed: usize,
time_of_sync: Option<u64>,
inspect: Box<InspectSync<I>>,
}

Expand All @@ -252,6 +259,7 @@ impl<I> Default for SyncRequest<I> {
txids_consumed: 0,
outpoints: VecDeque::new(),
outpoints_consumed: 0,
time_of_sync: None,
inspect: Box::new(|_, _| {}),
}
}
Expand Down Expand Up @@ -333,6 +341,11 @@ impl<I> SyncRequest<I> {
SyncIter::<I, OutPoint>::new(self)
}

/// Retrive the `time_of_sync`.
pub fn get_time_of_sync(&self) -> Option<u64> {
self.time_of_sync
}

fn _call_inspect(&mut self, item: SyncItem<I>) {
let progress = self.progress();
(*self.inspect)(item, progress);
Expand Down Expand Up @@ -409,6 +422,12 @@ impl<K: Ord> FullScanRequestBuilder<K> {
self
}

/// Set the `time_of_sync` for unconfirmed transactions.
pub fn time_of_sync(mut self, time_of_sync: u64) -> Self {
self.inner.time_of_sync = Some(time_of_sync);
self
}

/// Set the closure that will inspect every sync item visited.
pub fn inspect<F>(mut self, inspect: F) -> Self
where
Expand All @@ -435,6 +454,7 @@ impl<K: Ord> FullScanRequestBuilder<K> {
pub struct FullScanRequest<K> {
chain_tip: Option<CheckPoint>,
spks_by_keychain: BTreeMap<K, Box<dyn Iterator<Item = Indexed<ScriptBuf>> + Send>>,
time_of_sync: Option<u64>,
inspect: Box<InspectFullScan<K>>,
}

Expand All @@ -449,6 +469,7 @@ impl<K> Default for FullScanRequest<K> {
Self {
chain_tip: None,
spks_by_keychain: Default::default(),
time_of_sync: None,
inspect: Box::new(|_, _, _| {}),
}
}
Expand Down Expand Up @@ -488,6 +509,11 @@ impl<K: Ord + Clone> FullScanRequest<K> {
inspect,
}
}

/// Retrive the `time_of_sync`.
pub fn get_time_of_sync(&self) -> Option<u64> {
self.time_of_sync
}
}

/// Data returned from a spk-based blockchain client full scan.
Expand Down
65 changes: 1 addition & 64 deletions crates/chain/src/tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,76 +565,13 @@ impl<A: Clone + Ord> TxGraph<A> {

/// Inserts the given `seen_at` for `txid` into [`TxGraph`].
///
/// Note that [`TxGraph`] only keeps track of the latest `seen_at`. To batch
/// update all unconfirmed transactions with the latest `seen_at`, see
/// [`update_last_seen_unconfirmed`].
///
/// [`update_last_seen_unconfirmed`]: Self::update_last_seen_unconfirmed
/// Note that [`TxGraph`] only keeps track of the latest `seen_at`.
pub fn insert_seen_at(&mut self, txid: Txid, seen_at: u64) -> ChangeSet<A> {
let mut update = Self::default();
update.last_seen.insert(txid, seen_at);
self.apply_update(update)
}

/// Update the last seen time for all unconfirmed transactions.
///
/// This method updates the last seen unconfirmed time for this [`TxGraph`] by inserting
/// the given `seen_at` for every transaction not yet anchored to a confirmed block,
/// and returns the [`ChangeSet`] after applying all updates to `self`.
///
/// This is useful for keeping track of the latest time a transaction was seen
/// unconfirmed, which is important for evaluating transaction conflicts in the same
/// [`TxGraph`]. For details of how [`TxGraph`] resolves conflicts, see the docs for
/// [`try_get_chain_position`].
///
/// A normal use of this method is to call it with the current system time. Although
/// block headers contain a timestamp, using the header time would be less effective
/// at tracking mempool transactions, because it can drift from actual clock time, plus
/// we may want to update a transaction's last seen time repeatedly between blocks.
///
/// # Example
///
/// ```rust
/// # use bdk_chain::example_utils::*;
/// # use std::time::UNIX_EPOCH;
/// # let tx = tx_from_hex(RAW_TX_1);
/// # let mut tx_graph = bdk_chain::TxGraph::<()>::new([tx]);
/// let now = std::time::SystemTime::now()
/// .duration_since(UNIX_EPOCH)
/// .expect("valid duration")
/// .as_secs();
/// let changeset = tx_graph.update_last_seen_unconfirmed(now);
/// assert!(!changeset.last_seen.is_empty());
/// ```
///
/// Note that [`TxGraph`] only keeps track of the latest `seen_at`, so the given time must
/// by strictly greater than what is currently stored for a transaction to have an effect.
/// To insert a last seen time for a single txid, see [`insert_seen_at`].
///
/// [`insert_seen_at`]: Self::insert_seen_at
/// [`try_get_chain_position`]: Self::try_get_chain_position
pub fn update_last_seen_unconfirmed(&mut self, seen_at: u64) -> ChangeSet<A> {
let mut changeset = ChangeSet::default();
let unanchored_txs: Vec<Txid> = self
.txs
.iter()
.filter_map(
|(&txid, (_, anchors))| {
if anchors.is_empty() {
Some(txid)
} else {
None
}
},
)
.collect();

for txid in unanchored_txs {
changeset.merge(self.insert_seen_at(txid, seen_at));
}
changeset
}

Comment on lines -616 to -637
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing this method should really be a separate commit. However, I'm wondering if there is an argument to keep it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My 2 sats is if both wallet and non-wallet based app will use SyncRequest and FullScanRequest then this shouldn't be needed.

/// Extends this graph with another so that `self` becomes the union of the two sets of
/// transactions.
///
Expand Down
36 changes: 0 additions & 36 deletions crates/chain/tests/test_tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,42 +1088,6 @@ fn test_changeset_last_seen_merge() {
}
}

#[test]
fn update_last_seen_unconfirmed() {
let mut graph = TxGraph::<()>::default();
let tx = new_tx(0);
let txid = tx.compute_txid();

// insert a new tx
// initially we have a last_seen of None and no anchors
let _ = graph.insert_tx(tx);
let tx = graph.full_txs().next().unwrap();
assert_eq!(tx.last_seen_unconfirmed, None);
assert!(tx.anchors.is_empty());

// higher timestamp should update last seen
let changeset = graph.update_last_seen_unconfirmed(2);
assert_eq!(changeset.last_seen.get(&txid).unwrap(), &2);

// lower timestamp has no effect
let changeset = graph.update_last_seen_unconfirmed(1);
assert!(changeset.last_seen.is_empty());

// once anchored, last seen is not updated
let _ = graph.insert_anchor(txid, ());
let changeset = graph.update_last_seen_unconfirmed(4);
assert!(changeset.is_empty());
assert_eq!(
graph
.full_txs()
.next()
.unwrap()
.last_seen_unconfirmed
.unwrap(),
2
);
}

#[test]
fn transactions_inserted_into_tx_graph_are_not_canonical_until_they_have_an_anchor_in_best_chain() {
let txs = vec![new_tx(0), new_tx(1)];
Expand Down
49 changes: 40 additions & 9 deletions crates/electrum/src/bdk_electrum_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,19 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
None => None,
};
let time_of_sync = request.get_time_of_sync();

let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
let mut last_active_indices = BTreeMap::<K, u32>::default();
for keychain in request.keychains() {
let spks = request.iter_spks(keychain.clone());
if let Some(last_active_index) =
self.populate_with_spks(&mut graph_update, spks, stop_gap, batch_size)?
{
if let Some(last_active_index) = self.populate_with_spks(
&mut graph_update,
spks,
stop_gap,
batch_size,
time_of_sync,
)? {
last_active_indices.insert(keychain, last_active_index);
}
}
Expand Down Expand Up @@ -204,6 +209,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
None => None,
};
let time_of_sync = request.get_time_of_sync();

let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
self.populate_with_spks(
Expand All @@ -214,9 +220,10 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
.map(|(i, spk)| (i as u32, spk)),
usize::MAX,
batch_size,
time_of_sync,
)?;
self.populate_with_txids(&mut graph_update, request.iter_txids())?;
self.populate_with_outpoints(&mut graph_update, request.iter_outpoints())?;
self.populate_with_txids(&mut graph_update, request.iter_txids(), time_of_sync)?;
self.populate_with_outpoints(&mut graph_update, request.iter_outpoints(), time_of_sync)?;

// Fetch previous `TxOut`s for fee calculation if flag is enabled.
if fetch_prev_txouts {
Expand Down Expand Up @@ -249,6 +256,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
mut spks: impl Iterator<Item = (u32, ScriptBuf)>,
stop_gap: usize,
batch_size: usize,
time_of_sync: Option<u64>,
) -> Result<Option<u32>, Error> {
let mut unused_spk_count = 0_usize;
let mut last_active_index = Option::<u32>::None;
Expand Down Expand Up @@ -279,7 +287,12 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {

for tx_res in spk_history {
let _ = graph_update.insert_tx(self.fetch_tx(tx_res.tx_hash)?);
self.validate_merkle_for_anchor(graph_update, tx_res.tx_hash, tx_res.height)?;
self.validate_merkle_for_anchor(
graph_update,
tx_res.tx_hash,
tx_res.height,
time_of_sync,
)?;
}
}
}
Expand All @@ -293,6 +306,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
&self,
graph_update: &mut TxGraph<ConfirmationBlockTime>,
outpoints: impl IntoIterator<Item = OutPoint>,
time_of_sync: Option<u64>,
) -> Result<(), Error> {
for outpoint in outpoints {
let op_txid = outpoint.txid;
Expand All @@ -315,7 +329,12 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
if !has_residing && res.tx_hash == op_txid {
has_residing = true;
let _ = graph_update.insert_tx(Arc::clone(&op_tx));
self.validate_merkle_for_anchor(graph_update, res.tx_hash, res.height)?;
self.validate_merkle_for_anchor(
graph_update,
res.tx_hash,
res.height,
time_of_sync,
)?;
}

if !has_spending && res.tx_hash != op_txid {
Expand All @@ -329,7 +348,12 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
continue;
}
let _ = graph_update.insert_tx(Arc::clone(&res_tx));
self.validate_merkle_for_anchor(graph_update, res.tx_hash, res.height)?;
self.validate_merkle_for_anchor(
graph_update,
res.tx_hash,
res.height,
time_of_sync,
)?;
}
}
}
Expand All @@ -341,6 +365,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
&self,
graph_update: &mut TxGraph<ConfirmationBlockTime>,
txids: impl IntoIterator<Item = Txid>,
time_of_sync: Option<u64>,
) -> Result<(), Error> {
for txid in txids {
let tx = match self.fetch_tx(txid) {
Expand All @@ -363,7 +388,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
.into_iter()
.find(|r| r.tx_hash == txid)
{
self.validate_merkle_for_anchor(graph_update, txid, r.height)?;
self.validate_merkle_for_anchor(graph_update, txid, r.height, time_of_sync)?;
}

let _ = graph_update.insert_tx(tx);
Expand All @@ -378,6 +403,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
graph_update: &mut TxGraph<ConfirmationBlockTime>,
txid: Txid,
confirmation_height: i32,
time_of_sync: Option<u64>,
) -> Result<(), Error> {
if let Ok(merkle_res) = self
.inner
Expand Down Expand Up @@ -413,6 +439,11 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
},
);
}
} else {
// If no merkle proof is returned, then the tx is unconfirmed and we set the last_seen.
if let Some(seen_at) = time_of_sync {
let _ = graph_update.insert_seen_at(txid, seen_at);
}
}
Ok(())
}
Expand Down
Loading
Loading