Skip to content

Commit

Permalink
feat: add more logs & monitor stuck tasks (#197)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Sep 23, 2023
1 parent a7740ab commit a16d240
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 77 deletions.
59 changes: 34 additions & 25 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions livekit-ffi/src/server/audio_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::{proto, server, FfiError, FfiHandleId, FfiResult};
use futures_util::StreamExt;
use livekit::webrtc::audio_stream::native::NativeAudioStream;
use livekit::webrtc::prelude::*;
use log::warn;
use tokio::sync::oneshot;

pub struct FfiAudioStream {
Expand Down Expand Up @@ -122,7 +121,7 @@ impl FfiAudioStream {
)),
},
)).await {
warn!("failed to send audio frame: {}", err);
log::warn!("failed to send audio frame: {}", err);
}
}
}
Expand All @@ -139,7 +138,7 @@ impl FfiAudioStream {
))
.await
{
warn!("failed to send EOS: {}", err);
log::warn!("failed to send audio EOS: {}", err);
}
}
}
4 changes: 2 additions & 2 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl RoomInner {
if inner.pending_unpublished_tracks.lock().remove(&sid) {
break; // Event was sent
}
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
}

Expand Down Expand Up @@ -425,7 +425,7 @@ async fn forward_event(server: &'static FfiServer, inner: &Arc<RoomInner>, event
if inner.pending_published_tracks.lock().remove(&sid) {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}

let ffi_publication = FfiPublication {
Expand Down
5 changes: 2 additions & 3 deletions livekit-ffi/src/server/video_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::{proto, server, FfiError, FfiHandleId, FfiResult};
use futures_util::StreamExt;
use livekit::webrtc::prelude::*;
use livekit::webrtc::video_stream::native::NativeVideoStream;
use log::warn;
use tokio::sync::oneshot;

pub struct FfiVideoStream {
Expand Down Expand Up @@ -124,7 +123,7 @@ impl FfiVideoStream {
)),
}
)).await{
warn!("failed to send video frame: {}", err);
log::warn!("failed to send video frame: {}", err);
}
}
}
Expand All @@ -141,7 +140,7 @@ impl FfiVideoStream {
))
.await
{
warn!("failed to send video stream ended event: {}", err);
log::warn!("failed to send video EOS: {}", err);
}
}
}
38 changes: 30 additions & 8 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use parking_lot::RwLock;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::Mutex as AsyncMutex;
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -221,6 +222,7 @@ impl Room {
let dispatcher = dispatcher.clone();
let e2ee_manager = e2ee_manager.clone();
move |participant, publication| {
log::info!("local track published: {}", publication.sid());
let track = publication.track().unwrap();
let event = RoomEvent::LocalTrackPublished {
participant: participant.clone(),
Expand All @@ -236,6 +238,7 @@ impl Room {
let dispatcher = dispatcher.clone();
let e2ee_manager = e2ee_manager.clone();
move |participant, publication| {
log::info!("local track unpublished: {}", publication.sid());
let event = RoomEvent::LocalTrackUnpublished {
participant: participant.clone(),
publication: publication.clone(),
Expand Down Expand Up @@ -442,19 +445,34 @@ impl RoomSession {
) {
loop {
tokio::select! {
res = engine_events.recv() => {
if let Some(event) = res {
if let Err(err) = self.on_engine_event(event).await {
Some(event) = engine_events.recv() => {
let debug = format!("{:?}", event);
let inner = self.clone();
let (tx, rx) = oneshot::channel();
let task = tokio::spawn(async move {
if let Err(err) = inner.on_engine_event(event).await {
log::error!("failed to handle engine event: {:?}", err);
}
let _ = tx.send(());
});

// Monitor sync/async blockings
tokio::select! {
_ = rx => {},
_ = tokio::time::sleep(Duration::from_secs(10)) => {
log::error!("engine_event is taking too much time: {}", debug);
}
}

task.await.unwrap();
},
_ = &mut close_receiver => {
log::trace!("closing room_task");
break;
}
}
}

log::debug!("room_task closed");
}

async fn on_engine_event(self: &Arc<Self>, event: EngineEvent) -> RoomResult<()> {
Expand Down Expand Up @@ -564,7 +582,6 @@ impl RoomSession {
if let Some(remote_participant) = remote_participant {
if pi.state == proto::participant_info::State::Disconnected as i32 {
// Participant disconnected
log::info!("Participant disconnected: {:?}", participant_sid);
self.clone()
.handle_participant_disconnect(remote_participant)
} else {
Expand All @@ -573,7 +590,7 @@ impl RoomSession {
}
} else {
// Create a new participant
log::info!("Participant connected: {:?}", participant_sid);
log::info!("new participant: {}", pi.sid);
let remote_participant = {
let pi = pi.clone();
self.create_participant(
Expand Down Expand Up @@ -781,7 +798,7 @@ impl RoomSession {
}

fn handle_disconnected(&self, reason: DisconnectReason) {
log::info!("disconnected from room,: {:?}", reason);
log::info!("disconnected from room: {:?}", reason);
if self.update_connection_state(ConnectionState::Disconnected) {
self.dispatcher
.dispatch(&RoomEvent::Disconnected { reason });
Expand Down Expand Up @@ -830,6 +847,7 @@ impl RoomSession {
let dispatcher = self.dispatcher.clone();
let e2ee_manager = self.e2ee_manager.clone();
move |participant, publication, track| {
log::info!("track subscribed: {}", track.sid());
let event = RoomEvent::TrackSubscribed {
participant: participant.clone(),
track: track.clone(),
Expand All @@ -844,6 +862,7 @@ impl RoomSession {
let dispatcher = self.dispatcher.clone();
let e2ee_manager = self.e2ee_manager.clone();
move |participant, publication, track| {
log::info!("track unsubscribed: {}", track.sid());
let event = RoomEvent::TrackUnsubscribed {
participant: participant.clone(),
track: track.clone(),
Expand Down Expand Up @@ -888,13 +907,16 @@ impl RoomSession {
});

self.participants.write().insert(sid, participant.clone());

participant
}

/// A participant has disconnected
/// Cleanup the participant and emit an event
fn handle_participant_disconnect(self: Arc<Self>, remote_participant: RemoteParticipant) {
log::info!(
"handle_participant_disconnect: {}",
remote_participant.sid()
);
for (sid, _) in remote_participant.tracks() {
remote_participant.unpublish_track(&sid);
}
Expand Down
1 change: 0 additions & 1 deletion livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ impl LocalParticipant {
let publication = LocalTrackPublication::new(track_info.clone(), track.clone());
track.update_info(track_info); // Update sid + source

log::debug!("publishing track with cid {:?}", track.rtc_track().id());
let transceiver = self
.inner
.rtc_engine
Expand Down
2 changes: 0 additions & 2 deletions livekit/src/room/participant/remote_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ impl RemoteParticipant {

track.set_transceiver(Some(transceiver));

log::debug!("starting track: {:?}", sid);

//track.set_muted(remote_publication.is_muted());
track.update_info(proto::TrackInfo {
sid: remote_publication.sid().to_string(),
Expand Down
1 change: 0 additions & 1 deletion livekit/src/room/track/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ pub(super) fn new_inner(
/// This is only called for local tracks
pub(super) fn set_muted(inner: &Arc<TrackInner>, track: &Track, muted: bool) {
let info = inner.info.read();
log::debug!("set_muted: {:?} {:?}", info.sid, muted);
if info.muted == muted {
return;
}
Expand Down
4 changes: 2 additions & 2 deletions livekit/src/rtc_engine/lk_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl LkRuntime {
if let Some(lk_runtime) = lk_runtime_ref.upgrade() {
lk_runtime
} else {
log::trace!("LkRuntime::new()");
log::debug!("LkRuntime::new()");
let new_runtime = Arc::new(Self {
pc_factory: PeerConnectionFactory::default(),
});
Expand All @@ -54,6 +54,6 @@ impl LkRuntime {

impl Drop for LkRuntime {
fn drop(&mut self) {
log::trace!("LkRuntime::drop()");
log::debug!("LkRuntime::drop()");
}
}
Loading

0 comments on commit a16d240

Please sign in to comment.