Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use MonitorUpdatingPersister #456

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::peer_store::PeerStore;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
OnionMessenger, PeerManager,
OnionMessenger, PeerManager, Persister,
};
use crate::wallet::persist::KVStoreWalletPersister;
use crate::wallet::Wallet;
Expand All @@ -46,8 +46,8 @@ use lightning::routing::scoring::{
use lightning::sign::EntropySource;

use lightning::util::persist::{
read_channel_monitors, CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::ReadableArgs;
use lightning::util::sweep::OutputSweeper;
Expand Down Expand Up @@ -955,15 +955,6 @@ fn build_with_store_internal(

let runtime = Arc::new(RwLock::new(None));

// Initialize the ChainMonitor
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
Some(Arc::clone(&chain_source)),
Arc::clone(&tx_broadcaster),
Arc::clone(&logger),
Arc::clone(&fee_estimator),
Arc::clone(&kv_store),
));

// Initialize the KeysManager
let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| {
log_error!(logger, "Failed to get current time: {}", e);
Expand All @@ -979,6 +970,38 @@ fn build_with_store_internal(
Arc::clone(&logger),
));

let persister = Arc::new(Persister::new(
Arc::clone(&kv_store),
Arc::clone(&logger),
100,
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
));

// Read ChannelMonitor state from store
let channel_monitors = match persister.read_all_channel_monitors_with_updates() {
Ok(monitors) => monitors,
Err(e) => {
if e.kind() == lightning::io::ErrorKind::NotFound {
Vec::new()
} else {
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
return Err(BuildError::ReadFailed);
}
},
};

// Initialize the ChainMonitor
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
Some(Arc::clone(&chain_source)),
Arc::clone(&tx_broadcaster),
Arc::clone(&logger),
Arc::clone(&fee_estimator),
Arc::clone(&persister),
));

// Initialize the network graph, scorer, and router
let network_graph =
match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) {
Expand Down Expand Up @@ -1021,23 +1044,6 @@ fn build_with_store_internal(
scoring_fee_params,
));

// Read ChannelMonitor state from store
let channel_monitors = match read_channel_monitors(
Arc::clone(&kv_store),
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
) {
Ok(monitors) => monitors,
Err(e) => {
if e.kind() == lightning::io::ErrorKind::NotFound {
Vec::new()
} else {
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
return Err(BuildError::ReadFailed);
}
},
};

let mut user_config = default_user_config(&config);
if liquidity_source_config.and_then(|lsc| lsc.lsps2_service.as_ref()).is_some() {
// Generally allow claiming underpaying HTLCs as the LSP will skim off some fee. We'll
Expand Down
193 changes: 129 additions & 64 deletions src/io/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@

use lightning::ln::functional_test_utils::{
connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs, create_dummy_block,
create_network, create_node_cfgs, create_node_chanmgrs, send_payment,
create_network, create_node_cfgs, create_node_chanmgrs, send_payment, TestChanMonCfg,
};
use lightning::util::persist::{
KVStore, MonitorName, MonitorUpdatingPersister,
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, KVSTORE_NAMESPACE_KEY_MAX_LEN,
};
use lightning::util::persist::{read_channel_monitors, KVStore, KVSTORE_NAMESPACE_KEY_MAX_LEN};

use lightning::events::ClosureReason;
use lightning::util::test_utils;
Expand All @@ -21,6 +24,17 @@ use rand::{thread_rng, Rng};
use std::panic::RefUnwindSafe;
use std::path::PathBuf;

type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister<
&'a K,
&'a test_utils::TestLogger,
&'a test_utils::TestKeysInterface,
&'a test_utils::TestKeysInterface,
&'a test_utils::TestBroadcaster,
&'a test_utils::TestFeeEstimator,
>;

const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;

pub(crate) fn random_storage_path() -> PathBuf {
let mut temp_path = std::env::temp_dir();
let mut rng = thread_rng();
Expand Down Expand Up @@ -78,57 +92,105 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStore + RefUnwindSafe>(kv_s
assert_eq!(listed_keys.len(), 0);
}

pub(crate) fn create_persister<'a, K: KVStore>(
store: &'a K, chanmon_cfg: &'a TestChanMonCfg, max_pending_updates: u64,
) -> TestMonitorUpdatePersister<'a, K> {
let persister: TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister::new(
store,
&chanmon_cfg.logger,
max_pending_updates,
&chanmon_cfg.keys_manager,
&chanmon_cfg.keys_manager,
&chanmon_cfg.tx_broadcaster,
&chanmon_cfg.fee_estimator,
);
return persister;
}

