Skip to content

Commit

Permalink
Add wallet / address group endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
mononaut committed Mar 28, 2024
1 parent d4f788f commit 7444467
Show file tree
Hide file tree
Showing 4 changed files with 362 additions and 18 deletions.
83 changes: 83 additions & 0 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,69 @@ impl<'a> Iterator for ReverseScanIterator<'a> {
}
}

pub struct ReverseScanGroupIterator<'a> {
iters: Vec<ReverseScanIterator<'a>>,
next_rows: Vec<Option<DBRow>>,
value_offset: usize,
done: bool,
}

impl<'a> ReverseScanGroupIterator<'a> {
pub fn new(
iters: Vec<ReverseScanIterator<'a>>,
value_offset: usize,
) -> ReverseScanGroupIterator {
let mut next_rows: Vec<Option<DBRow>> = Vec::new();
let mut new_iters: Vec<ReverseScanIterator<'a>> = Vec::new();
for mut iter in iters {
let next = iter.next();
next_rows.push(next);
new_iters.push(iter);
}
let done = next_rows.iter().all(|row| row.is_none());
ReverseScanGroupIterator {
iters: new_iters,
next_rows,
value_offset,
done,
}
}
}

impl<'a> Iterator for ReverseScanGroupIterator<'a> {
type Item = DBRow;

fn next(&mut self) -> Option<DBRow> {
if self.done {
return None;
}

let best_index = self
.next_rows
.iter()
.enumerate()
.max_by(|(a_index, a_opt), (b_index, b_opt)| match (a_opt, b_opt) {
(None, None) => a_index.cmp(b_index),

(Some(_), None) => std::cmp::Ordering::Greater,

(None, Some(_)) => std::cmp::Ordering::Less,

(Some(a), Some(b)) => a.key[self.value_offset..].cmp(&(b.key[self.value_offset..])),
})
.map(|(index, _)| index)
.unwrap_or(0);

let best = self.next_rows[best_index].take();
self.next_rows[best_index] = self.iters.get_mut(best_index)?.next();
if self.next_rows.iter().all(|row| row.is_none()) {
self.done = true;
}

best
}
}

#[derive(Debug)]
pub struct DB {
db: rocksdb::DB,
Expand Down Expand Up @@ -136,6 +199,26 @@ impl DB {
}
}

pub fn iter_scan_group_reverse(
&self,
prefixes: Vec<(Vec<u8>, Vec<u8>)>,
value_offset: usize,
) -> ReverseScanGroupIterator {
let iters = prefixes
.iter()
.map(|(prefix, prefix_max)| {
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(prefix_max);
ReverseScanIterator {
prefix: prefix.to_vec(),
iter,
done: false,
}
})
.collect();
ReverseScanGroupIterator::new(iters, value_offset)
}

