From 74444677f2e3c9ee4bdc1a412812a6ac501cf54a Mon Sep 17 00:00:00 2001 From: Mononaut Date: Thu, 28 Mar 2024 08:47:36 +0000 Subject: [PATCH] Add wallet / address group endpoints --- src/new_index/db.rs | 83 ++++++++++++++++++++++++ src/new_index/mempool.rs | 43 +++++++++++++ src/new_index/schema.rs | 134 +++++++++++++++++++++++++++++++++------ src/rest.rs | 120 +++++++++++++++++++++++++++++++++++ 4 files changed, 362 insertions(+), 18 deletions(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 5e4b37a1..5229f8e4 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -69,6 +69,69 @@ impl<'a> Iterator for ReverseScanIterator<'a> { } } +pub struct ReverseScanGroupIterator<'a> { + iters: Vec>, + next_rows: Vec>, + value_offset: usize, + done: bool, +} + +impl<'a> ReverseScanGroupIterator<'a> { + pub fn new( + iters: Vec>, + value_offset: usize, + ) -> ReverseScanGroupIterator { + let mut next_rows: Vec> = Vec::new(); + let mut new_iters: Vec> = Vec::new(); + for mut iter in iters { + let next = iter.next(); + next_rows.push(next); + new_iters.push(iter); + } + let done = next_rows.iter().all(|row| row.is_none()); + ReverseScanGroupIterator { + iters: new_iters, + next_rows, + value_offset, + done, + } + } +} + +impl<'a> Iterator for ReverseScanGroupIterator<'a> { + type Item = DBRow; + + fn next(&mut self) -> Option { + if self.done { + return None; + } + + let best_index = self + .next_rows + .iter() + .enumerate() + .max_by(|(a_index, a_opt), (b_index, b_opt)| match (a_opt, b_opt) { + (None, None) => a_index.cmp(b_index), + + (Some(_), None) => std::cmp::Ordering::Greater, + + (None, Some(_)) => std::cmp::Ordering::Less, + + (Some(a), Some(b)) => a.key[self.value_offset..].cmp(&(b.key[self.value_offset..])), + }) + .map(|(index, _)| index) + .unwrap_or(0); + + let best = self.next_rows[best_index].take(); + self.next_rows[best_index] = self.iters.get_mut(best_index)?.next(); + if self.next_rows.iter().all(|row| row.is_none()) { + self.done = true; + } + + best + } +} + #[derive(Debug)] pub struct DB { db: rocksdb::DB, @@ -136,6 +199,26 @@ impl DB { } } + pub fn iter_scan_group_reverse( + &self, + prefixes: Vec<(Vec, Vec)>, + value_offset: usize, + ) -> ReverseScanGroupIterator { + let iters = prefixes + .iter() + .map(|(prefix, prefix_max)| { + let mut iter = self.db.raw_iterator(); + iter.seek_for_prev(prefix_max); + ReverseScanIterator { + prefix: prefix.to_vec(), + iter, + done: false, + } + }) + .collect(); + ReverseScanGroupIterator::new(iters, value_offset) + } + pub fn write(&self, mut rows: Vec, flush: DBFlush) { debug!( "writing {} rows to {:?}, flush={:?}", diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index c3841d52..64adbd9b 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -177,6 +177,49 @@ impl Mempool { .collect() } + pub fn history_group( + &self, + scripthashes: Vec<[u8; 32]>, + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec { + let _timer = self + .latency + .with_label_values(&["history_group"]) + .start_timer(); + scripthashes + .into_iter() + .filter_map(|scripthash| self.history.get(&scripthash[..])) + .flat_map(|entries| entries.iter()) + .map(|e| e.get_txid()) + .unique() + // TODO seek directly to last seen tx without reading earlier rows + .skip_while(|txid| { + // skip until we reach the last_seen_txid + last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) + }) + .skip(match last_seen_txid { + Some(_) => 1, // skip the last_seen_txid itself + None => 0, + }) + .take(limit) + .map(|txid| self.txstore.get(&txid).expect("missing mempool tx")) + .cloned() + .collect() + } + + pub fn history_txids_iter_group( + &self, + scripthashes: Vec<[u8; 32]>, + ) -> impl Iterator + '_ { + scripthashes + .into_iter() + .filter_map(move |scripthash| self.history.get(&scripthash[..])) + .flat_map(|entries| entries.iter()) + .map(|entry| entry.get_txid()) + .unique() + } + pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec { let _timer = self .latency diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 00ee3e89..97da52df 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -37,6 +37,8 @@ use crate::new_index::fetch::{start_fetcher, BlockEntry, FetchFrom}; #[cfg(feature = "liquid")] use crate::elements::{asset, peg}; +use super::db::ReverseScanGroupIterator; + const MIN_HISTORY_ITEMS_TO_CACHE: usize = 100; pub struct Store { @@ -511,28 +513,33 @@ impl ChainQuery { &TxHistoryRow::prefix_end(code, hash), ) } - - pub fn summary( + fn history_iter_scan_group_reverse( &self, - scripthash: &[u8], - last_seen_txid: Option<&Txid>, - limit: usize, - ) -> Vec { - // scripthash lookup - self._summary(b'H', scripthash, last_seen_txid, limit) + code: u8, + hashes: Vec<[u8; 32]>, + ) -> ReverseScanGroupIterator { + self.store.history_db.iter_scan_group_reverse( + hashes + .into_iter() + .map(|hash| { + let prefix = TxHistoryRow::filter(code, &hash[..]); + let prefix_max = TxHistoryRow::prefix_end(code, &hash[..]); + (prefix, prefix_max) + }) + .collect(), + 33, + ) } - fn _summary( + fn collate_summaries( &self, - code: u8, - hash: &[u8], + iter: impl Iterator, last_seen_txid: Option<&Txid>, limit: usize, ) -> Vec { - let _timer_scan = self.start_timer("address_summary"); - let rows = self - .history_iter_scan_reverse(code, hash) - .map(TxHistoryRow::from_row) + // collate utxo funding/spending events by transaction + + let rows = iter .map(|row| (row.get_txid(), row.key.txinfo)) .skip_while(|(txid, _)| { // skip until we reach the last_seen_txid @@ -546,8 +553,6 @@ impl ChainQuery { self.tx_confirming_block(&txid) .map(|b| (txid, info, b.height, b.time)) }); - - // collate utxo funding/spending events by transaction let mut map: HashMap = HashMap::new(); for (txid, info, height, time) in rows { if !map.contains_key(&txid) && map.len() == limit { @@ -602,7 +607,6 @@ impl ChainQuery { _ => {} } } - let mut tx_summaries = map.into_values().collect::>(); tx_summaries.sort_by(|a, b| { if a.height == b.height { @@ -614,6 +618,46 @@ impl ChainQuery { tx_summaries } + pub fn summary( + &self, + scripthash: &[u8], + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec { + // scripthash lookup + self._summary(b'H', scripthash, last_seen_txid, limit) + } + + fn _summary( + &self, + code: u8, + hash: &[u8], + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec { + let _timer_scan = self.start_timer("address_summary"); + let rows = self + .history_iter_scan_reverse(code, hash) + .map(TxHistoryRow::from_row); + + self.collate_summaries(rows, last_seen_txid, limit) + } + + pub fn summary_group( + &self, + scripthashes: Vec<[u8; 32]>, + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec { + // scripthash lookup + let _timer_scan = self.start_timer("address_group_summary"); + let rows = self + .history_iter_scan_group_reverse(b'H', scripthashes) + .map(TxHistoryRow::from_row); + + self.collate_summaries(rows, last_seen_txid, limit) + } + pub fn history( &self, scripthash: &[u8], @@ -679,6 +723,60 @@ impl ChainQuery { .collect() } + pub fn history_group( + &self, + scripthashes: Vec<[u8; 32]>, + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec<(Transaction, BlockId)> { + // scripthash lookup + self._history_group(b'H', scripthashes, last_seen_txid, limit) + } + + pub fn history_txids_iter_group( + &self, + scripthashes: Vec<[u8; 32]>, + ) -> impl Iterator + '_ { + self.history_iter_scan_group_reverse(b'H', scripthashes) + .map(|row| TxHistoryRow::from_row(row).get_txid()) + .unique() + } + + fn _history_group( + &self, + code: u8, + hashes: Vec<[u8; 32]>, + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec<(Transaction, BlockId)> { + print!("limit {} | last_seen {:?}", limit, last_seen_txid); + let _timer_scan = self.start_timer("history_group"); + let txs_conf = self + .history_iter_scan_group_reverse(code, hashes) + .map(|row| TxHistoryRow::from_row(row).get_txid()) + // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? + .unique() + // TODO seek directly to last seen tx without reading earlier rows + .skip_while(|txid| { + // skip until we reach the last_seen_txid + last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) + }) + .skip(match last_seen_txid { + Some(_) => 1, // skip the last_seen_txid itself + None => 0, + }) + .filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) + .take(limit) + .collect::>(); + + self.lookup_txns(&txs_conf) + .expect("failed looking up txs in history index") + .into_iter() + .zip(txs_conf) + .map(|(tx, (_, blockid))| (tx, blockid)) + .collect() + } + // TODO: avoid duplication with stats/stats_delta? pub fn utxo(&self, scripthash: &[u8], limit: usize, flush: DBFlush) -> Result> { let _timer = self.start_timer("utxo"); diff --git a/src/rest.rs b/src/rest.rs index df085441..49900b36 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -925,6 +925,80 @@ fn handle_request( json_response(prepare_txs(txs, query, config), TTL_SHORT) } + (&Method::GET, Some(script_types @ &"addresses"), Some(&"txs"), None, None, None) + | (&Method::GET, Some(script_types @ &"scripthashes"), Some(&"txs"), None, None, None) => { + let script_type = match *script_types { + "addresses" => "address", + "scripthashes" => "scripthash", + _ => "", + }; + let script_hashes: Vec<[u8; 32]> = query_params + .get(&script_types.to_string()) + .ok_or(HttpError::from(format!("No {} specified", script_types)))? + .as_str() + .split(',') + .map(|script_str| to_scripthash(script_type, script_str, config.network_type)) + .filter_map(|s| s.ok()) + .collect(); + + let max_txs = query_params + .get("max_txs") + .and_then(|s| s.parse::().ok()) + .unwrap_or(config.rest_default_max_mempool_txs); + let after_txid = query_params + .get("after_txid") + .and_then(|s| s.parse::().ok()); + + let mut txs = vec![]; + + if let Some(given_txid) = &after_txid { + let is_mempool = query + .mempool() + .history_txids_iter_group(script_hashes.clone()) + .any(|txid| given_txid == &txid); + let is_confirmed = if is_mempool { + false + } else { + query + .chain() + .history_txids_iter_group(script_hashes.clone()) + .any(|txid| given_txid == &txid) + }; + if !is_mempool && !is_confirmed { + return Err(HttpError( + StatusCode::UNPROCESSABLE_ENTITY, + String::from("after_txid not found"), + )); + } + } + txs.extend( + query + .mempool() + .history_group(script_hashes.clone(), after_txid.as_ref(), max_txs) + .into_iter() + .map(|tx| (tx, None)), + ); + + if txs.len() < max_txs { + let after_txid_ref = if !txs.is_empty() { + // If there are any txs, we know mempool found the + // after_txid IF it exists... so always return None. + None + } else { + after_txid.as_ref() + }; + txs.extend( + query + .chain() + .history_group(script_hashes, after_txid_ref, max_txs - txs.len()) + .into_iter() + .map(|(tx, blockid)| (tx, Some(blockid))), + ); + } + + json_response(prepare_txs(txs, query, config), TTL_SHORT) + } + ( &Method::GET, Some(script_type @ &"address"), @@ -989,6 +1063,52 @@ fn handle_request( json_response(summary, TTL_SHORT) } + ( + &Method::GET, + Some(script_types @ &"addresses"), + Some(&"txs"), + Some(&"summary"), + last_seen_txid, + None, + ) + | ( + &Method::GET, + Some(script_types @ &"scripthashes"), + Some(&"txs"), + Some(&"summary"), + last_seen_txid, + None, + ) => { + let script_type = match *script_types { + "addresses" => "address", + "scripthashes" => "scripthash", + _ => "", + }; + let script_hashes: Vec<[u8; 32]> = query_params + .get(&script_types.to_string()) + .ok_or(HttpError::from(format!("No {} specified", script_types)))? + .as_str() + .split(',') + .map(|script_str| to_scripthash(script_type, script_str, config.network_type)) + .filter_map(|s| s.ok()) + .collect(); + + let last_seen_txid = last_seen_txid.and_then(|txid| Txid::from_hex(txid).ok()); + let max_txs = cmp::min( + config.rest_default_max_address_summary_txs, + query_params + .get("max_txs") + .and_then(|s| s.parse::().ok()) + .unwrap_or(config.rest_default_max_address_summary_txs), + ); + + let summary = + query + .chain() + .summary_group(script_hashes, last_seen_txid.as_ref(), max_txs); + + json_response(summary, TTL_SHORT) + } ( &Method::GET, Some(script_type @ &"address"),