Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use MonitorUpdatingPersister #456

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::peer_store::PeerStore;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
OnionMessenger, PeerManager,
OnionMessenger, PeerManager, Persister,
};
use crate::wallet::persist::KVStoreWalletPersister;
use crate::wallet::Wallet;
Expand All @@ -46,8 +46,8 @@ use lightning::routing::scoring::{
use lightning::sign::EntropySource;

use lightning::util::persist::{
read_channel_monitors, CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::ReadableArgs;
use lightning::util::sweep::OutputSweeper;
Expand Down Expand Up @@ -908,15 +908,6 @@ fn build_with_store_internal(

let runtime = Arc::new(RwLock::new(None));

// Initialize the ChainMonitor
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
Some(Arc::clone(&chain_source)),
Arc::clone(&tx_broadcaster),
Arc::clone(&logger),
Arc::clone(&fee_estimator),
Arc::clone(&kv_store),
));

// Initialize the KeysManager
let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| {
log_error!(logger, "Failed to get current time: {}", e);
Expand All @@ -932,6 +923,38 @@ fn build_with_store_internal(
Arc::clone(&logger),
));

let persister = Arc::new(Persister::new(
Arc::clone(&kv_store),
Arc::clone(&logger),
10, // (?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't know what value to recommend here, but pending when @tnull takes a look, you could define a constant to hold this value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is as good as any, as we don't have super good data on this parameter so far. I think we could bump it to 100 to start with and then see if we run into any issue with it. I agree that we should introduce a const for it though.

Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
));

// Read ChannelMonitor state from store
let channel_monitors = match persister.read_all_channel_monitors_with_updates() {
Ok(monitors) => monitors,
Err(e) => {
if e.kind() == lightning::io::ErrorKind::NotFound {
Vec::new()
} else {
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
return Err(BuildError::ReadFailed);
}
},
};

// Initialize the ChainMonitor
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
Some(Arc::clone(&chain_source)),
Arc::clone(&tx_broadcaster),
Arc::clone(&logger),
Arc::clone(&fee_estimator),
Arc::clone(&persister),
));

// Initialize the network graph, scorer, and router
let network_graph =
match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) {
Expand Down Expand Up @@ -974,23 +997,6 @@ fn build_with_store_internal(
scoring_fee_params,
));

// Read ChannelMonitor state from store
let channel_monitors = match read_channel_monitors(
Arc::clone(&kv_store),
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
) {
Ok(monitors) => monitors,
Err(e) => {
if e.kind() == lightning::io::ErrorKind::NotFound {
Vec::new()
} else {
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
return Err(BuildError::ReadFailed);
}
},
};

let mut user_config = default_user_config(&config);
if liquidity_source_config.and_then(|lsc| lsc.lsps2_service.as_ref()).is_some() {
// Generally allow claiming underpaying HTLCs as the LSP will skim off some fee. We'll
Expand Down
163 changes: 109 additions & 54 deletions src/io/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,26 @@
// accordance with one or both of these licenses.

use lightning::ln::functional_test_utils::{
connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs, create_dummy_block,
create_network, create_node_cfgs, create_node_chanmgrs, send_payment,
check_closed_event, connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs,
create_dummy_block, create_network, create_node_cfgs, create_node_chanmgrs, send_payment,
};
use lightning::util::persist::{
KVStore, MonitorName, MonitorUpdatingPersister,
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, KVSTORE_NAMESPACE_KEY_MAX_LEN,
};
use lightning::util::persist::{read_channel_monitors, KVStore, KVSTORE_NAMESPACE_KEY_MAX_LEN};

use lightning::events::ClosureReason;
use lightning::util::test_utils;
use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not quite sure why the import was changed, but if we already do so, why not move the others up, too?

use lightning::{check_added_monitors, check_closed_broadcast};

use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};

use std::panic::RefUnwindSafe;
use std::path::PathBuf;

const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;

pub(crate) fn random_storage_path() -> PathBuf {
let mut temp_path = std::env::temp_dir();
let mut rng = thread_rng();
Expand Down Expand Up @@ -81,54 +86,104 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStore + RefUnwindSafe>(kv_s
// Integration-test the given KVStore implementation. Test relaying a few payments and check that
// the persisted data is updated the appropriate number of times.
pub(crate) fn do_test_store<K: KVStore>(store_0: &K, store_1: &K) {
// This value is used later to limit how many iterations we perform.
let persister_0_max_pending_updates = 7;
// Intentionally set this to a smaller value to test a different alignment.
let persister_1_max_pending_updates = 3;

let chanmon_cfgs = create_chanmon_cfgs(2);

let persister_0 = MonitorUpdatingPersister::new(
store_0,
&chanmon_cfgs[0].logger,
persister_0_max_pending_updates,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].fee_estimator,
);

let persister_1 = MonitorUpdatingPersister::new(
store_1,
&chanmon_cfgs[1].logger,
persister_1_max_pending_updates,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].fee_estimator,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the duplication, consider extracting this into a helper function.


let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);

let chain_mon_0 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[0].chain_source),
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].logger,
&chanmon_cfgs[0].fee_estimator,
store_0,
node_cfgs[0].keys_manager,
&persister_0,
&chanmon_cfgs[0].keys_manager,
);

let chain_mon_1 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[1].chain_source),
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].logger,
&chanmon_cfgs[1].fee_estimator,
store_1,
node_cfgs[1].keys_manager,
&persister_1,
&chanmon_cfgs[1].keys_manager,
);

node_cfgs[0].chain_monitor = chain_mon_0;
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

