Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add recent utxos rest API endpoint #97

Draft
wants to merge 1 commit into
base: mempool
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct Config {
pub precache_scripts: Option<String>,
pub precache_threads: usize,
pub utxos_limit: usize,
pub utxos_history_limit: usize,
pub electrum_txs_limit: usize,
pub electrum_banner: String,
pub mempool_backlog_stats_ttl: u64,
Expand Down Expand Up @@ -218,6 +219,12 @@ impl Config {
.help("Maximum number of utxos to process per address. Lookups for addresses with more utxos will fail. Applies to the Electrum and HTTP APIs.")
.default_value("500")
)
.arg(
Arg::with_name("utxos_history_limit")
.long("utxos-history-limit")
.help("Maximum number of history entries to process per address when looking up recent utxos.")
.default_value("20000")
)
.arg(
Arg::with_name("mempool_backlog_stats_ttl")
.long("mempool-backlog-stats-ttl")
Expand Down Expand Up @@ -514,6 +521,7 @@ impl Config {
daemon_rpc_addr,
cookie,
utxos_limit: value_t_or_exit!(m, "utxos_limit", usize),
utxos_history_limit: value_t_or_exit!(m, "utxos_history_limit", usize),
electrum_rpc_addr,
electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize),
electrum_banner,
Expand Down
25 changes: 25 additions & 0 deletions src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,31 @@ impl Query {
Ok(utxos)
}

pub fn recent_utxo(&self, scripthash: &[u8]) -> Result<Vec<Utxo>> {
let mut utxos = self.chain.recent_utxo(
scripthash,
self.config.utxos_limit,
self.config.utxos_history_limit,
super::db::DBFlush::Enable,
)?;
let mempool = self.mempool();
utxos.retain(|utxo| !mempool.has_spend(&OutPoint::from(utxo)));
utxos.extend(mempool.utxo(scripthash));
utxos.sort_by(|a, b| match (&a.confirmed, &b.confirmed) {
(Some(block_a), Some(block_b)) => {
if block_a.height == block_b.height {
a.txid.cmp(&b.txid)
} else {
block_b.height.cmp(&block_a.height)
}
}
(Some(_), None) => std::cmp::Ordering::Greater,
(None, Some(_)) => std::cmp::Ordering::Less,
(None, None) => a.txid.cmp(&b.txid),
});
Ok(utxos)
}

pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec<(Txid, Option<BlockId>)> {
let confirmed_txids = self.chain.history_txids(scripthash, limit);
let confirmed_len = confirmed_txids.len();
Expand Down
219 changes: 219 additions & 0 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,181 @@ impl ChainQuery {
Ok((utxos, lastblock, processed_items))
}

// get the *most recent* limit utxos
pub fn recent_utxo(
&self,
scripthash: &[u8],
limit: usize,
entries_limit: usize,
flush: DBFlush,
) -> Result<Vec<Utxo>> {
let _timer = self.start_timer("recent_utxo");

// get the last known utxo set and the blockhash it was updated for.
// invalidates the cache if the block was orphaned.
let cache: Option<(UtxoMap, usize, bool, usize, usize)> = self
.store
.cache_db
.get(&RecentUtxoCacheRow::key(scripthash))
.map(|c| bincode_util::deserialize_little(&c).unwrap())
.and_then(|(utxos_cache, blockhash, limited, limit, entries_limit)| {
self.height_by_hash(&blockhash)
.map(|height| (utxos_cache, height, limited, limit, entries_limit))
})
.map(|(utxos_cache, height, limited, limit, entries_limit)| {
(
from_utxo_cache(utxos_cache, self),
height,
limited,
limit,
entries_limit,
)
});
let had_cache = cache.is_some();

// get utxos set for new transactions
let (newutxos, lastblock, processed_items, limited) = cache.map_or_else(
|| self.recent_utxo_delta(scripthash, HashMap::new(), None, limit, entries_limit),
|(oldutxos, blockheight, limited, cache_limit, cache_entries_limit)| {
// invalidate the cache if it was constructed with a lower resource limit
let start_height =
if limited && (cache_limit < limit || cache_entries_limit < entries_limit) {
None
} else {
Some(blockheight as u32)
};
self.recent_utxo_delta(scripthash, oldutxos, start_height, limit, entries_limit)
},
)?;

// save updated utxo set to cache
if let Some(lastblock) = lastblock {
if had_cache || processed_items > MIN_HISTORY_ITEMS_TO_CACHE {
self.store.cache_db.write(
vec![RecentUtxoCacheRow::new(
scripthash,
&newutxos,
&lastblock,
limited,
limit,
entries_limit,
)
.into_row()],
flush,
);
}
}

// format as Utxo objects
Ok(newutxos
.into_iter()
.map(|(outpoint, (blockid, value))| {
// in elements/liquid chains, we have to lookup the txo in order to get its
// associated asset. the asset information could be kept in the db history rows
// alongside the value to avoid this.
#[cfg(feature = "liquid")]
let txo = self.lookup_txo(&outpoint).expect("missing utxo");

Utxo {
txid: outpoint.txid,
vout: outpoint.vout,
value,
confirmed: Some(blockid),

#[cfg(feature = "liquid")]
asset: txo.asset,
#[cfg(feature = "liquid")]
nonce: txo.nonce,
#[cfg(feature = "liquid")]
witness: txo.witness,
}
})
.collect())
}

fn recent_utxo_delta(
&self,
scripthash: &[u8],
init_utxos: UtxoMap,
start_height: Option<u32>,
limit: usize,
entries_limit: usize,
) -> Result<(UtxoMap, Option<BlockHash>, usize, bool)> {
// iterate over history in reverse until we reach the utxo limit or meet the last known utxo
let _timer = self.start_timer("recent_utxo_delta");
let history_iter = self
.history_iter_scan_reverse(b'H', scripthash)
.map(TxHistoryRow::from_row)
.take_while(|row| {
if let Some(height) = start_height {
row.key.confirmed_height > height
} else {
true
}
})
.filter_map(|history| {
self.tx_confirming_block(&history.get_txid())
// drop history entries that were previously confirmed in a re-orged block and later
// confirmed again at a different height
.filter(|blockid| blockid.height == history.key.confirmed_height as usize)
.map(|b| (history, b))
});

let mut utxos = UtxoMap::new();
let mut spent: HashSet<OutPoint> = HashSet::new();
let mut processed_items = 0;
let mut lastblock = None;
let mut limited = false;

for (history, blockid) in history_iter {
processed_items += 1;
if lastblock.is_none() {
lastblock = Some(blockid.hash);
}

match history.key.txinfo {
TxHistoryInfo::Funding(ref info) => {
if !spent.contains(&history.get_funded_outpoint()) {
utxos.insert(history.get_funded_outpoint(), (blockid, info.value));
}
}
TxHistoryInfo::Spending(_) => {
utxos.remove(&history.get_funded_outpoint());
spent.insert(history.get_funded_outpoint());
}
#[cfg(feature = "liquid")]
TxHistoryInfo::Issuing(_)
| TxHistoryInfo::Burning(_)
| TxHistoryInfo::Pegin(_)
| TxHistoryInfo::Pegout(_) => unreachable!(),
};

// finish as soon as the utxo set size exceeds the limit
if utxos.len() >= limit || processed_items >= entries_limit {
limited = true;
break;
}
}

// copy across unspent txos from cache
let mut utxo_entries: Vec<(&OutPoint, &(BlockId, Value))> = init_utxos.iter().collect();
utxo_entries.sort_by(|a, b| a.1 .0.height.cmp(&b.1 .0.height));
for (&outpoint, &(ref blockid, value)) in utxo_entries {
if lastblock.is_none() {
lastblock = Some(blockid.hash);
}
if !spent.contains(&outpoint) {
utxos.insert(outpoint, (blockid.clone(), value));
}
if utxos.len() >= limit {
limited = true;
break;
}
}

Ok((utxos, lastblock, processed_items, limited))
}

pub fn stats(&self, scripthash: &[u8], flush: DBFlush) -> ScriptStats {
let _timer = self.start_timer("stats");

Expand Down Expand Up @@ -1830,6 +2005,50 @@ impl UtxoCacheRow {
}
}

struct RecentUtxoCacheRow {
key: ScriptCacheKey,
value: Bytes,
}

impl RecentUtxoCacheRow {
fn new(
scripthash: &[u8],
utxos: &UtxoMap,
blockhash: &BlockHash,
limited: bool,
limit: usize,
entries_limit: usize,
) -> Self {
let utxos_cache = make_utxo_cache(utxos);

RecentUtxoCacheRow {
key: ScriptCacheKey {
code: b'R',
scripthash: full_hash(scripthash),
},
value: bincode_util::serialize_little(&(
utxos_cache,
blockhash,
limited,
limit,
entries_limit,
))
.unwrap(),
}
}

pub fn key(scripthash: &[u8]) -> Bytes {
[b"R", scripthash].concat()
}

fn into_row(self) -> DBRow {
DBRow {
key: bincode_util::serialize_little(&self.key).unwrap(),
value: self.value,
}
}
}

// keep utxo cache with just the block height (the hash/timestamp are read later from the headers to reconstruct BlockId)
// and use a (txid,vout) tuple instead of OutPoints (they don't play nicely with bincode serialization)
fn make_utxo_cache(utxos: &UtxoMap) -> CachedUtxoMap {
Expand Down
25 changes: 25 additions & 0 deletions src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,31 @@ fn handle_request(
// XXX paging?
json_response(utxos, TTL_SHORT)
}
(
&Method::GET,
Some(script_type @ &"address"),
Some(script_str),
Some(&"utxo"),
Some(&"recent"),
None,
)
| (
&Method::GET,
Some(script_type @ &"scripthash"),
Some(script_str),
Some(&"utxo"),
Some(&"recent"),
None,
) => {
let script_hash = to_scripthash(script_type, script_str, config.network_type)?;
let utxos: Vec<UtxoValue> = query
.recent_utxo(&script_hash[..])?
.into_iter()
.map(UtxoValue::from)
.collect();
// XXX paging?
json_response(utxos, TTL_SHORT)
}
(&Method::GET, Some(&"address-prefix"), Some(prefix), None, None, None) => {
if !config.address_search {
return Err(HttpError::from("address search disabled".to_string()));
Expand Down
Loading