Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: get transport connection stats #452

Merged
merged 1 commit into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ web-sys = { version = "0.3.56", optional = true, features = [
"RtcIceGatheringState",
"RtcIceCredentialType",
"RtcLifecycleEvent",
"RtcStatsReport",
"console",
"Blob",
] }
Expand Down
14 changes: 14 additions & 0 deletions core/src/transports/default/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,20 @@ impl IceTransportInterface<TransportEvent, AcChannel<TransportEvent>> for Defaul
.map(|pc| pc.ice_connection_state())
}

async fn get_stats(&self) -> Option<Vec<String>> {
let pc = self.get_peer_connection().await?;
let reports = pc.get_stats().await.reports;

Some(
reports
.into_iter()
.map(|x| {
serde_json::to_string(&x).unwrap_or("failed to dump stats entry".to_string())
})
.collect(),
)
}

async fn is_disconnected(&self) -> bool {
matches!(
self.ice_connection_state().await,
Expand Down
4 changes: 4 additions & 0 deletions core/src/transports/dummy/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ impl IceTransportInterface<TransportEvent, AcChannel<TransportEvent>> for DummyT
*self.ice_connection_state.lock().unwrap()
}

async fn get_stats(&self) -> Option<Vec<String>> {
None
}

async fn is_disconnected(&self) -> bool {
matches!(
self.ice_connection_state().await,
Expand Down
28 changes: 27 additions & 1 deletion core/src/transports/wasm/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use web_sys::RtcPeerConnectionIceEvent;
use web_sys::RtcSdpType;
use web_sys::RtcSessionDescription;
use web_sys::RtcSessionDescriptionInit;
use web_sys::RtcStatsReport;

use super::helper::RtcSessionDescriptionWrapper;
use crate::channels::Channel as CbChannel;
Expand Down Expand Up @@ -85,7 +86,7 @@ impl IceTransport for WasmTransport {
type DataChannel = RtcDataChannel;

async fn get_peer_connection(&self) -> Option<Arc<RtcPeerConnection>> {
self.connection.as_ref().map(Arc::clone)
self.connection.clone()
}

async fn get_pending_candidates(&self) -> Vec<RtcIceCandidate> {
Expand Down Expand Up @@ -232,6 +233,25 @@ impl IceTransportInterface<TransportEvent, CbChannel<TransportEvent>> for WasmTr
.map(|pc| pc.ice_connection_state())
}

async fn get_stats(&self) -> Option<Vec<String>> {
let pc = self.get_peer_connection().await?;

let stats: RtcStatsReport = wasm_bindgen_futures::JsFuture::from(pc.get_stats())
.await
.ok()?
.into();

Some(
stats
.entries()
.into_iter()
.map(|x| {
dump_stats_entry(&x.ok()).unwrap_or("failed to dump stats entry".to_string())
})
.collect::<Vec<_>>(),
)
}

async fn is_connected(&self) -> bool {
self.ice_connection_state()
.await
Expand Down Expand Up @@ -679,3 +699,9 @@ impl WasmTransport {
}
}
}

fn dump_stats_entry(entry: &Option<JsValue>) -> Option<String> {
js_sys::JSON::stringify(entry.as_ref()?)
.ok()
.and_then(|x| x.as_string())
}
1 change: 1 addition & 0 deletions core/src/types/ice_transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub trait IceTransportInterface<E: Send, Ch: Channel<E>> {
async fn apply_callback(&self) -> Result<&Self>;
async fn close(&self) -> Result<()>;
async fn ice_connection_state(&self) -> Option<Self::IceConnectionState>;
async fn get_stats(&self) -> Option<Vec<String>>;
async fn is_connected(&self) -> bool;
async fn is_disconnected(&self) -> bool;
async fn send_message(&self, msg: &Bytes) -> Result<()>;
Expand Down
35 changes: 31 additions & 4 deletions node/src/tests/wasm/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,36 @@ async fn create_connection(p1: &Processor, p2: &Processor) {

console_log!("answer_offer");
let (transport_2, answer) = p2.swarm.answer_offer(offer).await.unwrap();

console_log!("accept_answer");
let peer = p1.swarm.accept_answer(answer).await.unwrap();
utils::js_utils::window_sleep(1000).await.unwrap();

loop {
console_log!("waiting for connection");
utils::js_utils::window_sleep(1000).await.unwrap();

console_log!(
"transport_1 state: {:?}",
transport_1.ice_connection_state().await.unwrap()
);
console_log!(
"transport_2 state: {:?}",
transport_2.ice_connection_state().await.unwrap()
);

let s1 = transport_1.get_stats().await.unwrap();
let s2 = transport_2.get_stats().await.unwrap();

console_log!("transport_1 stats: {:?}", s1);
console_log!("transport_2 stats: {:?}", s2);

if transport_1.is_connected().await && transport_2.is_connected().await {
break;
}
}

assert!(peer.1.id.eq(&transport_1.id), "transport not same");

futures::try_join!(
async {
if transport_1.is_connected().await {
Expand Down Expand Up @@ -214,13 +240,14 @@ async fn test_processor_connect_with_did() {
let p3 = new_processor(None).await;
console_log!("p3 address: {}", p3.did());

listen(&p1).await;
listen(&p2).await;
listen(&p3).await;
p1.swarm.clone().listen().await;
p2.swarm.clone().listen().await;
p3.swarm.clone().listen().await;

console_log!("processor_connect_p1_and_p2");
create_connection(&p1, &p2).await;
console_log!("processor_connect_p1_and_p2, done");

console_log!("processor_connect_p2_and_p3");
create_connection(&p2, &p3).await;
console_log!("processor_connect_p2_and_p3, done");
Expand Down