Skip to content

Commit 44217eb

Browse files
committed
Adopt event handling logic from ldk-sample
1 parent ec8e03f commit 44217eb

File tree

1 file changed

+189
-19
lines changed

1 file changed

+189
-19
lines changed

src/event.rs

Lines changed: 189 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
2-
ChannelManager, Error, FilesystemPersister, LdkLiteChainAccess, LdkLiteConfig, NetworkGraph,
3-
PaymentInfoStorage,
2+
hex, ChannelManager, Error, FilesystemPersister, LdkLiteChainAccess, LdkLiteConfig,
3+
NetworkGraph, PaymentInfo, PaymentInfoStorage, PaymentStatus,
44
};
55

66
#[allow(unused_imports)]
@@ -11,15 +11,19 @@ use crate::logger::{
1111

1212
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
1313
use lightning::chain::keysinterface::KeysManager;
14+
use lightning::routing::gossip::NodeId;
1415
use lightning::util::events as ldk_events;
1516
use lightning::util::persist::KVStorePersister;
1617
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
1718

1819
use bdk::database::MemoryDatabase;
1920

2021
use bitcoin::secp256k1::Secp256k1;
21-
use std::collections::VecDeque;
22+
use rand::{thread_rng, Rng};
23+
use std::collections::{hash_map, VecDeque};
2224
use std::sync::{Arc, Condvar, Mutex};
25+
use std::thread;
26+
use std::time::Duration;
2327

2428
/// The event queue will be persisted under this key.
2529
pub(crate) const EVENTS_PERSISTENCE_KEY: &str = "events";
@@ -134,10 +138,10 @@ pub(crate) struct LdkLiteEventHandler {
134138
chain_access: Arc<LdkLiteChainAccess<MemoryDatabase>>,
135139
event_queue: Arc<LdkLiteEventQueue<FilesystemPersister>>,
136140
channel_manager: Arc<ChannelManager>,
137-
_network_graph: Arc<NetworkGraph>,
141+
network_graph: Arc<NetworkGraph>,
138142
keys_manager: Arc<KeysManager>,
139-
_inbound_payments: Arc<PaymentInfoStorage>,
140-
_outbound_payments: Arc<PaymentInfoStorage>,
143+
inbound_payments: Arc<PaymentInfoStorage>,
144+
outbound_payments: Arc<PaymentInfoStorage>,
141145
logger: Arc<FilesystemLogger>,
142146
_config: Arc<LdkLiteConfig>,
143147
}
@@ -146,19 +150,19 @@ impl LdkLiteEventHandler {
146150
pub fn new(
147151
chain_access: Arc<LdkLiteChainAccess<MemoryDatabase>>,
148152
event_queue: Arc<LdkLiteEventQueue<FilesystemPersister>>,
149-
channel_manager: Arc<ChannelManager>, _network_graph: Arc<NetworkGraph>,
150-
keys_manager: Arc<KeysManager>, _inbound_payments: Arc<PaymentInfoStorage>,
151-
_outbound_payments: Arc<PaymentInfoStorage>, logger: Arc<FilesystemLogger>,
153+
channel_manager: Arc<ChannelManager>, network_graph: Arc<NetworkGraph>,
154+
keys_manager: Arc<KeysManager>, inbound_payments: Arc<PaymentInfoStorage>,
155+
outbound_payments: Arc<PaymentInfoStorage>, logger: Arc<FilesystemLogger>,
152156
_config: Arc<LdkLiteConfig>,
153157
) -> Self {
154158
Self {
155159
event_queue,
156160
chain_access,
157161
channel_manager,
158-
_network_graph,
162+
network_graph,
159163
keys_manager,
160-
_inbound_payments,
161-
_outbound_payments,
164+
inbound_payments,
165+
outbound_payments,
162166
logger,
163167
_config,
164168
}
@@ -207,16 +211,113 @@ impl ldk_events::EventHandler for LdkLiteEventHandler {
207211
}
208212
}
209213
}
210-
ldk_events::Event::PaymentReceived { .. } => {}
211-
ldk_events::Event::PaymentClaimed { .. } => {}
212-
ldk_events::Event::PaymentSent { .. } => {}
213-
ldk_events::Event::PaymentFailed { .. } => {}
214+
ldk_events::Event::PaymentReceived { payment_hash, purpose, amount_msat } => {
215+
log_info!(
216+
self.logger,
217+
"Received payment from payment hash {} of {} millisatoshis",
218+
hex::to_string(&payment_hash.0),
219+
amount_msat,
220+
);
221+
let payment_preimage = match purpose {
222+
ldk_events::PaymentPurpose::InvoicePayment { payment_preimage, .. } => {
223+
*payment_preimage
224+
}
225+
ldk_events::PaymentPurpose::SpontaneousPayment(preimage) => Some(*preimage),
226+
};
227+
self.channel_manager.claim_funds(payment_preimage.unwrap());
228+
}
229+
ldk_events::Event::PaymentClaimed { payment_hash, purpose, amount_msat } => {
230+
log_info!(
231+
self.logger,
232+
"Claimed payment from payment hash {} of {} millisatoshis",
233+
hex::to_string(&payment_hash.0),
234+
amount_msat,
235+
);
236+
let (payment_preimage, payment_secret) = match purpose {
237+
ldk_events::PaymentPurpose::InvoicePayment {
238+
payment_preimage,
239+
payment_secret,
240+
..
241+
} => (*payment_preimage, Some(*payment_secret)),
242+
ldk_events::PaymentPurpose::SpontaneousPayment(preimage) => {
243+
(Some(*preimage), None)
244+
}
245+
};
246+
let mut payments = self.inbound_payments.lock().unwrap();
247+
match payments.entry(*payment_hash) {
248+
hash_map::Entry::Occupied(mut e) => {
249+
let payment = e.get_mut();
250+
payment.status = PaymentStatus::Succeeded;
251+
payment.preimage = payment_preimage;
252+
payment.secret = payment_secret;
253+
}
254+
hash_map::Entry::Vacant(e) => {
255+
e.insert(PaymentInfo {
256+
preimage: payment_preimage,
257+
secret: payment_secret,
258+
status: PaymentStatus::Succeeded,
259+
amount_msat: Some(*amount_msat),
260+
});
261+
}
262+
}
263+
}
264+
ldk_events::Event::PaymentSent {
265+
payment_preimage,
266+
payment_hash,
267+
fee_paid_msat,
268+
..
269+
} => {
270+
let mut payments = self.outbound_payments.lock().unwrap();
271+
for (hash, payment) in payments.iter_mut() {
272+
if *hash == *payment_hash {
273+
payment.preimage = Some(*payment_preimage);
274+
payment.status = PaymentStatus::Succeeded;
275+
log_info!(
276+
self.logger,
277+
"Successfully sent payment of {} millisatoshis{} from \
278+
payment hash {:?} with preimage {:?}",
279+
payment.amount_msat.unwrap(),
280+
if let Some(fee) = fee_paid_msat {
281+
format!(" (fee {} msat)", fee)
282+
} else {
283+
"".to_string()
284+
},
285+
hex::to_string(&payment_hash.0),
286+
hex::to_string(&payment_preimage.0)
287+
);
288+
}
289+
}
290+
}
291+
ldk_events::Event::PaymentFailed { payment_hash, .. } => {
292+
log_info!(
293+
self.logger,
294+
"Failed to send payment to payment hash {:?}: exhausted payment retry attempts",
295+
hex::to_string(&payment_hash.0)
296+
);
297+
298+
let mut payments = self.outbound_payments.lock().unwrap();
299+
if payments.contains_key(&payment_hash) {
300+
let payment = payments.get_mut(&payment_hash).unwrap();
301+
payment.status = PaymentStatus::Failed;
302+
}
303+
}
304+
214305
ldk_events::Event::PaymentPathSuccessful { .. } => {}
215306
ldk_events::Event::PaymentPathFailed { .. } => {}
216307
ldk_events::Event::ProbeSuccessful { .. } => {}
217308
ldk_events::Event::ProbeFailed { .. } => {}
218309
ldk_events::Event::HTLCHandlingFailed { .. } => {}
219-
ldk_events::Event::PendingHTLCsForwardable { .. } => {}
310+
ldk_events::Event::PendingHTLCsForwardable { time_forwardable } => {
311+
let forwarding_channel_manager = self.channel_manager.clone();
312+
let min = time_forwardable.as_millis() as u64;
313+
314+
// TODO: any way we still can use tokio here?
315+
thread::spawn(move || {
316+
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
317+
thread::sleep(Duration::from_millis(millis_to_sleep));
318+
forwarding_channel_manager.process_pending_htlc_forwards();
319+
});
320+
}
220321
ldk_events::Event::SpendableOutputs { outputs } => {
221322
let destination_address = self.chain_access.get_new_address().unwrap();
222323
let output_descriptors = &outputs.iter().map(|a| a).collect::<Vec<_>>();
@@ -235,8 +336,77 @@ impl ldk_events::EventHandler for LdkLiteEventHandler {
235336
self.chain_access.broadcast_transaction(&spending_tx);
236337
}
237338
ldk_events::Event::OpenChannelRequest { .. } => {}
238-
ldk_events::Event::PaymentForwarded { .. } => {}
239-
ldk_events::Event::ChannelClosed { .. } => {}
339+
ldk_events::Event::PaymentForwarded {
340+
prev_channel_id,
341+
next_channel_id,
342+
fee_earned_msat,
343+
claim_from_onchain_tx,
344+
} => {
345+
let read_only_network_graph = self.network_graph.read_only();
346+
let nodes = read_only_network_graph.nodes();
347+
let channels = self.channel_manager.list_channels();
348+
349+
let node_str = |channel_id: &Option<[u8; 32]>| match channel_id {
350+
None => String::new(),
351+
Some(channel_id) => match channels.iter().find(|c| c.channel_id == *channel_id)
352+
{
353+
None => String::new(),
354+
Some(channel) => {
355+
match nodes.get(&NodeId::from_pubkey(&channel.counterparty.node_id)) {
356+
None => "private node".to_string(),
357+
Some(node) => match &node.announcement_info {
358+
None => "unnamed node".to_string(),
359+
Some(announcement) => {
360+
format!("node {}", announcement.alias)
361+
}
362+
},
363+
}
364+
}
365+
},
366+
};
367+
let channel_str = |channel_id: &Option<[u8; 32]>| {
368+
channel_id
369+
.map(|channel_id| format!(" with channel {}", hex::to_string(&channel_id)))
370+
.unwrap_or_default()
371+
};
372+
let from_prev_str =
373+
format!(" from {}{}", node_str(prev_channel_id), channel_str(prev_channel_id));
374+
let to_next_str =
375+
format!(" to {}{}", node_str(next_channel_id), channel_str(next_channel_id));
376+
377+
let from_onchain_str = if *claim_from_onchain_tx {
378+
"from onchain downstream claim"
379+
} else {
380+
"from HTLC fulfill message"
381+
};
382+
if let Some(fee_earned) = fee_earned_msat {
383+
log_info!(
384+
self.logger,
385+
"Forwarded payment{}{}, earning {} msat {}",
386+
from_prev_str,
387+
to_next_str,
388+
fee_earned,
389+
from_onchain_str
390+
);
391+
} else {
392+
log_info!(
393+
self.logger,
394+
"Forwarded payment{}{}, claiming onchain {}",
395+
from_prev_str,
396+
to_next_str,
397+
from_onchain_str
398+
);
399+
}
400+
}
401+
402+
ldk_events::Event::ChannelClosed { channel_id, reason, user_channel_id: _ } => {
403+
log_info!(
404+
self.logger,
405+
"Channel {} closed due to: {:?}",
406+
hex::to_string(channel_id),
407+
reason
408+
);
409+
}
240410
ldk_events::Event::DiscardFunding { .. } => {}
241411
}
242412
}

0 commit comments

Comments
 (0)