pub fn write(&self, mut rows: Vec<DBRow>, flush: DBFlush) {
debug!(
"writing {} rows to {:?}, flush={:?}",
Expand Down
43 changes: 43 additions & 0 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,49 @@ impl Mempool {
.collect()
}

pub fn history_group(
&self,
scripthashes: Vec<[u8; 32]>,
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<Transaction> {
let _timer = self
.latency
.with_label_values(&["history_group"])
.start_timer();
scripthashes
.into_iter()
.filter_map(|scripthash| self.history.get(&scripthash[..]))
.flat_map(|entries| entries.iter())
.map(|e| e.get_txid())
.unique()
// TODO seek directly to last seen tx without reading earlier rows
.skip_while(|txid| {
// skip until we reach the last_seen_txid
last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid)
})
.skip(match last_seen_txid {
Some(_) => 1, // skip the last_seen_txid itself
None => 0,
})
.take(limit)
.map(|txid| self.txstore.get(&txid).expect("missing mempool tx"))
.cloned()
.collect()
}

pub fn history_txids_iter_group(
&self,
scripthashes: Vec<[u8; 32]>,
) -> impl Iterator<Item = Txid> + '_ {
scripthashes
.into_iter()
.filter_map(move |scripthash| self.history.get(&scripthash[..]))
.flat_map(|entries| entries.iter())
.map(|entry| entry.get_txid())
.unique()
}

pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec<Txid> {
let _timer = self
.latency
Expand Down
134 changes: 116 additions & 18 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use crate::new_index::fetch::{start_fetcher, BlockEntry, FetchFrom};
#[cfg(feature = "liquid")]
use crate::elements::{asset, peg};

use super::db::ReverseScanGroupIterator;

const MIN_HISTORY_ITEMS_TO_CACHE: usize = 100;

pub struct Store {
Expand Down Expand Up @@ -511,28 +513,33 @@ impl ChainQuery {
&TxHistoryRow::prefix_end(code, hash),
)
}

pub fn summary(
fn history_iter_scan_group_reverse(
&self,
scripthash: &[u8],
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<TxHistorySummary> {
// scripthash lookup
self._summary(b'H', scripthash, last_seen_txid, limit)
code: u8,
hashes: Vec<[u8; 32]>,
) -> ReverseScanGroupIterator {
self.store.history_db.iter_scan_group_reverse(
hashes
.into_iter()
.map(|hash| {
let prefix = TxHistoryRow::filter(code, &hash[..]);
let prefix_max = TxHistoryRow::prefix_end(code, &hash[..]);
(prefix, prefix_max)
})
.collect(),
33,
)
}

fn _summary(
fn collate_summaries(
&self,
code: u8,
hash: &[u8],
iter: impl Iterator<Item = TxHistoryRow>,
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<TxHistorySummary> {
let _timer_scan = self.start_timer("address_summary");
let rows = self
.history_iter_scan_reverse(code, hash)
.map(TxHistoryRow::from_row)
// collate utxo funding/spending events by transaction

let rows = iter
.map(|row| (row.get_txid(), row.key.txinfo))
.skip_while(|(txid, _)| {
// skip until we reach the last_seen_txid
Expand All @@ -546,8 +553,6 @@ impl ChainQuery {
self.tx_confirming_block(&txid)
.map(|b| (txid, info, b.height, b.time))
});

// collate utxo funding/spending events by transaction
let mut map: HashMap<Txid, TxHistorySummary> = HashMap::new();
for (txid, info, height, time) in rows {
if !map.contains_key(&txid) && map.len() == limit {
Expand Down Expand Up @@ -602,7 +607,6 @@ impl ChainQuery {
_ => {}
}
}

let mut tx_summaries = map.into_values().collect::<Vec<TxHistorySummary>>();
tx_summaries.sort_by(|a, b| {
if a.height == b.height {
Expand All @@ -614,6 +618,46 @@ impl ChainQuery {
tx_summaries
}

pub fn summary(
&self,
scripthash: &[u8],
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<TxHistorySummary> {
// scripthash lookup
self._summary(b'H', scripthash, last_seen_txid, limit)
}

fn _summary(
&self,
code: u8,
hash: &[u8],
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<TxHistorySummary> {
let _timer_scan = self.start_timer("address_summary");
let rows = self
.history_iter_scan_reverse(code, hash)
.map(TxHistoryRow::from_row);

self.collate_summaries(rows, last_seen_txid, limit)
}

pub fn summary_group(
&self,
scripthashes: Vec<[u8; 32]>,
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<TxHistorySummary> {
// scripthash lookup
let _timer_scan = self.start_timer("address_group_summary");
let rows = self
.history_iter_scan_group_reverse(b'H', scripthashes)
.map(TxHistoryRow::from_row);

self.collate_summaries(rows, last_seen_txid, limit)
}

pub fn history(
&self,
scripthash: &[u8],
Expand Down Expand Up @@ -679,6 +723,60 @@ impl ChainQuery {
.collect()
}

pub fn history_group(
&self,
scripthashes: Vec<[u8; 32]>,
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<(Transaction, BlockId)> {
// scripthash lookup
self._history_group(b'H', scripthashes, last_seen_txid, limit)
}

pub fn history_txids_iter_group(
&self,
scripthashes: Vec<[u8; 32]>,
) -> impl Iterator<Item = Txid> + '_ {
self.history_iter_scan_group_reverse(b'H', scripthashes)
.map(|row| TxHistoryRow::from_row(row).get_txid())
.unique()
}

fn _history_group(
&self,
code: u8,
hashes: Vec<[u8; 32]>,
last_seen_txid: Option<&Txid>,
limit: usize,
) -> Vec<(Transaction, BlockId)> {
print!("limit {} | last_seen {:?}", limit, last_seen_txid);
let _timer_scan = self.start_timer("history_group");
let txs_conf = self
.history_iter_scan_group_reverse(code, hashes)
.map(|row| TxHistoryRow::from_row(row).get_txid())
// XXX: unique() requires keeping an in-memory list of all txids, can we avoid that?
.unique()
// TODO seek directly to last seen tx without reading earlier rows
.skip_while(|txid| {
// skip until we reach the last_seen_txid
last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid)
})
.skip(match last_seen_txid {
Some(_) => 1, // skip the last_seen_txid itself
None => 0,
})
.filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b)))
.take(limit)
.collect::<Vec<(Txid, BlockId)>>();

self.lookup_txns(&txs_conf)
.expect("failed looking up txs in history index")
.into_iter()
.zip(txs_conf)
.map(|(tx, (_, blockid))| (tx, blockid))
.collect()
}

// TODO: avoid duplication with stats/stats_delta?
pub fn utxo(&self, scripthash: &[u8], limit: usize, flush: DBFlush) -> Result<Vec<Utxo>> {
let _timer = self.start_timer("utxo");
Expand Down
Loading

0 comments on commit 7444467

Please sign in to comment.