Skip to content

Commit ec8e03f

Browse files
committed
Impl event queue persistence
1 parent 2409741 commit ec8e03f

File tree

1 file changed

+113
-8
lines changed

1 file changed

+113
-8
lines changed

src/event.rs

Lines changed: 113 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
ChannelManager, EventSender, LdkLiteChainAccess, LdkLiteConfig, NetworkGraph,
2+
ChannelManager, Error, FilesystemPersister, LdkLiteChainAccess, LdkLiteConfig, NetworkGraph,
33
PaymentInfoStorage,
44
};
55

@@ -12,48 +12,153 @@ use crate::logger::{
1212
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
1313
use lightning::chain::keysinterface::KeysManager;
1414
use lightning::util::events as ldk_events;
15+
use lightning::util::persist::KVStorePersister;
16+
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
1517

1618
use bdk::database::MemoryDatabase;
1719

1820
use bitcoin::secp256k1::Secp256k1;
19-
//use std::collections::{hash_map, VecDeque};
20-
//use std::iter::Iterator;
21-
use std::sync::Arc;
21+
use std::collections::VecDeque;
22+
use std::sync::{Arc, Condvar, Mutex};
23+
24+
/// The event queue will be persisted under this key.
25+
pub(crate) const EVENTS_PERSISTENCE_KEY: &str = "events";
2226

2327
/// An LdkLiteEvent that should be handled by the user.
28+
#[derive(Debug, Clone)]
2429
pub enum LdkLiteEvent {
2530
/// asdf
2631
Test,
2732
}
2833

34+
impl Readable for LdkLiteEvent {
35+
fn read<R: lightning::io::Read>(
36+
reader: &mut R,
37+
) -> Result<Self, lightning::ln::msgs::DecodeError> {
38+
match Readable::read(reader)? {
39+
// TODO
40+
0u8 => Ok(LdkLiteEvent::Test),
41+
_ => Ok(LdkLiteEvent::Test),
42+
}
43+
}
44+
}
45+
46+
impl Writeable for LdkLiteEvent {
47+
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), lightning::io::Error> {
48+
match self {
49+
Test => {
50+
// TODO
51+
0u8.write(writer)?;
52+
Ok(())
53+
}
54+
}
55+
}
56+
}
57+
58+
pub(crate) struct LdkLiteEventQueue<K: KVStorePersister> {
59+
queue: Mutex<EventQueueSerWrapper>,
60+
notifier: Condvar,
61+
persister: Arc<K>,
62+
}
63+
64+
impl<K: KVStorePersister> LdkLiteEventQueue<K> {
65+
pub(crate) fn new(persister: Arc<K>) -> Self {
66+
let queue: Mutex<EventQueueSerWrapper> = Mutex::new(EventQueueSerWrapper(VecDeque::new()));
67+
let notifier = Condvar::new();
68+
Self { queue, notifier, persister }
69+
}
70+
pub(crate) fn add_event(&self, event: LdkLiteEvent) -> Result<(), Error> {
71+
let mut locked_queue = self.queue.lock().unwrap();
72+
locked_queue.0.push_back(event);
73+
74+
self.persister.persist(EVENTS_PERSISTENCE_KEY, &*locked_queue)?;
75+
76+
self.notifier.notify_one();
77+
Ok(())
78+
}
79+
80+
pub(crate) fn next_event(&self) -> LdkLiteEvent {
81+
let locked_queue = self
82+
.notifier
83+
.wait_while(self.queue.lock().unwrap(), |queue| queue.0.is_empty())
84+
.unwrap();
85+
locked_queue.0.front().unwrap().clone()
86+
}
87+
88+
pub(crate) fn event_handled(&self) -> Result<(), Error> {
89+
let mut locked_queue = self.queue.lock().unwrap();
90+
locked_queue.0.pop_front();
91+
self.persister.persist(EVENTS_PERSISTENCE_KEY, &*locked_queue)?;
92+
self.notifier.notify_one();
93+
Ok(())
94+
}
95+
}
96+
97+
impl<K: KVStorePersister> ReadableArgs<Arc<K>> for LdkLiteEventQueue<K> {
98+
#[inline]
99+
fn read<R: lightning::io::Read>(
100+
reader: &mut R, persister: Arc<K>,
101+
) -> Result<Self, lightning::ln::msgs::DecodeError> {
102+
let queue: Mutex<EventQueueSerWrapper> = Mutex::new(Readable::read(reader)?);
103+
let notifier = Condvar::new();
104+
Ok(Self { queue, notifier, persister })
105+
}
106+
}
107+
108+
struct EventQueueSerWrapper(VecDeque<LdkLiteEvent>);
109+
110+
impl Readable for EventQueueSerWrapper {
111+
fn read<R: lightning::io::Read>(
112+
reader: &mut R,
113+
) -> Result<Self, lightning::ln::msgs::DecodeError> {
114+
let len: u16 = Readable::read(reader)?;
115+
let mut queue = VecDeque::with_capacity(len as usize);
116+
for _ in 0..len {
117+
queue.push_back(Readable::read(reader)?);
118+
}
119+
Ok(EventQueueSerWrapper(queue))
120+
}
121+
}
122+
123+
impl Writeable for EventQueueSerWrapper {
124+
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), lightning::io::Error> {
125+
(self.0.len() as u16).write(writer)?;
126+
for e in self.0.iter() {
127+
e.write(writer)?;
128+
}
129+
Ok(())
130+
}
131+
}
132+
29133
pub(crate) struct LdkLiteEventHandler {
30134
chain_access: Arc<LdkLiteChainAccess<MemoryDatabase>>,
135+
event_queue: Arc<LdkLiteEventQueue<FilesystemPersister>>,
31136
channel_manager: Arc<ChannelManager>,
32137
_network_graph: Arc<NetworkGraph>,
33138
keys_manager: Arc<KeysManager>,
34139
_inbound_payments: Arc<PaymentInfoStorage>,
35140
_outbound_payments: Arc<PaymentInfoStorage>,
36-
_event_sender: EventSender,
37141
logger: Arc<FilesystemLogger>,
38142
_config: Arc<LdkLiteConfig>,
39143
}
40144

41145
impl LdkLiteEventHandler {
42146
pub fn new(
43147
chain_access: Arc<LdkLiteChainAccess<MemoryDatabase>>,
148+
event_queue: Arc<LdkLiteEventQueue<FilesystemPersister>>,
44149
channel_manager: Arc<ChannelManager>, _network_graph: Arc<NetworkGraph>,
45150
keys_manager: Arc<KeysManager>, _inbound_payments: Arc<PaymentInfoStorage>,
46-
_outbound_payments: Arc<PaymentInfoStorage>, _event_sender: EventSender,
47-
logger: Arc<FilesystemLogger>, _config: Arc<LdkLiteConfig>,
151+
_outbound_payments: Arc<PaymentInfoStorage>, logger: Arc<FilesystemLogger>,
152+
_config: Arc<LdkLiteConfig>,
48153
) -> Self {
49154
Self {
155+
event_queue,
50156
chain_access,
51157
channel_manager,
52158
_network_graph,
53159
keys_manager,
54160
_inbound_payments,
55161
_outbound_payments,
56-
_event_sender,
57162
logger,
58163
_config,
59164
}

0 commit comments

Comments
 (0)