Skip to content

Commit 53e0ca5

Browse files
committed
Implement KVStoreUnpersister
1 parent b5b4a08 commit 53e0ca5

File tree

5 files changed

+178
-31
lines changed

5 files changed

+178
-31
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ futures = "0.3"
6060
serde_json = { version = "1.0" }
6161
tokio = { version = "1", default-features = false, features = [ "rt-multi-thread", "time", "sync" ] }
6262
esplora-client = { version = "=0.3", default-features = false }
63+
libc = "0.2"
6364

6465
[dev-dependencies]
6566
electrsd = { version = "0.22.0", features = ["legacy", "esplora_a33e97e1", "bitcoind_23_0"] }

src/event.rs

+20-20
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::{
33
PaymentInfo, PaymentInfoStorage, PaymentStatus, Wallet,
44
};
55

6+
use crate::io_utils::KVStoreUnpersister;
67
use crate::logger::{log_error, log_info, Logger};
78

89
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
@@ -181,9 +182,9 @@ impl Writeable for EventQueueSerWrapper<'_> {
181182
}
182183
}
183184

184-
pub(crate) struct EventHandler<K: Deref, L: Deref>
185+
pub(crate) struct EventHandler<K: Deref + Clone, L: Deref>
185186
where
186-
K::Target: KVStorePersister,
187+
K::Target: KVStorePersister + KVStoreUnpersister,
187188
L::Target: Logger,
188189
{
189190
wallet: Arc<Wallet<bdk::database::SqliteDatabase>>,
@@ -197,9 +198,9 @@ where
197198
_config: Arc<Config>,
198199
}
199200

200-
impl<K: Deref, L: Deref> EventHandler<K, L>
201+
impl<K: Deref + Clone, L: Deref> EventHandler<K, L>
201202
where
202-
K::Target: KVStorePersister,
203+
K::Target: KVStorePersister + KVStoreUnpersister,
203204
L::Target: Logger,
204205
{
205206
pub fn new(
@@ -222,9 +223,9 @@ where
222223
}
223224
}
224225

