Skip to content

Commit

Permalink
Merge pull request #87 from mempool/junderw/lock-mempool-less
Browse files Browse the repository at this point in the history
Shorten mempool lock holding for update
  • Loading branch information
wiz authored May 17, 2024
2 parents 90200c8 + 9e0ecad commit 62863af
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 39 deletions.
21 changes: 19 additions & 2 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,18 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&metrics,
Arc::clone(&config),
)));
mempool.write().unwrap().update(&daemon)?;
loop {
match Mempool::update(&mempool, &daemon) {
Ok(_) => break,
Err(e) => {
warn!(
"Error performing initial mempool update, trying again in 5 seconds: {}",
e.display_chain()
);
signal.wait(Duration::from_secs(5), false)?;
}
}
}

#[cfg(feature = "liquid")]
let asset_db = config.asset_db_path.as_ref().map(|db_dir| {
Expand Down Expand Up @@ -137,7 +148,13 @@ fn run_server(config: Arc<Config>) -> Result<()> {
};

// Update mempool
mempool.write().unwrap().update(&daemon)?;
if let Err(e) = Mempool::update(&mempool, &daemon) {
// Log the error if the result is an Err
warn!(
"Error updating mempool, skipping mempool update: {}",
e.display_chain()
);
}

// Update subscribed clients
electrum_server.notify();
Expand Down
99 changes: 62 additions & 37 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use elements::{encode::serialize, AssetId};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::iter::FromIterator;
use std::ops::Bound::{Excluded, Unbounded};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid};
Expand Down Expand Up @@ -343,46 +343,67 @@ impl Mempool {
&self.backlog_stats.0
}

pub fn update(&mut self, daemon: &Daemon) -> Result<()> {
let _timer = self.latency.with_label_values(&["update"]).start_timer();
let new_txids = daemon
pub fn unique_txids(&self) -> HashSet<Txid> {
return HashSet::from_iter(self.txstore.keys().cloned());
}

pub fn update(mempool: &RwLock<Mempool>, daemon: &Daemon) -> Result<()> {
// 1. Start the metrics timer and get the current mempool txids
// [LOCK] Takes read lock for whole scope.
let (_timer, old_txids) = {
let mempool = mempool.read().unwrap();
(
mempool.latency.with_label_values(&["update"]).start_timer(),
mempool.unique_txids(),
)
};

// 2. Get all the mempool txids from the RPC.
// [LOCK] No lock taken. Wait for RPC request. Get lists of remove/add txes.
let all_txids = daemon
.getmempooltxids()
.chain_err(|| "failed to update mempool from daemon")?;
let old_txids = HashSet::from_iter(self.txstore.keys().cloned());
let to_remove: HashSet<&Txid> = old_txids.difference(&new_txids).collect();

// Download and add new transactions from bitcoind's mempool
let txids: Vec<&Txid> = new_txids.difference(&old_txids).collect();
let to_add = match daemon.gettransactions(&txids) {
Ok(txs) => txs,
Err(err) => {
warn!("failed to get {} transactions: {}", txids.len(), err); // e.g. new block or RBF
return Ok(()); // keep the mempool until next update()
let txids_to_remove: HashSet<&Txid> = old_txids.difference(&all_txids).collect();
let txids_to_add: Vec<&Txid> = all_txids.difference(&old_txids).collect();

// 3. Remove missing transactions. Even if we are unable to download new transactions from
// the daemon, we still want to remove the transactions that are no longer in the mempool.
// [LOCK] Write lock is released at the end of the call to remove().
mempool.write().unwrap().remove(txids_to_remove);

// 4. Download the new transactions from the daemon's mempool
// [LOCK] No lock taken, waiting for RPC response.
let txs_to_add = daemon
.gettransactions(&txids_to_add)
.chain_err(|| format!("failed to get {} transactions", txids_to_add.len()))?;

// 4. Update local mempool to match daemon's state
// [LOCK] Takes Write lock for whole scope.
{
let mut mempool = mempool.write().unwrap();
// Add new transactions
if txs_to_add.len() > mempool.add(txs_to_add) {
debug!("Mempool update added less transactions than expected");
}
};
// Add new transactions
if to_add.len() > self.add(to_add) {
debug!("Mempool update added less transactions than expected");
}
// Remove missing transactions
self.remove(to_remove);

self.count
.with_label_values(&["txs"])
.set(self.txstore.len() as f64);
mempool
.count
.with_label_values(&["txs"])
.set(mempool.txstore.len() as f64);

// Update cached backlog stats (if expired)
if mempool.backlog_stats.1.elapsed()
> Duration::from_secs(mempool.config.mempool_backlog_stats_ttl)
{
let _timer = mempool
.latency
.with_label_values(&["update_backlog_stats"])
.start_timer();
mempool.backlog_stats = (BacklogStats::new(&mempool.feeinfo), Instant::now());
}

// Update cached backlog stats (if expired)
if self.backlog_stats.1.elapsed()
> Duration::from_secs(self.config.mempool_backlog_stats_ttl)
{
let _timer = self
.latency
.with_label_values(&["update_backlog_stats"])
.start_timer();
self.backlog_stats = (BacklogStats::new(&self.feeinfo), Instant::now());
Ok(())
}

Ok(())
}

pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> {
Expand Down Expand Up @@ -418,8 +439,12 @@ impl Mempool {
// Phase 1: add to txstore
for tx in txs {
let txid = tx.txid();
txids.push(txid);
self.txstore.insert(txid, tx);
// Only push if it doesn't already exist.
// This is important now that update doesn't lock during
// the entire function body.
if self.txstore.insert(txid, tx).is_none() {
txids.push(txid);
}
}

// Phase 2: index history and spend edges (some txos can be missing)
Expand Down

0 comments on commit 62863af

Please sign in to comment.