pub(crate) fn create_chain_monitor<'a, K: KVStore>(
chanmon_cfg: &'a TestChanMonCfg, persister: &'a TestMonitorUpdatePersister<'a, K>,
) -> test_utils::TestChainMonitor<'a> {
let chain_mon = test_utils::TestChainMonitor::new(
Some(&chanmon_cfg.chain_source),
&chanmon_cfg.tx_broadcaster,
&chanmon_cfg.logger,
&chanmon_cfg.fee_estimator,
persister,
&chanmon_cfg.keys_manager,
);
return chain_mon;
}

// Integration-test the given KVStore implementation. Test relaying a few payments and check that
// the persisted data is updated the appropriate number of times.
pub(crate) fn do_test_store<K: KVStore>(store_0: &K, store_1: &K) {
// This value is used later to limit how many iterations we perform.
let persister_0_max_pending_updates = 7;
// Intentionally set this to a smaller value to test a different alignment.
let persister_1_max_pending_updates = 3;

let chanmon_cfgs = create_chanmon_cfgs(2);

let persister_0 = create_persister(store_0, &chanmon_cfgs[0], persister_0_max_pending_updates);
let persister_1 = create_persister(store_1, &chanmon_cfgs[1], persister_1_max_pending_updates);

let chain_mon_0 = create_chain_monitor(&chanmon_cfgs[0], &persister_0);
let chain_mon_1 = create_chain_monitor(&chanmon_cfgs[1], &persister_1);

let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[0].chain_source),
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].logger,
&chanmon_cfgs[0].fee_estimator,
store_0,
node_cfgs[0].keys_manager,
);
let chain_mon_1 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[1].chain_source),
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].logger,
&chanmon_cfgs[1].fee_estimator,
store_1,
node_cfgs[1].keys_manager,
);
node_cfgs[0].chain_monitor = chain_mon_0;
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

// Check that the persisted channel data is empty before any channels are
// open.
let mut persisted_chan_data_0 =
read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager).unwrap();
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_0.len(), 0);
let mut persisted_chan_data_1 =
read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager).unwrap();
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 0);

// Helper to make sure the channel is on the expected update ID.
macro_rules! check_persisted_data {
($expected_update_id: expr) => {
persisted_chan_data_0 =
read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager)
.unwrap();
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
// check that we stored only one monitor
assert_eq!(persisted_chan_data_0.len(), 1);
for (_, mon) in persisted_chan_data_0.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);

let monitor_name = MonitorName::from(mon.get_funding_txo().0);
let store_0_updates = store_0
.list(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
)
.unwrap()
.len() as u64;
assert_eq!(
store_0_updates,
mon.get_latest_update_id() % persister_0_max_pending_updates,
"Wrong number of updates stored in persister 0",
);
}
persisted_chan_data_1 =
read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager)
.unwrap();
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 1);
for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);

let monitor_name = MonitorName::from(mon.get_funding_txo().0);
let store_1_updates = store_1
.list(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
)
.unwrap()
.len() as u64;
assert_eq!(
store_1_updates,
mon.get_latest_update_id() % persister_1_max_pending_updates,
"Wrong number of updates stored in persister 1",
);
}
};
}
Expand All @@ -138,52 +200,55 @@ pub(crate) fn do_test_store<K: KVStore>(store_0: &K, store_1: &K) {
check_persisted_data!(0);

// Send a few payments and make sure the monitors are updated to the latest.
send_payment(&nodes[0], &vec![&nodes[1]][..], 8000000);
check_persisted_data!(5);
send_payment(&nodes[1], &vec![&nodes[0]][..], 4000000);
check_persisted_data!(10);
let expected_route = &[&nodes[1]][..];
send_payment(&nodes[0], expected_route, 8_000_000);
check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT);
let expected_route = &[&nodes[0]][..];
send_payment(&nodes[1], expected_route, 4_000_000);
check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT);

