Skip to content

Commit

Permalink
feature: get transport connection stats (#452)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 authored Jul 6, 2023
1 parent 9c40586 commit ee5ab25
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 5 deletions.
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ web-sys = { version = "0.3.56", optional = true, features = [
"RtcIceGatheringState",
"RtcIceCredentialType",
"RtcLifecycleEvent",
"RtcStatsReport",
"console",
"Blob",
] }
Expand Down
14 changes: 14 additions & 0 deletions core/src/transports/default/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,20 @@ impl IceTransportInterface<TransportEvent, AcChannel<TransportEvent>> for Defaul
.map(|pc| pc.ice_connection_state())
}

async fn get_stats(&self) -> Option<Vec<String>> {
let pc = self.get_peer_connection().await?;
let reports = pc.get_stats().await.reports;

Some(
reports
.into_iter()
.map(|x| {
serde_json::to_string(&x).unwrap_or("failed to dump stats entry".to_string())
})
.collect(),
)
}

async fn is_disconnected(&self) -> bool {
matches!(
self.ice_connection_state().await,
Expand Down
4 changes: 4 additions & 0 deletions core/src/transports/dummy/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ impl IceTransportInterface<TransportEvent, AcChannel<TransportEvent>> for DummyT
*self.ice_connection_state.lock().unwrap()
}

async fn get_stats(&self) -> Option<Vec<String>> {
None
}

async fn is_disconnected(&self) -> bool {
matches!(
self.ice_connection_state().await,
Expand Down
28 changes: 27 additions & 1 deletion core/src/transports/wasm/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use web_sys::RtcPeerConnectionIceEvent;
use web_sys::RtcSdpType;
use web_sys::RtcSessionDescription;
use web_sys::RtcSessionDescriptionInit;
use web_sys::RtcStatsReport;

use super::helper::RtcSessionDescriptionWrapper;
use crate::channels::Channel as CbChannel;
Expand Down Expand Up @@ -85,7 +86,7 @@ impl IceTransport for WasmTransport {
type DataChannel = RtcDataChannel;

async fn get_peer_connection(&self) -> Option<Arc<RtcPeerConnection>> {
self.connection.as_ref().map(Arc::clone)
self.connection.clone()
}

async fn get_pending_candidates(&self) -> Vec<RtcIceCandidate> {
Expand Down Expand Up @@ -232,6 +233,25 @@ impl IceTransportInterface<TransportEvent, CbChannel<TransportEvent>> for WasmTr
.map(|pc| pc.ice_connection_state())
}

async fn get_stats(&self) -> Option<Vec<String>> {
let pc = self.get_peer_connection().await?;

let stats: RtcStatsReport = wasm_bindgen_futures::JsFuture::from(pc.get_stats())
.await
.ok()?
.into();

Some(
stats
.entries()
.into_iter()
.map(|x| {
dump_stats_entry(&x.ok()).unwrap_or("failed to dump stats entry".to_string())
})
.collect::<Vec<_>>(),
)
}

async fn is_connected(&self) -> bool {
self.ice_connection_state()
.await
Expand Down Expand Up @@ -679,3 +699,9 @@ impl WasmTransport {
}
}
}

fn dump_stats_entry(entry: &Option<JsValue>) -> Option<String> {
js_sys::JSON::stringify(entry.as_ref()?)
.ok()
.and_then(|x| x.as_string())
}
1 change: 1 addition & 0 deletions core/src/types/ice_transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub trait IceTransportInterface<E: Send, Ch: Channel<E>> {
async fn apply_callback(&self) -> Result<&Self>;
async fn close(&self) -> Result<()>;
async fn ice_connection_state(&self) -> Option<Self::IceConnectionState>;
async fn get_stats(&self) -> Option<Vec<String>>;
async fn is_connected(&self) -> bool;
async fn is_disconnected(&self) -> bool;
async fn send_message(&self, msg: &Bytes) -> Result<()>;
Expand Down
35 changes: 31 additions & 4 deletions node/src/tests/wasm/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,36 @@ async fn create_connection(p1: &Processor, p2: &Processor) {

console_log!("answer_offer");
let (transport_2, answer) = p2.swarm.answer_offer(offer).await.unwrap();

console_log!("accept_answer");
let peer = p1.swarm.accept_answer(answer).await.unwrap();
utils::js_utils::window_sleep(1000).await.unwrap();

loop {
console_log!("waiting for connection");
utils::js_utils::window_sleep(1000).await.unwrap();

console_log!(
"transport_1 state: {:?}",
transport_1.ice_connection_state().await.unwrap()
);
console_log!(
"transport_2 state: {:?}",
transport_2.ice_connection_state().await.unwrap()
);

let s1 = transport_1.get_stats().await.unwrap();
let s2 = transport_2.get_stats().await.unwrap();

console_log!("transport_1 stats: {:?}", s1);
console_log!("transport_2 stats: {:?}", s2);

if transport_1.is_connected().await && transport_2.is_connected().await {
break;
}
}

assert!(peer.1.id.eq(&transport_1.id), "transport not same");

futures::try_join!(
async {
if transport_1.is_connected().await {
Expand Down Expand Up @@ -214,13 +240,14 @@ async fn test_processor_connect_with_did() {
let p3 = new_processor(None).await;
console_log!("p3 address: {}", p3.did());

listen(&p1).await;
listen(&p2).await;
listen(&p3).await;
p1.swarm.clone().listen().await;
p2.swarm.clone().listen().await;
p3.swarm.clone().listen().await;

console_log!("processor_connect_p1_and_p2");
create_connection(&p1, &p2).await;
console_log!("processor_connect_p1_and_p2, done");

console_log!("processor_connect_p2_and_p3");
create_connection(&p2, &p3).await;
console_log!("processor_connect_p2_and_p3, done");
Expand Down

0 comments on commit ee5ab25

Please sign in to comment.