Skip to content

Commit 55966e7

Browse files
committed
Improve peer info persistence
1 parent 873652d commit 55966e7

File tree

3 files changed

+226
-103
lines changed

3 files changed

+226
-103
lines changed

src/io.rs

Lines changed: 3 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::error::LdkLiteError as Error;
22

3-
use crate::hex;
4-
use crate::{FilesystemLogger, LdkLiteConfig, NetworkGraph, Scorer};
3+
use crate::{FilesystemLogger, LdkLiteConfig, NetworkGraph, PeerInfo, Scorer};
54

65
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters};
76
use lightning::util::ser::ReadableArgs;
@@ -11,9 +10,10 @@ use bitcoin::secp256k1::PublicKey;
1110
use rand::{thread_rng, RngCore};
1211

1312
use std::collections::HashMap;
13+
use std::convert::TryFrom;
1414
use std::fs;
1515
use std::io::{BufRead, BufReader, Write};
16-
use std::net::{SocketAddr, ToSocketAddrs};
16+
use std::net::SocketAddr;
1717
use std::sync::Arc;
1818

1919
pub(crate) fn read_or_generate_seed_file(config: Arc<LdkLiteConfig>) -> Result<[u8; 32], Error> {
@@ -68,59 +68,3 @@ pub(crate) fn read_scorer(
6868
}
6969
ProbabilisticScorer::new(params, network_graph, logger)
7070
}
71-
72-
pub(crate) fn read_channel_peer_data(
73-
config: Arc<LdkLiteConfig>,
74-
) -> Result<HashMap<PublicKey, SocketAddr>, Error> {
75-
let ldk_data_dir = format!("{}/ldk", &config.storage_dir_path.clone());
76-
let peer_data_path = format!("{}/channel_peer_data", ldk_data_dir.clone());
77-
let mut peer_data = HashMap::new();
78-
79-
if let Ok(file) = fs::File::open(peer_data_path) {
80-
let reader = BufReader::new(file);
81-
for line in reader.lines() {
82-
match parse_peer_info(line.unwrap()) {
83-
Ok((pubkey, socket_addr)) => {
84-
peer_data.insert(pubkey, socket_addr);
85-
}
86-
Err(e) => return Err(e),
87-
}
88-
}
89-
}
90-
Ok(peer_data)
91-
}
92-
93-
pub(crate) fn persist_channel_peer(
94-
config: Arc<LdkLiteConfig>, peer_info: &str,
95-
) -> std::io::Result<()> {
96-
let ldk_data_dir = format!("{}/ldk", &config.storage_dir_path.clone());
97-
let peer_data_path = format!("{}/channel_peer_data", ldk_data_dir.clone());
98-
let mut file = fs::OpenOptions::new().create(true).append(true).open(peer_data_path)?;
99-
file.write_all(format!("{}\n", peer_info).as_bytes())
100-
}
101-
102-
// TODO: handle different kinds of NetAddress, e.g., the Hostname field.
103-
pub(crate) fn parse_peer_info(
104-
peer_pubkey_and_ip_addr: String,
105-
) -> Result<(PublicKey, SocketAddr), Error> {
106-
let mut pubkey_and_addr = peer_pubkey_and_ip_addr.split("@");
107-
let pubkey = pubkey_and_addr.next();
108-
let peer_addr_str = pubkey_and_addr.next();
109-
if peer_addr_str.is_none() || peer_addr_str.is_none() {
110-
return Err(Error::PeerInfoParse(
111-
"Incorrect format. Should be formatted as: `pubkey@host:port`.",
112-
));
113-
}
114-
115-
let peer_addr = peer_addr_str.unwrap().to_socket_addrs().map(|mut r| r.next());
116-
if peer_addr.is_err() || peer_addr.as_ref().unwrap().is_none() {
117-
return Err(Error::PeerInfoParse("Couldn't parse pubkey@host:port into a socket address."));
118-
}
119-
120-
let pubkey = hex::to_compressed_pubkey(pubkey.unwrap());
121-
if pubkey.is_none() {
122-
return Err(Error::PeerInfoParse("Unable to parse pubkey for node."));
123-
}
124-
125-
Ok((pubkey.unwrap(), peer_addr.unwrap().unwrap()))
126-
}

