diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 877026df..6da6503e 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -562,7 +562,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e" dependencies = [ "prost 0.11.9", - "prost-types", + "prost-types 0.11.9", "tonic", "tracing-core", ] @@ -580,7 +580,7 @@ dependencies = [ "hdrhistogram", "humantime", "parking_lot", - "prost-types", + "prost-types 0.11.9", "serde", "serde_json", "thread_local", @@ -1842,8 +1842,8 @@ dependencies = [ "parking_lot", "pbjson", "pbjson-types", - "prost 0.11.9", - "prost-types", + "prost 0.12.1", + "prost-types 0.12.1", "serde", "thiserror", "tokio", @@ -2360,37 +2360,37 @@ dependencies = [ [[package]] name = "pbjson" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "048f9ac93c1eab514f9470c4bc8d97ca2a0a236b84f45cc19d69a59fc11467f6" +checksum = "1030c719b0ec2a2d25a5df729d6cff1acf3cc230bf766f4f97833591f7577b90" dependencies = [ - "base64 0.13.1", + "base64 0.21.4", "serde", ] [[package]] name = "pbjson-build" -version = "0.5.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdbb7b706f2afc610f3853550cdbbf6372fd324824a087806bd4480ea4996e24" +checksum = "2580e33f2292d34be285c5bc3dba5259542b083cfad6037b6d70345f24dcb735" dependencies = [ "heck", - "itertools 0.10.5", - "prost 0.11.9", - "prost-types", + "itertools 0.11.0", + "prost 0.12.1", + "prost-types 0.12.1", ] [[package]] name = "pbjson-types" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a88c8d87f99a4ac14325e7a4c24af190fca261956e3b82dd7ed67e77e6c7043" +checksum = "18f596653ba4ac51bdecbb4ef6773bc7f56042dc13927910de1684ad3d32aa12" dependencies = [ "bytes", "chrono", "pbjson", "pbjson-build", - "prost 0.11.9", + "prost 0.12.1", "prost-build", "serde", ] @@ -2488,12 +2488,12 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "prettyplease" -version = "0.1.25" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 1.0.109", + "syn 2.0.37", ] [[package]] @@ -2543,22 +2543,22 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck", - "itertools 0.10.5", - "lazy_static", + "itertools 0.11.0", "log", "multimap", + "once_cell", "petgraph", "prettyplease", - "prost 0.11.9", - "prost-types", + "prost 0.12.1", + "prost-types 0.12.1", "regex", - "syn 1.0.109", + "syn 2.0.37", "tempfile", "which", ] @@ -2598,6 +2598,15 @@ dependencies = [ "prost 0.11.9", ] +[[package]] +name = "prost-types" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf" +dependencies = [ + "prost 0.12.1", +] + [[package]] name = "qoi" version = "0.4.1" diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index bb1d28e2..1f8bc43f 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -18,7 +18,6 @@ use crate::{proto, server, FfiError, FfiHandleId, FfiResult}; use futures_util::StreamExt; use livekit::webrtc::audio_stream::native::NativeAudioStream; use livekit::webrtc::prelude::*; -use log::warn; use tokio::sync::oneshot; pub struct FfiAudioStream { @@ -122,7 +121,7 @@ impl FfiAudioStream { )), }, )).await { - warn!("failed to send audio frame: {}", err); + log::warn!("failed to send audio frame: {}", err); } } } @@ -139,7 +138,7 @@ impl FfiAudioStream { )) .await { - warn!("failed to send EOS: {}", err); + log::warn!("failed to send audio EOS: {}", err); } } } diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 9cd15f87..b2abc4b9 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -311,7 +311,7 @@ impl RoomInner { if inner.pending_unpublished_tracks.lock().remove(&sid) { break; // Event was sent } - tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; } } @@ -425,7 +425,7 @@ async fn forward_event(server: &'static FfiServer, inner: &Arc, event if inner.pending_published_tracks.lock().remove(&sid) { break; } - tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; } let ffi_publication = FfiPublication { diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 6451b313..2bde3aeb 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -18,7 +18,6 @@ use crate::{proto, server, FfiError, FfiHandleId, FfiResult}; use futures_util::StreamExt; use livekit::webrtc::prelude::*; use livekit::webrtc::video_stream::native::NativeVideoStream; -use log::warn; use tokio::sync::oneshot; pub struct FfiVideoStream { @@ -124,7 +123,7 @@ impl FfiVideoStream { )), } )).await{ - warn!("failed to send video frame: {}", err); + log::warn!("failed to send video frame: {}", err); } } } @@ -141,7 +140,7 @@ impl FfiVideoStream { )) .await { - warn!("failed to send video stream ended event: {}", err); + log::warn!("failed to send video EOS: {}", err); } } } diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 0c51175e..0e237418 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -26,6 +26,7 @@ use parking_lot::RwLock; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; +use std::time::Duration; use thiserror::Error; use tokio::sync::Mutex as AsyncMutex; use tokio::sync::{mpsc, oneshot}; @@ -221,6 +222,7 @@ impl Room { let dispatcher = dispatcher.clone(); let e2ee_manager = e2ee_manager.clone(); move |participant, publication| { + log::info!("local track published: {}", publication.sid()); let track = publication.track().unwrap(); let event = RoomEvent::LocalTrackPublished { participant: participant.clone(), @@ -236,6 +238,7 @@ impl Room { let dispatcher = dispatcher.clone(); let e2ee_manager = e2ee_manager.clone(); move |participant, publication| { + log::info!("local track unpublished: {}", publication.sid()); let event = RoomEvent::LocalTrackUnpublished { participant: participant.clone(), publication: publication.clone(), @@ -442,19 +445,34 @@ impl RoomSession { ) { loop { tokio::select! { - res = engine_events.recv() => { - if let Some(event) = res { - if let Err(err) = self.on_engine_event(event).await { + Some(event) = engine_events.recv() => { + let debug = format!("{:?}", event); + let inner = self.clone(); + let (tx, rx) = oneshot::channel(); + let task = tokio::spawn(async move { + if let Err(err) = inner.on_engine_event(event).await { log::error!("failed to handle engine event: {:?}", err); } + let _ = tx.send(()); + }); + + // Monitor sync/async blockings + tokio::select! { + _ = rx => {}, + _ = tokio::time::sleep(Duration::from_secs(10)) => { + log::error!("engine_event is taking too much time: {}", debug); + } } + + task.await.unwrap(); }, _ = &mut close_receiver => { - log::trace!("closing room_task"); break; } } } + + log::debug!("room_task closed"); } async fn on_engine_event(self: &Arc, event: EngineEvent) -> RoomResult<()> { @@ -564,7 +582,6 @@ impl RoomSession { if let Some(remote_participant) = remote_participant { if pi.state == proto::participant_info::State::Disconnected as i32 { // Participant disconnected - log::info!("Participant disconnected: {:?}", participant_sid); self.clone() .handle_participant_disconnect(remote_participant) } else { @@ -573,7 +590,7 @@ impl RoomSession { } } else { // Create a new participant - log::info!("Participant connected: {:?}", participant_sid); + log::info!("new participant: {}", pi.sid); let remote_participant = { let pi = pi.clone(); self.create_participant( @@ -781,7 +798,7 @@ impl RoomSession { } fn handle_disconnected(&self, reason: DisconnectReason) { - log::info!("disconnected from room,: {:?}", reason); + log::info!("disconnected from room: {:?}", reason); if self.update_connection_state(ConnectionState::Disconnected) { self.dispatcher .dispatch(&RoomEvent::Disconnected { reason }); @@ -830,6 +847,7 @@ impl RoomSession { let dispatcher = self.dispatcher.clone(); let e2ee_manager = self.e2ee_manager.clone(); move |participant, publication, track| { + log::info!("track subscribed: {}", track.sid()); let event = RoomEvent::TrackSubscribed { participant: participant.clone(), track: track.clone(), @@ -844,6 +862,7 @@ impl RoomSession { let dispatcher = self.dispatcher.clone(); let e2ee_manager = self.e2ee_manager.clone(); move |participant, publication, track| { + log::info!("track unsubscribed: {}", track.sid()); let event = RoomEvent::TrackUnsubscribed { participant: participant.clone(), track: track.clone(), @@ -888,13 +907,16 @@ impl RoomSession { }); self.participants.write().insert(sid, participant.clone()); - participant } /// A participant has disconnected /// Cleanup the participant and emit an event fn handle_participant_disconnect(self: Arc, remote_participant: RemoteParticipant) { + log::info!( + "handle_participant_disconnect: {}", + remote_participant.sid() + ); for (sid, _) in remote_participant.tracks() { remote_participant.unpublish_track(&sid); } diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index 2a8b3902..c1a127ae 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -195,7 +195,6 @@ impl LocalParticipant { let publication = LocalTrackPublication::new(track_info.clone(), track.clone()); track.update_info(track_info); // Update sid + source - log::debug!("publishing track with cid {:?}", track.rtc_track().id()); let transceiver = self .inner .rtc_engine diff --git a/livekit/src/room/participant/remote_participant.rs b/livekit/src/room/participant/remote_participant.rs index a8f53662..83ee0366 100644 --- a/livekit/src/room/participant/remote_participant.rs +++ b/livekit/src/room/participant/remote_participant.rs @@ -139,8 +139,6 @@ impl RemoteParticipant { track.set_transceiver(Some(transceiver)); - log::debug!("starting track: {:?}", sid); - //track.set_muted(remote_publication.is_muted()); track.update_info(proto::TrackInfo { sid: remote_publication.sid().to_string(), diff --git a/livekit/src/room/track/mod.rs b/livekit/src/room/track/mod.rs index 4c1b5b50..d341ff1a 100644 --- a/livekit/src/room/track/mod.rs +++ b/livekit/src/room/track/mod.rs @@ -166,7 +166,6 @@ pub(super) fn new_inner( /// This is only called for local tracks pub(super) fn set_muted(inner: &Arc, track: &Track, muted: bool) { let info = inner.info.read(); - log::debug!("set_muted: {:?} {:?}", info.sid, muted); if info.muted == muted { return; } diff --git a/livekit/src/rtc_engine/lk_runtime.rs b/livekit/src/rtc_engine/lk_runtime.rs index 4f7cbbb3..4fea0217 100644 --- a/livekit/src/rtc_engine/lk_runtime.rs +++ b/livekit/src/rtc_engine/lk_runtime.rs @@ -38,7 +38,7 @@ impl LkRuntime { if let Some(lk_runtime) = lk_runtime_ref.upgrade() { lk_runtime } else { - log::trace!("LkRuntime::new()"); + log::debug!("LkRuntime::new()"); let new_runtime = Arc::new(Self { pc_factory: PeerConnectionFactory::default(), }); @@ -54,6 +54,6 @@ impl LkRuntime { impl Drop for LkRuntime { fn drop(&mut self) { - log::trace!("LkRuntime::drop()"); + log::debug!("LkRuntime::drop()"); } } diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index ea7f3934..470b1dd5 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -283,19 +283,34 @@ impl EngineInner { ) { loop { tokio::select! { - res = session_events.recv() => { - if let Some(event) = res { - if let Err(err) = self.on_session_event(event).await { + Some(event) = session_events.recv() => { + let debug = format!("{:?}", event); + let inner = self.clone(); + let (tx, rx) = oneshot::channel(); + let task = tokio::spawn(async move { + if let Err(err) = inner.on_session_event(event).await { log::error!("failed to handle session event: {:?}", err); } + let _ = tx.send(()); + }); + + // Monitor sync/async blockings + tokio::select! { + _ = rx => {}, + _ = tokio::time::sleep(Duration::from_secs(10)) => { + log::error!("session_event is taking too much time: {}", debug); + } } + + task.await.unwrap(); }, _ = &mut close_receiver => { - log::trace!("closing engine task"); break; } } } + + log::debug!("engine task closed"); } async fn on_session_event(self: &Arc, event: SessionEvent) -> EngineResult<()> { @@ -492,7 +507,7 @@ impl EngineInner { inner.reconnecting.store(false, Ordering::Release); if res.is_ok() { - log::info!("RtcEngine successfully reconnected") + log::info!("RtcEngine successfully recovered") } else { log::error!("failed to reconnect after {} attempts", RECONNECT_ATTEMPTS); inner.close(DisconnectReason::UnknownReason).await; diff --git a/livekit/src/rtc_engine/peer_transport.rs b/livekit/src/rtc_engine/peer_transport.rs index 2ba64e77..15b02934 100644 --- a/livekit/src/rtc_engine/peer_transport.rs +++ b/livekit/src/rtc_engine/peer_transport.rs @@ -136,7 +136,6 @@ impl PeerTransport { let mut inner = self.inner.lock().await; if options.ice_restart { - log::debug!("restarting ICE"); inner.restarting_ice = true; } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 02ef5631..37ec2e02 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -145,7 +145,7 @@ impl RtcSession { let (signal_client, join_response, signal_events) = SignalClient::connect(url, token, options).await?; let signal_client = Arc::new(signal_client); - log::debug!("received JoinResponse: {:?}", join_response); + log::info!("received JoinResponse: {:?}", join_response); let (rtc_emitter, rtc_events) = mpsc::unbounded_channel(); let rtc_config = make_rtc_config_join(join_response); @@ -303,18 +303,34 @@ impl SessionInner { ) { loop { tokio::select! { - res = rtc_events.recv() => { - if let Some(event) = res { - if let Err(err) = self.on_rtc_event(event).await { + Some(event) = rtc_events.recv() => { + let debug = format!("{:?}", event); + let inner = self.clone(); + let (tx, rx) = oneshot::channel(); + let task = tokio::spawn(async move { + if let Err(err) = inner.on_rtc_event(event).await { log::error!("failed to handle rtc event: {:?}", err); } - } }, - _ = close_rx.changed() => { - log::trace!("closing rtc_session_task"); + let _ = tx.send(()); + }); + + // Monitor sync/async blockings + tokio::select! { + _ = rx => {}, + _ = tokio::time::sleep(Duration::from_secs(10)) => { + log::error!("rtc_event is taking too much time: {}", debug); + } + } + + task.await.unwrap(); + }, + _ = close_rx.changed() => { break; } } } + + log::debug!("rtc_session_task closed"); } async fn signal_task( @@ -324,34 +340,48 @@ impl SessionInner { ) { loop { tokio::select! { - res = signal_events.recv() => { - if let Some(signal) = res { - match signal { - SignalEvent::Message(signal) => { - // Received a signal - if let Err(err) = self.on_signal_event(*signal).await { + Some(signal) = signal_events.recv() => { + match signal { + SignalEvent::Message(signal) => { + let debug = format!("{:?}", signal); + let inner = self.clone(); + let (tx, rx) = oneshot::channel(); + let task = tokio::spawn(async move { + if let Err(err) = inner.on_signal_event(*signal).await { log::error!("failed to handle signal: {:?}", err); } + let _ = tx.send(()); + }); + + // Monitor sync/async blockings + tokio::select! { + _ = rx => {}, + _ = tokio::time::sleep(Duration::from_secs(10)) => { + log::error!("signal_event taking too much time: {}", debug); + } } - SignalEvent::Close => { - // SignalClient has been closed - self.on_session_disconnected( - "SignalClient closed", - DisconnectReason::UnknownReason, - true, - false, - false - ); - } + + task.await.unwrap(); + } + SignalEvent::Close => { + // SignalClient has been closed + self.on_session_disconnected( + "SignalClient closed", + DisconnectReason::UnknownReason, + true, + false, + false + ); } } }, _ = close_rx.changed() => { - log::trace!("closing signal_task"); break; } } } + + log::debug!("closing signal_task"); } async fn on_signal_event(&self, event: proto::signal_response::Message) -> EngineResult<()> {