225-
impl<K: Deref, L: Deref> LdkEventHandler for EventHandler<K, L>
226+
impl<K: Deref + Clone, L: Deref> LdkEventHandler for EventHandler<K, L>
226227
where
227-
K::Target: KVStorePersister,
228+
K::Target: KVStorePersister + KVStoreUnpersister,
228229
L::Target: Logger,
229230
{
230231
fn handle_event(&self, event: LdkEvent) {
@@ -363,25 +364,24 @@ where
363364
PaymentPurpose::SpontaneousPayment(preimage) => (Some(preimage), None),
364365
};
365366

366-
let payment_info =
367-
if let Some(mut payment_info) = self.payment_store.get(&payment_hash) {
367+
let mut locked_store = self.payment_store.lock().unwrap();
368+
locked_store
369+
.entry(payment_hash)
370+
.and_modify(|payment_info| {
368371
payment_info.status = PaymentStatus::Succeeded;
369372
payment_info.preimage = payment_preimage;
370373
payment_info.secret = payment_secret;
371374
payment_info.amount_msat = Some(amount_msat);
372-
payment_info
373-
} else {
374-
PaymentInfo {
375-
preimage: payment_preimage,
376-
payment_hash,
377-
secret: payment_secret,
378-
amount_msat: Some(amount_msat),
379-
direction: PaymentDirection::Inbound,
380-
status: PaymentStatus::Succeeded,
381-
}
382-
};
375+
})
376+
.or_insert(PaymentInfo {
377+
preimage: payment_preimage,
378+
payment_hash,
379+
secret: payment_secret,
380+
amount_msat: Some(amount_msat),
381+
direction: PaymentDirection::Inbound,
382+
status: PaymentStatus::Succeeded,
383+
});
383384

384-
self.payment_store.insert(payment_info).expect("Failed to access payment store");
385385
self.event_queue
386386
.add_event(Event::PaymentReceived { payment_hash, amount_msat })
387387
.expect("Failed to push to event queue");

src/io_utils.rs

+43
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ use crate::{Config, FilesystemLogger, NetworkGraph, Scorer, WALLET_KEYS_SEED_LEN
33

44
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters};
55
use lightning::util::ser::{Readable, ReadableArgs};
6+
use lightning_persister::FilesystemPersister;
67

78
use rand::{thread_rng, RngCore};
89

910
use std::fs;
1011
use std::io::{BufReader, Write};
12+
use std::os::unix::io::AsRawFd;
1113
use std::path::Path;
14+
use std::path::PathBuf;
1215
use std::sync::Arc;
1316

1417
pub(crate) fn read_or_generate_seed_file(keys_seed_path: &str) -> [u8; WALLET_KEYS_SEED_LEN] {
@@ -83,3 +86,43 @@ pub(crate) fn read_payment_info(config: &Config) -> Vec<PaymentInfo> {
8386

8487
payments
8588
}
89+
90+
/// Provides an interface that allows a previously persisted key to be unpersisted.
91+
pub trait KVStoreUnpersister {
92+
/// Unpersist (i.e., remove) the writeable previously persisted under the provided key.
93+
/// Returns `true` if the key was present, and `false` otherwise.
94+
fn unpersist(&self, key: &str) -> std::io::Result<bool>;
95+
}
96+
97+
impl KVStoreUnpersister for FilesystemPersister {
98+
fn unpersist(&self, key: &str) -> std::io::Result<bool> {
99+
let mut dest_file = PathBuf::from(self.get_data_dir());
100+
dest_file.push(key);
101+
102+
if !dest_file.is_file() {
103+
return Ok(false);
104+
}
105+
106+
fs::remove_file(&dest_file)?;
107+
#[cfg(not(target_os = "windows"))]
108+
{
109+
let parent_directory = dest_file.parent().unwrap();
110+
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
111+
unsafe {
112+
// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
113+
// to the inode might get cached (and hence possibly lost on crash), depending on
114+
// the target platform and file system.
115+
//
116+
// In order to assert we permanently removed the file in question we therefore
117+
// call `fsync` on the parent directory on platforms that support it,
118+
libc::fsync(dir_file.as_raw_fd());
119+
}
120+
}
121+
122+
if dest_file.is_file() {
123+
return Err(std::io::Error::new(std::io::ErrorKind::Other, "Unpersisting key failed"));
124+
}
125+
126+
return Ok(true);
127+
}
128+
}

src/payment_store.rs

+105-11
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
use crate::hex_utils;
2+
use crate::io_utils::KVStoreUnpersister;
23
use crate::Error;
34

45
use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
56
use lightning::util::persist::KVStorePersister;
67
use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum};
78

8-
use std::collections::HashMap;
9+
use std::collections::hash_map;
10+
use std::collections::{HashMap, HashSet};
911
use std::iter::FromIterator;
1012
use std::ops::Deref;
11-
use std::sync::Mutex;
13+
use std::sync::{Mutex, MutexGuard};
1214

1315
/// Represents a payment.
1416
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -70,21 +72,26 @@ impl_writeable_tlv_based_enum!(PaymentStatus,
7072
/// The payment information will be persisted under this prefix.
7173
pub(crate) const PAYMENT_INFO_PERSISTENCE_PREFIX: &str = "payments";
7274

73-
pub(crate) struct PaymentInfoStorage<K: Deref>
75+
pub(crate) struct PaymentInfoStorage<K: Deref + Clone>
7476
where
75-
K::Target: KVStorePersister,
77+
K::Target: KVStorePersister + KVStoreUnpersister,
7678
{
7779
payments: Mutex<HashMap<PaymentHash, PaymentInfo>>,
7880
persister: K,
7981
}
8082

81-
impl<K: Deref> PaymentInfoStorage<K>
83+
impl<K: Deref + Clone> PaymentInfoStorage<K>
8284
where
83-
K::Target: KVStorePersister,
85+
K::Target: KVStorePersister + KVStoreUnpersister,
8486
{
85-
pub(crate) fn from_payments(mut payments: Vec<PaymentInfo>, persister: K) -> Self {
87+
pub(crate) fn new(persister: K) -> Self {
88+
let payments = Mutex::new(HashMap::new());
89+
Self { payments, persister }
90+
}
91+
92+
pub(crate) fn from_payments(payments: Vec<PaymentInfo>, persister: K) -> Self {
8693
let payments = Mutex::new(HashMap::from_iter(
87-
payments.drain(..).map(|payment_info| (payment_info.payment_hash, payment_info)),
94+
payments.into_iter().map(|payment_info| (payment_info.payment_hash, payment_info)),
8895
));
8996
Self { payments, persister }
9097
}
@@ -106,9 +113,20 @@ where
106113
return Ok(());
107114
}
108115

109-
// TODO: Need an `unpersist` method for this?
110-
//pub(crate) fn remove_payment(&self, payment_hash: &PaymentHash) -> Result<(), Error> {
111-
//}
116+
pub(crate) fn lock(&self) -> Result<PaymentInfoGuard<K>, ()> {
117+
let locked_store = self.payments.lock().map_err(|_| ())?;
118+
Ok(PaymentInfoGuard::new(locked_store, self.persister.clone()))
119+
}
120+
121+
pub(crate) fn remove(&self, payment_hash: &PaymentHash) -> Result<(), Error> {
122+
let key = format!(
123+
"{}/{}",
124+
PAYMENT_INFO_PERSISTENCE_PREFIX,
125+
hex_utils::to_string(&payment_hash.0)
126+
);
127+
self.persister.unpersist(&key).map_err(|_| Error::PersistenceFailed)?;
128+
Ok(())
129+
}
112130

113131
pub(crate) fn get(&self, payment_hash: &PaymentHash) -> Option<PaymentInfo> {
114132
self.payments.lock().unwrap().get(payment_hash).cloned()
@@ -136,3 +154,79 @@ where
136154
Ok(())
137155
}
138156
}
157+
158+
pub(crate) struct PaymentInfoGuard<'a, K: Deref>
159+
where
160+
K::Target: KVStorePersister + KVStoreUnpersister,
161+
{
162+
inner: MutexGuard<'a, HashMap<PaymentHash, PaymentInfo>>,
163+
touched_keys: HashSet<PaymentHash>,
164+
persister: K,
165+
}
166+
167+
impl<'a, K: Deref> PaymentInfoGuard<'a, K>
168+
where
169+
K::Target: KVStorePersister + KVStoreUnpersister,
170+
{
171+
pub fn new(inner: MutexGuard<'a, HashMap<PaymentHash, PaymentInfo>>, persister: K) -> Self {
172+
let touched_keys = HashSet::new();
173+
Self { inner, touched_keys, persister }
174+
}
175+
176+
pub fn entry(
177+
&mut self, payment_hash: PaymentHash,
178+
) -> hash_map::Entry<PaymentHash, PaymentInfo> {
179+
self.touched_keys.insert(payment_hash);
180+
self.inner.entry(payment_hash)
181+
}
182+
}
183+
184+
impl<'a, K: Deref> Drop for PaymentInfoGuard<'a, K>
185+
where
186+
K::Target: KVStorePersister + KVStoreUnpersister,
187+
{
188+
fn drop(&mut self) {
189+
for key in self.touched_keys.iter() {
190+
let store_key =
191+
format!("{}/{}", PAYMENT_INFO_PERSISTENCE_PREFIX, hex_utils::to_string(&key.0));
192+
193+
match self.inner.entry(*key) {
194+
hash_map::Entry::Vacant(_) => {
195+
self.persister.unpersist(&store_key).expect("Persistence failed");
196+
}
197+
hash_map::Entry::Occupied(e) => {
198+
self.persister.persist(&store_key, e.get()).expect("Persistence failed");
199+
}
200+
};
201+
}
202+
}
203+
}
204+
205+
#[cfg(test)]
206+
mod tests {
207+
use super::*;
208+
use crate::tests::test_utils::TestPersister;
209+
use std::sync::Arc;
210+
211+
#[test]
212+
fn persistence_guard_persists_on_drop() {
213+
let persister = Arc::new(TestPersister::new());
214+
let payment_info_store = PaymentInfoStorage::new(Arc::clone(&persister));
215+
216+
let payment_hash = PaymentHash([42u8; 32]);
217+
assert!(!payment_info_store.contains(&payment_hash));
218+
219+
let payment_info = PaymentInfo {
220+
payment_hash,
221+
preimage: None,
222+
secret: None,
223+
amount_msat: None,
224+
direction: PaymentDirection::Inbound,
225+
status: PaymentStatus::Pending,
226+
};
227+
228+
assert!(!persister.get_and_clear_did_persist());
229+
payment_info_store.lock().unwrap().entry(payment_hash).or_insert(payment_info);
230+
assert!(persister.get_and_clear_did_persist());
231+
}
232+
}

src/tests/test_utils.rs

+9
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::io_utils::KVStoreUnpersister;
12
use lightning::util::persist::KVStorePersister;
23
use lightning::util::ser::Writeable;
34

@@ -53,3 +54,11 @@ impl KVStorePersister for TestPersister {
5354
Ok(())
5455
}
5556
}
57+
58+
impl KVStoreUnpersister for TestPersister {
59+
fn unpersist(&self, key: &str) -> std::io::Result<bool> {
60+
let mut persisted_bytes_lock = self.persisted_bytes.lock().unwrap();
61+
self.did_persist.store(true, Ordering::SeqCst);
62+
Ok(persisted_bytes_lock.remove(key).is_some())
63+
}
64+
}

0 commit comments

Comments
 (0)