28
28
//! Run: `RUST_LOG=info cargo run --example custom_executor`
29
29
30
30
use litep2p:: {
31
- config:: ConfigBuilder ,
32
- executor:: Executor ,
33
- protocol:: libp2p:: ping:: { Config as PingConfig , PingEvent } ,
34
- transport:: tcp:: config:: Config as TcpConfig ,
35
- Litep2p ,
31
+ config:: ConfigBuilder ,
32
+ executor:: Executor ,
33
+ protocol:: libp2p:: ping:: { Config as PingConfig , PingEvent } ,
34
+ transport:: tcp:: config:: Config as TcpConfig ,
35
+ Litep2p ,
36
36
} ;
37
37
38
38
use futures:: { future:: BoxFuture , stream:: FuturesUnordered , Stream , StreamExt } ;
@@ -44,112 +44,102 @@ use std::{future::Future, pin::Pin, sync::Arc};
44
44
///
45
45
/// Just a wrapper around `FuturesUnordered` which receives the futures over `mpsc::Receiver`.
46
46
struct TaskExecutor {
47
- rx : Receiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
48
- futures : FuturesUnordered < BoxFuture < ' static , ( ) > > ,
47
+ rx : Receiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
48
+ futures : FuturesUnordered < BoxFuture < ' static , ( ) > > ,
49
49
}
50
50
51
51
impl TaskExecutor {
52
- /// Create new [`TaskExecutor`].
53
- fn new ( ) -> ( Self , Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ) {
54
- let ( tx, rx) = channel ( 64 ) ;
55
-
56
- (
57
- Self {
58
- rx,
59
- futures : FuturesUnordered :: new ( ) ,
60
- } ,
61
- tx,
62
- )
63
- }
64
-
65
- /// Drive the futures forward and poll the receiver for any new futures.
66
- async fn next ( & mut self ) {
67
- loop {
68
- tokio:: select! {
69
- future = self . rx. recv( ) => self . futures. push( future. unwrap( ) ) ,
70
- _ = self . futures. next( ) , if !self . futures. is_empty( ) => { }
71
- }
72
- }
73
- }
52
+ /// Create new [`TaskExecutor`].
53
+ fn new ( ) -> ( Self , Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ) {
54
+ let ( tx, rx) = channel ( 64 ) ;
55
+
56
+ ( Self { rx, futures : FuturesUnordered :: new ( ) } , tx)
57
+ }
58
+
59
+ /// Drive the futures forward and poll the receiver for any new futures.
60
+ async fn next ( & mut self ) {
61
+ loop {
62
+ tokio:: select! {
63
+ future = self . rx. recv( ) => self . futures. push( future. unwrap( ) ) ,
64
+ _ = self . futures. next( ) , if !self . futures. is_empty( ) => { }
65
+ }
66
+ }
67
+ }
74
68
}
75
69
76
70
struct TaskExecutorHandle {
77
- tx : Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
71
+ tx : Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
78
72
}
79
73
80
74
impl Executor for TaskExecutorHandle {
81
- fn run ( & self , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
82
- let _ = self . tx . try_send ( future) ;
83
- }
75
+ fn run ( & self , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
76
+ let _ = self . tx . try_send ( future) ;
77
+ }
84
78
85
- fn run_with_name ( & self , _: & ' static str , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
86
- let _ = self . tx . try_send ( future) ;
87
- }
79
+ fn run_with_name ( & self , _: & ' static str , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
80
+ let _ = self . tx . try_send ( future) ;
81
+ }
88
82
}
89
83
90
- fn make_litep2p ( ) -> (
91
- Litep2p ,
92
- TaskExecutor ,
93
- Box < dyn Stream < Item = PingEvent > + Send + Unpin > ,
94
- ) {
95
- let ( executor, sender) = TaskExecutor :: new ( ) ;
96
- let ( ping_config, ping_event_stream) = PingConfig :: default ( ) ;
97
-
98
- let litep2p = Litep2p :: new (
99
- ConfigBuilder :: new ( )
100
- . with_executor ( Arc :: new ( TaskExecutorHandle { tx : sender. clone ( ) } ) )
101
- . with_tcp ( TcpConfig {
102
- listen_addresses : vec ! [ "/ip6/::1/tcp/0" . parse( ) . unwrap( ) ] ,
103
- ..Default :: default ( )
104
- } )
105
- . with_libp2p_ping ( ping_config)
106
- . build ( ) ,
107
- )
108
- . unwrap ( ) ;
109
-
110
- ( litep2p, executor, ping_event_stream)
84
+ fn make_litep2p ( ) -> ( Litep2p , TaskExecutor , Box < dyn Stream < Item = PingEvent > + Send + Unpin > ) {
85
+ let ( executor, sender) = TaskExecutor :: new ( ) ;
86
+ let ( ping_config, ping_event_stream) = PingConfig :: default ( ) ;
87
+
88
+ let litep2p = Litep2p :: new (
89
+ ConfigBuilder :: new ( )
90
+ . with_executor ( Arc :: new ( TaskExecutorHandle { tx : sender. clone ( ) } ) )
91
+ . with_tcp ( TcpConfig {
92
+ listen_addresses : vec ! [ "/ip6/::1/tcp/0" . parse( ) . unwrap( ) ] ,
93
+ ..Default :: default ( )
94
+ } )
95
+ . with_libp2p_ping ( ping_config)
96
+ . build ( ) ,
97
+ )
98
+ . unwrap ( ) ;
99
+
100
+ ( litep2p, executor, ping_event_stream)
111
101
}
112
102
113
103
#[ tokio:: main]
114
104
async fn main ( ) {
115
- let _ = tracing_subscriber:: fmt ( )
116
- . with_env_filter ( tracing_subscriber:: EnvFilter :: from_default_env ( ) )
117
- . try_init ( ) ;
118
-
119
- // create two identical litep2ps
120
- let ( mut litep2p1, mut executor1, mut ping_event_stream1) = make_litep2p ( ) ;
121
- let ( mut litep2p2, mut executor2, mut ping_event_stream2) = make_litep2p ( ) ;
122
-
123
- // dial `litep2p1`
124
- litep2p2
125
- . dial_address ( litep2p1. listen_addresses ( ) . next ( ) . unwrap ( ) . clone ( ) )
126
- . await
127
- . unwrap ( ) ;
128
-
129
- tokio:: spawn ( async move {
130
- loop {
131
- tokio:: select! {
132
- _ = executor1. next( ) => { }
133
- _ = litep2p1. next_event( ) => { } ,
134
- _ = ping_event_stream1. next( ) => { } ,
135
- }
136
- }
137
- } ) ;
138
-
139
- // poll litep2p, task executor and ping event stream all together
140
- //
141
- // since a custom task executor was provided, it's now the user's responsibility
142
- // to actually make sure to poll those futures so that litep2p can make progress
143
- loop {
144
- tokio:: select! {
145
- _ = executor2. next( ) => { }
146
- _ = litep2p2. next_event( ) => { } ,
147
- event = ping_event_stream2. next( ) => match event {
148
- Some ( PingEvent :: Ping { peer, ping } ) => tracing:: info!(
149
- "ping time with {peer:?}: {ping:?}"
150
- ) ,
151
- _ => { }
152
- }
153
- }
154
- }
105
+ let _ = tracing_subscriber:: fmt ( )
106
+ . with_env_filter ( tracing_subscriber:: EnvFilter :: from_default_env ( ) )
107
+ . try_init ( ) ;
108
+
109
+ // create two identical litep2ps
110
+ let ( mut litep2p1, mut executor1, mut ping_event_stream1) = make_litep2p ( ) ;
111
+ let ( mut litep2p2, mut executor2, mut ping_event_stream2) = make_litep2p ( ) ;
112
+
113
+ // dial `litep2p1`
114
+ litep2p2
115
+ . dial_address ( litep2p1. listen_addresses ( ) . next ( ) . unwrap ( ) . clone ( ) )
116
+ . await
117
+ . unwrap ( ) ;
118
+
119
+ tokio:: spawn ( async move {
120
+ loop {
121
+ tokio:: select! {
122
+ _ = executor1. next( ) => { }
123
+ _ = litep2p1. next_event( ) => { } ,
124
+ _ = ping_event_stream1. next( ) => { } ,
125
+ }
126
+ }
127
+ } ) ;
128
+
129
+ // poll litep2p, task executor and ping event stream all together
130
+ //
131
+ // since a custom task executor was provided, it's now the user's responsibility
132
+ // to actually make sure to poll those futures so that litep2p can make progress
133
+ loop {
134
+ tokio:: select! {
135
+ _ = executor2. next( ) => { }
136
+ _ = litep2p2. next_event( ) => { } ,
137
+ event = ping_event_stream2. next( ) => match event {
138
+ Some ( PingEvent :: Ping { peer, ping } ) => tracing:: info!(
139
+ "ping time with {peer:?}: {ping:?}"
140
+ ) ,
141
+ _ => { }
142
+ }
143
+ }
144
+ }
155
145
}
0 commit comments