diff --git a/Cargo.lock b/Cargo.lock index 00e3c50a..b043f99c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,12 +49,6 @@ version = "1.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b13c32d80ecc7ab747b80c3784bce54ee8a7a0cc4fbda9bf4cda2cf6fe90854" -[[package]] -name = "arc-swap" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" - [[package]] name = "async-trait" version = "0.1.72" @@ -995,7 +989,6 @@ checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" name = "livekit" version = "0.2.0" dependencies = [ - "arc-swap", "futures-util", "lazy_static", "livekit-api", @@ -1679,7 +1672,8 @@ dependencies = [ [[package]] name = "ring" version = "0.16.20" -source = "git+https://github.com/1Password/ring?branch=1p/release-0.16#d2cc20fd523abe5aef18bb67e3df49654fa9376b" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" dependencies = [ "cc", "libc", @@ -1953,9 +1947,9 @@ dependencies = [ [[package]] name = "spin" -version = "0.9.8" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "subtle" diff --git a/Cargo.toml b/Cargo.toml index 606b8eda..8ca61b59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,3 @@ members = [ "webrtc-sys", "webrtc-sys/build", ] - -[patch.crates-io] -ring = { git = "https://github.com/1Password/ring", branch = "1p/release-0.16" } diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 71248b37..e319fe4c 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -167,6 +167,12 @@ dependencies = [ "x11rb", ] +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + [[package]] name = "arrayref" version = "0.3.7" @@ -1780,6 +1786,7 @@ checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" name = "livekit" version = "0.2.0" dependencies = [ + "arc-swap", "futures-util", "lazy_static", "livekit-api", diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index c6aff1ab..f3f582b2 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -31,4 +31,3 @@ futures-util = { version = "0.3", default-features = false, features = ["sink"] thiserror = "1.0" lazy_static = "1.4" log = "0.4" -arc-swap = "1.6.0" diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index cc36b45d..6fa4a3d9 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -189,9 +189,7 @@ impl Room { token: &str, options: RoomOptions, ) -> RoomResult<(Self, mpsc::UnboundedReceiver)> { - let e2ee_options = options.e2ee.clone(); - - let e2ee_manager = E2eeManager::new(e2ee_options); + let e2ee_manager = E2eeManager::new(options.e2ee.clone()); let (rtc_engine, engine_events) = RtcEngine::connect( url, token, @@ -282,66 +280,6 @@ impl Room { e2ee_manager: e2ee_manager.clone(), }); - rtc_engine.on_resuming({ - let inner = inner.clone(); - move || { - let inner = inner.clone(); - Box::pin(async move { - inner.handle_resuming().await; - }) - } - }); - - rtc_engine.on_resumed({ - let inner = inner.clone(); - move || { - let inner = inner.clone(); - Box::pin(async move { - inner.handle_resumed().await; - }) - } - }); - - rtc_engine.on_signal_resumed({ - let inner = inner.clone(); - move || { - let inner = inner.clone(); - Box::pin(async move { - inner.handle_signal_resumed().await; - }) - } - }); - - rtc_engine.on_restarting({ - let inner = inner.clone(); - move || { - let inner = inner.clone(); - Box::pin(async move { - inner.handle_restarting().await; - }) - } - }); - - rtc_engine.on_restarted({ - let inner = inner.clone(); - move || { - let inner = inner.clone(); - Box::pin(async move { - inner.handle_restarted().await; - }) - } - }); - - rtc_engine.on_signal_restarted({ - let inner = inner.clone(); - move || { - let inner = inner.clone(); - Box::pin(async move { - inner.handle_signal_restarted().await; - }) - } - }); - e2ee_manager.on_state_changed({ let dispatcher = dispatcher.clone(); let inner = inner.clone(); @@ -553,6 +491,12 @@ impl RoomSession { )))?; } } + EngineEvent::Resuming(tx) => self.handle_resuming(tx).await, + EngineEvent::Resumed(tx) => self.handle_resumed(tx).await, + EngineEvent::SignalResumed(tx) => self.handle_signal_resumed(tx).await, + EngineEvent::Restarting(tx) => self.handle_restarting(tx).await, + EngineEvent::Restarted(tx) => self.handle_restarted(tx).await, + EngineEvent::SignalRestarted(tx) => self.handle_signal_restarted(tx).await, EngineEvent::Disconnected { reason } => self.handle_disconnected(reason), EngineEvent::Data { payload, @@ -750,22 +694,29 @@ impl RoomSession { } } - async fn handle_resuming(self: &Arc) { + async fn handle_resuming(self: &Arc, tx: oneshot::Sender<()>) { if self.update_connection_state(ConnectionState::Reconnecting) { self.dispatcher.dispatch(&RoomEvent::Reconnecting); } + + let _ = tx.send(()); } - async fn handle_resumed(self: &Arc) { + async fn handle_resumed(self: &Arc, tx: oneshot::Sender<()>) { self.update_connection_state(ConnectionState::Connected); self.dispatcher.dispatch(&RoomEvent::Reconnected); + + let _ = tx.send(()); } - async fn handle_signal_resumed(self: Arc) { + async fn handle_signal_resumed(self: &Arc, tx: oneshot::Sender<()>) { self.send_sync_state().await; + + // Always send the sync state before continuing the reconnection (e.g: publisher offer) + let _ = tx.send(()); } - async fn handle_restarting(self: &Arc) { + async fn handle_restarting(self: &Arc, tx: oneshot::Sender<()>) { // Remove existing participants/subscriptions on full reconnect let participants = self.participants.read().clone(); for (_, participant) in participants.iter() { @@ -776,14 +727,18 @@ impl RoomSession { if self.update_connection_state(ConnectionState::Reconnecting) { self.dispatcher.dispatch(&RoomEvent::Reconnecting); } + + let _ = tx.send(()); } - async fn handle_restarted(self: &Arc) { + async fn handle_restarted(self: &Arc, tx: oneshot::Sender<()>) { self.update_connection_state(ConnectionState::Connected); self.dispatcher.dispatch(&RoomEvent::Reconnected); + + let _ = tx.send(()); } - async fn handle_signal_restarted(self: Arc) { + async fn handle_signal_restarted(self: &Arc, tx: oneshot::Sender<()>) { let join_response = self.rtc_engine.last_info().join_response; self.local_participant .update_info(join_response.participant.unwrap()); // The sid may have changed @@ -794,21 +749,26 @@ impl RoomSession { let published_tracks = self.local_participant.tracks(); // Should I create a new task? - tokio::spawn(async move { - for (_, publication) in published_tracks { - let track = publication.track(); - - let _ = self - .local_participant - .unpublish_track(&publication.sid()) - .await; - - let _ = self - .local_participant - .publish_track(track.unwrap(), publication.publish_options()) - .await; + tokio::spawn({ + let session = self.clone(); + async move { + for (_, publication) in published_tracks { + let track = publication.track(); + + let _ = session + .local_participant + .unpublish_track(&publication.sid()) + .await; + + let _ = session + .local_participant + .publish_track(track.unwrap(), publication.publish_options()) + .await; + } } }); + + let _ = tx.send(()); } fn handle_disconnected(&self, reason: DisconnectReason) { diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index b89c88e7..963bc704 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -19,8 +19,6 @@ use crate::room::DisconnectReason; use crate::rtc_engine::lk_runtime::LkRuntime; use crate::rtc_engine::rtc_session::{RtcSession, SessionEvent, SessionEvents}; use crate::DataPacketKind; -use arc_swap::ArcSwapOption; -use futures_util::future::BoxFuture; use livekit_api::signal_client::{SignalError, SignalOptions}; use livekit_protocol as proto; use livekit_webrtc::prelude::*; @@ -99,6 +97,17 @@ pub enum EngineEvent { ConnectionQuality { updates: Vec, }, + + /// The following events are used to notify the room about the reconnection state + /// Since the room needs to also sync state in a good timing with the server. + /// We synchronize the state with a one-shot channel. + Resuming(oneshot::Sender<()>), + Resumed(oneshot::Sender<()>), + SignalResumed(oneshot::Sender<()>), + Restarting(oneshot::Sender<()>), + Restarted(oneshot::Sender<()>), + SignalRestarted(oneshot::Sender<()>), + Disconnected { reason: DisconnectReason, }, @@ -128,30 +137,12 @@ pub struct LastInfo { pub data_channels_info: Vec, } -// ArcSwap limitation requires to keep the callbacks in Box (requires Sized & avoid fat pointer). -// Using ArcSwap because it is convenient in async code -pub type OnResuming = Box BoxFuture<'static, ()> + Send>; -pub type OnResumed = Box BoxFuture<'static, ()> + Send>; -pub type OnSignalResumed = Box BoxFuture<'static, ()> + Send>; -pub type OnRestarting = Box BoxFuture<'static, ()> + Send>; -pub type OnRestarted = Box BoxFuture<'static, ()> + Send>; -pub type OnSignalRestarted = Box BoxFuture<'static, ()> + Send>; - struct EngineInner { // Keep a strong reference to LkRuntime to avoid creating a new RtcRuntime or PeerConnection factory accross multiple Rtc sessions #[allow(dead_code)] lk_runtime: Arc, engine_emitter: EngineEmitter, - // Async events (Used instead of passing message using the channel because we need async synchronization) - // We must not block in these events otherwise this will break the reconnection logic - resuming: ArcSwapOption>, - resumed: ArcSwapOption>, - signal_resumed: ArcSwapOption>, - restarting: ArcSwapOption>, - restarted: ArcSwapOption>, - signal_restarted: ArcSwapOption>, - // Last/current session states (needed by the room) last_info: Mutex, running_handle: AsyncRwLock>, @@ -195,13 +186,6 @@ impl RtcEngine { running_handle: Default::default(), engine_emitter, - resuming: Default::default(), - resumed: Default::default(), - signal_resumed: Default::default(), - restarting: Default::default(), - restarted: Default::default(), - signal_restarted: Default::default(), - last_info: Default::default(), closed: Default::default(), reconnecting: Default::default(), @@ -289,60 +273,6 @@ impl RtcEngine { pub fn last_info(&self) -> LastInfo { self.inner.last_info.lock().clone() } - - pub(crate) fn on_resuming(&self, f: F) - where - F: FnMut() -> BoxFuture<'static, ()> + Send + 'static, - { - self.inner - .resuming - .store(Some(Arc::new(AsyncMutex::new(Box::new(f))))); - } - - pub(crate) fn on_resumed(&self, f: F) - where - F: FnMut() -> BoxFuture<'static, ()> + Send + 'static, - { - self.inner - .resumed - .store(Some(Arc::new(AsyncMutex::new(Box::new(f))))); - } - - pub(crate) fn on_signal_resumed(&self, f: F) - where - F: FnMut() -> BoxFuture<'static, ()> + Send + 'static, - { - self.inner - .signal_resumed - .store(Some(Arc::new(AsyncMutex::new(Box::new(f))))); - } - - pub(crate) fn on_restarting(&self, f: F) - where - F: FnMut() -> BoxFuture<'static, ()> + Send + 'static, - { - self.inner - .restarting - .store(Some(Arc::new(AsyncMutex::new(Box::new(f))))); - } - - pub(crate) fn on_restarted(&self, f: F) - where - F: FnMut() -> BoxFuture<'static, ()> + Send + 'static, - { - self.inner - .restarted - .store(Some(Arc::new(AsyncMutex::new(Box::new(f))))); - } - - pub(crate) fn on_signal_restarted(&self, f: F) - where - F: FnMut() -> BoxFuture<'static, ()> + Send + 'static, - { - self.inner - .signal_restarted - .store(Some(Arc::new(AsyncMutex::new(Box::new(f))))); - } } impl EngineInner { @@ -598,9 +528,9 @@ impl EngineInner { if self.full_reconnect.load(Ordering::SeqCst) { if i == 0 { - if let Some(restarting) = self.restarting.load().as_ref() { - (*restarting.lock().await)().await; - } + let (tx, rx) = oneshot::channel(); + self.engine_emitter.send(EngineEvent::Restarting(tx)).await; + rx.await; } log::error!("restarting connection... attempt: {}", i); @@ -610,16 +540,16 @@ impl EngineInner { { log::error!("restarting connection failed: {}", err); } else { - if let Some(restarted) = self.restarted.load().as_ref() { - (*restarted.lock().await)().await; - } + let (tx, rx) = oneshot::channel(); + self.engine_emitter.send(EngineEvent::Restarted(tx)).await; + rx.await; return Ok(()); } } else { if i == 0 { - if let Some(resuming) = self.resuming.load().as_ref() { - (*resuming.lock().await)().await; - } + let (tx, rx) = oneshot::channel(); + self.engine_emitter.send(EngineEvent::Resuming(tx)).await; + rx.await; } log::error!("resuming connection... attempt: {}", i); @@ -629,9 +559,9 @@ impl EngineInner { self.full_reconnect.store(true, Ordering::SeqCst); } } else { - if let Some(resumed) = self.resumed.load().as_ref() { - (*resumed.lock().await)().await; - } + let (tx, rx) = oneshot::channel(); + self.engine_emitter.send(EngineEvent::Resumed(tx)).await; + rx.await; return Ok(()); } } @@ -653,9 +583,11 @@ impl EngineInner { self.terminate_session().await; self.connect(url, token, options).await?; - if let Some(signal_restarted) = self.signal_restarted.load().as_ref() { - (*signal_restarted.lock().await)().await; - } + let (tx, rx) = oneshot::channel(); + self.engine_emitter + .send(EngineEvent::SignalRestarted(tx)) + .await; + rx.await; let handle = self.running_handle.read().await; let session = &handle.as_ref().unwrap().session; @@ -669,10 +601,13 @@ impl EngineInner { session.restart().await?; + let (tx, rx) = oneshot::channel(); + self.engine_emitter + .send(EngineEvent::SignalResumed(tx)) + .await; + // With SignalResumed, the room will send a SyncState message to the server - if let Some(signal_resumed) = self.signal_resumed.load().as_ref() { - (*signal_resumed.lock().await)().await; - } + rx.await; // The publisher offer must be sent AFTER the SyncState message session.restart_publisher().await?;