Skip to content

Commit 55b714c

Browse files
committed
Implement async versions of process_pending_events
1 parent 05cb467 commit 55b714c

File tree

2 files changed

+53
-2
lines changed

2 files changed

+53
-2
lines changed

lightning/src/chain/chainmonitor.rs

+19-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::util::atomic_counter::AtomicCounter;
3636
use crate::util::logger::Logger;
3737
use crate::util::errors::APIError;
3838
use crate::util::events;
39-
use crate::util::events::EventHandler;
39+
use crate::util::events::{Event, EventHandler};
4040
use crate::ln::channelmanager::ChannelDetails;
4141

4242
use crate::prelude::*;
@@ -496,6 +496,24 @@ where C::Target: chain::Filter,
496496
self.process_pending_events(&event_handler);
497497
events.into_inner()
498498
}
499+
500+
/// Processes any events asynchronously in the order they were generated since the last call
501+
/// using the given event handler.
502+
///
503+
/// See the trait-level documentation of [`EventsProvider`] for requirements.
504+
///
505+
/// [`EventsProvider`]: crate::util::events::EventsProvider
506+
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
507+
&self, handler: H
508+
) {
509+
let mut pending_events = Vec::new();
510+
for monitor_state in self.monitors.read().unwrap().values() {
511+
pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
512+
}
513+
for event in pending_events {
514+
handler(event).await;
515+
}
516+
}
499517
}
500518

501519
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>

lightning/src/ln/channelmanager.rs

+34-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VA
5353
use crate::ln::wire::Encode;
5454
use crate::chain::keysinterface::{Sign, KeysInterface, KeysManager, Recipient};
5555
use crate::util::config::{UserConfig, ChannelConfig};
56-
use crate::util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
56+
use crate::util::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
5757
use crate::util::{byte_utils, events};
5858
use crate::util::wakers::{Future, Notifier};
5959
use crate::util::scid_utils::fake_scid;
@@ -5728,6 +5728,39 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
57285728
pub fn clear_pending_payments(&self) {
57295729
self.pending_outbound_payments.lock().unwrap().clear()
57305730
}
5731+
5732+
/// Processes any events asynchronously in the order they were generated since the last call
5733+
/// using the given event handler.
5734+
///
5735+
/// See the trait-level documentation of [`EventsProvider`] for requirements.
5736+
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
5737+
&self, handler: H
5738+
) {
5739+
// We'll acquire our total consistency lock until the returned future completes so that
5740+
// we can be sure no other persists happen while processing events.
5741+
let _read_guard = self.total_consistency_lock.read().unwrap();
5742+
5743+
let mut result = NotifyOption::SkipPersist;
5744+
5745+
// TODO: This behavior should be documented. It's unintuitive that we query
5746+
// ChannelMonitors when clearing other events.
5747+
if self.process_pending_monitor_events() {
5748+
result = NotifyOption::DoPersist;
5749+
}
5750+
5751+
let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
5752+
if !pending_events.is_empty() {
5753+
result = NotifyOption::DoPersist;
5754+
}
5755+
5756+
for event in pending_events {
5757+
handler(event).await;
5758+
}
5759+
5760+
if result == NotifyOption::DoPersist {
5761+
self.persistence_notifier.notify();
5762+
}
5763+
}
57315764
}
57325765

57335766
impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<M, T, K, F, L>

0 commit comments

Comments
 (0)