// Check that the persisted channel data is empty before any channels are
// open.
let mut persisted_chan_data_0 =
read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager).unwrap();
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_0.len(), 0);
let mut persisted_chan_data_1 =
read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager).unwrap();
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 0);

// Helper to make sure the channel is on the expected update ID.
macro_rules! check_persisted_data {
($expected_update_id: expr) => {
persisted_chan_data_0 =
read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager)
.unwrap();
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
// check that we stored only one monitor
assert_eq!(persisted_chan_data_0.len(), 1);
for (_, mon) in persisted_chan_data_0.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);

let monitor_name = MonitorName::from(mon.get_funding_txo().0);
assert_eq!(
store_0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would prefer if we could move this to a variable to avoid the additional indentation and verticality in the assert_eq (same below).

.list(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str()
)
.unwrap()
.len() as u64,
mon.get_latest_update_id() % persister_0_max_pending_updates,
"Wrong number of updates stored in persister 0",
);
}
persisted_chan_data_1 =
read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager)
.unwrap();
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 1);
for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);

let monitor_name = MonitorName::from(mon.get_funding_txo().0);
assert_eq!(
store_1
.list(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str()
)
.unwrap()
.len() as u64,
mon.get_latest_update_id() % persister_1_max_pending_updates,
"Wrong number of updates stored in persister 1",
);
}
};
}
Expand All @@ -138,52 +193,52 @@ pub(crate) fn do_test_store<K: KVStore>(store_0: &K, store_1: &K) {
check_persisted_data!(0);

// Send a few payments and make sure the monitors are updated to the latest.
send_payment(&nodes[0], &vec![&nodes[1]][..], 8000000);
check_persisted_data!(5);
send_payment(&nodes[1], &vec![&nodes[0]][..], 4000000);
check_persisted_data!(10);
send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT);
send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT);

// Send a few more payments to try all the alignments of max pending updates with
// updates for a payment sent and received.
let mut sender = 0;
for i in 3..=persister_0_max_pending_updates * 2 {
let receiver;
if sender == 0 {
sender = 1;
receiver = 0;
} else {
sender = 0;
receiver = 1;
}
send_payment(&nodes[sender], &vec![&nodes[receiver]][..], 21_000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid the unnecessary allocations here and below by simply dropping vec!, given that you're making it a slice anyways.

Suggested change
send_payment(&nodes[sender], &vec![&nodes[receiver]][..], 21_000);
send_payment(&nodes[sender], &[&nodes[receiver]][..], 21_000);

also, consider moving these to appropriately named variables for clarity.

check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT);
}

// Force close because cooperative close doesn't result in any persisted
// updates.
nodes[0]
.node
.force_close_broadcasting_latest_txn(
&nodes[0].node.list_channels()[0].channel_id,
&nodes[1].node.get_our_node_id(),
"whoops".to_string(),
)
.unwrap();
check_closed_event!(
nodes[0],
1,
ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) },
[nodes[1].node.get_our_node_id()],
100000
);

let node_id_1 = nodes[1].node.get_our_node_id();
let chan_id = nodes[0].node.list_channels()[0].channel_id;
let err_msg = "Channel force-closed".to_string();
nodes[0].node.force_close_broadcasting_latest_txn(&chan_id, &node_id_1, err_msg).unwrap();

let reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) };
check_closed_event(&nodes[0], 1, reason, false, &[node_id_1], 100000);
check_closed_broadcast!(nodes[0], true);
check_added_monitors!(nodes[0], 1);

let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
let node_txn = nodes[0].tx_broadcaster.txn_broadcast();
assert_eq!(node_txn.len(), 1);
let txn = vec![node_txn[0].clone(), node_txn[0].clone()];
let dummy_block = create_dummy_block(nodes[0].best_block_hash(), 42, txn);
connect_block(&nodes[1], &dummy_block);

connect_block(
&nodes[1],
&create_dummy_block(
nodes[0].best_block_hash(),
42,
vec![node_txn[0].clone(), node_txn[0].clone()],
),
);
check_closed_broadcast!(nodes[1], true);
check_closed_event!(
nodes[1],
1,
ClosureReason::CommitmentTxConfirmed,
[nodes[0].node.get_our_node_id()],
100000
);
let reason = ClosureReason::CommitmentTxConfirmed;
let node_id_0 = nodes[0].node.get_our_node_id();
check_closed_event(&nodes[1], 1, reason, false, &[node_id_0], 100000);
check_added_monitors!(nodes[1], 1);

// Make sure everything is persisted as expected after close.
check_persisted_data!(11);
check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1);
}
13 changes: 11 additions & 2 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use lightning::routing::gossip;
use lightning::routing::router::DefaultRouter;
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
use lightning::sign::InMemorySigner;
use lightning::util::persist::KVStore;
use lightning::util::persist::{KVStore, MonitorUpdatingPersister};
use lightning::util::ser::{Readable, Writeable, Writer};
use lightning::util::sweep::OutputSweeper;

Expand All @@ -38,13 +38,22 @@ use std::sync::{Arc, Mutex};

pub(crate) type DynStore = dyn KVStore + Sync + Send;

pub type Persister = MonitorUpdatingPersister<
Arc<DynStore>,
Arc<Logger>,
Arc<KeysManager>,
Arc<KeysManager>,
Arc<Broadcaster>,
Arc<OnchainFeeEstimator>,
>;

pub(crate) type ChainMonitor = chainmonitor::ChainMonitor<
InMemorySigner,
Arc<ChainSource>,
Arc<Broadcaster>,
Arc<OnchainFeeEstimator>,
Arc<Logger>,
Arc<DynStore>,
Arc<Persister>,
>;

pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager<
Expand Down
Loading