Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom committed Sep 16, 2023
1 parent fddfa75 commit b3691e9
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 97 deletions.
28 changes: 14 additions & 14 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,12 @@ impl Room {
}
});

rtc_engine.on_restarted({
rtc_engine.on_restarting({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_restarted().await;
inner.handle_restarting().await;
})
}
});
Expand All @@ -327,7 +327,7 @@ impl Room {
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_signal_restarted().await;
inner.handle_restarted().await;
})
}
});
Expand Down Expand Up @@ -404,15 +404,16 @@ impl Room {
let (close_emitter, close_receiver) = oneshot::channel();
let session_task = tokio::spawn(inner.clone().room_task(engine_events, close_receiver));

let session = Self {
inner,
handle: AsyncMutex::new(Some(RoomHandle {
session_task,
close_emitter,
})),
};

Ok((session, events))
Ok((
Self {
inner,
handle: AsyncMutex::new(Some(RoomHandle {
session_task,
close_emitter,
})),
},
events,
))
}

pub async fn close(&self) -> RoomResult<()> {
Expand Down Expand Up @@ -558,10 +559,9 @@ impl RoomSession {
kind,
participant_sid,
} => {
let payload = Arc::new(payload);
if let Some(participant) = self.get_participant(&participant_sid) {
self.dispatcher.dispatch(&RoomEvent::DataReceived {
payload,
payload: Arc::new(payload),
kind,
participant,
});
Expand Down
8 changes: 3 additions & 5 deletions livekit/src/rtc_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use crate::room::DisconnectReason;
use crate::rtc_engine::lk_runtime::LkRuntime;
use crate::rtc_engine::rtc_session::{RtcSession, SessionEvent, SessionEvents};
use crate::DataPacketKind;
use arc_swap::access::Access;
use arc_swap::{ArcSwap, ArcSwapOption};
use arc_swap::ArcSwapOption;
use futures_util::future::BoxFuture;
use livekit_api::signal_client::{SignalError, SignalOptions};
use livekit_protocol as proto;
Expand Down Expand Up @@ -440,7 +439,6 @@ impl EngineInner {
.send(EngineEvent::ConnectionQuality { updates })
.await;
}
SessionEvent::Connected => {}
}
Ok(())
}
Expand Down Expand Up @@ -676,8 +674,8 @@ impl EngineInner {
(*signal_resumed.lock().await)().await;
}

// TODO Publisher IceRestart here

// The publisher offer must be sent AFTER the SyncState message
session.restart_publisher().await?;
session.wait_pc_connection().await
}
}
5 changes: 1 addition & 4 deletions livekit/src/rtc_engine/peer_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ impl PeerTransport {
}

pub fn is_connected(&self) -> bool {
matches!(
self.peer_connection.ice_connection_state(),
IceConnectionState::Connected | IceConnectionState::Completed
)
self.peer_connection.connection_state() == PeerConnectionState::Connected
}

pub fn peer_connection(&self) -> PeerConnection {
Expand Down
101 changes: 27 additions & 74 deletions livekit/src/rtc_engine/rtc_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::TryInto;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, watch};
Expand Down Expand Up @@ -78,31 +78,6 @@ pub enum SessionEvent {
full_reconnect: bool,
retry_now: bool,
},
Connected,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PeerState {
New,
Connected,
Disconnected,
Reconnecting,
Closed,
}

impl TryFrom<u8> for PeerState {
type Error = &'static str;

fn try_from(v: u8) -> Result<Self, Self::Error> {
match v {
0 => Ok(Self::New),
1 => Ok(Self::Connected),
2 => Ok(Self::Disconnected),
3 => Ok(Self::Reconnecting),
4 => Ok(Self::Closed),
_ => Err("invalid PeerState"),
}
}
}