src/lib.rs

Lines changed: 50 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ mod event;
3030
mod hex;
3131
mod io;
3232
mod logger;
33+
mod peer_store;
3334

3435
use access::LdkLiteChainAccess;
3536
pub use error::LdkLiteError as Error;
3637
pub use event::LdkLiteEvent;
3738
use event::{LdkLiteEventHandler, LdkLiteEventQueue};
39+
use peer_store::{PeerInfo, PeerInfoStorage};
3840

3941
#[allow(unused_imports)]
4042
use logger::{
@@ -78,6 +80,7 @@ use bitcoin::BlockHash;
7880
use rand::Rng;
7981

8082
use std::collections::HashMap;
83+
use std::convert::TryFrom;
8184
use std::fs;
8285
use std::net::SocketAddr;
8386
use std::sync::atomic::{AtomicBool, Ordering};
@@ -361,6 +364,16 @@ impl LdkLiteBuilder {
361364
payment::Retry::Timeout(LDK_PAYMENT_RETRY_TIMEOUT),
362365
));
363366

367+
let peer_store = if let Ok(mut f) = fs::File::open(format!(
368+
"{}/{}",
369+
ldk_data_dir.clone(),
370+
peer_store::PEER_INFO_PERSISTENCE_KEY
371+
)) {
372+
Arc::new(PeerInfoStorage::read(&mut f, Arc::clone(&persister))?)
373+
} else {
374+
Arc::new(PeerInfoStorage::new(Arc::clone(&persister)))
375+
};
376+
364377
let running = RwLock::new(None);
365378

366379
Ok(LdkLite {
@@ -380,6 +393,7 @@ impl LdkLiteBuilder {
380393
invoice_payer,
381394
inbound_payments,
382395
outbound_payments,
396+
peer_store,
383397
})
384398
}
385399
}
@@ -413,6 +427,7 @@ pub struct LdkLite {
413427
invoice_payer: Arc<InvoicePayer<LdkLiteEventHandler>>,
414428
inbound_payments: Arc<PaymentInfoStorage>,
415429
outbound_payments: Arc<PaymentInfoStorage>,
430+
peer_store: Arc<PeerInfoStorage<FilesystemPersister>>,
416431
}
417432