// Send a few more payments to try all the alignments of max pending updates with
// updates for a payment sent and received.
let mut sender = 0;
for i in 3..=persister_0_max_pending_updates * 2 {
let receiver;
if sender == 0 {
sender = 1;
receiver = 0;
} else {
sender = 0;
receiver = 1;
}
let expected_route = &[&nodes[receiver]][..];
send_payment(&nodes[sender], expected_route, 21_000);
check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT);
}

// Force close because cooperative close doesn't result in any persisted
// updates.
nodes[0]
.node
.force_close_broadcasting_latest_txn(
&nodes[0].node.list_channels()[0].channel_id,
&nodes[1].node.get_our_node_id(),
"whoops".to_string(),
)
.unwrap();
check_closed_event!(
nodes[0],
1,
ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) },
[nodes[1].node.get_our_node_id()],
100000
);

let node_id_1 = nodes[1].node.get_our_node_id();
let chan_id = nodes[0].node.list_channels()[0].channel_id;
let err_msg = "Channel force-closed".to_string();
nodes[0].node.force_close_broadcasting_latest_txn(&chan_id, &node_id_1, err_msg).unwrap();

let reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) };
check_closed_event!(nodes[0], 1, reason, false, [node_id_1], 100000);
check_closed_broadcast!(nodes[0], true);
check_added_monitors!(nodes[0], 1);

let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
let node_txn = nodes[0].tx_broadcaster.txn_broadcast();
assert_eq!(node_txn.len(), 1);
let txn = vec![node_txn[0].clone(), node_txn[0].clone()];
let dummy_block = create_dummy_block(nodes[0].best_block_hash(), 42, txn);
connect_block(&nodes[1], &dummy_block);

connect_block(
&nodes[1],
&create_dummy_block(
nodes[0].best_block_hash(),
42,
vec![node_txn[0].clone(), node_txn[0].clone()],
),
);
check_closed_broadcast!(nodes[1], true);
check_closed_event!(
nodes[1],
1,
ClosureReason::CommitmentTxConfirmed,
[nodes[0].node.get_our_node_id()],
100000
);
let reason = ClosureReason::CommitmentTxConfirmed;
let node_id_0 = nodes[0].node.get_our_node_id();
check_closed_event!(nodes[1], 1, reason, false, [node_id_0], 100000);
check_added_monitors!(nodes[1], 1);

// Make sure everything is persisted as expected after close.
check_persisted_data!(11);
check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1);
}
13 changes: 11 additions & 2 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use lightning::routing::gossip;
use lightning::routing::router::DefaultRouter;
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
use lightning::sign::InMemorySigner;
use lightning::util::persist::KVStore;
use lightning::util::persist::{KVStore, MonitorUpdatingPersister};
use lightning::util::ser::{Readable, Writeable, Writer};
use lightning::util::sweep::OutputSweeper;

Expand All @@ -38,13 +38,22 @@ use std::sync::{Arc, Mutex};

pub(crate) type DynStore = dyn KVStore + Sync + Send;

pub type Persister = MonitorUpdatingPersister<
Arc<DynStore>,
Arc<Logger>,
Arc<KeysManager>,
Arc<KeysManager>,
Arc<Broadcaster>,
Arc<OnchainFeeEstimator>,
>;

pub(crate) type ChainMonitor = chainmonitor::ChainMonitor<
InMemorySigner,
Arc<ChainSource>,
Arc<Broadcaster>,
Arc<OnchainFeeEstimator>,
Arc<Logger>,
Arc<DynStore>,
Arc<Persister>,
>;

pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager<
Expand Down
Loading