Skip to content

Commit

Permalink
RSDK-7282 Log ICE event timings/stats
Browse files Browse the repository at this point in the history
  • Loading branch information
benjirewis committed Jun 20, 2024
1 parent 1065c0e commit 3add9a6
Showing 1 changed file with 56 additions and 3 deletions.
59 changes: 56 additions & 3 deletions src/rpc/dial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use ::http::{
HeaderValue, Version,
};
use ::viam_mdns::{discover, Response};
use ::webrtc::ice_transport::ice_candidate::{RTCIceCandidate, RTCIceCandidateInit};
use ::webrtc::ice_transport::{
ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
ice_connection_state::RTCIceConnectionState,
};
use ::webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use anyhow::{Context, Result};
use core::fmt;
Expand All @@ -34,10 +37,10 @@ use std::{
net::{IpAddr, Ipv4Addr},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
Arc, Mutex, RwLock,
},
task::{Context as TaskContext, Poll},
time::Duration,
time::{Duration, Instant},
};
use tokio::sync::{mpsc, watch};
use tonic::codegen::BoxFuture;
Expand Down Expand Up @@ -869,6 +872,27 @@ async fn send_done_once(
send_done_or_error_update(update_request, channel).await
}

#[derive(Default)]
struct CallerUpdateStats {
count: u128,
total_duration: Duration,
max_duration: Duration,
}

impl fmt::Display for CallerUpdateStats {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let average_duration = &self.total_duration.as_millis() / &self.count;
writeln!(
f,
"Caller update statistics: num_updates {}, average_duration: {}ms, max_duration: {}ms",
&self.count,
average_duration,
&self.max_duration.as_millis()
)?;
Ok(())
}
}

async fn maybe_connect_via_webrtc(
uri: Uri,
channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
Expand Down Expand Up @@ -915,6 +939,7 @@ async fn maybe_connect_via_webrtc(
let (remote_description_set_s, remote_description_set_r) = watch::channel(None);
let ice_done = Arc::new(tokio::sync::Notify::new());
let ice_done2 = ice_done.clone();
let caller_update_stats = Arc::new(Mutex::new(CallerUpdateStats::default()));

if !webrtc_options.disable_trickle_ice {
let offer = peer_connection.create_offer(None).await?;
Expand All @@ -925,6 +950,20 @@ async fn maybe_connect_via_webrtc(
let exchange_done = exchange_done.clone();

let on_local_ice_candidate_failure = is_open_s.clone();

let caller_update_stats = caller_update_stats.clone();
let caller_update_stats2 = caller_update_stats.clone();
peer_connection.on_ice_connection_state_change(Box::new(
move |state: RTCIceConnectionState| {
let caller_update_stats = caller_update_stats.clone();
Box::pin(async move {
if state == RTCIceConnectionState::Completed {
let caller_update_stats_inner = caller_update_stats.lock().unwrap();
log::info!("{}", caller_update_stats_inner);
}
})
},
));
peer_connection.on_ice_candidate(Box::new(
move |ice_candidate: Option<RTCIceCandidate>| {
if exchange_done.load(Ordering::Acquire) {
Expand All @@ -936,6 +975,7 @@ async fn maybe_connect_via_webrtc(
let uuid_lock = uuid_lock2.clone();
let on_local_ice_candidate_failure = on_local_ice_candidate_failure.clone();
let mut remote_description_set_r = remote_description_set_r.clone();
let caller_update_stats = caller_update_stats2.clone();
Box::pin(async move {
// If the value in the watch channel has not been set yet, we wait until it does.
// Afterwards Some(()) should be visible to all watcher and any watcher waiting will
Expand Down Expand Up @@ -965,6 +1005,7 @@ async fn maybe_connect_via_webrtc(
let mut signaling_client = SignalingServiceClient::new(channel.clone());
match ice_candidate {
Some(ice_candidate) => {
log::info!("Gathered local candidate of {}", ice_candidate);
if sent_done_or_error.load(Ordering::Acquire) {
return;
}
Expand All @@ -975,6 +1016,7 @@ async fn maybe_connect_via_webrtc(
uuid: uuid.clone(),
update: Some(Update::Candidate(proto_candidate)),
};
let call_update_start = Instant::now();
if let Err(e) = webrtc_action_with_timeout(
signaling_client.call_update(update_request),
)
Expand All @@ -988,6 +1030,16 @@ async fn maybe_connect_via_webrtc(
)),
));
}
let mut caller_update_stats_inner =
caller_update_stats.lock().unwrap();
caller_update_stats_inner.count += 1;
let call_update_duration = call_update_start.elapsed();
if call_update_duration > caller_update_stats_inner.max_duration
{
caller_update_stats_inner.max_duration =
call_update_duration;
}
caller_update_stats_inner.total_duration = call_update_duration;
}
Err(e) => log::error!("Error parsing ice candidate: {e}"),
}
Expand Down Expand Up @@ -1145,6 +1197,7 @@ async fn maybe_connect_via_webrtc(
break;
}
};
log::info!("Received remote ICE candidate of {:#?}", candidate);
if let Err(e) = client_channel
.base_channel
.peer_connection
Expand Down

0 comments on commit 3add9a6

Please sign in to comment.