Skip to content

Commit

Permalink
fix: publisher migration failures (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Sep 18, 2023
1 parent 023e4f2 commit 7a55204
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 148 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ members = [
]

[patch.crates-io]
ring = { git = "https://github.com/1Password/ring", branch = "1p/release-0.16" }
ring = { git = "https://github.com/1Password/ring", branch = "1p/release-0.16" }
7 changes: 7 additions & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,30 @@ message TrackPublishOptions {
TrackSource source = 7;
}

enum IceTransportPolicy {
TRANSPORT_NONE = 0;
TRANSPORT_RELAY = 1;
TRANSPORT_NOHOST = 2;
TRANSPORT_ALL = 3;
}

message IceServer {
repeated string urls = 1;
string username = 2;
string credential = 3;
}

message RtcConfig {
IceTransportPolicy ice_transport_policy = 1;
repeated IceServer ice_servers = 2;
}

message RoomOptions {
bool auto_subscribe = 1;
bool adaptive_stream = 2;
bool dynacast = 3;
optional E2eeOptions e2ee = 4;
optional RtcConfig rtc_config = 5; // allow to setup a custom RtcConfiguration
}

//
Expand Down
10 changes: 5 additions & 5 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ fn on_e2ee_request(
)
}
proto::e2ee_request::Message::CryptorSetEnabled(request) => {
let identity = request.participant_identity.try_into().unwrap();
let identity = request.participant_identity.into();
let track_sid = request.track_sid.try_into().unwrap();

if let Some(frame_cryptor) = e2ee_manager.frame_cryptors().get(&(identity, track_sid)) {
Expand All @@ -522,7 +522,7 @@ fn on_e2ee_request(
)
}
proto::e2ee_request::Message::CryptorSetKeyIndex(request) => {
let identity = request.participant_identity.try_into().unwrap();
let identity = request.participant_identity.into();
let track_sid = request.track_sid.try_into().unwrap();

if let Some(frame_cryptor) = e2ee_manager.frame_cryptors().get(&(identity, track_sid)) {
Expand Down Expand Up @@ -559,22 +559,22 @@ fn on_e2ee_request(
proto::e2ee_response::Message::GetSharedKey(proto::GetSharedKeyResponse { key })
}
proto::e2ee_request::Message::SetKey(request) => {
let identity = request.participant_identity.try_into().unwrap();
let identity = request.participant_identity.into();
if let Some(key_provider) = e2ee_manager.key_provider() {
key_provider.set_key(&identity, request.key_index, request.key);
}
proto::e2ee_response::Message::SetKey(proto::SetKeyResponse {})
}
proto::e2ee_request::Message::RatchetKey(request) => {
let identity = request.participant_identity.try_into().unwrap();
let identity = request.participant_identity.into();
let new_key = e2ee_manager
.key_provider()
.and_then(|key_provider| key_provider.ratchet_key(&identity, request.key_index));

proto::e2ee_response::Message::RatchetKey(proto::RatchetKeyResponse { new_key })
}
proto::e2ee_request::Message::GetKey(request) => {
let identity = request.participant_identity.try_into().unwrap();
let identity = request.participant_identity.into();
let key = e2ee_manager
.key_provider()
.and_then(|key_provider| key_provider.get_key(&identity, request.key_index));
Expand Down
2 changes: 1 addition & 1 deletion livekit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ parking_lot = { version = "0.12" }
futures-util = { version = "0.3", default-features = false, features = ["sink"] }
thiserror = "1.0"
lazy_static = "1.4"
log = "0.4"
log = "0.4"
129 changes: 74 additions & 55 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,7 @@ impl Room {
token: &str,
options: RoomOptions,
) -> RoomResult<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
let e2ee_options = options.e2ee.clone();

let e2ee_manager = E2eeManager::new(e2ee_options);
let e2ee_manager = E2eeManager::new(options.e2ee.clone());
let (rtc_engine, engine_events) = RtcEngine::connect(
url,
token,
Expand Down Expand Up @@ -276,25 +274,12 @@ impl Room {
participants: Default::default(),
active_speakers: Default::default(),
options,
rtc_engine,
rtc_engine: rtc_engine.clone(),
local_participant,
dispatcher: dispatcher.clone(),
e2ee_manager: e2ee_manager.clone(),
});

for pi in join_response.other_participants {
let participant = {
let pi = pi.clone();
inner.create_participant(
pi.sid.try_into().unwrap(),
pi.identity.into(),
pi.name,
pi.metadata,
)
};
participant.update_info(pi.clone());
}

e2ee_manager.on_state_changed({
let dispatcher = dispatcher.clone();
let inner = inner.clone();
Expand Down Expand Up @@ -324,6 +309,19 @@ impl Room {
}
});

for pi in join_response.other_participants {
let participant = {
let pi = pi.clone();
inner.create_participant(
pi.sid.try_into().unwrap(),
pi.identity.into(),
pi.name,
pi.metadata,
)
};
participant.update_info(pi.clone());
}

// Get the initial states (Can be useful on some usecases, like the FfiServer)
// Getting them here ensure nothing happening before (Like a new participant joining) because the room task
// is not started yet
Expand All @@ -344,15 +342,16 @@ impl Room {
let (close_emitter, close_receiver) = oneshot::channel();
let session_task = tokio::spawn(inner.clone().room_task(engine_events, close_receiver));

let session = Self {
inner,
handle: AsyncMutex::new(Some(RoomHandle {
session_task,
close_emitter,
})),
};

Ok((session, events))
Ok((
Self {
inner,
handle: AsyncMutex::new(Some(RoomHandle {
session_task,
close_emitter,
})),
},
events,
))
}

pub async fn close(&self) -> RoomResult<()> {
Expand Down Expand Up @@ -492,22 +491,21 @@ impl RoomSession {
)))?;
}
}
EngineEvent::Resuming => self.handle_resuming(),
EngineEvent::Resumed => self.handle_resumed(),
EngineEvent::SignalResumed => self.clone().handle_signal_resumed(),
EngineEvent::Restarting => self.handle_restarting(),
EngineEvent::Restarted => self.handle_restarted(),
EngineEvent::SignalRestarted => self.clone().handle_signal_restarted(),
EngineEvent::Resuming(tx) => self.handle_resuming(tx),
EngineEvent::Resumed(tx) => self.handle_resumed(tx),
EngineEvent::SignalResumed(tx) => self.handle_signal_resumed(tx),
EngineEvent::Restarting(tx) => self.handle_restarting(tx),
EngineEvent::Restarted(tx) => self.handle_restarted(tx),
EngineEvent::SignalRestarted(tx) => self.handle_signal_restarted(tx),
EngineEvent::Disconnected { reason } => self.handle_disconnected(reason),
EngineEvent::Data {
payload,
kind,
participant_sid,
} => {
let payload = Arc::new(payload);
if let Some(participant) = self.get_participant(&participant_sid) {
self.dispatcher.dispatch(&RoomEvent::DataReceived {
payload,
payload: Arc::new(payload),
kind,
participant,
});
Expand Down Expand Up @@ -696,24 +694,34 @@ impl RoomSession {
}
}

fn handle_resuming(self: &Arc<Self>) {
fn handle_resuming(self: &Arc<Self>, tx: oneshot::Sender<()>) {
if self.update_connection_state(ConnectionState::Reconnecting) {
self.dispatcher.dispatch(&RoomEvent::Reconnecting);
}

let _ = tx.send(());
}

fn handle_resumed(self: &Arc<Self>) {
fn handle_resumed(self: &Arc<Self>, tx: oneshot::Sender<()>) {
self.update_connection_state(ConnectionState::Connected);
self.dispatcher.dispatch(&RoomEvent::Reconnected);

let _ = tx.send(());
}

fn handle_signal_resumed(self: Arc<Self>) {
tokio::spawn(async move {
self.send_sync_state().await;
fn handle_signal_resumed(self: &Arc<Self>, tx: oneshot::Sender<()>) {
tokio::spawn({
let session = self.clone();
async move {
session.send_sync_state().await;

// Always send the sync state before continuing the reconnection (e.g: publisher offer)
let _ = tx.send(());
}
});
}

fn handle_restarting(self: &Arc<Self>) {
fn handle_restarting(self: &Arc<Self>, tx: oneshot::Sender<()>) {
// Remove existing participants/subscriptions on full reconnect
let participants = self.participants.read().clone();
for (_, participant) in participants.iter() {
Expand All @@ -724,14 +732,18 @@ impl RoomSession {
if self.update_connection_state(ConnectionState::Reconnecting) {
self.dispatcher.dispatch(&RoomEvent::Reconnecting);
}

let _ = tx.send(());
}

fn handle_restarted(self: &Arc<Self>) {
fn handle_restarted(self: &Arc<Self>, tx: oneshot::Sender<()>) {
self.update_connection_state(ConnectionState::Connected);
self.dispatcher.dispatch(&RoomEvent::Reconnected);

let _ = tx.send(());
}

fn handle_signal_restarted(self: Arc<Self>) {
fn handle_signal_restarted(self: &Arc<Self>, tx: oneshot::Sender<()>) {
let join_response = self.rtc_engine.last_info().join_response;
self.local_participant
.update_info(join_response.participant.unwrap()); // The sid may have changed
Expand All @@ -740,21 +752,28 @@ impl RoomSession {

// unpublish & republish tracks
let published_tracks = self.local_participant.tracks();
tokio::spawn(async move {
for (_, publication) in published_tracks {
let track = publication.track();

let _ = self
.local_participant
.unpublish_track(&publication.sid())
.await;

let _ = self
.local_participant
.publish_track(track.unwrap(), publication.publish_options())
.await;

// Should I create a new task?
tokio::spawn({
let session = self.clone();
async move {
for (_, publication) in published_tracks {
let track = publication.track();

let _ = session
.local_participant
.unpublish_track(&publication.sid())
.await;

let _ = session
.local_participant
.publish_track(track.unwrap(), publication.publish_options())
.await;
}
}
});

let _ = tx.send(());
}

fn handle_disconnected(&self, reason: DisconnectReason) {
Expand Down
Loading

0 comments on commit 7a55204

Please sign in to comment.