Skip to content

Commit

Permalink
ci: fix dummy transport and add dummy test back (#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 authored Apr 15, 2024
1 parent 6f16630 commit 5f1a104
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 63 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/qaci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ jobs:
- name: Run doc tests
run: cargo test --doc

# - name: Run dummy tests
# run: cargo test -p rings-core --features dummy --verbose
- name: Run dummy tests
run: cargo test -p rings-core --features dummy --verbose

- name: Run tests
run: cargo test --release --all --verbose
Expand Down
10 changes: 3 additions & 7 deletions crates/core/src/message/handlers/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,22 +522,18 @@ pub mod tests {
async fn test_finger_when_disconnect() -> Result<()> {
let key1 = SecretKey::random();
let key2 = SecretKey::random();
let key3 = SecretKey::random();

let node1 = prepare_node(key1).await;
let node2 = prepare_node(key2).await;

// This is only a dummy node for using assert_no_more_msg function
let node3 = prepare_node(key3).await;

{
assert!(node1.dht().lock_finger()?.is_empty());
assert!(node1.dht().lock_finger()?.is_empty());
}

manually_establish_connection(&node1.swarm, &node2.swarm).await;
wait_for_msgs([&node1, &node2, &node3]).await;
assert_no_more_msg([&node1, &node2, &node3]).await;
wait_for_msgs([&node1, &node2]).await;
assert_no_more_msg([&node1, &node2]).await;

node1.assert_transports(vec![node2.did()]);
node2.assert_transports(vec![node1.did()]);
Expand Down Expand Up @@ -571,7 +567,7 @@ pub mod tests {
}
}

assert_no_more_msg([&node1, &node2, &node3]).await;
assert_no_more_msg([&node1, &node2]).await;

node1.assert_transports(vec![]);
node2.assert_transports(vec![]);
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/swarm/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ impl SwarmTransport {
};

if let Err(e) = conn.connection.webrtc_wait_for_data_channel_open().await {
dbg!(&e);
tracing::warn!(
"[get_and_check_connection] connection {peer} data channel not open, will be dropped, reason: {e:?}"
);
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/tests/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ pub async fn wait_for_msgs(nodes: impl IntoIterator<Item = &Node>) {
tokio::select! {
Some(payload) = node.listen_once() => {
println!(
"Msg {} -> {} [{} => {}] : {:?}",
*did_names.get(&payload.signer()).unwrap(),
*did_names.get(&node.did()).unwrap(),
*did_names.get(&payload.transaction.signer()).unwrap(),
*did_names.get(&payload.transaction.destination).unwrap(),
payload.transaction.data::<Message>().unwrap()
"Msg {} -> {} [{} => {}] : {:?}",
*did_names.get(&payload.signer()).unwrap(),
*did_names.get(&node.did()).unwrap(),
*did_names.get(&payload.transaction.signer()).unwrap(),
*did_names.get(&payload.transaction.destination).unwrap(),
payload.transaction.data::<Message>().unwrap()
)
}
_ = sleep(Duration::from_secs(3)) => break
Expand Down
123 changes: 76 additions & 47 deletions crates/transport/src/connections/dummy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use bytes::Bytes;
use dashmap::DashMap;
use lazy_static::lazy_static;
use rand::distributions::Distribution;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use crate::callback::InnerTransportCallback;
use crate::connection_ref::ConnectionRef;
Expand All @@ -22,24 +24,31 @@ use crate::notifier::Notifier;
use crate::pool::Pool;

/// Max delay in ms on sending message
const DUMMY_DELAY_MAX: u64 = 1000;
const DUMMY_DELAY_MAX: u64 = 100;
/// Min delay in ms on sending message
const DUMMY_DELAY_MIN: u64 = 100;
const DUMMY_DELAY_MIN: u64 = 10;
/// Config random delay when send message
const SEND_MESSAGE_DELAY: bool = true;
/// Config random delay when channel opening
const CHANNEL_OPEN_DELAY: bool = false;

lazy_static! {
static ref CBS: DashMap<String, Arc<InnerTransportCallback>> = DashMap::new();
static ref CONNS: DashMap<String, Arc<DummyConnection>> = DashMap::new();
}

enum Event {
PeerConnectionStateChange(WebrtcConnectionState),
DataChannelOpen,
DataChannelClose,
Message(Bytes),
}

/// A dummy connection for local testing.
/// Implements the [ConnectionInterface] trait with no real network.
pub struct DummyConnection {
pub(crate) rand_id: String,
rand_id: String,
callback: InnerTransportCallback,
event_sender: mpsc::UnboundedSender<Event>,
remote_rand_id: Arc<Mutex<Option<String>>>,
event_listener: JoinHandle<()>,
webrtc_connection_state: Arc<Mutex<WebrtcConnectionState>>,
}

Expand All @@ -50,23 +59,45 @@ pub struct DummyTransport {
}

impl DummyConnection {
fn new() -> Self {
fn new(callback: InnerTransportCallback) -> Self {
let rand_id = random(0, 10000000000).to_string();

let (tx, mut rx) = mpsc::unbounded_channel();

let event_listener = {
let rand_id = rand_id.clone();
tokio::spawn(async move {
while let Some(ev) = rx.recv().await {
let conn = { CONNS.get(&rand_id).unwrap().clone() };
conn.handle_event(ev).await;
}
})
};

Self {
rand_id: random(0, 10000000000).to_string(),
remote_rand_id: Arc::new(Mutex::new(None)),
rand_id,
callback,
event_sender: tx,
remote_rand_id: Default::default(),
event_listener,
webrtc_connection_state: Arc::new(Mutex::new(WebrtcConnectionState::New)),
}
}

fn callback(&self) -> Arc<InnerTransportCallback> {
CBS.get(&self.rand_id).unwrap().clone()
}

fn remote_callback(&self) -> Arc<InnerTransportCallback> {
let cid: String = { self.remote_rand_id.lock().unwrap() }.clone().unwrap();
CBS.get(&cid)
.expect(&format!("Failed to get cid {:?}", &cid))
.clone()
async fn handle_event(&self, event: Event) {
match event {
Event::PeerConnectionStateChange(state) => {
self.callback.on_peer_connection_state_change(state).await
}
Event::DataChannelOpen => self.callback.on_data_channel_open().await,
Event::DataChannelClose => self.callback.on_data_channel_close(),
Event::Message(data) => {
if SEND_MESSAGE_DELAY {
random_delay().await;
}
self.callback.on_message(&data).await
}
}
}

fn remote_conn(&self) -> Option<Arc<DummyConnection>> {
Expand All @@ -92,18 +123,19 @@ impl DummyConnection {
*webrtc_connection_state = state;
}

self.callback().on_peer_connection_state_change(state).await;
self.event_sender
.send(Event::PeerConnectionStateChange(state))
.unwrap();

// Simulate the behavior where the data channel is not opened immediately upon connection,
// but rather after a certain number of milliseconds.
if state == WebrtcConnectionState::Connected {
let cb = self.callback();
tokio::spawn(async move {
if CHANNEL_OPEN_DELAY {
random_delay().await;
}
cb.on_data_channel_open().await;
});
self.event_sender.send(Event::DataChannelOpen).unwrap();
}

if matches!(
state,
WebrtcConnectionState::Closed | WebrtcConnectionState::Disconnected
) {
self.event_sender.send(Event::DataChannelClose).unwrap();
}
}
}
Expand All @@ -123,12 +155,15 @@ impl ConnectionInterface for DummyConnection {
type Error = Error;

async fn send_message(&self, msg: TransportMessage) -> Result<()> {
if SEND_MESSAGE_DELAY {
random_delay().await;
}
self.webrtc_wait_for_data_channel_open().await?;

let data = bincode::serialize(&msg).map(Bytes::from)?;
self.remote_callback().on_message(&data).await;
self.remote_conn()
.unwrap()
.event_sender
.send(Event::Message(data))
.unwrap();

Ok(())
}

Expand Down Expand Up @@ -170,10 +205,6 @@ impl ConnectionInterface for DummyConnection {
}

async fn webrtc_wait_for_data_channel_open(&self) -> Result<()> {
if CHANNEL_OPEN_DELAY {
random_delay().await;
}

// Will pass if the state is connecting to prevent release connection in the `test_handshake_on_both_sides` test.
// The connecting state means an offer is answered but not accepted by the other side.
if matches!(
Expand All @@ -189,6 +220,9 @@ impl ConnectionInterface for DummyConnection {
}

async fn close(&self) -> Result<()> {
CONNS.remove(&self.rand_id);
self.event_listener.abort();

self.set_webrtc_connection_state(WebrtcConnectionState::Closed)
.await;

Expand Down Expand Up @@ -225,19 +259,14 @@ impl TransportInterface for DummyTransport {
}
}

let conn = DummyConnection::new();
let conn_rand_id = conn.rand_id.clone();
let inner_callback = InnerTransportCallback::new(cid, callback, Notifier::default());
let conn = DummyConnection::new(inner_callback);

self.pool.safely_insert(cid, conn)?;
CONNS.insert(conn_rand_id.clone(), self.connection(cid)?.upgrade()?);
CBS.insert(
conn_rand_id.clone(),
Arc::new(InnerTransportCallback::new(
cid,
callback,
Notifier::default(),
)),
);

let conn = self.connection(cid)?.upgrade()?;
CONNS.insert(conn.rand_id.clone(), conn);

Ok(())
}

Expand Down

0 comments on commit 5f1a104

Please sign in to comment.