From 05d0083b2952449bf8554f7daf7b5fb6511f73dc Mon Sep 17 00:00:00 2001 From: magine Date: Thu, 6 Jul 2023 12:47:28 +0800 Subject: [PATCH] feature: get transport connection stats --- core/Cargo.toml | 1 + core/src/transports/default/transport.rs | 14 ++++++++++ core/src/transports/wasm/transport.rs | 28 ++++++++++++++++++- core/src/types/ice_transport/mod.rs | 1 + node/src/tests/wasm/processor.rs | 35 +++++++++++++++++++++--- 5 files changed, 74 insertions(+), 5 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 3e439ed5f..4f00ef20c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -118,6 +118,7 @@ web-sys = { version = "0.3.56", optional = true, features = [ "RtcIceGatheringState", "RtcIceCredentialType", "RtcLifecycleEvent", + "RtcStatsReport", "console", "Blob", ] } diff --git a/core/src/transports/default/transport.rs b/core/src/transports/default/transport.rs index 939e7c888..949bddfea 100644 --- a/core/src/transports/default/transport.rs +++ b/core/src/transports/default/transport.rs @@ -223,6 +223,20 @@ impl IceTransportInterface> for Defaul .map(|pc| pc.ice_connection_state()) } + async fn get_stats(&self) -> Option> { + let pc = self.get_peer_connection().await?; + let reports = pc.get_stats().await.reports; + + Some( + reports + .into_iter() + .map(|x| { + serde_json::to_string(&x).unwrap_or("failed to dump stats entry".to_string()) + }) + .collect(), + ) + } + async fn is_disconnected(&self) -> bool { matches!( self.ice_connection_state().await, diff --git a/core/src/transports/wasm/transport.rs b/core/src/transports/wasm/transport.rs index bab0041b1..3bfe47ae4 100644 --- a/core/src/transports/wasm/transport.rs +++ b/core/src/transports/wasm/transport.rs @@ -24,6 +24,7 @@ use web_sys::RtcPeerConnectionIceEvent; use web_sys::RtcSdpType; use web_sys::RtcSessionDescription; use web_sys::RtcSessionDescriptionInit; +use web_sys::RtcStatsReport; use super::helper::RtcSessionDescriptionWrapper; use crate::channels::Channel as CbChannel; @@ -85,7 +86,7 @@ impl IceTransport for WasmTransport { type DataChannel = RtcDataChannel; async fn get_peer_connection(&self) -> Option> { - self.connection.as_ref().map(Arc::clone) + self.connection.clone() } async fn get_pending_candidates(&self) -> Vec { @@ -232,6 +233,25 @@ impl IceTransportInterface> for WasmTr .map(|pc| pc.ice_connection_state()) } + async fn get_stats(&self) -> Option> { + let pc = self.get_peer_connection().await?; + + let stats: RtcStatsReport = wasm_bindgen_futures::JsFuture::from(pc.get_stats()) + .await + .ok()? + .into(); + + Some( + stats + .entries() + .into_iter() + .map(|x| { + dump_stats_entry(&x.ok()).unwrap_or("failed to dump stats entry".to_string()) + }) + .collect::>(), + ) + } + async fn is_connected(&self) -> bool { self.ice_connection_state() .await @@ -679,3 +699,9 @@ impl WasmTransport { } } } + +fn dump_stats_entry(entry: &Option) -> Option { + js_sys::JSON::stringify(entry.as_ref()?) + .ok() + .and_then(|x| x.as_string()) +} diff --git a/core/src/types/ice_transport/mod.rs b/core/src/types/ice_transport/mod.rs index 0ea2f4788..149936c79 100644 --- a/core/src/types/ice_transport/mod.rs +++ b/core/src/types/ice_transport/mod.rs @@ -63,6 +63,7 @@ pub trait IceTransportInterface> { async fn apply_callback(&self) -> Result<&Self>; async fn close(&self) -> Result<()>; async fn ice_connection_state(&self) -> Option; + async fn get_stats(&self) -> Option>; async fn is_connected(&self) -> bool; async fn is_disconnected(&self) -> bool; async fn send_message(&self, msg: &Bytes) -> Result<()>; diff --git a/node/src/tests/wasm/processor.rs b/node/src/tests/wasm/processor.rs index 720220632..6c391ddcc 100644 --- a/node/src/tests/wasm/processor.rs +++ b/node/src/tests/wasm/processor.rs @@ -94,10 +94,36 @@ async fn create_connection(p1: &Processor, p2: &Processor) { console_log!("answer_offer"); let (transport_2, answer) = p2.swarm.answer_offer(offer).await.unwrap(); + console_log!("accept_answer"); let peer = p1.swarm.accept_answer(answer).await.unwrap(); - utils::js_utils::window_sleep(1000).await.unwrap(); + + loop { + console_log!("waiting for connection"); + utils::js_utils::window_sleep(1000).await.unwrap(); + + console_log!( + "transport_1 state: {:?}", + transport_1.ice_connection_state().await.unwrap() + ); + console_log!( + "transport_2 state: {:?}", + transport_2.ice_connection_state().await.unwrap() + ); + + let s1 = transport_1.get_stats().await.unwrap(); + let s2 = transport_2.get_stats().await.unwrap(); + + console_log!("transport_1 stats: {:?}", s1); + console_log!("transport_2 stats: {:?}", s2); + + if transport_1.is_connected().await && transport_2.is_connected().await { + break; + } + } + assert!(peer.1.id.eq(&transport_1.id), "transport not same"); + futures::try_join!( async { if transport_1.is_connected().await { @@ -214,13 +240,14 @@ async fn test_processor_connect_with_did() { let p3 = new_processor(None).await; console_log!("p3 address: {}", p3.did()); - listen(&p1).await; - listen(&p2).await; - listen(&p3).await; + p1.swarm.clone().listen().await; + p2.swarm.clone().listen().await; + p3.swarm.clone().listen().await; console_log!("processor_connect_p1_and_p2"); create_connection(&p1, &p2).await; console_log!("processor_connect_p1_and_p2, done"); + console_log!("processor_connect_p2_and_p3"); create_connection(&p2, &p3).await; console_log!("processor_connect_p2_and_p3, done");