418433
impl LdkLite {
@@ -516,8 +531,8 @@ impl LdkLite {
516531
// Regularly reconnect to channel peers.
517532
let connect_cm = Arc::clone(&self.channel_manager);
518533
let connect_pm = Arc::clone(&self.peer_manager);
519-
let connect_config = Arc::clone(&self.config);
520534
let connect_logger = Arc::clone(&self.logger);
535+
let connect_peer_store = Arc::clone(&self.peer_store);
521536
let stop_connect = Arc::clone(&stop_networking);
522537
tokio_runtime.spawn(async move {
523538
let mut interval = tokio::time::interval(Duration::from_secs(1));
@@ -526,35 +541,24 @@ impl LdkLite {
526541
return;
527542
}
528543
interval.tick().await;
529-
match io::read_channel_peer_data(Arc::clone(&connect_config)) {
530-
Ok(info) => {
531-
let peers = connect_pm.get_peer_node_ids();
532-
for node_id in connect_cm
533-
.list_channels()
534-
.iter()
535-
.map(|chan| chan.counterparty.node_id)
536-
.filter(|id| !peers.contains(id))
537-
{
538-
for (pubkey, peer_addr) in info.iter() {
539-
if *pubkey == node_id {
540-
let _ = do_connect_peer(
541-
*pubkey,
542-
peer_addr.clone(),
543-
Arc::clone(&connect_pm),
544-
Arc::clone(&connect_logger),
545-
)
546-
.await;
547-
}
548-
}
544+
let pm_peers = connect_pm.get_peer_node_ids();
545+
for node_id in connect_cm
546+
.list_channels()
547+
.iter()
548+
.map(|chan| chan.counterparty.node_id)
549+
.filter(|id| !pm_peers.contains(id))
550+
{
551+
for peer_info in connect_peer_store.peers() {
552+
if peer_info.pubkey == node_id {
553+
let _ = do_connect_peer(
554+
peer_info.pubkey,
555+
peer_info.address.clone(),
556+
Arc::clone(&connect_pm),
557+
Arc::clone(&connect_logger),
558+
)
559+
.await;
549560
}
550561
}
551-
Err(e) => {
552-
log_error!(
553-
connect_logger,
554-
"failure reading channel peer info from disk: {:?}",
555-
e
556-
)
557-
}
558562
}
559563
}
560564
});
@@ -618,23 +622,30 @@ impl LdkLite {
618622
/// Returns a temporary channel id
619623
pub fn connect_open_channel(
620624
&self, node_pubkey_and_address: &str, channel_amount_sats: u64, announce_channel: bool,
621-
) -> Result<[u8; 32], Error> {
625+
) -> Result<(), Error> {
622626
let runtime_lock = self.running.read().unwrap();
623627
if runtime_lock.is_none() {
624628
return Err(Error::NotRunning);
625629
}
626630

627-
let (peer_pubkey, peer_addr) = io::parse_peer_info(node_pubkey_and_address.to_string())?;
631+
let peer_info = PeerInfo::try_from(node_pubkey_and_address.to_string())?;
628632

629633
let runtime = runtime_lock.as_ref().unwrap();
630634

635+
let con_peer_info = peer_info.clone();
631636
let con_success = Arc::new(AtomicBool::new(false));
632637
let con_success_cloned = Arc::clone(&con_success);
633638
let con_logger = Arc::clone(&self.logger);
634639
let con_pm = Arc::clone(&self.peer_manager);
635640

636641
runtime.tokio_runtime.block_on(async move {
637-
let res = connect_peer_if_necessary(peer_pubkey, peer_addr, con_pm, con_logger).await;
642+
let res = connect_peer_if_necessary(
643+
con_peer_info.pubkey,
644+
con_peer_info.address,
645+
con_pm,
646+
con_logger,
647+
)
648+
.await;
638649
con_success_cloned.store(res.is_ok(), Ordering::Release);
639650
});
640651

@@ -657,15 +668,16 @@ impl LdkLite {
657668
};
658669

659670
match self.channel_manager.create_channel(
660-
peer_pubkey,
671+
peer_info.pubkey,
661672
channel_amount_sats,
662673
0,
663674
0,
664675
Some(user_config),
665676
) {
666-
Ok(temporary_channel_id) => {
667-
log_info!(self.logger, "Initiated channel with peer {}. ", peer_pubkey);
668-
Ok(temporary_channel_id)
677+
Ok(_) => {
678+
self.peer_store.add_peer(peer_info.clone())?;
679+
log_info!(self.logger, "Initiated channel with peer {}. ", peer_info.pubkey);
680+
Ok(())
669681
}
670682
Err(e) => {
671683
log_error!(self.logger, "failed to open channel: {:?}", e);
@@ -678,6 +690,7 @@ impl LdkLite {
678690
pub fn close_channel(
679691
&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey,
680692
) -> Result<(), Error> {
693+
self.peer_store.remove_peer(counterparty_node_id)?;
681694
Ok(self.channel_manager.close_channel(channel_id, counterparty_node_id)?)
682695
}
683696

@@ -737,7 +750,8 @@ impl LdkLite {
737750
return Err(Error::NotRunning);
738751
}
739752

740-
let pubkey = hex::to_compressed_pubkey(node_id).ok_or(Error::PeerInfoParse("failed to parse node id"))?;
753+
let pubkey = hex::to_compressed_pubkey(node_id)
754+
.ok_or(Error::PeerInfoParse("failed to parse node id"))?;
741755

742756
let payment_preimage = PaymentPreimage(self.keys_manager.get_secure_random_bytes());
743757
let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
@@ -841,14 +855,6 @@ impl LdkLite {
841855

842856
None
843857
}
844-
845-
//
846-
// // Query for information about our channels
847-
// pub channel_info(&self) -> ChannelInfo;
848-
//
849-
// // Query for information about our on-chain/funding status.
850-
// pub funding_info(&self) -> FundingInfo;
851-
//}
852858
}
853859

854860
async fn connect_peer_if_necessary(

0 commit comments

Comments
 (0)