From 5f1a1040fdbaf39d90cf8b6e3d7efbb62d44ebeb Mon Sep 17 00:00:00 2001 From: magine Date: Tue, 16 Apr 2024 07:55:27 +0800 Subject: [PATCH] ci: fix dummy transport and add dummy test back (#573) --- .github/workflows/qaci.yml | 4 +- .../core/src/message/handlers/connection.rs | 10 +- crates/core/src/swarm/transport.rs | 1 - crates/core/src/tests/default/mod.rs | 12 +- crates/transport/src/connections/dummy/mod.rs | 123 +++++++++++------- 5 files changed, 87 insertions(+), 63 deletions(-) diff --git a/.github/workflows/qaci.yml b/.github/workflows/qaci.yml index 24e01da8d..e486ad84e 100644 --- a/.github/workflows/qaci.yml +++ b/.github/workflows/qaci.yml @@ -91,8 +91,8 @@ jobs: - name: Run doc tests run: cargo test --doc - # - name: Run dummy tests - # run: cargo test -p rings-core --features dummy --verbose + - name: Run dummy tests + run: cargo test -p rings-core --features dummy --verbose - name: Run tests run: cargo test --release --all --verbose diff --git a/crates/core/src/message/handlers/connection.rs b/crates/core/src/message/handlers/connection.rs index 1ec1014ac..fa772b13b 100644 --- a/crates/core/src/message/handlers/connection.rs +++ b/crates/core/src/message/handlers/connection.rs @@ -522,22 +522,18 @@ pub mod tests { async fn test_finger_when_disconnect() -> Result<()> { let key1 = SecretKey::random(); let key2 = SecretKey::random(); - let key3 = SecretKey::random(); let node1 = prepare_node(key1).await; let node2 = prepare_node(key2).await; - // This is only a dummy node for using assert_no_more_msg function - let node3 = prepare_node(key3).await; - { assert!(node1.dht().lock_finger()?.is_empty()); assert!(node1.dht().lock_finger()?.is_empty()); } manually_establish_connection(&node1.swarm, &node2.swarm).await; - wait_for_msgs([&node1, &node2, &node3]).await; - assert_no_more_msg([&node1, &node2, &node3]).await; + wait_for_msgs([&node1, &node2]).await; + assert_no_more_msg([&node1, &node2]).await; node1.assert_transports(vec![node2.did()]); node2.assert_transports(vec![node1.did()]); @@ -571,7 +567,7 @@ pub mod tests { } } - assert_no_more_msg([&node1, &node2, &node3]).await; + assert_no_more_msg([&node1, &node2]).await; node1.assert_transports(vec![]); node2.assert_transports(vec![]); diff --git a/crates/core/src/swarm/transport.rs b/crates/core/src/swarm/transport.rs index cd86bf314..c148c1283 100644 --- a/crates/core/src/swarm/transport.rs +++ b/crates/core/src/swarm/transport.rs @@ -152,7 +152,6 @@ impl SwarmTransport { }; if let Err(e) = conn.connection.webrtc_wait_for_data_channel_open().await { - dbg!(&e); tracing::warn!( "[get_and_check_connection] connection {peer} data channel not open, will be dropped, reason: {e:?}" ); diff --git a/crates/core/src/tests/default/mod.rs b/crates/core/src/tests/default/mod.rs index 2a6edc22f..a7769dc69 100644 --- a/crates/core/src/tests/default/mod.rs +++ b/crates/core/src/tests/default/mod.rs @@ -170,12 +170,12 @@ pub async fn wait_for_msgs(nodes: impl IntoIterator) { tokio::select! { Some(payload) = node.listen_once() => { println!( - "Msg {} -> {} [{} => {}] : {:?}", - *did_names.get(&payload.signer()).unwrap(), - *did_names.get(&node.did()).unwrap(), - *did_names.get(&payload.transaction.signer()).unwrap(), - *did_names.get(&payload.transaction.destination).unwrap(), - payload.transaction.data::().unwrap() + "Msg {} -> {} [{} => {}] : {:?}", + *did_names.get(&payload.signer()).unwrap(), + *did_names.get(&node.did()).unwrap(), + *did_names.get(&payload.transaction.signer()).unwrap(), + *did_names.get(&payload.transaction.destination).unwrap(), + payload.transaction.data::().unwrap() ) } _ = sleep(Duration::from_secs(3)) => break diff --git a/crates/transport/src/connections/dummy/mod.rs b/crates/transport/src/connections/dummy/mod.rs index a6d7f5d6f..d03df3790 100644 --- a/crates/transport/src/connections/dummy/mod.rs +++ b/crates/transport/src/connections/dummy/mod.rs @@ -7,6 +7,8 @@ use bytes::Bytes; use dashmap::DashMap; use lazy_static::lazy_static; use rand::distributions::Distribution; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; use crate::callback::InnerTransportCallback; use crate::connection_ref::ConnectionRef; @@ -22,24 +24,31 @@ use crate::notifier::Notifier; use crate::pool::Pool; /// Max delay in ms on sending message -const DUMMY_DELAY_MAX: u64 = 1000; +const DUMMY_DELAY_MAX: u64 = 100; /// Min delay in ms on sending message -const DUMMY_DELAY_MIN: u64 = 100; +const DUMMY_DELAY_MIN: u64 = 10; /// Config random delay when send message const SEND_MESSAGE_DELAY: bool = true; -/// Config random delay when channel opening -const CHANNEL_OPEN_DELAY: bool = false; lazy_static! { - static ref CBS: DashMap> = DashMap::new(); static ref CONNS: DashMap> = DashMap::new(); } +enum Event { + PeerConnectionStateChange(WebrtcConnectionState), + DataChannelOpen, + DataChannelClose, + Message(Bytes), +} + /// A dummy connection for local testing. /// Implements the [ConnectionInterface] trait with no real network. pub struct DummyConnection { - pub(crate) rand_id: String, + rand_id: String, + callback: InnerTransportCallback, + event_sender: mpsc::UnboundedSender, remote_rand_id: Arc>>, + event_listener: JoinHandle<()>, webrtc_connection_state: Arc>, } @@ -50,23 +59,45 @@ pub struct DummyTransport { } impl DummyConnection { - fn new() -> Self { + fn new(callback: InnerTransportCallback) -> Self { + let rand_id = random(0, 10000000000).to_string(); + + let (tx, mut rx) = mpsc::unbounded_channel(); + + let event_listener = { + let rand_id = rand_id.clone(); + tokio::spawn(async move { + while let Some(ev) = rx.recv().await { + let conn = { CONNS.get(&rand_id).unwrap().clone() }; + conn.handle_event(ev).await; + } + }) + }; + Self { - rand_id: random(0, 10000000000).to_string(), - remote_rand_id: Arc::new(Mutex::new(None)), + rand_id, + callback, + event_sender: tx, + remote_rand_id: Default::default(), + event_listener, webrtc_connection_state: Arc::new(Mutex::new(WebrtcConnectionState::New)), } } - fn callback(&self) -> Arc { - CBS.get(&self.rand_id).unwrap().clone() - } - - fn remote_callback(&self) -> Arc { - let cid: String = { self.remote_rand_id.lock().unwrap() }.clone().unwrap(); - CBS.get(&cid) - .expect(&format!("Failed to get cid {:?}", &cid)) - .clone() + async fn handle_event(&self, event: Event) { + match event { + Event::PeerConnectionStateChange(state) => { + self.callback.on_peer_connection_state_change(state).await + } + Event::DataChannelOpen => self.callback.on_data_channel_open().await, + Event::DataChannelClose => self.callback.on_data_channel_close(), + Event::Message(data) => { + if SEND_MESSAGE_DELAY { + random_delay().await; + } + self.callback.on_message(&data).await + } + } } fn remote_conn(&self) -> Option> { @@ -92,18 +123,19 @@ impl DummyConnection { *webrtc_connection_state = state; } - self.callback().on_peer_connection_state_change(state).await; + self.event_sender + .send(Event::PeerConnectionStateChange(state)) + .unwrap(); - // Simulate the behavior where the data channel is not opened immediately upon connection, - // but rather after a certain number of milliseconds. if state == WebrtcConnectionState::Connected { - let cb = self.callback(); - tokio::spawn(async move { - if CHANNEL_OPEN_DELAY { - random_delay().await; - } - cb.on_data_channel_open().await; - }); + self.event_sender.send(Event::DataChannelOpen).unwrap(); + } + + if matches!( + state, + WebrtcConnectionState::Closed | WebrtcConnectionState::Disconnected + ) { + self.event_sender.send(Event::DataChannelClose).unwrap(); } } } @@ -123,12 +155,15 @@ impl ConnectionInterface for DummyConnection { type Error = Error; async fn send_message(&self, msg: TransportMessage) -> Result<()> { - if SEND_MESSAGE_DELAY { - random_delay().await; - } self.webrtc_wait_for_data_channel_open().await?; + let data = bincode::serialize(&msg).map(Bytes::from)?; - self.remote_callback().on_message(&data).await; + self.remote_conn() + .unwrap() + .event_sender + .send(Event::Message(data)) + .unwrap(); + Ok(()) } @@ -170,10 +205,6 @@ impl ConnectionInterface for DummyConnection { } async fn webrtc_wait_for_data_channel_open(&self) -> Result<()> { - if CHANNEL_OPEN_DELAY { - random_delay().await; - } - // Will pass if the state is connecting to prevent release connection in the `test_handshake_on_both_sides` test. // The connecting state means an offer is answered but not accepted by the other side. if matches!( @@ -189,6 +220,9 @@ impl ConnectionInterface for DummyConnection { } async fn close(&self) -> Result<()> { + CONNS.remove(&self.rand_id); + self.event_listener.abort(); + self.set_webrtc_connection_state(WebrtcConnectionState::Closed) .await; @@ -225,19 +259,14 @@ impl TransportInterface for DummyTransport { } } - let conn = DummyConnection::new(); - let conn_rand_id = conn.rand_id.clone(); + let inner_callback = InnerTransportCallback::new(cid, callback, Notifier::default()); + let conn = DummyConnection::new(inner_callback); self.pool.safely_insert(cid, conn)?; - CONNS.insert(conn_rand_id.clone(), self.connection(cid)?.upgrade()?); - CBS.insert( - conn_rand_id.clone(), - Arc::new(InnerTransportCallback::new( - cid, - callback, - Notifier::default(), - )), - ); + + let conn = self.connection(cid)?.upgrade()?; + CONNS.insert(conn.rand_id.clone(), conn); + Ok(()) }