Skip to content

Commit 2a3c99c

Browse files
authored
Merge pull request #1787 from wpaulino/async-event-handler
Handle events asynchronously in the BackgroundProcessor's async variant
2 parents 15b79f8 + d3f1e25 commit 2a3c99c

File tree

7 files changed

+278
-236
lines changed

7 files changed

+278
-236
lines changed

lightning-background-processor/src/lib.rs

+47-53
Original file line numberDiff line numberDiff line change
@@ -192,49 +192,22 @@ where
192192
}
193193
}
194194

195-
/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
196-
struct DecoratingEventHandler<
197-
'a,
198-
E: EventHandler,
199-
PGS: Deref<Target = P2PGossipSync<G, A, L>>,
200-
RGS: Deref<Target = RapidGossipSync<G, L>>,
201-
G: Deref<Target = NetworkGraph<L>>,
202-
A: Deref,
203-
L: Deref,
204-
>
205-
where A::Target: chain::Access, L::Target: Logger {
206-
event_handler: E,
207-
gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
208-
}
209-
210-
impl<
211-
'a,
212-
E: EventHandler,
213-
PGS: Deref<Target = P2PGossipSync<G, A, L>>,
214-
RGS: Deref<Target = RapidGossipSync<G, L>>,
215-
G: Deref<Target = NetworkGraph<L>>,
216-
A: Deref,
217-
L: Deref,
218-
> EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
219-
where A::Target: chain::Access, L::Target: Logger {
220-
fn handle_event(&self, event: &Event) {
221-
if let Some(network_graph) = self.gossip_sync.network_graph() {
222-
network_graph.handle_event(event);
195+
fn handle_network_graph_update<L: Deref>(
196+
network_graph: &NetworkGraph<L>, event: &Event
197+
) where L::Target: Logger {
198+
if let Event::PaymentPathFailed { ref network_update, .. } = event {
199+
if let Some(network_update) = network_update {
200+
network_graph.handle_network_update(&network_update);
223201
}
224-
self.event_handler.handle_event(event);
225202
}
226203
}
227204

228205
macro_rules! define_run_body {
229-
($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
206+
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
207+
$channel_manager: ident, $process_channel_manager_events: expr,
230208
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
231209
$loop_exit_check: expr, $await: expr)
232210
=> { {
233-
let event_handler = DecoratingEventHandler {
234-
event_handler: $event_handler,
235-
gossip_sync: &$gossip_sync,
236-
};
237-
238211
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
239212
$channel_manager.timer_tick_occurred();
240213

@@ -245,8 +218,8 @@ macro_rules! define_run_body {
245218
let mut have_pruned = false;
246219

247220
loop {
248-
$channel_manager.process_pending_events(&event_handler);
249-
$chain_monitor.process_pending_events(&event_handler);
221+
$process_channel_manager_events;
222+
$process_chain_monitor_events;
250223

251224
// Note that the PeerManager::process_events may block on ChannelManager's locks,
252225
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -379,7 +352,8 @@ pub async fn process_events_async<
379352
CMH: 'static + Deref + Send + Sync,
380353
RMH: 'static + Deref + Send + Sync,
381354
OMH: 'static + Deref + Send + Sync,
382-
EH: 'static + EventHandler + Send,
355+
EventHandlerFuture: core::future::Future<Output = ()>,
356+
EventHandler: Fn(Event) -> EventHandlerFuture,
383357
PS: 'static + Deref + Send,
384358
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
385359
CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
@@ -392,7 +366,7 @@ pub async fn process_events_async<
392366
SleepFuture: core::future::Future<Output = bool>,
393367
Sleeper: Fn(Duration) -> SleepFuture
394368
>(
395-
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
369+
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
396370
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
397371
sleeper: Sleeper,
398372
) -> Result<(), std::io::Error>
@@ -412,7 +386,19 @@ where
412386
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
413387
{
414388
let mut should_break = true;
415-
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
389+
let async_event_handler = |event| {
390+
let network_graph = gossip_sync.network_graph();
391+
let event_handler = &event_handler;
392+
async move {
393+
if let Some(network_graph) = network_graph {
394+
handle_network_graph_update(network_graph, &event)
395+
}
396+
event_handler(event).await;
397+
}
398+
};
399+
define_run_body!(persister,
400+
chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
401+
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
416402
gossip_sync, peer_manager, logger, scorer, should_break, {
417403
select_biased! {
418404
_ = channel_manager.get_persistable_update_future().fuse() => true,
@@ -517,7 +503,15 @@ impl BackgroundProcessor {
517503
let stop_thread = Arc::new(AtomicBool::new(false));
518504
let stop_thread_clone = stop_thread.clone();
519505
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
520-
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
506+
let event_handler = |event| {
507+
let network_graph = gossip_sync.network_graph();
508+
if let Some(network_graph) = network_graph {
509+
handle_network_graph_update(network_graph, &event)
510+
}
511+
event_handler.handle_event(event);
512+
};
513+
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
514+
channel_manager, channel_manager.process_pending_events(&event_handler),
521515
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
522516
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
523517
});
@@ -769,7 +763,7 @@ mod tests {
769763
begin_open_channel!($node_a, $node_b, $channel_value);
770764
let events = $node_a.node.get_and_clear_pending_events();
771765
assert_eq!(events.len(), 1);
772-
let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
766+
let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
773767
end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
774768
tx
775769
}}
@@ -786,7 +780,7 @@ mod tests {
786780
macro_rules! handle_funding_generation_ready {
787781
($event: expr, $channel_value: expr) => {{
788782
match $event {
789-
&Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
783+
Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
790784
assert_eq!(channel_value_satoshis, $channel_value);
791785
assert_eq!(user_channel_id, 42);
792786

@@ -847,7 +841,7 @@ mod tests {
847841
// Initiate the background processors to watch each node.
848842
let data_dir = nodes[0].persister.get_data_dir();
849843
let persister = Arc::new(Persister::new(data_dir));
850-
let event_handler = |_: &_| {};
844+
let event_handler = |_: _| {};
851845
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
852846

853847
macro_rules! check_persisted_data {
@@ -909,7 +903,7 @@ mod tests {
909903
let nodes = create_nodes(1, "test_timer_tick_called".to_string());
910904
let data_dir = nodes[0].persister.get_data_dir();
911905
let persister = Arc::new(Persister::new(data_dir));
912-
let event_handler = |_: &_| {};
906+
let event_handler = |_: _| {};
913907
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
914908
loop {
915909
let log_entries = nodes[0].logger.lines.lock().unwrap();
@@ -932,7 +926,7 @@ mod tests {
932926

933927
let data_dir = nodes[0].persister.get_data_dir();
934928
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
935-
let event_handler = |_: &_| {};
929+
let event_handler = |_: _| {};
936930
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
937931
match bg_processor.join() {
938932
Ok(_) => panic!("Expected error persisting manager"),
@@ -949,7 +943,7 @@ mod tests {
949943
let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
950944
let data_dir = nodes[0].persister.get_data_dir();
951945
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
952-
let event_handler = |_: &_| {};
946+
let event_handler = |_: _| {};
953947
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
954948

955949
match bg_processor.stop() {
@@ -967,7 +961,7 @@ mod tests {
967961
let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
968962
let data_dir = nodes[0].persister.get_data_dir();
969963
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
970-
let event_handler = |_: &_| {};
964+
let event_handler = |_: _| {};
971965
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
972966

973967
match bg_processor.stop() {
@@ -988,7 +982,7 @@ mod tests {
988982

989983
// Set up a background event handler for FundingGenerationReady events.
990984
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
991-
let event_handler = move |event: &Event| match event {
985+
let event_handler = move |event: Event| match event {
992986
Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
993987
Event::ChannelReady { .. } => {},
994988
_ => panic!("Unexpected event: {:?}", event),
@@ -1017,7 +1011,7 @@ mod tests {
10171011

10181012
// Set up a background event handler for SpendableOutputs events.
10191013
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1020-
let event_handler = move |event: &Event| match event {
1014+
let event_handler = move |event: Event| match event {
10211015
Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
10221016
Event::ChannelReady { .. } => {},
10231017
Event::ChannelClosed { .. } => {},
@@ -1047,7 +1041,7 @@ mod tests {
10471041
let nodes = create_nodes(2, "test_scorer_persistence".to_string());
10481042
let data_dir = nodes[0].persister.get_data_dir();
10491043
let persister = Arc::new(Persister::new(data_dir));
1050-
let event_handler = |_: &_| {};
1044+
let event_handler = |_: _| {};
10511045
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
10521046

10531047
loop {
@@ -1075,7 +1069,7 @@ mod tests {
10751069
assert!(original_graph_description.contains("42: features: 0000, node_one:"));
10761070
assert_eq!(network_graph.read_only().channels().len(), 1);
10771071

1078-
let event_handler = |_: &_| {};
1072+
let event_handler = |_: _| {};
10791073
let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
10801074

10811075
loop {
@@ -1128,7 +1122,7 @@ mod tests {
11281122
let data_dir = nodes[0].persister.get_data_dir();
11291123
let persister = Arc::new(Persister::new(data_dir));
11301124
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1131-
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1125+
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
11321126
let event_handler = Arc::clone(&invoice_payer);
11331127
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
11341128
assert!(bg_processor.stop().is_ok());

0 commit comments

Comments
 (0)