From a2f291c1b6855dae872617510215bcd3bc2a55a5 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 28 May 2025 12:10:21 +0200 Subject: [PATCH 01/17] async kv store --- lightning-background-processor/src/lib.rs | 7 +- lightning-persister/src/fs_store.rs | 9 + lightning/src/chain/chainmonitor.rs | 173 ++++++++---- lightning/src/sign/ecdsa.rs | 2 +- lightning/src/util/anchor_channel_reserves.rs | 6 +- lightning/src/util/async_poll.rs | 33 +++ lightning/src/util/mod.rs | 2 +- lightning/src/util/persist.rs | 258 ++++++++++++------ 8 files changed, 353 insertions(+), 137 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 239ff3f0c98..daa686c9453 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -38,6 +38,7 @@ use lightning::sign::ChangeDestinationSource; #[cfg(feature = "std")] use lightning::sign::ChangeDestinationSourceSync; use lightning::sign::OutputSpender; +use lightning::util::async_poll::FutureSpawner; use lightning::util::logger::Logger; use lightning::util::persist::{KVStore, Persister}; use lightning::util::sweep::OutputSweeper; @@ -780,8 +781,9 @@ pub async fn process_events_async< EventHandlerFuture: core::future::Future>, EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, + FS: FutureSpawner, M: 'static - + Deref::Signer, CF, T, F, L, P>> + + Deref::Signer, CF, T, F, L, P, FS>> + Send + Sync, CM: 'static + Deref, @@ -977,7 +979,7 @@ impl BackgroundProcessor { EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, M: 'static - + Deref::Signer, CF, T, F, L, P>> + + Deref::Signer, CF, T, F, L, P, FS>> + Send + Sync, CM: 'static + Deref + Send, @@ -992,6 +994,7 @@ impl BackgroundProcessor { O: 'static + Deref, K: 'static + Deref, OS: 'static + Deref> + Send, + FS: FutureSpawner >( persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 850a0786671..0cd0047562b 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -3,6 +3,7 @@ use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; use lightning::util::persist::{KVStore, MigratableKVStore}; use lightning::util::string::PrintableString; +use lightning::util::async_poll::AsyncResult; use std::collections::HashMap; use std::fs; @@ -11,6 +12,8 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; + + #[cfg(target_os = "windows")] use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt}; @@ -329,6 +332,12 @@ impl KVStore for FilesystemStore { Ok(keys) } + + fn write_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> AsyncResult<'static, ()> { + todo!() + } } fn dir_entry_is_key(p: &Path) -> Result { diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 09d87b775be..18f15fd7522 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -36,11 +36,13 @@ use crate::chain::transaction::{OutPoint, TransactionData}; use crate::ln::types::ChannelId; use crate::sign::ecdsa::EcdsaChannelSigner; use crate::events::{self, Event, EventHandler, ReplayEvent}; +use crate::util::async_poll::{poll_or_spawn, AsyncResult, FutureSpawner}; use crate::util::logger::{Logger, WithContext}; use crate::util::errors::APIError; use crate::util::persist::MonitorName; use crate::util::wakers::{Future, Notifier}; use crate::ln::channel_state::ChannelDetails; +use crate::sync::{Arc}; use crate::prelude::*; use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; @@ -122,7 +124,7 @@ pub trait Persist { /// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn persist_new_channel(&self, monitor_name: MonitorName, monitor: &ChannelMonitor) -> ChannelMonitorUpdateStatus; + fn persist_new_channel(&self, monitor_name: MonitorName, monitor: &ChannelMonitor) -> AsyncResult<'static, ()>; /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given /// update. @@ -161,7 +163,7 @@ pub trait Persist { /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn update_persisted_channel(&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor) -> ChannelMonitorUpdateStatus; + fn update_persisted_channel(&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor) -> AsyncResult<'static, ()>; /// Prevents the channel monitor from being loaded on startup. /// /// Archiving the data in a backup location (rather than deleting it fully) is useful for @@ -233,14 +235,14 @@ impl Deref for LockedChannelMonitor<'_, Chann /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [module-level documentation]: crate::chain::chainmonitor /// [`rebroadcast_pending_claims`]: Self::rebroadcast_pending_claims -pub struct ChainMonitor +pub struct ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, P::Target: Persist, { - monitors: RwLock>>, + monitors: Arc>>>, chain_source: Option, broadcaster: T, logger: L, @@ -248,16 +250,18 @@ pub struct ChainMonitor, PublicKey)>>, + pending_monitor_events: Arc, PublicKey)>>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, /// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for /// it to give to users (or [`MonitorEvent`]s for `ChannelManager` to process). - event_notifier: Notifier, + event_notifier: Arc, + + future_spawner: Arc, } -impl ChainMonitor +impl ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, @@ -347,18 +351,31 @@ where C::Target: chain::Filter, // `ChannelMonitorUpdate` after a channel persist for a channel with the same // `latest_update_id`. let _pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - match self.persister.update_persisted_channel(monitor.persistence_key(), None, monitor) { - ChannelMonitorUpdateStatus::Completed => - log_trace!(logger, "Finished syncing Channel Monitor for channel {} for block-data", - log_funding_info!(monitor) - ), - ChannelMonitorUpdateStatus::InProgress => { - log_trace!(logger, "Channel Monitor sync for channel {} in progress.", log_funding_info!(monitor)); - } - ChannelMonitorUpdateStatus::UnrecoverableError => { - return Err(()); + let max_update_id = _pending_monitor_updates.iter().copied().max().unwrap_or(0); + + let persist_res = self.persister.update_persisted_channel(monitor.persistence_key(), None, monitor); + + let monitors = self.monitors.clone(); + let pending_monitor_updates_cb = self.pending_monitor_events.clone(); + let event_notifier = self.event_notifier.clone(); + let future_spawner = self.future_spawner.clone(); + let channel_id = *channel_id; + + match poll_or_spawn(persist_res, move || { + // TODO: Log error if the monitor is not persisted. + let _ = ChainMonitor::::channel_monitor_updated_internal(&monitors, &pending_monitor_updates_cb, &event_notifier, + channel_id, max_update_id); + }, future_spawner.deref()) { + Ok(true) => { + // log + }, + Ok(false) => { + // log + } + Err(_) => { + return Err(()); + }, } - } } // Register any new outputs with the chain source for filtering, storing any dependent @@ -388,17 +405,18 @@ where C::Target: chain::Filter, /// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may /// always need to fetch full blocks absent another means for determining which blocks contain /// transactions relevant to the watched channels. - pub fn new(chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P) -> Self { + pub fn new(chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P, future_spawner: FS) -> Self { Self { - monitors: RwLock::new(new_hash_map()), + monitors: Arc::new(RwLock::new(new_hash_map())), chain_source, broadcaster, logger, fee_estimator: feeest, persister, - pending_monitor_events: Mutex::new(Vec::new()), + pending_monitor_events: Arc::new(Mutex::new(Vec::new())), highest_chain_height: AtomicUsize::new(0), - event_notifier: Notifier::new(), + event_notifier: Arc::new(Notifier::new()), + future_spawner: Arc::new(future_spawner), } } @@ -531,6 +549,40 @@ where C::Target: chain::Filter, Ok(()) } + fn channel_monitor_updated_internal( + monitors: &RwLock>>, + pending_monitor_events: &Mutex, PublicKey)>>, + event_notifier: &Notifier, + channel_id: ChannelId, completed_update_id: u64) -> Result<(), APIError> { + let monitors = monitors.read().unwrap(); + let monitor_data = if let Some(mon) = monitors.get(&channel_id) { mon } else { + return Err(APIError::APIMisuseError { err: format!("No ChannelMonitor matching channel ID {} found", channel_id) }); + }; + let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap(); + pending_monitor_updates.retain(|update_id| *update_id != completed_update_id); + + // Note that we only check for pending non-chainsync monitor updates and we don't track monitor + // updates resulting from chainsync in `pending_monitor_updates`. + let monitor_is_pending_updates = monitor_data.has_pending_updates(&pending_monitor_updates); + + // TODO: Add logger + + if monitor_is_pending_updates { + // If there are still monitor updates pending, we cannot yet construct a + // Completed event. + return Ok(()); + } + let funding_txo = monitor_data.monitor.get_funding_txo(); + pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed { + funding_txo, + channel_id, + monitor_update_id: monitor_data.monitor.get_latest_update_id(), + }], monitor_data.monitor.get_counterparty_node_id())); + + event_notifier.notify(); + Ok(()) + } + /// This wrapper avoids having to update some of our tests for now as they assume the direct /// chain::Watch API wherein we mark a monitor fully-updated by just calling /// channel_monitor_updated once with the highest ID. @@ -669,8 +721,8 @@ where C::Target: chain::Filter, } } -impl -chain::Listen for ChainMonitor +impl +chain::Listen for ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, @@ -698,8 +750,8 @@ where } } -impl -chain::Confirm for ChainMonitor +impl +chain::Confirm for ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, @@ -752,8 +804,8 @@ where } } -impl -chain::Watch for ChainMonitor +impl +chain::Watch for ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, @@ -774,15 +826,28 @@ where C::Target: chain::Filter, let update_id = monitor.get_latest_update_id(); let mut pending_monitor_updates = Vec::new(); let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor)); - pending_monitor_updates.push(update_id); - }, - ChannelMonitorUpdateStatus::Completed => { + + let update_status; + let monitors = self.monitors.clone(); + let pending_monitor_updates_cb = self.pending_monitor_events.clone(); + let event_notifier = self.event_notifier.clone(); + let future_spawner = self.future_spawner.clone(); + + match poll_or_spawn(persist_res, move || { + // TODO: Log error if the monitor is not persisted. + let _ = ChainMonitor::::channel_monitor_updated_internal(&monitors, &pending_monitor_updates_cb, &event_notifier, + channel_id, update_id); + }, future_spawner.deref()) { + Ok(true) => { log_info!(logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor)); + update_status = ChannelMonitorUpdateStatus::Completed; }, - ChannelMonitorUpdateStatus::UnrecoverableError => { + Ok(false) => { + log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor)); + pending_monitor_updates.push(update_id); + update_status = ChannelMonitorUpdateStatus::InProgress; + } + Err(_) => { let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; log_error!(logger, "{}", err_str); panic!("{}", err_str); @@ -795,7 +860,7 @@ where C::Target: chain::Filter, monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates), }); - Ok(persist_res) + Ok(update_status) } fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { @@ -840,27 +905,40 @@ where C::Target: chain::Filter, } else { self.persister.update_persisted_channel(monitor.persistence_key(), Some(update), monitor) }; - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - pending_monitor_updates.push(update_id); + + let monitors = self.monitors.clone(); + let pending_monitor_updates_cb = self.pending_monitor_events.clone(); + let event_notifier = self.event_notifier.clone(); + let future_spawner = self.future_spawner.clone(); + + let update_status; + match poll_or_spawn(persist_res, move || { + // TODO: Log error if the monitor is not persisted. + let _ = ChainMonitor::::channel_monitor_updated_internal(&monitors, &pending_monitor_updates_cb, &event_notifier, + channel_id, update_id); + }, future_spawner.deref()) { + Ok(true) => { log_debug!(logger, - "Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress", + "Persistence of ChannelMonitorUpdate id {:?} for channel {} completed", update_id, log_funding_info!(monitor) ); + update_status = ChannelMonitorUpdateStatus::Completed; }, - ChannelMonitorUpdateStatus::Completed => { + Ok(false) => { log_debug!(logger, - "Persistence of ChannelMonitorUpdate id {:?} for channel {} completed", + "Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress", update_id, log_funding_info!(monitor) ); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { + pending_monitor_updates.push(update_id); + update_status = ChannelMonitorUpdateStatus::InProgress; + } + Err(_) => { // Take the monitors lock for writing so that we poison it and any future // operations going forward fail immediately. core::mem::drop(pending_monitor_updates); - core::mem::drop(monitors); + // core::mem::drop(monitors); let _poison = self.monitors.write().unwrap(); let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; log_error!(logger, "{}", err_str); @@ -870,7 +948,7 @@ where C::Target: chain::Filter, if update_res.is_err() { ChannelMonitorUpdateStatus::InProgress } else { - persist_res + update_status } } } @@ -891,7 +969,7 @@ where C::Target: chain::Filter, } } -impl events::EventsProvider for ChainMonitor +impl events::EventsProvider for ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, @@ -1138,4 +1216,3 @@ mod tests { }).is_err()); } } - diff --git a/lightning/src/sign/ecdsa.rs b/lightning/src/sign/ecdsa.rs index f9c330bbc4c..52c388bd511 100644 --- a/lightning/src/sign/ecdsa.rs +++ b/lightning/src/sign/ecdsa.rs @@ -33,7 +33,7 @@ use crate::sign::{ChannelSigner, ChannelTransactionParameters, HTLCDescriptor}; /// /// [`ChannelManager::signer_unblocked`]: crate::ln::channelmanager::ChannelManager::signer_unblocked /// [`ChainMonitor::signer_unblocked`]: crate::chain::chainmonitor::ChainMonitor::signer_unblocked -pub trait EcdsaChannelSigner: ChannelSigner { +pub trait EcdsaChannelSigner: ChannelSigner + Send { /// Create a signature for a counterparty's commitment transaction and associated HTLC transactions. /// /// Policy checks should be implemented in this function, including checking the amount diff --git a/lightning/src/util/anchor_channel_reserves.rs b/lightning/src/util/anchor_channel_reserves.rs index 968a60ada0b..aa910fda840 100644 --- a/lightning/src/util/anchor_channel_reserves.rs +++ b/lightning/src/util/anchor_channel_reserves.rs @@ -38,6 +38,8 @@ use bitcoin::Weight; use core::cmp::min; use core::ops::Deref; +use super::async_poll::FutureSpawner; + // Transaction weights based on: // https://github.com/lightning/bolts/blob/master/03-transactions.md#appendix-a-expected-weights const COMMITMENT_TRANSACTION_BASE_WEIGHT: u64 = 900 + 224; @@ -270,12 +272,13 @@ pub fn get_supportable_anchor_channels( /// [Event::OpenChannelRequest]: crate::events::Event::OpenChannelRequest pub fn can_support_additional_anchor_channel< AChannelManagerRef: Deref, - ChannelSigner: EcdsaChannelSigner, + ChannelSigner: EcdsaChannelSigner + Send + Sync + 'static, FilterRef: Deref, BroadcasterRef: Deref, EstimatorRef: Deref, LoggerRef: Deref, PersistRef: Deref, + FS: FutureSpawner, ChainMonitorRef: Deref< Target = ChainMonitor< ChannelSigner, @@ -284,6 +287,7 @@ pub fn can_support_additional_anchor_channel< EstimatorRef, LoggerRef, PersistRef, + FS, >, >, >( diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index a0034a6caae..eee96ca85de 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -98,3 +98,36 @@ pub(crate) fn dummy_waker() -> Waker { /// A type alias for a future that returns a result of type T. pub type AsyncResult<'a, T> = Pin> + 'a + Send>>; + +/// A type alias for a future that returns a result of type T. +pub trait FutureSpawner: Send + Sync + 'static { + /// Spawns a future on a runtime. + fn spawn + Send + 'static>(&self, future: T); +} + +/// Polls a future and either returns true if it is ready or spawns it on the tokio runtime if it is not. +pub fn poll_or_spawn(mut fut: Pin>, callback: C, future_spawner: &S) -> Result +where + F: Future> + Send + 'static + ?Sized, + C: FnOnce() + Send + 'static, + S: FutureSpawner, +{ + let waker = dummy_waker(); + let mut cx = Context::from_waker(&waker); + + match fut.as_mut().poll(&mut cx) { + Poll::Ready(Ok(())) => Ok(true), + Poll::Ready(Err(_)) => Err(()), + Poll::Pending => { + println!("Future not ready, using tokio runtime"); + + let callback = Box::new(callback); + future_spawner.spawn(async move { + fut.await; + callback(); + }); + + Ok(false) + } + } +} \ No newline at end of file diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index e4b8b0b4429..9f685785c94 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -32,7 +32,7 @@ pub mod ser; pub mod sweep; pub mod wakers; -pub(crate) mod async_poll; +pub mod async_poll; pub(crate) mod atomic_counter; pub(crate) mod byte_utils; pub mod hash_tables; diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 6f1f9d0862a..dc2303a86cc 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -19,6 +19,7 @@ use core::str::FromStr; use crate::prelude::*; use crate::{io, log_error}; +use crate::sync::{Arc}; use crate::chain; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::chainmonitor::Persist; @@ -32,6 +33,8 @@ use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::util::logger::Logger; use crate::util::ser::{Readable, ReadableArgs, Writeable}; +use super::async_poll::AsyncResult; + /// The alphabet of characters allowed for namespaces and keys. pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-"; @@ -137,6 +140,11 @@ pub trait KVStore { fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], ) -> Result<(), io::Error>; + + /// Asynchronously persists the given data under the given `key`. + fn write_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> AsyncResult<'static, ()>; /// Removes any data that had previously been persisted under the given `key`. /// /// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily @@ -254,7 +262,9 @@ where } } -impl Persist for K { +impl + Persist for Arc +{ // TODO: We really need a way for the persister to inform the user that its time to crash/shut // down once these start returning failure. // Then we should return InProgress rather than UnrecoverableError, implying we should probably @@ -262,31 +272,37 @@ impl Persist, - ) -> chain::ChannelMonitorUpdateStatus { - match self.write( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - &monitor_name.to_string(), - &monitor.encode(), - ) { - Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, - Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError, - } + ) -> AsyncResult<'static, ()> { + let encoded = monitor.encode(); + let kv_store = self.clone(); + + Box::pin(async move { + kv_store.write_async( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + &monitor_name.to_string(), + &encoded, + ) + .await + }) } fn update_persisted_channel( &self, monitor_name: MonitorName, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, - ) -> chain::ChannelMonitorUpdateStatus { - match self.write( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - &monitor_name.to_string(), - &monitor.encode(), - ) { - Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, - Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError, - } + ) -> AsyncResult<'static, ()> { + let encoded = monitor.encode(); + let kv_store = self.clone(); + + Box::pin(async move { + kv_store.write_async( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + &monitor_name.to_string(), + &encoded, + ) + .await + }) } fn archive_persisted_channel(&self, monitor_name: MonitorName) { @@ -447,6 +463,18 @@ where /// would like to get rid of them, consider using the /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. pub struct MonitorUpdatingPersister +where + K::Target: KVStore, + L::Target: Logger, + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator, +{ + state: Arc>, +} + +struct MonitorUpdatingPersisterState where K::Target: KVStore, L::Target: Logger, @@ -464,9 +492,10 @@ where fee_estimator: FE, } + #[allow(dead_code)] impl - MonitorUpdatingPersister + MonitorUpdatingPersisterState where K::Target: KVStore, L::Target: Logger, @@ -495,7 +524,7 @@ where kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, signer_provider: SP, broadcaster: BI, fee_estimator: FE, ) -> Self { - MonitorUpdatingPersister { + MonitorUpdatingPersisterState { kv_store, logger, maximum_pending_updates, @@ -687,19 +716,19 @@ where } impl< - ChannelSigner: EcdsaChannelSigner, - K: Deref, - L: Deref, - ES: Deref, - SP: Deref, - BI: Deref, - FE: Deref, + ChannelSigner: EcdsaChannelSigner + Send + Sync, + K: Deref + Send + Sync + 'static, + L: Deref + Send + Sync + 'static, + ES: Deref + Send + Sync + 'static, + SP: Deref + Send + Sync + 'static, + BI: Deref + Send + Sync + 'static, + FE: Deref + Send + Sync + 'static, > Persist for MonitorUpdatingPersister where - K::Target: KVStore, + K::Target: KVStore + Sync, L::Target: Logger, ES::Target: EntropySource + Sized, - SP::Target: SignerProvider + Sized, + SP::Target: SignerProvider + Send + Sync + Sized, BI::Target: BroadcasterInterface, FE::Target: FeeEstimator, { @@ -707,34 +736,107 @@ where /// parametrized [`KVStore`]. fn persist_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor, - ) -> chain::ChannelMonitorUpdateStatus { + ) -> AsyncResult<'static, ()> { + let state = self.state.clone(); + + let encoded_monitor = Self::encode_monitor(monitor); + + Box::pin(async move { + state.persist_new_channel(monitor_name, &encoded_monitor).await + }) + } + + /// Persists a channel update, writing only the update to the parameterized [`KVStore`] if possible. + /// + /// In some cases, this will forward to [`MonitorUpdatingPersister::persist_new_channel`]: + /// + /// - No full monitor is found in [`KVStore`] + /// - The number of pending updates exceeds `maximum_pending_updates` as given to [`Self::new`] + /// - LDK commands re-persisting the entire monitor through this function, specifically when + /// `update` is `None`. + /// - The update is at [`u64::MAX`], indicating an update generated by pre-0.1 LDK. + fn update_persisted_channel( + &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, + monitor: &ChannelMonitor, + ) -> AsyncResult<'static, ()> { + let state = self.state.clone(); + + let encoded_monitor = Self::encode_monitor(monitor); + let encoded_update = update.map(|update| (update.update_id, update.encode())); + let monitor_latest_update_id = monitor.get_latest_update_id(); + + Box::pin(async move { + state.update_persisted_channel(monitor_name, encoded_update, &encoded_monitor, monitor_latest_update_id).await + }) + } + + fn archive_persisted_channel(&self, monitor_name: MonitorName) { + self.state.archive_persisted_channel(monitor_name); + } +} + + +impl< + K: Deref + Send + Sync + 'static, + L: Deref + Send + Sync + 'static, + ES: Deref + Send + Sync + 'static, + SP: Deref + Send + Sync + 'static, + BI: Deref + Send + Sync + 'static, + FE: Deref + Send + Sync + 'static, + > MonitorUpdatingPersister +where + K::Target: KVStore + Sync, + L::Target: Logger, + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sync + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator, +{ + fn encode_monitor( + monitor: &ChannelMonitor, + ) -> Vec { + // Serialize and write the new monitor + let mut monitor_bytes = Vec::with_capacity( + MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(), + ); + monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL); + monitor.write(&mut monitor_bytes).unwrap(); + + monitor_bytes + } + } + +impl< + K: Deref + Send + Sync + 'static, + L: Deref + Send + Sync + 'static, + ES: Deref + Send + Sync + 'static, + SP: Deref + Send + Sync + 'static, + BI: Deref + Send + Sync + 'static, + FE: Deref + Send + Sync + 'static, + > MonitorUpdatingPersisterState +where + K::Target: KVStore + Sync, + L::Target: Logger, + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sync + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator, +{ + /// Persists a new channel. This means writing the entire monitor to the + /// parametrized [`KVStore`]. + async fn persist_new_channel( + self: Arc, monitor_name: MonitorName, monitor_bytes: &[u8], + ) -> Result<(), ()> { // Determine the proper key for this monitor let monitor_key = monitor_name.to_string(); + // Serialize and write the new monitor - let mut monitor_bytes = Vec::with_capacity( - MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(), - ); - monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL); - monitor.write(&mut monitor_bytes).unwrap(); - match self.kv_store.write( + self.kv_store.write_async( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, monitor_key.as_str(), &monitor_bytes, - ) { - Ok(_) => chain::ChannelMonitorUpdateStatus::Completed, - Err(e) => { - log_error!( - self.logger, - "Failed to write ChannelMonitor {}/{}/{} reason: {}", - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_key.as_str(), - e - ); - chain::ChannelMonitorUpdateStatus::UnrecoverableError - }, - } + ).await } /// Persists a channel update, writing only the update to the parameterized [`KVStore`] if possible. @@ -746,40 +848,27 @@ where /// - LDK commands re-persisting the entire monitor through this function, specifically when /// `update` is `None`. /// - The update is at [`u64::MAX`], indicating an update generated by pre-0.1 LDK. - fn update_persisted_channel( - &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, - monitor: &ChannelMonitor, - ) -> chain::ChannelMonitorUpdateStatus { + async fn update_persisted_channel( + self: Arc, monitor_name: MonitorName, update: Option<(u64, Vec)>, + monitor: &[u8], monitor_latest_update_id: u64, + ) -> Result<(), ()> { const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX; - if let Some(update) = update { - let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID - && update.update_id % self.maximum_pending_updates != 0; + if let Some((update_id, update)) = update { + let persist_update = update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID + && update_id % self.maximum_pending_updates != 0; if persist_update { let monitor_key = monitor_name.to_string(); - let update_name = UpdateName::from(update.update_id); - match self.kv_store.write( + let update_name = UpdateName::from(update_id); + self.kv_store.write_async( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key.as_str(), update_name.as_str(), - &update.encode(), - ) { - Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, - Err(e) => { - log_error!( - self.logger, - "Failed to write ChannelMonitorUpdate {}/{}/{} reason: {}", - CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, - monitor_key.as_str(), - update_name.as_str(), - e - ); - chain::ChannelMonitorUpdateStatus::UnrecoverableError - }, - } + &update, + ).await } else { // In case of channel-close monitor update, we need to read old monitor before persisting // the new one in order to determine the cleanup range. - let maybe_old_monitor = match monitor.get_latest_update_id() { + let maybe_old_monitor = match monitor_latest_update_id { LEGACY_CLOSED_CHANNEL_UPDATE_ID => { let monitor_key = monitor_name.to_string(); self.read_monitor(&monitor_name, &monitor_key).ok() @@ -788,11 +877,11 @@ where }; // We could write this update, but it meets criteria of our design that calls for a full monitor write. - let monitor_update_status = self.persist_new_channel(monitor_name, monitor); + let monitor_update_status = self.clone().persist_new_channel(monitor_name, &monitor).await; - if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status { + if monitor_update_status.is_ok() { let channel_closed_legacy = - monitor.get_latest_update_id() == LEGACY_CLOSED_CHANNEL_UPDATE_ID; + monitor_latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID; let cleanup_range = if channel_closed_legacy { // If there is an error while reading old monitor, we skip clean up. maybe_old_monitor.map(|(_, ref old_monitor)| { @@ -805,7 +894,7 @@ where (start, end) }) } else { - let end = monitor.get_latest_update_id(); + let end = monitor_latest_update_id; let start = end.saturating_sub(self.maximum_pending_updates); Some((start, end)) }; @@ -819,7 +908,7 @@ where } } else { // There is no update given, so we must persist a new monitor. - self.persist_new_channel(monitor_name, monitor) + self.persist_new_channel(monitor_name, &monitor).await } } @@ -847,8 +936,9 @@ where } } + impl - MonitorUpdatingPersister + MonitorUpdatingPersisterState where ES::Target: EntropySource + Sized, K::Target: KVStore, From 7b0dcf37facb055216f5e24fae451b56023a48e8 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 28 May 2025 12:15:02 +0200 Subject: [PATCH 02/17] removed constraints --- lightning-background-processor/src/lib.rs | 6 +- lightning-persister/src/fs_store.rs | 6 +- lightning/src/chain/chainmonitor.rs | 6 +- lightning/src/util/async_poll.rs | 42 +++++---- lightning/src/util/persist.rs | 108 ++++++++++++---------- 5 files changed, 89 insertions(+), 79 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index daa686c9453..1d26bce7f48 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -979,7 +979,9 @@ impl BackgroundProcessor { EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, M: 'static - + Deref::Signer, CF, T, F, L, P, FS>> + + Deref< + Target = ChainMonitor<::Signer, CF, T, F, L, P, FS>, + > + Send + Sync, CM: 'static + Deref + Send, @@ -994,7 +996,7 @@ impl BackgroundProcessor { O: 'static + Deref, K: 'static + Deref, OS: 'static + Deref> + Send, - FS: FutureSpawner + FS: FutureSpawner, >( persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 0cd0047562b..2039758245b 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -1,9 +1,9 @@ //! Objects related to [`FilesystemStore`] live here. use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; +use lightning::util::async_poll::AsyncResult; use lightning::util::persist::{KVStore, MigratableKVStore}; use lightning::util::string::PrintableString; -use lightning::util::async_poll::AsyncResult; use std::collections::HashMap; use std::fs; @@ -12,8 +12,6 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; - - #[cfg(target_os = "windows")] use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt}; @@ -334,7 +332,7 @@ impl KVStore for FilesystemStore { } fn write_async( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], ) -> AsyncResult<'static, ()> { todo!() } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 18f15fd7522..3e88c9ae89c 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -721,7 +721,7 @@ where C::Target: chain::Filter, } } -impl +impl chain::Listen for ChainMonitor where C::Target: chain::Filter, @@ -750,7 +750,7 @@ where } } -impl +impl chain::Confirm for ChainMonitor where C::Target: chain::Filter, @@ -804,7 +804,7 @@ where } } -impl +impl chain::Watch for ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index eee96ca85de..9fca6b2b49f 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -106,28 +106,30 @@ pub trait FutureSpawner: Send + Sync + 'static { } /// Polls a future and either returns true if it is ready or spawns it on the tokio runtime if it is not. -pub fn poll_or_spawn(mut fut: Pin>, callback: C, future_spawner: &S) -> Result +pub fn poll_or_spawn( + mut fut: Pin>, callback: C, future_spawner: &S, +) -> Result where - F: Future> + Send + 'static + ?Sized, - C: FnOnce() + Send + 'static, + F: Future> + Send + 'static + ?Sized, + C: FnOnce() + Send + 'static, S: FutureSpawner, { - let waker = dummy_waker(); - let mut cx = Context::from_waker(&waker); + let waker = dummy_waker(); + let mut cx = Context::from_waker(&waker); - match fut.as_mut().poll(&mut cx) { - Poll::Ready(Ok(())) => Ok(true), + match fut.as_mut().poll(&mut cx) { + Poll::Ready(Ok(())) => Ok(true), Poll::Ready(Err(_)) => Err(()), - Poll::Pending => { - println!("Future not ready, using tokio runtime"); - - let callback = Box::new(callback); - future_spawner.spawn(async move { - fut.await; - callback(); - }); - - Ok(false) - } - } -} \ No newline at end of file + Poll::Pending => { + println!("Future not ready, using tokio runtime"); + + let callback = Box::new(callback); + future_spawner.spawn(async move { + fut.await; + callback(); + }); + + Ok(false) + }, + } +} diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index dc2303a86cc..5d1103bde5f 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -19,8 +19,6 @@ use core::str::FromStr; use crate::prelude::*; use crate::{io, log_error}; -use crate::sync::{Arc}; -use crate::chain; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::chainmonitor::Persist; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}; @@ -30,6 +28,7 @@ use crate::ln::types::ChannelId; use crate::routing::gossip::NetworkGraph; use crate::routing::scoring::WriteableScore; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; +use crate::sync::Arc; use crate::util::logger::Logger; use crate::util::ser::{Readable, ReadableArgs, Writeable}; @@ -262,7 +261,7 @@ where } } -impl +impl Persist for Arc { // TODO: We really need a way for the persister to inform the user that its time to crash/shut @@ -277,13 +276,14 @@ impl MonitorUpdatingPersisterState @@ -728,7 +728,7 @@ where K::Target: KVStore + Sync, L::Target: Logger, ES::Target: EntropySource + Sized, - SP::Target: SignerProvider + Send + Sync + Sized, + SP::Target: SignerProvider + Sync + Sized, BI::Target: BroadcasterInterface, FE::Target: FeeEstimator, { @@ -741,9 +741,7 @@ where let encoded_monitor = Self::encode_monitor(monitor); - Box::pin(async move { - state.persist_new_channel(monitor_name, &encoded_monitor).await - }) + Box::pin(async move { state.persist_new_channel(monitor_name, &encoded_monitor).await }) } /// Persists a channel update, writing only the update to the parameterized [`KVStore`] if possible. @@ -766,7 +764,14 @@ where let monitor_latest_update_id = monitor.get_latest_update_id(); Box::pin(async move { - state.update_persisted_channel(monitor_name, encoded_update, &encoded_monitor, monitor_latest_update_id).await + state + .update_persisted_channel( + monitor_name, + encoded_update, + &encoded_monitor, + monitor_latest_update_id, + ) + .await }) } @@ -775,7 +780,6 @@ where } } - impl< K: Deref + Send + Sync + 'static, L: Deref + Send + Sync + 'static, @@ -793,18 +797,18 @@ where FE::Target: FeeEstimator, { fn encode_monitor( - monitor: &ChannelMonitor, - ) -> Vec { - // Serialize and write the new monitor - let mut monitor_bytes = Vec::with_capacity( - MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(), - ); - monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL); - monitor.write(&mut monitor_bytes).unwrap(); + monitor: &ChannelMonitor, + ) -> Vec { + // Serialize and write the new monitor + let mut monitor_bytes = Vec::with_capacity( + MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(), + ); + monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL); + monitor.write(&mut monitor_bytes).unwrap(); - monitor_bytes - } + monitor_bytes } +} impl< K: Deref + Send + Sync + 'static, @@ -831,12 +835,14 @@ where let monitor_key = monitor_name.to_string(); // Serialize and write the new monitor - self.kv_store.write_async( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_key.as_str(), - &monitor_bytes, - ).await + self.kv_store + .write_async( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_key.as_str(), + &monitor_bytes, + ) + .await } /// Persists a channel update, writing only the update to the parameterized [`KVStore`] if possible. @@ -849,8 +855,8 @@ where /// `update` is `None`. /// - The update is at [`u64::MAX`], indicating an update generated by pre-0.1 LDK. async fn update_persisted_channel( - self: Arc, monitor_name: MonitorName, update: Option<(u64, Vec)>, - monitor: &[u8], monitor_latest_update_id: u64, + self: Arc, monitor_name: MonitorName, update: Option<(u64, Vec)>, monitor: &[u8], + monitor_latest_update_id: u64, ) -> Result<(), ()> { const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX; if let Some((update_id, update)) = update { @@ -859,12 +865,14 @@ where if persist_update { let monitor_key = monitor_name.to_string(); let update_name = UpdateName::from(update_id); - self.kv_store.write_async( - CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, - monitor_key.as_str(), - update_name.as_str(), - &update, - ).await + self.kv_store + .write_async( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_key.as_str(), + update_name.as_str(), + &update, + ) + .await } else { // In case of channel-close monitor update, we need to read old monitor before persisting // the new one in order to determine the cleanup range. @@ -877,7 +885,8 @@ where }; // We could write this update, but it meets criteria of our design that calls for a full monitor write. - let monitor_update_status = self.clone().persist_new_channel(monitor_name, &monitor).await; + let monitor_update_status = + self.clone().persist_new_channel(monitor_name, &monitor).await; if monitor_update_status.is_ok() { let channel_closed_legacy = @@ -936,7 +945,6 @@ where } } - impl MonitorUpdatingPersisterState where From 9452e13fc35e485ff028be37950036e7ae2fc1f7 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 28 May 2025 13:22:16 +0200 Subject: [PATCH 03/17] return io::error --- lightning-persister/src/fs_store.rs | 4 ++-- lightning/src/util/async_poll.rs | 3 +++ lightning/src/util/persist.rs | 8 ++++++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 2039758245b..c97f8f68a61 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -1,7 +1,7 @@ //! Objects related to [`FilesystemStore`] live here. use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; -use lightning::util::async_poll::AsyncResult; +use lightning::util::async_poll::{AsyncResult, AsyncResultType}; use lightning::util::persist::{KVStore, MigratableKVStore}; use lightning::util::string::PrintableString; @@ -333,7 +333,7 @@ impl KVStore for FilesystemStore { fn write_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], - ) -> AsyncResult<'static, ()> { + ) -> AsyncResultType<'static, (), lightning::io::Error> { todo!() } } diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index 9fca6b2b49f..c54f7c85b8e 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -99,6 +99,9 @@ pub(crate) fn dummy_waker() -> Waker { /// A type alias for a future that returns a result of type T. pub type AsyncResult<'a, T> = Pin> + 'a + Send>>; +/// A type alias for a future that returns a result of type T with error type V. +pub type AsyncResultType<'a, T, V> = Pin> + 'a + Send>>; + /// A type alias for a future that returns a result of type T. pub trait FutureSpawner: Send + Sync + 'static { /// Spawns a future on a runtime. diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 5d1103bde5f..70dc8280d49 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -32,7 +32,7 @@ use crate::sync::Arc; use crate::util::logger::Logger; use crate::util::ser::{Readable, ReadableArgs, Writeable}; -use super::async_poll::AsyncResult; +use super::async_poll::{AsyncResult, AsyncResultType}; /// The alphabet of characters allowed for namespaces and keys. pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = @@ -143,7 +143,7 @@ pub trait KVStore { /// Asynchronously persists the given data under the given `key`. fn write_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], - ) -> AsyncResult<'static, ()>; + ) -> AsyncResultType<'static, (), io::Error>; /// Removes any data that had previously been persisted under the given `key`. /// /// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily @@ -284,6 +284,7 @@ impl Date: Wed, 28 May 2025 13:38:46 +0200 Subject: [PATCH 04/17] async write wip --- lightning/src/chain/chainmonitor.rs | 4 +- lightning/src/util/async_poll.rs | 3 + lightning/src/util/persist.rs | 145 +++++++++++++++++----------- 3 files changed, 93 insertions(+), 59 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 3e88c9ae89c..a97ef8c6b5b 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -36,7 +36,7 @@ use crate::chain::transaction::{OutPoint, TransactionData}; use crate::ln::types::ChannelId; use crate::sign::ecdsa::EcdsaChannelSigner; use crate::events::{self, Event, EventHandler, ReplayEvent}; -use crate::util::async_poll::{poll_or_spawn, AsyncResult, FutureSpawner}; +use crate::util::async_poll::{poll_or_spawn, AsyncResult, AsyncVoid, FutureSpawner}; use crate::util::logger::{Logger, WithContext}; use crate::util::errors::APIError; use crate::util::persist::MonitorName; @@ -175,7 +175,7 @@ pub trait Persist { /// the archive process. Additionally, because the archive operation could be retried on /// restart, this method must in that case be idempotent, ensuring it can handle scenarios where /// the monitor already exists in the archive. - fn archive_persisted_channel(&self, monitor_name: MonitorName); + fn archive_persisted_channel(&self, monitor_name: MonitorName) -> AsyncVoid; } struct MonitorHolder { diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index c54f7c85b8e..9eaed995c26 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -96,6 +96,9 @@ pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } } +/// A type alias for a future that returns nothing. +pub type AsyncVoid = Pin + 'static + Send>>; + /// A type alias for a future that returns a result of type T. pub type AsyncResult<'a, T> = Pin> + 'a + Send>>; diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 70dc8280d49..c6cec0c0bba 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -32,7 +32,7 @@ use crate::sync::Arc; use crate::util::logger::Logger; use crate::util::ser::{Readable, ReadableArgs, Writeable}; -use super::async_poll::{AsyncResult, AsyncResultType}; +use super::async_poll::{AsyncResult, AsyncResultType, AsyncVoid}; /// The alphabet of characters allowed for namespaces and keys. pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = @@ -136,11 +136,6 @@ pub trait KVStore { /// /// Will create the given `primary_namespace` and `secondary_namespace` if not already present /// in the store. - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], - ) -> Result<(), io::Error>; - - /// Asynchronously persists the given data under the given `key`. fn write_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], ) -> AsyncResultType<'static, (), io::Error>; @@ -193,14 +188,22 @@ pub trait MigratableKVStore: KVStore { /// /// Will abort and return an error if any IO operation fails. Note that in this case the /// `target_store` might get left in an intermediate state. -pub fn migrate_kv_store_data( +pub async fn migrate_kv_store_data( source_store: &mut S, target_store: &mut T, ) -> Result<(), io::Error> { let keys_to_migrate = source_store.list_all_keys()?; for (primary_namespace, secondary_namespace, key) in &keys_to_migrate { let data = source_store.read(primary_namespace, secondary_namespace, key)?; - target_store.write(primary_namespace, secondary_namespace, key, &data)?; + target_store + .write_async(primary_namespace, secondary_namespace, key, &data) + .await + .map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "Failed to write data to target store during migration", + ) + })?; } Ok(()) @@ -218,32 +221,44 @@ where /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed. /// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager - fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error>; + fn persist_manager(&self, channel_manager: &CM) -> AsyncResultType<'static, (), io::Error>; /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed. - fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error>; + fn persist_graph( + &self, network_graph: &NetworkGraph, + ) -> AsyncResultType<'static, (), io::Error>; /// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed. - fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>; + fn persist_scorer(&self, scorer: &S) -> AsyncResultType<'static, (), io::Error>; } -impl<'a, A: KVStore + ?Sized, CM: Deref, L: Deref, S: Deref> Persister<'a, CM, L, S> for A +impl<'a, A: KVStore + ?Sized + Send + Sync + 'static, CM: Deref, L: Deref, S: Deref> + Persister<'a, CM, L, S> for Arc where CM::Target: 'static + AChannelManager, L::Target: 'static + Logger, S::Target: WriteableScore<'a>, { - fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> { - self.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - &channel_manager.get_cm().encode(), - ) + fn persist_manager(&self, channel_manager: &CM) -> AsyncResultType<'static, (), io::Error> { + let encoded = channel_manager.get_cm().encode(); + let kv_store = self.clone(); + + Box::pin(async move { + kv_store + .write_async( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &encoded, + ) + .await + }) } - fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> { - self.write( + fn persist_graph( + &self, network_graph: &NetworkGraph, + ) -> AsyncResultType<'static, (), io::Error> { + self.write_async( NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, @@ -251,8 +266,8 @@ where ) } - fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> { - self.write( + fn persist_scorer(&self, scorer: &S) -> AsyncResultType<'static, (), io::Error> { + self.write_async( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, @@ -308,31 +323,38 @@ impl monitor, - Err(_) => return, - }; - match self.write( - ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_key.as_str(), - &monitor, - ) { - Ok(()) => {}, - Err(_e) => return, - }; - let _ = self.remove( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_key.as_str(), - true, - ); + fn archive_persisted_channel(&self, monitor_name: MonitorName) -> AsyncVoid { + let kv_store = self.clone(); + + Box::pin(async move { + let monitor_key = monitor_name.to_string(); + let monitor = match kv_store.read( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_key.as_str(), + ) { + Ok(monitor) => monitor, + Err(_) => return, + }; + match kv_store + .write_async( + ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_key.as_str(), + &monitor, + ) + .await + { + Ok(()) => {}, + Err(_e) => return, + }; + let _ = kv_store.remove( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_key.as_str(), + true, + ); + }) } } @@ -777,8 +799,13 @@ where }) } - fn archive_persisted_channel(&self, monitor_name: MonitorName) { - self.state.archive_persisted_channel(monitor_name); + fn archive_persisted_channel(&self, monitor_name: MonitorName) -> AsyncVoid { + let monitor_name = monitor_name; + let state = self.state.clone(); + + Box::pin(async move { + state.archive_persisted_channel(monitor_name).await; + }) } } @@ -925,18 +952,22 @@ where } } - fn archive_persisted_channel(&self, monitor_name: MonitorName) { + async fn archive_persisted_channel(&self, monitor_name: MonitorName) { let monitor_key = monitor_name.to_string(); let monitor = match self.read_channel_monitor_with_updates(&monitor_key) { Ok((_block_hash, monitor)) => monitor, Err(_) => return, }; - match self.kv_store.write( - ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_key.as_str(), - &monitor.encode(), - ) { + match self + .kv_store + .write_async( + ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_key.as_str(), + &monitor.encode(), + ) + .await + { Ok(()) => {}, Err(_e) => return, }; From 436e5266b9407e0828d4c4ef09468e712347490b Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 28 May 2025 13:43:58 +0200 Subject: [PATCH 05/17] sweeper async wip --- lightning/src/util/sweep.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index 0fae91bebc2..e07bf93697f 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -411,7 +411,7 @@ where /// Returns `Err` on persistence failure, in which case the call may be safely retried. /// /// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs - pub fn track_spendable_outputs( + pub async fn track_spendable_outputs( &self, output_descriptors: Vec, channel_id: Option, exclude_static_outputs: bool, delay_until_height: Option, ) -> Result<(), ()> { @@ -444,7 +444,7 @@ where state_lock.outputs.push(output_info); } - self.persist_state(&*state_lock).map_err(|e| { + self.persist_state(&*state_lock).await.map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); }) } @@ -560,7 +560,7 @@ where output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone()); } - self.persist_state(&sweeper_state).map_err(|e| { + self.persist_state(&sweeper_state).await.map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); })?; @@ -590,14 +590,15 @@ where }); } - fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> { + async fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> { self.kv_store - .write( + .write_async( OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, &sweeper_state.encode(), ) + .await .map_err(|e| { log_error!( self.logger, @@ -970,16 +971,18 @@ where } /// Tells the sweeper to track the given outputs descriptors. Wraps [`OutputSweeper::track_spendable_outputs`]. - pub fn track_spendable_outputs( + pub async fn track_spendable_outputs( &self, output_descriptors: Vec, channel_id: Option, exclude_static_outputs: bool, delay_until_height: Option, ) -> Result<(), ()> { - self.sweeper.track_spendable_outputs( - output_descriptors, - channel_id, - exclude_static_outputs, - delay_until_height, - ) + self.sweeper + .track_spendable_outputs( + output_descriptors, + channel_id, + exclude_static_outputs, + delay_until_height, + ) + .await } /// Returns a list of the currently tracked spendable outputs. Wraps [`OutputSweeper::tracked_spendable_outputs`]. From 1127ab0442b1bd27b80c85abe7bc7f814661e6da Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 28 May 2025 15:25:57 +0200 Subject: [PATCH 06/17] sweeper/bg proc fixes wip --- lightning-background-processor/src/lib.rs | 16 +-- lightning-persister/src/fs_store.rs | 147 +++++++++++----------- lightning/src/util/sweep.rs | 30 ++--- 3 files changed, 99 insertions(+), 94 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 1d26bce7f48..1ebc0791a3d 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -375,7 +375,7 @@ macro_rules! define_run_body { if $channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!($logger, "Persisting ChannelManager..."); - $persister.persist_manager(&$channel_manager)?; + $persister.persist_manager(&$channel_manager).await?; log_trace!($logger, "Done persisting ChannelManager."); } if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { @@ -436,7 +436,7 @@ macro_rules! define_run_body { log_trace!($logger, "Persisting network graph."); } - if let Err(e) = $persister.persist_graph(network_graph) { + if let Err(e) = $persister.persist_graph(network_graph).await { log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) } @@ -464,7 +464,7 @@ macro_rules! define_run_body { } else { log_trace!($logger, "Persisting scorer"); } - if let Err(e) = $persister.persist_scorer(&scorer) { + if let Err(e) = $persister.persist_scorer(&scorer).await { log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) } } @@ -487,16 +487,16 @@ macro_rules! define_run_body { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - $persister.persist_manager(&$channel_manager)?; + $persister.persist_manager(&$channel_manager).await?; // Persist Scorer on exit if let Some(ref scorer) = $scorer { - $persister.persist_scorer(&scorer)?; + $persister.persist_scorer(&scorer).await?; } // Persist NetworkGraph on exit if let Some(network_graph) = $gossip_sync.network_graph() { - $persister.persist_graph(network_graph)?; + $persister.persist_graph(network_graph).await?; } Ok(()) @@ -840,7 +840,7 @@ where if let Some(duration_since_epoch) = fetch_time() { if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); - if let Err(e) = persister.persist_scorer(&*scorer) { + if let Err(e) = persister.persist_scorer(&*scorer).await { log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e); // We opt not to abort early on persistence failure here as persisting // the scorer is non-critical and we still hope that it will have @@ -1033,7 +1033,7 @@ impl BackgroundProcessor { .expect("Time should be sometime after 1970"); if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); - if let Err(e) = persister.persist_scorer(&scorer) { + if let Err(e) = persister.persist_scorer(&scorer).await { log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) } } diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index c97f8f68a61..ade12ed25e7 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -119,56 +119,64 @@ impl KVStore for FilesystemStore { Ok(buf) } - fn write( + fn write_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], - ) -> lightning::io::Result<()> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; - - let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; - dest_file_path.push(key); - - let parent_directory = dest_file_path.parent().ok_or_else(|| { - let msg = - format!("Could not retrieve parent directory of {}.", dest_file_path.display()); - std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) - })?; - fs::create_dir_all(&parent_directory)?; - - // Do a crazy dance with lots of fsync()s to be overly cautious here... - // We never want to end up in a state where we've lost the old data, or end up using the - // old data on power loss after we've returned. - // The way to atomically write a file on Unix platforms is: - // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) - let mut tmp_file_path = dest_file_path.clone(); - let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); - tmp_file_path.set_extension(tmp_file_ext); - - { - let mut tmp_file = fs::File::create(&tmp_file_path)?; - tmp_file.write_all(&buf)?; - tmp_file.sync_all()?; - } - - let res = { - let inner_lock_ref = { - let mut outer_lock = self.locks.lock().unwrap(); - Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) - }; - let _guard = inner_lock_ref.write().unwrap(); + ) -> AsyncResultType<'static, (), lightning::io::Error> { + Box::pin(async move { + check_namespace_key_validity( + primary_namespace, + secondary_namespace, + Some(key), + "write", + )?; + + let mut dest_file_path = + self.get_dest_dir_path(primary_namespace, secondary_namespace)?; + dest_file_path.push(key); + + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = + format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + fs::create_dir_all(&parent_directory)?; + + // Do a crazy dance with lots of fsync()s to be overly cautious here... + // We never want to end up in a state where we've lost the old data, or end up using the + // old data on power loss after we've returned. + // The way to atomically write a file on Unix platforms is: + // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) + let mut tmp_file_path = dest_file_path.clone(); + let tmp_file_ext = + format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); + tmp_file_path.set_extension(tmp_file_ext); - #[cfg(not(target_os = "windows"))] { - fs::rename(&tmp_file_path, &dest_file_path)?; - let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?; - dir_file.sync_all()?; - Ok(()) + let mut tmp_file = fs::File::create(&tmp_file_path)?; + tmp_file.write_all(&buf)?; + tmp_file.sync_all()?; } - #[cfg(target_os = "windows")] - { - let res = if dest_file_path.exists() { - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::ReplaceFileW( + let res = { + let inner_lock_ref = { + let mut outer_lock = self.locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) + }; + let _guard = inner_lock_ref.write().unwrap(); + + #[cfg(not(target_os = "windows"))] + { + fs::rename(&tmp_file_path, &dest_file_path)?; + let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?; + dir_file.sync_all()?; + Ok(()) + } + + #[cfg(target_os = "windows")] + { + let res = if dest_file_path.exists() { + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::ReplaceFileW( path_to_windows_str(&dest_file_path).as_ptr(), path_to_windows_str(&tmp_file_path).as_ptr(), std::ptr::null(), @@ -176,34 +184,37 @@ impl KVStore for FilesystemStore { std::ptr::null_mut() as *const core::ffi::c_void, std::ptr::null_mut() as *const core::ffi::c_void, ) - }) - } else { - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::MoveFileExW( + }) + } else { + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::MoveFileExW( path_to_windows_str(&tmp_file_path).as_ptr(), path_to_windows_str(&dest_file_path).as_ptr(), windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, ) - }) - }; - - match res { - Ok(()) => { - // We fsync the dest file in hopes this will also flush the metadata to disk. - let dest_file = - fs::OpenOptions::new().read(true).write(true).open(&dest_file_path)?; - dest_file.sync_all()?; - Ok(()) - }, - Err(e) => Err(e.into()), + }) + }; + + match res { + Ok(()) => { + // We fsync the dest file in hopes this will also flush the metadata to disk. + let dest_file = fs::OpenOptions::new() + .read(true) + .write(true) + .open(&dest_file_path)?; + dest_file.sync_all()?; + Ok(()) + }, + Err(e) => Err(e.into()), + } } - } - }; + }; - self.garbage_collect_locks(); + self.garbage_collect_locks(); - res + res + }) } fn remove( @@ -330,12 +341,6 @@ impl KVStore for FilesystemStore { Ok(keys) } - - fn write_async( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], - ) -> AsyncResultType<'static, (), lightning::io::Error> { - todo!() - } } fn dir_entry_is_key(p: &Path) -> Result { diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index e07bf93697f..a8e4a0a1ded 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -675,9 +675,9 @@ where self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); self.best_block_updated_internal(&mut *state_lock, header, height); - let _ = self.persist_state(&*state_lock).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + // let _ = self.persist_state(&*state_lock).map_err(|e| { + // log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + // }); } fn block_disconnected(&self, header: &Header, height: u32) { @@ -699,9 +699,9 @@ where } } - self.persist_state(&*state_lock).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + // self.persist_state(&*state_lock).unwrap_or_else(|e| { + // log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + // }); } } @@ -721,9 +721,9 @@ where ) { let mut state_lock = self.sweeper_state.lock().unwrap(); self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); - self.persist_state(&*state_lock).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + // self.persist_state(&*state_lock).unwrap_or_else(|e| { + // log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + // }); } fn transaction_unconfirmed(&self, txid: &Txid) { @@ -744,18 +744,18 @@ where .filter(|o| o.status.confirmation_height() >= Some(unconf_height)) .for_each(|o| o.status.unconfirmed()); - self.persist_state(&*state_lock).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + // self.persist_state(&*state_lock).unwrap_or_else(|e| { + // log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + // }); } } fn best_block_updated(&self, header: &Header, height: u32) { let mut state_lock = self.sweeper_state.lock().unwrap(); self.best_block_updated_internal(&mut *state_lock, header, height); - let _ = self.persist_state(&*state_lock).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + // let _ = self.persist_state(&*state_lock).map_err(|e| { + // log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + // }); } fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { From f2dc7abc36c6e84150ebece8b8814fae8797f64b Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 28 May 2025 19:39:08 +0200 Subject: [PATCH 07/17] bg proc fixes wip --- lightning-background-processor/src/lib.rs | 120 +++++++++++----------- 1 file changed, 60 insertions(+), 60 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 1ebc0791a3d..8d64b6c20f8 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1033,70 +1033,70 @@ impl BackgroundProcessor { .expect("Time should be sometime after 1970"); if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); - if let Err(e) = persister.persist_scorer(&scorer).await { - log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) - } + // if let Err(e) = persister.persist_scorer(&scorer).await { + // log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + // } } } event_handler.handle_event(event) }; - define_run_body!( - persister, - chain_monitor, - chain_monitor.process_pending_events(&event_handler), - channel_manager, - channel_manager.get_cm().process_pending_events(&event_handler), - onion_messenger, - if let Some(om) = &onion_messenger { - om.get_om().process_pending_events(&event_handler) - }, - peer_manager, - gossip_sync, - { - if let Some(ref sweeper) = sweeper { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary(); - } - }, - logger, - scorer, - stop_thread.load(Ordering::Acquire), - { - let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) { - (Some(om), Some(lm)) => Sleeper::from_four_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &om.get_om().get_update_future(), - &lm.get_lm().get_pending_msgs_future(), - ), - (Some(om), None) => Sleeper::from_three_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &om.get_om().get_update_future(), - ), - (None, Some(lm)) => Sleeper::from_three_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &lm.get_lm().get_pending_msgs_future(), - ), - (None, None) => Sleeper::from_two_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - ), - }; - sleeper.wait_timeout(Duration::from_millis(100)); - }, - |_| Instant::now(), - |time: &Instant, dur| time.elapsed().as_secs() > dur, - false, - || { - use std::time::SystemTime; - Some( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("Time should be sometime after 1970"), - ) - }, - ) + // define_run_body!( + // persister, + // chain_monitor, + // chain_monitor.process_pending_events(&event_handler), + // channel_manager, + // channel_manager.get_cm().process_pending_events(&event_handler), + // onion_messenger, + // if let Some(om) = &onion_messenger { + // om.get_om().process_pending_events(&event_handler) + // }, + // peer_manager, + // gossip_sync, + // { + // if let Some(ref sweeper) = sweeper { + // let _ = sweeper.regenerate_and_broadcast_spend_if_necessary(); + // } + // }, + // logger, + // scorer, + // stop_thread.load(Ordering::Acquire), + // { + // let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) { + // (Some(om), Some(lm)) => Sleeper::from_four_futures( + // &channel_manager.get_cm().get_event_or_persistence_needed_future(), + // &chain_monitor.get_update_future(), + // &om.get_om().get_update_future(), + // &lm.get_lm().get_pending_msgs_future(), + // ), + // (Some(om), None) => Sleeper::from_three_futures( + // &channel_manager.get_cm().get_event_or_persistence_needed_future(), + // &chain_monitor.get_update_future(), + // &om.get_om().get_update_future(), + // ), + // (None, Some(lm)) => Sleeper::from_three_futures( + // &channel_manager.get_cm().get_event_or_persistence_needed_future(), + // &chain_monitor.get_update_future(), + // &lm.get_lm().get_pending_msgs_future(), + // ), + // (None, None) => Sleeper::from_two_futures( + // &channel_manager.get_cm().get_event_or_persistence_needed_future(), + // &chain_monitor.get_update_future(), + // ), + // }; + // sleeper.wait_timeout(Duration::from_millis(100)); + // }, + // |_| Instant::now(), + // |time: &Instant, dur| time.elapsed().as_secs() > dur, + // false, + // || { + // use std::time::SystemTime; + // Some( + // SystemTime::now() + // .duration_since(SystemTime::UNIX_EPOCH) + // .expect("Time should be sometime after 1970"), + // ) + // }, + // ) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } From aba5d24876668980ddbe6395b760dba1cf2e1794 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 28 May 2025 20:23:18 +0200 Subject: [PATCH 08/17] fs fixes --- lightning-background-processor/src/lib.rs | 2 + lightning-persister/src/fs_store.rs | 183 +++++++++++----------- 2 files changed, 93 insertions(+), 92 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 8d64b6c20f8..ce3608bdbe8 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1097,6 +1097,8 @@ impl BackgroundProcessor { // ) // }, // ) + + Ok(()) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index ade12ed25e7..ea00615457d 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -93,6 +93,95 @@ impl FilesystemStore { } } +impl FilesystemStore { + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> Result<(), lightning::io::Error> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; + + let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; + dest_file_path.push(key); + + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = + format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + fs::create_dir_all(&parent_directory)?; + + // Do a crazy dance with lots of fsync()s to be overly cautious here... + // We never want to end up in a state where we've lost the old data, or end up using the + // old data on power loss after we've returned. + // The way to atomically write a file on Unix platforms is: + // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) + let mut tmp_file_path = dest_file_path.clone(); + let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); + tmp_file_path.set_extension(tmp_file_ext); + + { + let mut tmp_file = fs::File::create(&tmp_file_path)?; + tmp_file.write_all(&buf)?; + tmp_file.sync_all()?; + } + + let res = { + let inner_lock_ref = { + let mut outer_lock = self.locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) + }; + let _guard = inner_lock_ref.write().unwrap(); + + #[cfg(not(target_os = "windows"))] + { + fs::rename(&tmp_file_path, &dest_file_path)?; + let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?; + dir_file.sync_all()?; + Ok(()) + } + + #[cfg(target_os = "windows")] + { + let res = if dest_file_path.exists() { + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::ReplaceFileW( + path_to_windows_str(&dest_file_path).as_ptr(), + path_to_windows_str(&tmp_file_path).as_ptr(), + std::ptr::null(), + windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS, + std::ptr::null_mut() as *const core::ffi::c_void, + std::ptr::null_mut() as *const core::ffi::c_void, + ) + }) + } else { + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::MoveFileExW( + path_to_windows_str(&tmp_file_path).as_ptr(), + path_to_windows_str(&dest_file_path).as_ptr(), + windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH + | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, + ) + }) + }; + + match res { + Ok(()) => { + // We fsync the dest file in hopes this will also flush the metadata to disk. + let dest_file = + fs::OpenOptions::new().read(true).write(true).open(&dest_file_path)?; + dest_file.sync_all()?; + Ok(()) + }, + Err(e) => Err(e.into()), + } + } + }; + + self.garbage_collect_locks(); + + res + } +} + impl KVStore for FilesystemStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, @@ -122,99 +211,9 @@ impl KVStore for FilesystemStore { fn write_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], ) -> AsyncResultType<'static, (), lightning::io::Error> { - Box::pin(async move { - check_namespace_key_validity( - primary_namespace, - secondary_namespace, - Some(key), - "write", - )?; - - let mut dest_file_path = - self.get_dest_dir_path(primary_namespace, secondary_namespace)?; - dest_file_path.push(key); - - let parent_directory = dest_file_path.parent().ok_or_else(|| { - let msg = - format!("Could not retrieve parent directory of {}.", dest_file_path.display()); - std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) - })?; - fs::create_dir_all(&parent_directory)?; - - // Do a crazy dance with lots of fsync()s to be overly cautious here... - // We never want to end up in a state where we've lost the old data, or end up using the - // old data on power loss after we've returned. - // The way to atomically write a file on Unix platforms is: - // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) - let mut tmp_file_path = dest_file_path.clone(); - let tmp_file_ext = - format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); - tmp_file_path.set_extension(tmp_file_ext); - - { - let mut tmp_file = fs::File::create(&tmp_file_path)?; - tmp_file.write_all(&buf)?; - tmp_file.sync_all()?; - } - - let res = { - let inner_lock_ref = { - let mut outer_lock = self.locks.lock().unwrap(); - Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) - }; - let _guard = inner_lock_ref.write().unwrap(); - - #[cfg(not(target_os = "windows"))] - { - fs::rename(&tmp_file_path, &dest_file_path)?; - let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?; - dir_file.sync_all()?; - Ok(()) - } - - #[cfg(target_os = "windows")] - { - let res = if dest_file_path.exists() { - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::ReplaceFileW( - path_to_windows_str(&dest_file_path).as_ptr(), - path_to_windows_str(&tmp_file_path).as_ptr(), - std::ptr::null(), - windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS, - std::ptr::null_mut() as *const core::ffi::c_void, - std::ptr::null_mut() as *const core::ffi::c_void, - ) - }) - } else { - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::MoveFileExW( - path_to_windows_str(&tmp_file_path).as_ptr(), - path_to_windows_str(&dest_file_path).as_ptr(), - windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH - | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, - ) - }) - }; - - match res { - Ok(()) => { - // We fsync the dest file in hopes this will also flush the metadata to disk. - let dest_file = fs::OpenOptions::new() - .read(true) - .write(true) - .open(&dest_file_path)?; - dest_file.sync_all()?; - Ok(()) - }, - Err(e) => Err(e.into()), - } - } - }; - - self.garbage_collect_locks(); + let res = self.write(primary_namespace, secondary_namespace, key, buf); - res - }) + Box::pin(async move { res }) } fn remove( From c10d7c324df742e35231c1cdfe215dd8be8c73ed Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 2 Jun 2025 16:34:13 +0200 Subject: [PATCH 09/17] rename write_async to write --- lightning-persister/src/fs_store.rs | 2 +- lightning/src/util/persist.rs | 29 ++++++++++++++--------------- lightning/src/util/sweep.rs | 2 +- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index ea00615457d..90bef4387c1 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -208,7 +208,7 @@ impl KVStore for FilesystemStore { Ok(buf) } - fn write_async( + fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], ) -> AsyncResultType<'static, (), lightning::io::Error> { let res = self.write(primary_namespace, secondary_namespace, key, buf); diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index c6cec0c0bba..930d02030fa 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -136,7 +136,7 @@ pub trait KVStore { /// /// Will create the given `primary_namespace` and `secondary_namespace` if not already present /// in the store. - fn write_async( + fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], ) -> AsyncResultType<'static, (), io::Error>; /// Removes any data that had previously been persisted under the given `key`. @@ -195,15 +195,14 @@ pub async fn migrate_kv_store_data( for (primary_namespace, secondary_namespace, key) in &keys_to_migrate { let data = source_store.read(primary_namespace, secondary_namespace, key)?; - target_store - .write_async(primary_namespace, secondary_namespace, key, &data) - .await - .map_err(|_| { + target_store.write(primary_namespace, secondary_namespace, key, &data).await.map_err( + |_| { io::Error::new( io::ErrorKind::Other, "Failed to write data to target store during migration", ) - })?; + }, + )?; } Ok(()) @@ -245,7 +244,7 @@ where Box::pin(async move { kv_store - .write_async( + .write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, @@ -258,7 +257,7 @@ where fn persist_graph( &self, network_graph: &NetworkGraph, ) -> AsyncResultType<'static, (), io::Error> { - self.write_async( + self.write( NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, @@ -267,7 +266,7 @@ where } fn persist_scorer(&self, scorer: &S) -> AsyncResultType<'static, (), io::Error> { - self.write_async( + self.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, @@ -292,7 +291,7 @@ impl return, }; match kv_store - .write_async( + .write( ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, monitor_key.as_str(), @@ -865,7 +864,7 @@ where // Serialize and write the new monitor self.kv_store - .write_async( + .write( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, monitor_key.as_str(), @@ -896,7 +895,7 @@ where let monitor_key = monitor_name.to_string(); let update_name = UpdateName::from(update_id); self.kv_store - .write_async( + .write( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key.as_str(), update_name.as_str(), @@ -960,7 +959,7 @@ where }; match self .kv_store - .write_async( + .write( ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, monitor_key.as_str(), diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index a8e4a0a1ded..dcb6c9e4240 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -592,7 +592,7 @@ where async fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> { self.kv_store - .write_async( + .write( OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, From e551842705793aecf8f8ace47437c5cd5a1b9e2b Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 2 Jun 2025 16:41:05 +0200 Subject: [PATCH 10/17] async read --- lightning-persister/src/fs_store.rs | 49 +++++++++-------- lightning/src/util/persist.rs | 82 +++++++++++++++++------------ 2 files changed, 76 insertions(+), 55 deletions(-) diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 90bef4387c1..919dc97f691 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -94,6 +94,31 @@ impl FilesystemStore { } impl FilesystemStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> lightning::io::Result> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; + + let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; + dest_file_path.push(key); + + let mut buf = Vec::new(); + { + let inner_lock_ref = { + let mut outer_lock = self.locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) + }; + let _guard = inner_lock_ref.read().unwrap(); + + let mut f = fs::File::open(dest_file_path)?; + f.read_to_end(&mut buf)?; + } + + self.garbage_collect_locks(); + + Ok(buf) + } + fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], ) -> Result<(), lightning::io::Error> { @@ -185,27 +210,9 @@ impl FilesystemStore { impl KVStore for FilesystemStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> lightning::io::Result> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; - - let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; - dest_file_path.push(key); - - let mut buf = Vec::new(); - { - let inner_lock_ref = { - let mut outer_lock = self.locks.lock().unwrap(); - Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) - }; - let _guard = inner_lock_ref.read().unwrap(); - - let mut f = fs::File::open(dest_file_path)?; - f.read_to_end(&mut buf)?; - } - - self.garbage_collect_locks(); - - Ok(buf) + ) -> AsyncResultType<'static, Vec, lightning::io::Error> { + let res = self.read(primary_namespace, secondary_namespace, key); + Box::pin(async move { res }) } fn write( diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 930d02030fa..bff065e58a8 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -131,7 +131,7 @@ pub trait KVStore { /// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Result, io::Error>; + ) -> AsyncResultType<'static, Vec, io::Error>; /// Persists the given data under the given `key`. /// /// Will create the given `primary_namespace` and `secondary_namespace` if not already present @@ -194,7 +194,7 @@ pub async fn migrate_kv_store_data( let keys_to_migrate = source_store.list_all_keys()?; for (primary_namespace, secondary_namespace, key) in &keys_to_migrate { - let data = source_store.read(primary_namespace, secondary_namespace, key)?; + let data = source_store.read(primary_namespace, secondary_namespace, key).await?; target_store.write(primary_namespace, secondary_namespace, key, &data).await.map_err( |_| { io::Error::new( @@ -327,11 +327,14 @@ impl monitor, Err(_) => return, }; @@ -358,7 +361,7 @@ impl( +pub async fn read_channel_monitors( kv_store: K, entropy_source: ES, signer_provider: SP, ) -> Result::EcdsaSigner>)>, io::Error> where @@ -373,11 +376,15 @@ where CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, )? { match <(BlockHash, ChannelMonitor<::EcdsaSigner>)>::read( - &mut io::Cursor::new(kv_store.read( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - &stored_key, - )?), + &mut io::Cursor::new( + kv_store + .read( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + &stored_key, + ) + .await?, + ), (&*entropy_source, &*signer_provider), ) { Ok((block_hash, channel_monitor)) => { @@ -563,7 +570,7 @@ where /// It is extremely important that your [`KVStore::read`] implementation uses the /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the /// documentation for [`MonitorUpdatingPersister`]. - pub fn read_all_channel_monitors_with_updates( + pub async fn read_all_channel_monitors_with_updates( &self, ) -> Result< Vec<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, @@ -575,7 +582,7 @@ where )?; let mut res = Vec::with_capacity(monitor_list.len()); for monitor_key in monitor_list { - res.push(self.read_channel_monitor_with_updates(monitor_key.as_str())?) + res.push(self.read_channel_monitor_with_updates(monitor_key.as_str()).await?) } Ok(res) } @@ -599,12 +606,12 @@ where /// /// Loading a large number of monitors will be faster if done in parallel. You can use this /// function to accomplish this. Take care to limit the number of parallel readers. - pub fn read_channel_monitor_with_updates( + pub async fn read_channel_monitor_with_updates( &self, monitor_key: &str, ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> { let monitor_name = MonitorName::from_str(monitor_key)?; - let (block_hash, monitor) = self.read_monitor(&monitor_name, monitor_key)?; + let (block_hash, monitor) = self.read_monitor(&monitor_name, monitor_key).await?; let mut current_update_id = monitor.get_latest_update_id(); loop { current_update_id = match current_update_id.checked_add(1) { @@ -612,7 +619,7 @@ where None => break, }; let update_name = UpdateName::from(current_update_id); - let update = match self.read_monitor_update(monitor_key, &update_name) { + let update = match self.read_monitor_update(monitor_key, &update_name).await { Ok(update) => update, Err(err) if err.kind() == io::ErrorKind::NotFound => { // We can't find any more updates, so we are done. @@ -638,15 +645,19 @@ where } /// Read a channel monitor. - fn read_monitor( + async fn read_monitor( &self, monitor_name: &MonitorName, monitor_key: &str, ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> { - let mut monitor_cursor = io::Cursor::new(self.kv_store.read( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_key, - )?); + let mut monitor_cursor = io::Cursor::new( + self.kv_store + .read( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_key, + ) + .await?, + ); // Discard the sentinel bytes if found. if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) { monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64); @@ -683,14 +694,17 @@ where } /// Read a channel monitor update. - fn read_monitor_update( + async fn read_monitor_update( &self, monitor_key: &str, update_name: &UpdateName, ) -> Result { - let update_bytes = self.kv_store.read( - CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, - monitor_key, - update_name.as_str(), - )?; + let update_bytes = self + .kv_store + .read( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_key, + update_name.as_str(), + ) + .await?; ChannelMonitorUpdate::read(&mut io::Cursor::new(update_bytes)).map_err(|e| { log_error!( self.logger, @@ -710,14 +724,14 @@ where /// updates. The updates that have an `update_id` less than or equal to than the stored monitor /// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will /// be passed to [`KVStore::remove`]. - pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { + pub async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { let monitor_keys = self.kv_store.list( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, )?; for monitor_key in monitor_keys { let monitor_name = MonitorName::from_str(&monitor_key)?; - let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key)?; + let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key).await?; let updates = self .kv_store .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key.as_str())?; @@ -909,7 +923,7 @@ where let maybe_old_monitor = match monitor_latest_update_id { LEGACY_CLOSED_CHANNEL_UPDATE_ID => { let monitor_key = monitor_name.to_string(); - self.read_monitor(&monitor_name, &monitor_key).ok() + self.read_monitor(&monitor_name, &monitor_key).await.ok() }, _ => None, }; @@ -953,7 +967,7 @@ where async fn archive_persisted_channel(&self, monitor_name: MonitorName) { let monitor_key = monitor_name.to_string(); - let monitor = match self.read_channel_monitor_with_updates(&monitor_key) { + let monitor = match self.read_channel_monitor_with_updates(&monitor_key).await { Ok((_block_hash, monitor)) => monitor, Err(_) => return, }; From 28eedfa299834119b58d11a76730ed2e508a9cfc Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 5 Jun 2025 16:18:24 +0200 Subject: [PATCH 11/17] sync wrapper wip --- lightning-background-processor/src/lib.rs | 18 +-- lightning/src/util/persist.rs | 140 ++++++++++++++++++++++ 2 files changed, 151 insertions(+), 7 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index ce3608bdbe8..f65a7c8b07c 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -40,7 +40,9 @@ use lightning::sign::ChangeDestinationSourceSync; use lightning::sign::OutputSpender; use lightning::util::async_poll::FutureSpawner; use lightning::util::logger::Logger; -use lightning::util::persist::{KVStore, Persister}; +use lightning::util::persist::{ + KVStore, KVStoreSync, KVStoreSyncWrapper, Persister, PersisterSync, +}; use lightning::util::sweep::OutputSweeper; #[cfg(feature = "std")] use lightning::util::sweep::OutputSweeperSync; @@ -995,7 +997,9 @@ impl BackgroundProcessor { D: 'static + Deref, O: 'static + Deref, K: 'static + Deref, - OS: 'static + Deref> + Send, + OS: 'static + + Deref, L, O>> + + Send, FS: FutureSpawner, >( persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, @@ -1009,14 +1013,14 @@ impl BackgroundProcessor { F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, P::Target: 'static + Persist<::Signer>, - PS::Target: 'static + Persister<'a, CM, L, S>, + PS::Target: 'static + PersisterSync<'a, CM, L, S>, CM::Target: AChannelManager, OM::Target: AOnionMessenger, PM::Target: APeerManager, LM::Target: ALiquidityManager, D::Target: ChangeDestinationSourceSync, O::Target: 'static + OutputSpender, - K::Target: 'static + KVStore, + K::Target: 'static + KVStoreSync, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); @@ -1033,9 +1037,9 @@ impl BackgroundProcessor { .expect("Time should be sometime after 1970"); if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); - // if let Err(e) = persister.persist_scorer(&scorer).await { - // log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) - // } + if let Err(e) = persister.persist_scorer(&scorer) { + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + } } } event_handler.handle_event(event) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index bff065e58a8..e68d474629f 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -167,6 +167,91 @@ pub trait KVStore { ) -> Result, io::Error>; } +/// Provides a synchronous interface to the [`KVStore`] trait. +pub trait KVStoreSync { + /// Returns the data stored for the given `primary_namespace`, `secondary_namespace`, and + /// `key`. + /// + /// Returns an [`ErrorKind::NotFound`] if the given `key` could not be found in the given + /// `primary_namespace` and `secondary_namespace`. + /// + /// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, io::Error>; + /// Persists the given data under the given `key`. + /// + /// Will create the given `primary_namespace` and `secondary_namespace` if not already present + /// in the store. + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> Result<(), io::Error>; + /// Removes any data that had previously been persisted under the given `key`. + /// + /// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily + /// remove the given `key` at some point in time after the method returns, e.g., as part of an + /// eventual batch deletion of multiple keys. As a consequence, subsequent calls to + /// [`KVStore::list`] might include the removed key until the changes are actually persisted. + /// + /// Note that while setting the `lazy` flag reduces the I/O burden of multiple subsequent + /// `remove` calls, it also influences the atomicity guarantees as lazy `remove`s could + /// potentially get lost on crash after the method returns. Therefore, this flag should only be + /// set for `remove` operations that can be safely replayed at a later time. + /// + /// Returns successfully if no data will be stored for the given `primary_namespace`, + /// `secondary_namespace`, and `key`, independently of whether it was present before its + /// invokation or not. + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), io::Error>; + /// Returns a list of keys that are stored under the given `secondary_namespace` in + /// `primary_namespace`. + /// + /// Returns the keys in arbitrary order, so users requiring a particular order need to sort the + /// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown. + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error>; +} + +/// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait. +pub struct KVStoreSyncWrapper(Arc); + +impl KVStoreSyncWrapper { + /// Constructs a new [`KVStoreSyncWrapper`]. + pub fn new(kv_store: Arc) -> Self { + Self(kv_store) + } +} + +impl KVStore for KVStoreSyncWrapper { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> AsyncResultType<'static, Vec, io::Error> { + let res = self.0.read(primary_namespace, secondary_namespace, key); + + Box::pin(async move { res }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> AsyncResultType<'static, (), io::Error> { + todo!() + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), io::Error> { + todo!() + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error> { + todo!() + } +} + /// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] /// data migration. pub trait MigratableKVStore: KVStore { @@ -275,6 +360,61 @@ where } } +/// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk. +/// +/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager +pub trait PersisterSync<'a, CM: Deref, L: Deref, S: Deref> +where + CM::Target: 'static + AChannelManager, + L::Target: 'static + Logger, + S::Target: WriteableScore<'a>, +{ + /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed. + /// + /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager + fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error>; + + /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed. + fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error>; + + /// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed. + fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>; +} + +impl<'a, A: KVStoreSync + ?Sized, CM: Deref, L: Deref, S: Deref> PersisterSync<'a, CM, L, S> for A +where + CM::Target: 'static + AChannelManager, + L::Target: 'static + Logger, + S::Target: WriteableScore<'a>, +{ + fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> { + self.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.get_cm().encode(), + ) + } + + fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> { + self.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + ) + } + + fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> { + self.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) + } +} + impl Persist for Arc { From ba946aba393a019efb3625962cc0c6d77d65fab2 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Jun 2025 15:49:16 +0200 Subject: [PATCH 12/17] wip --- lightning-background-processor/src/lib.rs | 2 +- lightning/src/util/persist.rs | 14 +++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index f65a7c8b07c..2c38e91a4a7 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -998,7 +998,7 @@ impl BackgroundProcessor { O: 'static + Deref, K: 'static + Deref, OS: 'static - + Deref, L, O>> + + Deref>, L, O>> + Send, FS: FutureSpawner, >( diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index e68d474629f..5c723195542 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -215,16 +215,24 @@ pub trait KVStoreSync { } /// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait. -pub struct KVStoreSyncWrapper(Arc); +pub struct KVStoreSyncWrapper(Arc) +where + K::Target: KVStoreSync; -impl KVStoreSyncWrapper { +impl KVStoreSyncWrapper +where + K::Target: KVStoreSync, +{ /// Constructs a new [`KVStoreSyncWrapper`]. pub fn new(kv_store: Arc) -> Self { Self(kv_store) } } -impl KVStore for KVStoreSyncWrapper { +impl KVStore for KVStoreSyncWrapper +where + K::Target: KVStoreSync, +{ fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> AsyncResultType<'static, Vec, io::Error> { From f357700c93491d72cc7f03c0ec3d378babe5a9ee Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Jun 2025 17:10:45 +0200 Subject: [PATCH 13/17] fix sweeper kvstore sync --- lightning-background-processor/src/lib.rs | 4 +--- lightning/src/util/persist.rs | 4 ++-- lightning/src/util/sweep.rs | 24 +++++++++++++++++------ 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 2c38e91a4a7..4319b8617af 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -997,9 +997,7 @@ impl BackgroundProcessor { D: 'static + Deref, O: 'static + Deref, K: 'static + Deref, - OS: 'static - + Deref>, L, O>> - + Send, + OS: 'static + Deref> + Send, FS: FutureSpawner, >( persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 5c723195542..dd78ed15858 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -215,7 +215,7 @@ pub trait KVStoreSync { } /// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait. -pub struct KVStoreSyncWrapper(Arc) +pub struct KVStoreSyncWrapper(K) where K::Target: KVStoreSync; @@ -224,7 +224,7 @@ where K::Target: KVStoreSync, { /// Constructs a new [`KVStoreSyncWrapper`]. - pub fn new(kv_store: Arc) -> Self { + pub fn new(kv_store: K) -> Self { Self(kv_store) } } diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index dcb6c9e4240..f3f528cf96c 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -23,8 +23,8 @@ use crate::sync::Arc; use crate::sync::Mutex; use crate::util::logger::Logger; use crate::util::persist::{ - KVStore, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + KVStore, KVStoreSync, KVStoreSyncWrapper, OUTPUT_SWEEPER_PERSISTENCE_KEY, + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, }; use crate::util::ser::{Readable, ReadableArgs, Writeable}; use crate::{impl_writeable_tlv_based, log_debug, log_error}; @@ -916,11 +916,21 @@ where D::Target: ChangeDestinationSourceSync, E::Target: FeeEstimator, F::Target: Filter, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { - sweeper: Arc>, E, F, K, L, O>>, + sweeper: Arc< + OutputSweeper< + B, + Arc>, + E, + F, + Arc>, + L, + O, + >, + >, } impl @@ -930,7 +940,7 @@ where D::Target: ChangeDestinationSourceSync, E::Target: FeeEstimator, F::Target: Filter, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { @@ -942,6 +952,8 @@ where let change_destination_source = Arc::new(ChangeDestinationSourceSyncWrapper::new(change_destination_source)); + let kv_store = Arc::new(KVStoreSyncWrapper::new(kv_store)); + let sweeper = OutputSweeper::new( best_block, broadcaster, @@ -1006,7 +1018,7 @@ where D::Target: ChangeDestinationSourceSync, E::Target: FeeEstimator, F::Target: Filter + Sync + Send, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { From 55426d7da4dedb7e1dd94fb0881a158285a8ecd0 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Tue, 10 Jun 2025 12:21:58 +0200 Subject: [PATCH 14/17] re-enable sync bg proc --- lightning-background-processor/src/lib.rs | 141 ++++++++++++---------- 1 file changed, 75 insertions(+), 66 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 4319b8617af..33afb3b2a0f 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -313,6 +313,15 @@ fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + Wri true } +macro_rules! maybe_await { + (true, $e:expr) => { + $e.await + }; + (false, $e:expr) => { + $e + }; +} + macro_rules! define_run_body { ( $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, @@ -321,7 +330,7 @@ macro_rules! define_run_body { $peer_manager: ident, $gossip_sync: ident, $process_sweeper: expr, $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, - $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, + $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async: tt, ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.get_cm().timer_tick_occurred(); @@ -377,7 +386,7 @@ macro_rules! define_run_body { if $channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!($logger, "Persisting ChannelManager..."); - $persister.persist_manager(&$channel_manager).await?; + maybe_await!($async, $persister.persist_manager(&$channel_manager))?; log_trace!($logger, "Done persisting ChannelManager."); } if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { @@ -438,7 +447,7 @@ macro_rules! define_run_body { log_trace!($logger, "Persisting network graph."); } - if let Err(e) = $persister.persist_graph(network_graph).await { + if let Err(e) = maybe_await!($async, $persister.persist_graph(network_graph)) { log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) } @@ -466,7 +475,7 @@ macro_rules! define_run_body { } else { log_trace!($logger, "Persisting scorer"); } - if let Err(e) = $persister.persist_scorer(&scorer).await { + if let Err(e) = maybe_await!($async, $persister.persist_scorer(&scorer)) { log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) } } @@ -489,16 +498,16 @@ macro_rules! define_run_body { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - $persister.persist_manager(&$channel_manager).await?; + maybe_await!($async, $persister.persist_manager(&$channel_manager))?; // Persist Scorer on exit if let Some(ref scorer) = $scorer { - $persister.persist_scorer(&scorer).await?; + maybe_await!($async, $persister.persist_scorer(&scorer))?; } // Persist NetworkGraph on exit if let Some(network_graph) = $gossip_sync.network_graph() { - $persister.persist_graph(network_graph).await?; + maybe_await!($async, $persister.persist_graph(network_graph))?; } Ok(()) @@ -920,6 +929,7 @@ where }, mobile_interruptable_platform, fetch_time, + true, ) } @@ -1042,65 +1052,64 @@ impl BackgroundProcessor { } event_handler.handle_event(event) }; - // define_run_body!( - // persister, - // chain_monitor, - // chain_monitor.process_pending_events(&event_handler), - // channel_manager, - // channel_manager.get_cm().process_pending_events(&event_handler), - // onion_messenger, - // if let Some(om) = &onion_messenger { - // om.get_om().process_pending_events(&event_handler) - // }, - // peer_manager, - // gossip_sync, - // { - // if let Some(ref sweeper) = sweeper { - // let _ = sweeper.regenerate_and_broadcast_spend_if_necessary(); - // } - // }, - // logger, - // scorer, - // stop_thread.load(Ordering::Acquire), - // { - // let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) { - // (Some(om), Some(lm)) => Sleeper::from_four_futures( - // &channel_manager.get_cm().get_event_or_persistence_needed_future(), - // &chain_monitor.get_update_future(), - // &om.get_om().get_update_future(), - // &lm.get_lm().get_pending_msgs_future(), - // ), - // (Some(om), None) => Sleeper::from_three_futures( - // &channel_manager.get_cm().get_event_or_persistence_needed_future(), - // &chain_monitor.get_update_future(), - // &om.get_om().get_update_future(), - // ), - // (None, Some(lm)) => Sleeper::from_three_futures( - // &channel_manager.get_cm().get_event_or_persistence_needed_future(), - // &chain_monitor.get_update_future(), - // &lm.get_lm().get_pending_msgs_future(), - // ), - // (None, None) => Sleeper::from_two_futures( - // &channel_manager.get_cm().get_event_or_persistence_needed_future(), - // &chain_monitor.get_update_future(), - // ), - // }; - // sleeper.wait_timeout(Duration::from_millis(100)); - // }, - // |_| Instant::now(), - // |time: &Instant, dur| time.elapsed().as_secs() > dur, - // false, - // || { - // use std::time::SystemTime; - // Some( - // SystemTime::now() - // .duration_since(SystemTime::UNIX_EPOCH) - // .expect("Time should be sometime after 1970"), - // ) - // }, - // ) - - Ok(()) + define_run_body!( + persister, + chain_monitor, + chain_monitor.process_pending_events(&event_handler), + channel_manager, + channel_manager.get_cm().process_pending_events(&event_handler), + onion_messenger, + if let Some(om) = &onion_messenger { + om.get_om().process_pending_events(&event_handler) + }, + peer_manager, + gossip_sync, + { + if let Some(ref sweeper) = sweeper { + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary(); + } + }, + logger, + scorer, + stop_thread.load(Ordering::Acquire), + { + let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) { + (Some(om), Some(lm)) => Sleeper::from_four_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + &om.get_om().get_update_future(), + &lm.get_lm().get_pending_msgs_future(), + ), + (Some(om), None) => Sleeper::from_three_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + &om.get_om().get_update_future(), + ), + (None, Some(lm)) => Sleeper::from_three_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + &lm.get_lm().get_pending_msgs_future(), + ), + (None, None) => Sleeper::from_two_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + ), + }; + sleeper.wait_timeout(Duration::from_millis(100)); + }, + |_| Instant::now(), + |time: &Instant, dur| time.elapsed().as_secs() > dur, + false, + || { + use std::time::SystemTime; + Some( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970"), + ) + }, + false, + ) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } From 27b73475835cdc6f8c2c39c7b711e76e327bf73f Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Jun 2025 10:14:28 +0200 Subject: [PATCH 15/17] Separate sweeper persistent state Prepare for adding runtime state while avoiding the _unused serialization macro config. --- lightning/src/util/sweep.rs | 75 ++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index f3f528cf96c..bee26c773a7 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -382,7 +382,8 @@ where output_spender: O, change_destination_source: D, kv_store: K, logger: L, ) -> Self { let outputs = Vec::new(); - let sweeper_state = Mutex::new(SweeperState { outputs, best_block }); + let sweeper_state = + Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } }); Self { sweeper_state, pending_sweep: AtomicBool::new(false), @@ -437,12 +438,12 @@ where }, }; - if state_lock.outputs.iter().find(|o| o.descriptor == output_info.descriptor).is_some() - { + let mut outputs = state_lock.persistent.outputs.iter(); + if outputs.find(|o| o.descriptor == output_info.descriptor).is_some() { continue; } - state_lock.outputs.push(output_info); + state_lock.persistent.outputs.push(output_info); } self.persist_state(&*state_lock).await.map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); @@ -451,13 +452,13 @@ where /// Returns a list of the currently tracked spendable outputs. pub fn tracked_spendable_outputs(&self) -> Vec { - self.sweeper_state.lock().unwrap().outputs.clone() + self.sweeper_state.lock().unwrap().persistent.outputs.clone() } /// Gets the latest best block which was connected either via the [`Listen`] or /// [`Confirm`] interfaces. pub fn current_best_block(&self) -> BestBlock { - self.sweeper_state.lock().unwrap().best_block + self.sweeper_state.lock().unwrap().persistent.best_block } /// Regenerates and broadcasts the spending transaction for any outputs that are pending. This method will be a @@ -505,8 +506,9 @@ where { let sweeper_state = self.sweeper_state.lock().unwrap(); - let cur_height = sweeper_state.best_block.height; - let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height)); + let cur_height = sweeper_state.persistent.best_block.height; + let has_respends = + sweeper_state.persistent.outputs.iter().any(|o| filter_fn(o, cur_height)); if !has_respends { return Ok(()); } @@ -520,10 +522,11 @@ where { let mut sweeper_state = self.sweeper_state.lock().unwrap(); - let cur_height = sweeper_state.best_block.height; - let cur_hash = sweeper_state.best_block.block_hash; + let cur_height = sweeper_state.persistent.best_block.height; + let cur_hash = sweeper_state.persistent.best_block.block_hash; let respend_descriptors: Vec<&SpendableOutputDescriptor> = sweeper_state + .persistent .outputs .iter() .filter(|o| filter_fn(*o, cur_height)) @@ -536,7 +539,11 @@ where } let spending_tx = self - .spend_outputs(&sweeper_state, &respend_descriptors, change_destination_script) + .spend_outputs( + &sweeper_state.persistent, + &respend_descriptors, + change_destination_script, + ) .map_err(|e| { log_error!(self.logger, "Error spending outputs: {:?}", e); })?; @@ -550,7 +557,7 @@ where // As we didn't modify the state so far, the same filter_fn yields the same elements as // above. let respend_outputs = - sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height)); + sweeper_state.persistent.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height)); for output_info in respend_outputs { if let Some(filter) = self.chain_data_source.as_ref() { let watched_output = output_info.to_watched_output(cur_hash); @@ -571,10 +578,10 @@ where } fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) { - let cur_height = sweeper_state.best_block.height; + let cur_height = sweeper_state.persistent.best_block.height; // Prune all outputs that have sufficient depth by now. - sweeper_state.outputs.retain(|o| { + sweeper_state.persistent.outputs.retain(|o| { if let Some(confirmation_height) = o.status.confirmation_height() { // We wait at least `PRUNE_DELAY_BLOCKS` as before that // `Event::SpendableOutputs` from lingering monitors might get replayed. @@ -596,7 +603,7 @@ where OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, - &sweeper_state.encode(), + &sweeper_state.persistent.encode(), ) .await .map_err(|e| { @@ -613,7 +620,7 @@ where } fn spend_outputs( - &self, sweeper_state: &SweeperState, descriptors: &[&SpendableOutputDescriptor], + &self, sweeper_state: &PersistentSweeperState, descriptors: &[&SpendableOutputDescriptor], change_destination_script: ScriptBuf, ) -> Result { let tx_feerate = @@ -636,7 +643,7 @@ where ) { let confirmation_hash = header.block_hash(); for (_, tx) in txdata { - for output_info in sweeper_state.outputs.iter_mut() { + for output_info in sweeper_state.persistent.outputs.iter_mut() { if output_info.is_spent_in(*tx) { output_info.status.confirmed(confirmation_hash, height, (*tx).clone()) } @@ -647,7 +654,7 @@ where fn best_block_updated_internal( &self, sweeper_state: &mut SweeperState, header: &Header, height: u32, ) { - sweeper_state.best_block = BestBlock::new(header.block_hash(), height); + sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height); self.prune_confirmed_outputs(sweeper_state); } } @@ -667,9 +674,9 @@ where &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, ) { let mut state_lock = self.sweeper_state.lock().unwrap(); - assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash, + assert_eq!(state_lock.persistent.best_block.block_hash, header.prev_blockhash, "Blocks must be connected in chain-order - the connected header must build on the last connected header"); - assert_eq!(state_lock.best_block.height, height - 1, + assert_eq!(state_lock.persistent.best_block.height, height - 1, "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); @@ -686,13 +693,13 @@ where let new_height = height - 1; let block_hash = header.block_hash(); - assert_eq!(state_lock.best_block.block_hash, block_hash, + assert_eq!(state_lock.persistent.best_block.block_hash, block_hash, "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); - assert_eq!(state_lock.best_block.height, height, + assert_eq!(state_lock.persistent.best_block.height, height, "Blocks must be disconnected in chain-order - the disconnected block must have the correct height"); - state_lock.best_block = BestBlock::new(header.prev_blockhash, new_height); + state_lock.persistent.best_block = BestBlock::new(header.prev_blockhash, new_height); - for output_info in state_lock.outputs.iter_mut() { + for output_info in state_lock.persistent.outputs.iter_mut() { if output_info.status.confirmation_hash() == Some(block_hash) { debug_assert_eq!(output_info.status.confirmation_height(), Some(height)); output_info.status.unconfirmed(); @@ -731,6 +738,7 @@ where // Get what height was unconfirmed. let unconf_height = state_lock + .persistent .outputs .iter() .find(|o| o.status.latest_spending_tx().map(|tx| tx.compute_txid()) == Some(*txid)) @@ -739,6 +747,7 @@ where if let Some(unconf_height) = unconf_height { // Unconfirm all >= this height. state_lock + .persistent .outputs .iter_mut() .filter(|o| o.status.confirmation_height() >= Some(unconf_height)) @@ -761,6 +770,7 @@ where fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { let state_lock = self.sweeper_state.lock().unwrap(); state_lock + .persistent .outputs .iter() .filter_map(|o| match o.status { @@ -780,13 +790,18 @@ where } } -#[derive(Debug, Clone)] +#[derive(Debug)] struct SweeperState { + persistent: PersistentSweeperState, +} + +#[derive(Debug, Clone)] +struct PersistentSweeperState { outputs: Vec, best_block: BestBlock, } -impl_writeable_tlv_based!(SweeperState, { +impl_writeable_tlv_based!(PersistentSweeperState, { (0, outputs, required_vec), (2, best_block, required), }); @@ -832,7 +847,7 @@ where kv_store, logger, ) = args; - let state = SweeperState::read(reader)?; + let state = PersistentSweeperState::read(reader)?; let best_block = state.best_block; if let Some(filter) = chain_data_source.as_ref() { @@ -842,7 +857,7 @@ where } } - let sweeper_state = Mutex::new(state); + let sweeper_state = Mutex::new(SweeperState { persistent: state }); Ok(Self { sweeper_state, pending_sweep: AtomicBool::new(false), @@ -881,7 +896,7 @@ where kv_store, logger, ) = args; - let state = SweeperState::read(reader)?; + let state = PersistentSweeperState::read(reader)?; let best_block = state.best_block; if let Some(filter) = chain_data_source.as_ref() { @@ -891,7 +906,7 @@ where } } - let sweeper_state = Mutex::new(state); + let sweeper_state = Mutex::new(SweeperState { persistent: state }); Ok(( best_block, OutputSweeper { From edd5cd5c84863151a784a4feaeae5e131064efca Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Jun 2025 14:54:25 +0200 Subject: [PATCH 16/17] Move persist into async part of the sweeper Prepares for making the kv store async. Otherwise it might be necessary to use block_on in the sweeper. For block_on, a runtime would be needed. --- lightning/src/util/sweep.rs | 65 ++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index bee26c773a7..c8ba6becd4e 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -382,8 +382,10 @@ where output_spender: O, change_destination_source: D, kv_store: K, logger: L, ) -> Self { let outputs = Vec::new(); - let sweeper_state = - Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } }); + let sweeper_state = Mutex::new(SweeperState { + persistent: PersistentSweeperState { outputs, best_block }, + dirty: false, + }); Self { sweeper_state, pending_sweep: AtomicBool::new(false), @@ -445,7 +447,7 @@ where state_lock.persistent.outputs.push(output_info); } - self.persist_state(&*state_lock).await.map_err(|e| { + self.flush_state(&mut state_lock).await.map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); }) } @@ -473,7 +475,19 @@ where return Ok(()); } - let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await; + let result = { + self.regenerate_and_broadcast_spend_if_necessary_internal().await?; + + // If there is still dirty state, we need to persist it. + let mut sweeper_state = self.sweeper_state.lock().unwrap(); + if sweeper_state.dirty { + self.flush_state(&mut sweeper_state).await.map_err(|e| { + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + }) + } else { + Ok(()) + } + }; // Release the pending sweep flag again, regardless of result. self.pending_sweep.store(false, Ordering::Release); @@ -567,7 +581,7 @@ where output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone()); } - self.persist_state(&sweeper_state).await.map_err(|e| { + self.flush_state(&mut sweeper_state).await.map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); })?; @@ -595,9 +609,12 @@ where } true }); + + sweeper_state.dirty = true; } - async fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> { + /// Flushes the current state to the persistence layer and marks the state as clean. + async fn flush_state(&self, sweeper_state: &mut SweeperState) -> Result<(), io::Error> { self.kv_store .write( OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -617,6 +634,9 @@ where ); e }) + .map(|_| { + sweeper_state.dirty = false; + }) } fn spend_outputs( @@ -649,6 +669,8 @@ where } } } + + sweeper_state.dirty = true; } fn best_block_updated_internal( @@ -656,6 +678,8 @@ where ) { sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height); self.prune_confirmed_outputs(sweeper_state); + + sweeper_state.dirty = true; } } @@ -679,12 +703,8 @@ where assert_eq!(state_lock.persistent.best_block.height, height - 1, "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); - self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); - self.best_block_updated_internal(&mut *state_lock, header, height); - - // let _ = self.persist_state(&*state_lock).map_err(|e| { - // log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - // }); + self.transactions_confirmed_internal(&mut state_lock, header, txdata, height); + self.best_block_updated_internal(&mut state_lock, header, height); } fn block_disconnected(&self, header: &Header, height: u32) { @@ -706,9 +726,7 @@ where } } - // self.persist_state(&*state_lock).unwrap_or_else(|e| { - // log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - // }); + state_lock.dirty = true; } } @@ -728,9 +746,6 @@ where ) { let mut state_lock = self.sweeper_state.lock().unwrap(); self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); - // self.persist_state(&*state_lock).unwrap_or_else(|e| { - // log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - // }); } fn transaction_unconfirmed(&self, txid: &Txid) { @@ -753,18 +768,13 @@ where .filter(|o| o.status.confirmation_height() >= Some(unconf_height)) .for_each(|o| o.status.unconfirmed()); - // self.persist_state(&*state_lock).unwrap_or_else(|e| { - // log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - // }); + state_lock.dirty = true; } } fn best_block_updated(&self, header: &Header, height: u32) { let mut state_lock = self.sweeper_state.lock().unwrap(); - self.best_block_updated_internal(&mut *state_lock, header, height); - // let _ = self.persist_state(&*state_lock).map_err(|e| { - // log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - // }); + self.best_block_updated_internal(&mut state_lock, header, height); } fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { @@ -793,6 +803,7 @@ where #[derive(Debug)] struct SweeperState { persistent: PersistentSweeperState, + dirty: bool, } #[derive(Debug, Clone)] @@ -857,7 +868,7 @@ where } } - let sweeper_state = Mutex::new(SweeperState { persistent: state }); + let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false }); Ok(Self { sweeper_state, pending_sweep: AtomicBool::new(false), @@ -906,7 +917,7 @@ where } } - let sweeper_state = Mutex::new(SweeperState { persistent: state }); + let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false }); Ok(( best_block, OutputSweeper { From 3d49db760872b355cbcd83214335660b364d625c Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Tue, 10 Jun 2025 13:28:22 +0200 Subject: [PATCH 17/17] try async kvstore in sweeper --- lightning/src/util/sweep.rs | 70 +++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 26 deletions(-) diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index c8ba6becd4e..a3e2befd7be 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -430,24 +430,35 @@ where return Ok(()); } - let mut state_lock = self.sweeper_state.lock().unwrap(); - for descriptor in relevant_descriptors { - let output_info = TrackedSpendableOutput { - descriptor, - channel_id, - status: OutputSpendStatus::PendingInitialBroadcast { - delayed_until_height: delay_until_height, - }, - }; - - let mut outputs = state_lock.persistent.outputs.iter(); - if outputs.find(|o| o.descriptor == output_info.descriptor).is_some() { - continue; + let encoded; + let flush_fut; + { + let mut state_lock = self.sweeper_state.lock().unwrap(); + for descriptor in relevant_descriptors { + let output_info = TrackedSpendableOutput { + descriptor, + channel_id, + status: OutputSpendStatus::PendingInitialBroadcast { + delayed_until_height: delay_until_height, + }, + }; + + let mut outputs = state_lock.persistent.outputs.iter(); + if outputs.find(|o| o.descriptor == output_info.descriptor).is_some() { + continue; + } + + state_lock.persistent.outputs.push(output_info); } + encoded = state_lock.persistent.encode(); - state_lock.persistent.outputs.push(output_info); + // Not safe, because not yet persisted... + state_lock.dirty = false; + + // Hopefully this fixates the ordering? + flush_fut = self.flush_state(encoded); } - self.flush_state(&mut state_lock).await.map_err(|e| { + flush_fut.await.map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); }) } @@ -481,9 +492,12 @@ where // If there is still dirty state, we need to persist it. let mut sweeper_state = self.sweeper_state.lock().unwrap(); if sweeper_state.dirty { - self.flush_state(&mut sweeper_state).await.map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }) + // TODO: Move outside lock. + // self.flush_state(&mut sweeper_state).await.map_err(|e| { + // log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + // }) + + Ok(()) } else { Ok(()) } @@ -533,6 +547,8 @@ where self.change_destination_source.get_change_destination_script().await?; // Sweep the outputs. + let flush_fut; + let encoded; { let mut sweeper_state = self.sweeper_state.lock().unwrap(); @@ -581,13 +597,18 @@ where output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone()); } - self.flush_state(&mut sweeper_state).await.map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - })?; + encoded = sweeper_state.persistent.encode(); + sweeper_state.dirty = false; + + flush_fut = self.flush_state(encoded); self.broadcaster.broadcast_transactions(&[&spending_tx]); } + flush_fut.await.map_err(|e| { + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + })?; + Ok(()) } @@ -614,13 +635,13 @@ where } /// Flushes the current state to the persistence layer and marks the state as clean. - async fn flush_state(&self, sweeper_state: &mut SweeperState) -> Result<(), io::Error> { + async fn flush_state(&self, sweeper_state_encoded: Vec) -> Result<(), io::Error> { self.kv_store .write( OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, - &sweeper_state.persistent.encode(), + &sweeper_state_encoded, ) .await .map_err(|e| { @@ -634,9 +655,6 @@ where ); e }) - .map(|_| { - sweeper_state.dirty = false; - }) } fn spend_outputs(