#[derive(Serialize, Deserialize)]
Expand All @@ -116,7 +91,6 @@ struct IceCandidateJson {
/// Fields shared with rtc_task and signal_task
struct SessionInner {
signal_client: Arc<SignalClient>,
pc_state: AtomicU8, // PcState
has_published: AtomicBool,

publisher_pc: PeerTransport,
Expand All @@ -142,7 +116,6 @@ struct SessionInner {
impl Debug for SessionInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionInner")
.field("pc_state", &self.pc_state)
.field("has_published", &self.has_published)
.field("closed", &self.closed)
.finish()
Expand Down Expand Up @@ -215,7 +188,6 @@ impl RtcSession {

let (close_tx, close_rx) = watch::channel(false);
let inner = Arc::new(SessionInner {
pc_state: AtomicU8::new(PeerState::New as u8),
has_published: Default::default(),
signal_client,
publisher_pc,
Expand All @@ -233,14 +205,17 @@ impl RtcSession {
let signal_task = tokio::spawn(inner.clone().signal_task(signal_events, close_rx.clone()));
let rtc_task = tokio::spawn(inner.clone().rtc_session_task(rtc_events, close_rx));

let session = Self {
inner,
close_tx,
signal_task,
rtc_task,
};
inner.wait_pc_connection().await?;

Ok((session, session_events))
Ok((
Self {
inner,
close_tx,
signal_task,
rtc_task,
},
session_events,
))
}

pub async fn add_track(&self, req: proto::AddTrackRequest) -> EngineResult<proto::TrackInfo> {
Expand Down Expand Up @@ -282,7 +257,11 @@ impl RtcSession {
}

pub async fn restart(&self) -> EngineResult<()> {
self.inner.restart_session().await
self.inner.restart().await
}

pub async fn restart_publisher(&self) -> EngineResult<()> {
self.inner.restart_publisher().await
}

pub async fn wait_pc_connection(&self) -> EngineResult<()> {
Expand All @@ -293,15 +272,6 @@ impl RtcSession {
self.inner.simulate_scenario(scenario).await
}

#[allow(dead_code)]
pub fn state(&self) -> PeerState {
self.inner
.pc_state
.load(Ordering::SeqCst)
.try_into()
.unwrap()
}

#[allow(dead_code)]
pub fn publisher(&self) -> &PeerTransport {
&self.inner.publisher_pc
Expand Down Expand Up @@ -484,21 +454,8 @@ impl SessionInner {
RtcEvent::ConnectionChange { state, target } => {
log::debug!("connection change, {:?} {:?}", state, target);

// The subscriber is always the primary peer connection
if target == proto::SignalTarget::Subscriber
&& state == PeerConnectionState::Connected
{
let old_state = self
.pc_state
.swap(PeerState::Connected as u8, Ordering::SeqCst);
if old_state == PeerState::New as u8 {
let _ = self.emitter.send(SessionEvent::Connected);
}
} else if state == PeerConnectionState::Failed {
if state == PeerConnectionState::Failed {
log::error!("{:?} pc state failed", target);
self.pc_state
.store(PeerState::Disconnected as u8, Ordering::SeqCst);

self.on_session_disconnected(
"pc_state failed",
DisconnectReason::UnknownReason,
Expand Down Expand Up @@ -794,12 +751,8 @@ impl SessionInner {
.map_err(Into::into)
}

/// Try to restart the session by doing an ICE Restart (The SignalClient is also restarted)
/// This reconnection if more seemless compared to the full reconnection implemented in ['RTCEngine']
async fn restart_session(&self) -> EngineResult<()> {
self.pc_state
.store(PeerState::Reconnecting as u8, Ordering::Release);

async fn restart(&self) -> EngineResult<()> {
let reconnect_response = self.signal_client.restart().await?;
log::info!("received reconnect response: {:?}", reconnect_response);

Expand All @@ -811,23 +764,27 @@ impl SessionInner {
.peer_connection()
.set_configuration(rtc_config)?;

/*if self.has_published.load(Ordering::Acquire) {
Ok(())
}

async fn restart_publisher(&self) -> EngineResult<()> {
if self.has_published.load(Ordering::Acquire) {
self.publisher_pc
.create_and_send_offer(OfferOptions {
ice_restart: true,
..Default::default()
})
.await?;
}*/

}
Ok(())
}

/// Wait for PeerState to become PeerState::Connected
/// Timeout after ['MAX_ICE_CONNECT_TIMEOUT']
async fn wait_pc_connection(&self) -> EngineResult<()> {
let wait_connected = async move {
while self.pc_state.load(Ordering::Acquire) != PeerState::Connected as u8 {
while !self.subscriber_pc.is_connected()
|| (self.has_published.load(Ordering::Acquire) && !self.publisher_pc.is_connected())
{
if self.closed.load(Ordering::Acquire) {
return Err(EngineError::Connection("closed".to_string()));
}
Expand All @@ -847,10 +804,6 @@ impl SessionInner {
}
}

fn requires_renegotiation(&self) {
// Putting the publusher renegotation in another function because we don't want to send an offer before the sync state
}

/// Start publisher negotiation
fn publisher_negotiation_needed(self: &Arc<Self>) {
self.has_published.store(true, Ordering::Release);
Expand Down

0 comments on commit b3691e9

Please sign in to comment.