diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index f169a210..cc36b45d 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -312,12 +312,12 @@ impl Room { } }); - rtc_engine.on_restarted({ + rtc_engine.on_restarting({ let inner = inner.clone(); move || { let inner = inner.clone(); Box::pin(async move { - inner.handle_restarted().await; + inner.handle_restarting().await; }) } }); @@ -327,7 +327,7 @@ impl Room { move || { let inner = inner.clone(); Box::pin(async move { - inner.handle_signal_restarted().await; + inner.handle_restarted().await; }) } }); @@ -404,15 +404,16 @@ impl Room { let (close_emitter, close_receiver) = oneshot::channel(); let session_task = tokio::spawn(inner.clone().room_task(engine_events, close_receiver)); - let session = Self { - inner, - handle: AsyncMutex::new(Some(RoomHandle { - session_task, - close_emitter, - })), - }; - - Ok((session, events)) + Ok(( + Self { + inner, + handle: AsyncMutex::new(Some(RoomHandle { + session_task, + close_emitter, + })), + }, + events, + )) } pub async fn close(&self) -> RoomResult<()> { @@ -558,10 +559,9 @@ impl RoomSession { kind, participant_sid, } => { - let payload = Arc::new(payload); if let Some(participant) = self.get_participant(&participant_sid) { self.dispatcher.dispatch(&RoomEvent::DataReceived { - payload, + payload: Arc::new(payload), kind, participant, }); diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index 1d0547f5..b89c88e7 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -19,8 +19,7 @@ use crate::room::DisconnectReason; use crate::rtc_engine::lk_runtime::LkRuntime; use crate::rtc_engine::rtc_session::{RtcSession, SessionEvent, SessionEvents}; use crate::DataPacketKind; -use arc_swap::access::Access; -use arc_swap::{ArcSwap, ArcSwapOption}; +use arc_swap::ArcSwapOption; use futures_util::future::BoxFuture; use livekit_api::signal_client::{SignalError, SignalOptions}; use livekit_protocol as proto; @@ -440,7 +439,6 @@ impl EngineInner { .send(EngineEvent::ConnectionQuality { updates }) .await; } - SessionEvent::Connected => {} } Ok(()) } @@ -676,8 +674,8 @@ impl EngineInner { (*signal_resumed.lock().await)().await; } - // TODO Publisher IceRestart here - + // The publisher offer must be sent AFTER the SyncState message + session.restart_publisher().await?; session.wait_pc_connection().await } } diff --git a/livekit/src/rtc_engine/peer_transport.rs b/livekit/src/rtc_engine/peer_transport.rs index 3dd65f7c..32dbb3a0 100644 --- a/livekit/src/rtc_engine/peer_transport.rs +++ b/livekit/src/rtc_engine/peer_transport.rs @@ -59,10 +59,7 @@ impl PeerTransport { } pub fn is_connected(&self) -> bool { - matches!( - self.peer_connection.ice_connection_state(), - IceConnectionState::Connected | IceConnectionState::Completed - ) + self.peer_connection.connection_state() == PeerConnectionState::Connected } pub fn peer_connection(&self) -> PeerConnection { diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 58d03adc..41df8d8a 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -32,7 +32,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::convert::TryInto; use std::fmt::Debug; -use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, oneshot, watch}; @@ -78,31 +78,6 @@ pub enum SessionEvent { full_reconnect: bool, retry_now: bool, }, - Connected, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum PeerState { - New, - Connected, - Disconnected, - Reconnecting, - Closed, -} - -impl TryFrom for PeerState { - type Error = &'static str; - - fn try_from(v: u8) -> Result { - match v { - 0 => Ok(Self::New), - 1 => Ok(Self::Connected), - 2 => Ok(Self::Disconnected), - 3 => Ok(Self::Reconnecting), - 4 => Ok(Self::Closed), - _ => Err("invalid PeerState"), - } - } } #[derive(Serialize, Deserialize)] @@ -116,7 +91,6 @@ struct IceCandidateJson { /// Fields shared with rtc_task and signal_task struct SessionInner { signal_client: Arc, - pc_state: AtomicU8, // PcState has_published: AtomicBool, publisher_pc: PeerTransport, @@ -142,7 +116,6 @@ struct SessionInner { impl Debug for SessionInner { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SessionInner") - .field("pc_state", &self.pc_state) .field("has_published", &self.has_published) .field("closed", &self.closed) .finish() @@ -215,7 +188,6 @@ impl RtcSession { let (close_tx, close_rx) = watch::channel(false); let inner = Arc::new(SessionInner { - pc_state: AtomicU8::new(PeerState::New as u8), has_published: Default::default(), signal_client, publisher_pc, @@ -233,14 +205,17 @@ impl RtcSession { let signal_task = tokio::spawn(inner.clone().signal_task(signal_events, close_rx.clone())); let rtc_task = tokio::spawn(inner.clone().rtc_session_task(rtc_events, close_rx)); - let session = Self { - inner, - close_tx, - signal_task, - rtc_task, - }; + inner.wait_pc_connection().await?; - Ok((session, session_events)) + Ok(( + Self { + inner, + close_tx, + signal_task, + rtc_task, + }, + session_events, + )) } pub async fn add_track(&self, req: proto::AddTrackRequest) -> EngineResult { @@ -282,7 +257,11 @@ impl RtcSession { } pub async fn restart(&self) -> EngineResult<()> { - self.inner.restart_session().await + self.inner.restart().await + } + + pub async fn restart_publisher(&self) -> EngineResult<()> { + self.inner.restart_publisher().await } pub async fn wait_pc_connection(&self) -> EngineResult<()> { @@ -293,15 +272,6 @@ impl RtcSession { self.inner.simulate_scenario(scenario).await } - #[allow(dead_code)] - pub fn state(&self) -> PeerState { - self.inner - .pc_state - .load(Ordering::SeqCst) - .try_into() - .unwrap() - } - #[allow(dead_code)] pub fn publisher(&self) -> &PeerTransport { &self.inner.publisher_pc @@ -484,21 +454,8 @@ impl SessionInner { RtcEvent::ConnectionChange { state, target } => { log::debug!("connection change, {:?} {:?}", state, target); - // The subscriber is always the primary peer connection - if target == proto::SignalTarget::Subscriber - && state == PeerConnectionState::Connected - { - let old_state = self - .pc_state - .swap(PeerState::Connected as u8, Ordering::SeqCst); - if old_state == PeerState::New as u8 { - let _ = self.emitter.send(SessionEvent::Connected); - } - } else if state == PeerConnectionState::Failed { + if state == PeerConnectionState::Failed { log::error!("{:?} pc state failed", target); - self.pc_state - .store(PeerState::Disconnected as u8, Ordering::SeqCst); - self.on_session_disconnected( "pc_state failed", DisconnectReason::UnknownReason, @@ -794,12 +751,8 @@ impl SessionInner { .map_err(Into::into) } - /// Try to restart the session by doing an ICE Restart (The SignalClient is also restarted) /// This reconnection if more seemless compared to the full reconnection implemented in ['RTCEngine'] - async fn restart_session(&self) -> EngineResult<()> { - self.pc_state - .store(PeerState::Reconnecting as u8, Ordering::Release); - + async fn restart(&self) -> EngineResult<()> { let reconnect_response = self.signal_client.restart().await?; log::info!("received reconnect response: {:?}", reconnect_response); @@ -811,23 +764,27 @@ impl SessionInner { .peer_connection() .set_configuration(rtc_config)?; - /*if self.has_published.load(Ordering::Acquire) { + Ok(()) + } + + async fn restart_publisher(&self) -> EngineResult<()> { + if self.has_published.load(Ordering::Acquire) { self.publisher_pc .create_and_send_offer(OfferOptions { ice_restart: true, ..Default::default() }) .await?; - }*/ - + } Ok(()) } - /// Wait for PeerState to become PeerState::Connected /// Timeout after ['MAX_ICE_CONNECT_TIMEOUT'] async fn wait_pc_connection(&self) -> EngineResult<()> { let wait_connected = async move { - while self.pc_state.load(Ordering::Acquire) != PeerState::Connected as u8 { + while !self.subscriber_pc.is_connected() + || (self.has_published.load(Ordering::Acquire) && !self.publisher_pc.is_connected()) + { if self.closed.load(Ordering::Acquire) { return Err(EngineError::Connection("closed".to_string())); } @@ -847,10 +804,6 @@ impl SessionInner { } } - fn requires_renegotiation(&self) { - // Putting the publusher renegotation in another function because we don't want to send an offer before the sync state - } - /// Start publisher negotiation fn publisher_negotiation_needed(self: &Arc) { self.has_published.store(true, Ordering::Release);