28
28
//! Run: `RUST_LOG=info cargo run --example custom_executor`
29
29
30
30
use litep2p:: {
31
- config:: ConfigBuilder ,
32
- executor:: Executor ,
33
- protocol:: libp2p:: ping:: { Config as PingConfig , PingEvent } ,
34
- transport:: tcp:: config:: Config as TcpConfig ,
35
- Litep2p ,
31
+ config:: ConfigBuilder ,
32
+ executor:: Executor ,
33
+ protocol:: libp2p:: ping:: { Config as PingConfig , PingEvent } ,
34
+ transport:: tcp:: config:: Config as TcpConfig ,
35
+ Litep2p ,
36
36
} ;
37
37
38
38
use futures:: { future:: BoxFuture , stream:: FuturesUnordered , Stream , StreamExt } ;
@@ -44,102 +44,112 @@ use std::{future::Future, pin::Pin, sync::Arc};
44
44
///
45
45
/// Just a wrapper around `FuturesUnordered` which receives the futures over `mpsc::Receiver`.
46
46
struct TaskExecutor {
47
- rx : Receiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
48
- futures : FuturesUnordered < BoxFuture < ' static , ( ) > > ,
47
+ rx : Receiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
48
+ futures : FuturesUnordered < BoxFuture < ' static , ( ) > > ,
49
49
}
50
50
51
51
impl TaskExecutor {
52
- /// Create new [`TaskExecutor`].
53
- fn new ( ) -> ( Self , Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ) {
54
- let ( tx, rx) = channel ( 64 ) ;
55
-
56
- ( Self { rx, futures : FuturesUnordered :: new ( ) } , tx)
57
- }
58
-
59
- /// Drive the futures forward and poll the receiver for any new futures.
60
- async fn next ( & mut self ) {
61
- loop {
62
- tokio:: select! {
63
- future = self . rx. recv( ) => self . futures. push( future. unwrap( ) ) ,
64
- _ = self . futures. next( ) , if !self . futures. is_empty( ) => { }
65
- }
66
- }
67
- }
52
+ /// Create new [`TaskExecutor`].
53
+ fn new ( ) -> ( Self , Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ) {
54
+ let ( tx, rx) = channel ( 64 ) ;
55
+
56
+ (
57
+ Self {
58
+ rx,
59
+ futures : FuturesUnordered :: new ( ) ,
60
+ } ,
61
+ tx,
62
+ )
63
+ }
64
+
65
+ /// Drive the futures forward and poll the receiver for any new futures.
66
+ async fn next ( & mut self ) {
67
+ loop {
68
+ tokio:: select! {
69
+ future = self . rx. recv( ) => self . futures. push( future. unwrap( ) ) ,
70
+ _ = self . futures. next( ) , if !self . futures. is_empty( ) => { }
71
+ }
72
+ }
73
+ }
68
74
}
69
75
70
76
struct TaskExecutorHandle {
71
- tx : Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
77
+ tx : Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
72
78
}
73
79
74
80
impl Executor for TaskExecutorHandle {
75
- fn run ( & self , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
76
- let _ = self . tx . try_send ( future) ;
77
- }
81
+ fn run ( & self , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
82
+ let _ = self . tx . try_send ( future) ;
83
+ }
78
84
79
- fn run_with_name ( & self , _: & ' static str , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
80
- let _ = self . tx . try_send ( future) ;
81
- }
85
+ fn run_with_name ( & self , _: & ' static str , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
86
+ let _ = self . tx . try_send ( future) ;
87
+ }
82
88
}
83
89
84
- fn make_litep2p ( ) -> ( Litep2p , TaskExecutor , Box < dyn Stream < Item = PingEvent > + Send + Unpin > ) {
85
- let ( executor, sender) = TaskExecutor :: new ( ) ;
86
- let ( ping_config, ping_event_stream) = PingConfig :: default ( ) ;
87
-
88
- let litep2p = Litep2p :: new (
89
- ConfigBuilder :: new ( )
90
- . with_executor ( Arc :: new ( TaskExecutorHandle { tx : sender. clone ( ) } ) )
91
- . with_tcp ( TcpConfig {
92
- listen_addresses : vec ! [ "/ip6/::1/tcp/0" . parse( ) . unwrap( ) ] ,
93
- ..Default :: default ( )
94
- } )
95
- . with_libp2p_ping ( ping_config)
96
- . build ( ) ,
97
- )
98
- . unwrap ( ) ;
99
-
100
- ( litep2p, executor, ping_event_stream)
90
+ fn make_litep2p ( ) -> (
91
+ Litep2p ,
92
+ TaskExecutor ,
93
+ Box < dyn Stream < Item = PingEvent > + Send + Unpin > ,
94
+ ) {
95
+ let ( executor, sender) = TaskExecutor :: new ( ) ;
96
+ let ( ping_config, ping_event_stream) = PingConfig :: default ( ) ;
97
+
98
+ let litep2p = Litep2p :: new (
99
+ ConfigBuilder :: new ( )
100
+ . with_executor ( Arc :: new ( TaskExecutorHandle { tx : sender. clone ( ) } ) )
101
+ . with_tcp ( TcpConfig {
102
+ listen_addresses : vec ! [ "/ip6/::1/tcp/0" . parse( ) . unwrap( ) ] ,
103
+ ..Default :: default ( )
104
+ } )
105
+ . with_libp2p_ping ( ping_config)
106
+ . build ( ) ,
107
+ )
108
+ . unwrap ( ) ;
109
+
110
+ ( litep2p, executor, ping_event_stream)
101
111
}
102
112
103
113
#[ tokio:: main]
104
114
async fn main ( ) {
105
- let _ = tracing_subscriber:: fmt ( )
106
- . with_env_filter ( tracing_subscriber:: EnvFilter :: from_default_env ( ) )
107
- . try_init ( ) ;
108
-
109
- // create two identical litep2ps
110
- let ( mut litep2p1, mut executor1, mut ping_event_stream1) = make_litep2p ( ) ;
111
- let ( mut litep2p2, mut executor2, mut ping_event_stream2) = make_litep2p ( ) ;
112
-
113
- // dial `litep2p1`
114
- litep2p2
115
- . dial_address ( litep2p1. listen_addresses ( ) . next ( ) . unwrap ( ) . clone ( ) )
116
- . await
117
- . unwrap ( ) ;
118
-
119
- tokio:: spawn ( async move {
120
- loop {
121
- tokio:: select! {
122
- _ = executor1. next( ) => { }
123
- _ = litep2p1. next_event( ) => { } ,
124
- _ = ping_event_stream1. next( ) => { } ,
125
- }
126
- }
127
- } ) ;
128
-
129
- // poll litep2p, task executor and ping event stream all together
130
- //
131
- // since a custom task executor was provided, it's now the user's responsibility
132
- // to actually make sure to poll those futures so that litep2p can make progress
133
- loop {
134
- tokio:: select! {
135
- _ = executor2. next( ) => { }
136
- _ = litep2p2. next_event( ) => { } ,
137
- event = ping_event_stream2. next( ) => match event {
138
- Some ( PingEvent :: Ping { peer, ping } ) => tracing:: info!(
139
- "ping time with {peer:?}: {ping:?}"
140
- ) ,
141
- _ => { }
142
- }
143
- }
144
- }
115
+ let _ = tracing_subscriber:: fmt ( )
116
+ . with_env_filter ( tracing_subscriber:: EnvFilter :: from_default_env ( ) )
117
+ . try_init ( ) ;
118
+
119
+ // create two identical litep2ps
120
+ let ( mut litep2p1, mut executor1, mut ping_event_stream1) = make_litep2p ( ) ;
121
+ let ( mut litep2p2, mut executor2, mut ping_event_stream2) = make_litep2p ( ) ;
122
+
123
+ // dial `litep2p1`
124
+ litep2p2
125
+ . dial_address ( litep2p1. listen_addresses ( ) . next ( ) . unwrap ( ) . clone ( ) )
126
+ . await
127
+ . unwrap ( ) ;
128
+
129
+ tokio:: spawn ( async move {
130
+ loop {
131
+ tokio:: select! {
132
+ _ = executor1. next( ) => { }
133
+ _ = litep2p1. next_event( ) => { } ,
134
+ _ = ping_event_stream1. next( ) => { } ,
135
+ }
136
+ }
137
+ } ) ;
138
+
139
+ // poll litep2p, task executor and ping event stream all together
140
+ //
141
+ // since a custom task executor was provided, it's now the user's responsibility
142
+ // to actually make sure to poll those futures so that litep2p can make progress
143
+ loop {
144
+ tokio:: select! {
145
+ _ = executor2. next( ) => { }
146
+ _ = litep2p2. next_event( ) => { } ,
147
+ event = ping_event_stream2. next( ) => match event {
148
+ Some ( PingEvent :: Ping { peer, ping } ) => tracing:: info!(
149
+ "ping time with {peer:?}: {ping:?}"
150
+ ) ,
151
+ _ => { }
152
+ }
153
+ }
154
+ }
145
155
}
0 commit comments