@@ -236,15 +236,11 @@ where A::Target: chain::Access, L::Target: Logger {
236
236
}
237
237
238
238
macro_rules! define_run_body {
239
- ( $persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
239
+ ( $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
240
+ $channel_manager: ident, $process_channel_manager_events: expr,
240
241
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
241
242
$loop_exit_check: expr, $await: expr)
242
243
=> { {
243
- let event_handler = DecoratingEventHandler {
244
- event_handler: $event_handler,
245
- gossip_sync: & $gossip_sync,
246
- } ;
247
-
248
244
log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
249
245
$channel_manager. timer_tick_occurred( ) ;
250
246
@@ -255,8 +251,8 @@ macro_rules! define_run_body {
255
251
let mut have_pruned = false ;
256
252
257
253
loop {
258
- $channel_manager . process_pending_events ( & event_handler ) ;
259
- $chain_monitor . process_pending_events ( & event_handler ) ;
254
+ $process_channel_manager_events ;
255
+ $process_chain_monitor_events ;
260
256
261
257
// Note that the PeerManager::process_events may block on ChannelManager's locks,
262
258
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -389,7 +385,8 @@ pub async fn process_events_async<
389
385
CMH : ' static + Deref + Send + Sync ,
390
386
RMH : ' static + Deref + Send + Sync ,
391
387
OMH : ' static + Deref + Send + Sync ,
392
- EH : ' static + EventHandler + Send ,
388
+ EventHandlerFuture : core:: future:: Future < Output = ( ) > ,
389
+ EventHandler : Fn ( Event ) -> EventHandlerFuture ,
393
390
PS : ' static + Deref + Send ,
394
391
M : ' static + Deref < Target = ChainMonitor < Signer , CF , T , F , L , P > > + Send + Sync ,
395
392
CM : ' static + Deref < Target = ChannelManager < CW , T , K , F , L > > + Send + Sync ,
@@ -402,7 +399,7 @@ pub async fn process_events_async<
402
399
SleepFuture : core:: future:: Future < Output = bool > ,
403
400
Sleeper : Fn ( Duration ) -> SleepFuture
404
401
> (
405
- persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
402
+ persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
406
403
gossip_sync : GossipSync < PGS , RGS , G , CA , L > , peer_manager : PM , logger : L , scorer : Option < S > ,
407
404
sleeper : Sleeper ,
408
405
) -> Result < ( ) , std:: io:: Error >
@@ -422,7 +419,19 @@ where
422
419
PS :: Target : ' static + Persister < ' a , Signer , CW , T , K , F , L , SC > ,
423
420
{
424
421
let mut should_continue = true ;
425
- define_run_body ! ( persister, event_handler, chain_monitor, channel_manager,
422
+ let async_event_handler = |event| -> core:: pin:: Pin < Box < dyn core:: future:: Future < Output = ( ) > > > {
423
+ let network_graph = gossip_sync. network_graph ( ) ;
424
+ let event_handler = & event_handler;
425
+ Box :: pin ( async move {
426
+ if let Some ( network_graph) = network_graph {
427
+ handle_network_graph_update ( network_graph, & event)
428
+ }
429
+ event_handler ( event) . await ;
430
+ } )
431
+ } ;
432
+ define_run_body ! ( persister,
433
+ chain_monitor, chain_monitor. process_pending_events_async( async_event_handler) . await ,
434
+ channel_manager, channel_manager. process_pending_events_async( async_event_handler) . await ,
426
435
gossip_sync, peer_manager, logger, scorer, should_continue, {
427
436
select_biased! {
428
437
_ = channel_manager. get_persistable_update_future( ) . fuse( ) => true ,
@@ -527,7 +536,12 @@ impl BackgroundProcessor {
527
536
let stop_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
528
537
let stop_thread_clone = stop_thread. clone ( ) ;
529
538
let handle = thread:: spawn ( move || -> Result < ( ) , std:: io:: Error > {
530
- define_run_body ! ( persister, event_handler, chain_monitor, channel_manager,
539
+ let event_handler = DecoratingEventHandler {
540
+ event_handler,
541
+ gossip_sync : & gossip_sync,
542
+ } ;
543
+ define_run_body ! ( persister, chain_monitor, chain_monitor. process_pending_events( & event_handler) ,
544
+ channel_manager, channel_manager. process_pending_events( & event_handler) ,
531
545
gossip_sync, peer_manager, logger, scorer, stop_thread. load( Ordering :: Acquire ) ,
532
546
channel_manager. await_persistable_update_timeout( Duration :: from_millis( 100 ) ) )
533
547
} ) ;
0 commit comments