Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom committed Sep 15, 2023
1 parent 023e4f2 commit fddfa75
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 52 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,30 @@ message TrackPublishOptions {
TrackSource source = 7;
}

enum IceTransportPolicy {
TRANSPORT_NONE = 0;
TRANSPORT_RELAY = 1;
TRANSPORT_NOHOST = 2;
TRANSPORT_ALL = 3;
}

message IceServer {
repeated string urls = 1;
string username = 2;
string credential = 3;
}

message RtcConfig {
IceTransportPolicy ice_transport_policy = 1;
repeated IceServer ice_servers = 2;
}

message RoomOptions {
bool auto_subscribe = 1;
bool adaptive_stream = 2;
bool dynacast = 3;
optional E2eeOptions e2ee = 4;
optional RtcConfig rtc_config = 5; // allow to setup a custom RtcConfiguration
}

//
Expand Down
10 changes: 5 additions & 5 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ fn on_e2ee_request(
)
}
proto::e2ee_request::Message::CryptorSetEnabled(request) => {
let identity = request.participant_identity.try_into().unwrap();
let identity = request.participant_identity.into();
let track_sid = request.track_sid.try_into().unwrap();

if let Some(frame_cryptor) = e2ee_manager.frame_cryptors().get(&(identity, track_sid)) {
Expand All @@ -522,7 +522,7 @@ fn on_e2ee_request(
)
}
proto::e2ee_request::Message::CryptorSetKeyIndex(request) => {
let identity = request.participant_identity.try_into().unwrap();
let identity = request.participant_identity.into();
let track_sid = request.track_sid.try_into().unwrap();

if let Some(frame_cryptor) = e2ee_manager.frame_cryptors().get(&(identity, track_sid)) {
Expand Down Expand Up @@ -559,22 +559,22 @@ fn on_e2ee_request(
proto::e2ee_response::Message::GetSharedKey(proto::GetSharedKeyResponse { key })
}
proto::e2ee_request::Message::SetKey(request) => {
let identity = request.participant_identity.try_into().unwrap();
let identity = request.participant_identity.into();
if let Some(key_provider) = e2ee_manager.key_provider() {
key_provider.set_key(&identity, request.key_index, request.key);
}
proto::e2ee_response::Message::SetKey(proto::SetKeyResponse {})
}
proto::e2ee_request::Message::RatchetKey(request) => {
let identity = request.participant_identity.try_into().unwrap();
let identity = request.participant_identity.into();
let new_key = e2ee_manager
.key_provider()
.and_then(|key_provider| key_provider.ratchet_key(&identity, request.key_index));

proto::e2ee_response::Message::RatchetKey(proto::RatchetKeyResponse { new_key })
}
proto::e2ee_request::Message::GetKey(request) => {
let identity = request.participant_identity.try_into().unwrap();
let identity = request.participant_identity.into();
let key = e2ee_manager
.key_provider()
.and_then(|key_provider| key_provider.get_key(&identity, request.key_index));
Expand Down
3 changes: 2 additions & 1 deletion livekit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ parking_lot = { version = "0.12" }
futures-util = { version = "0.3", default-features = false, features = ["sink"] }
thiserror = "1.0"
lazy_static = "1.4"
log = "0.4"
log = "0.4"
arc-swap = "1.6.0"
110 changes: 82 additions & 28 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,24 +276,71 @@ impl Room {
participants: Default::default(),
active_speakers: Default::default(),
options,
rtc_engine,
rtc_engine: rtc_engine.clone(),
local_participant,
dispatcher: dispatcher.clone(),
e2ee_manager: e2ee_manager.clone(),
});

for pi in join_response.other_participants {
let participant = {
let pi = pi.clone();
inner.create_participant(
pi.sid.try_into().unwrap(),
pi.identity.into(),
pi.name,
pi.metadata,
)
};
participant.update_info(pi.clone());
}
rtc_engine.on_resuming({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_resuming().await;
})
}
});

rtc_engine.on_resumed({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_resumed().await;
})
}
});

rtc_engine.on_signal_resumed({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_signal_resumed().await;
})
}
});

rtc_engine.on_restarted({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_restarted().await;
})
}
});

rtc_engine.on_restarted({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_signal_restarted().await;
})
}
});

rtc_engine.on_signal_restarted({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_signal_restarted().await;
})
}
});

e2ee_manager.on_state_changed({
let dispatcher = dispatcher.clone();
Expand Down Expand Up @@ -324,6 +371,19 @@ impl Room {
}
});

for pi in join_response.other_participants {
let participant = {
let pi = pi.clone();
inner.create_participant(
pi.sid.try_into().unwrap(),
pi.identity.into(),
pi.name,
pi.metadata,
)
};
participant.update_info(pi.clone());
}

// Get the initial states (Can be useful on some usecases, like the FfiServer)
// Getting them here ensure nothing happening before (Like a new participant joining) because the room task
// is not started yet
Expand Down Expand Up @@ -492,12 +552,6 @@ impl RoomSession {
)))?;
}
}
EngineEvent::Resuming => self.handle_resuming(),
EngineEvent::Resumed => self.handle_resumed(),
EngineEvent::SignalResumed => self.clone().handle_signal_resumed(),
EngineEvent::Restarting => self.handle_restarting(),
EngineEvent::Restarted => self.handle_restarted(),
EngineEvent::SignalRestarted => self.clone().handle_signal_restarted(),
EngineEvent::Disconnected { reason } => self.handle_disconnected(reason),
EngineEvent::Data {
payload,
Expand Down Expand Up @@ -696,24 +750,22 @@ impl RoomSession {
}
}

fn handle_resuming(self: &Arc<Self>) {
async fn handle_resuming(self: &Arc<Self>) {
if self.update_connection_state(ConnectionState::Reconnecting) {
self.dispatcher.dispatch(&RoomEvent::Reconnecting);
}
}

fn handle_resumed(self: &Arc<Self>) {
async fn handle_resumed(self: &Arc<Self>) {
self.update_connection_state(ConnectionState::Connected);
self.dispatcher.dispatch(&RoomEvent::Reconnected);
}

fn handle_signal_resumed(self: Arc<Self>) {
tokio::spawn(async move {
self.send_sync_state().await;
});
async fn handle_signal_resumed(self: Arc<Self>) {
self.send_sync_state().await;
}

fn handle_restarting(self: &Arc<Self>) {
async fn handle_restarting(self: &Arc<Self>) {
// Remove existing participants/subscriptions on full reconnect
let participants = self.participants.read().clone();
for (_, participant) in participants.iter() {
Expand All @@ -726,12 +778,12 @@ impl RoomSession {
}
}

fn handle_restarted(self: &Arc<Self>) {
async fn handle_restarted(self: &Arc<Self>) {
self.update_connection_state(ConnectionState::Connected);
self.dispatcher.dispatch(&RoomEvent::Reconnected);
}

fn handle_signal_restarted(self: Arc<Self>) {
async fn handle_signal_restarted(self: Arc<Self>) {
let join_response = self.rtc_engine.last_info().join_response;
self.local_participant
.update_info(join_response.participant.unwrap()); // The sid may have changed
Expand All @@ -740,6 +792,8 @@ impl RoomSession {

// unpublish & republish tracks
let published_tracks = self.local_participant.tracks();

// Should I create a new task?
tokio::spawn(async move {
for (_, publication) in published_tracks {
let track = publication.track();
Expand Down
Loading

0 comments on commit fddfa75

Please sign in to comment.