Skip to content

Commit

Permalink
use channel instead of callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom committed Sep 18, 2023
1 parent b3691e9 commit 35885d3
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 195 deletions.
14 changes: 4 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,3 @@ members = [
"webrtc-sys",
"webrtc-sys/build",
]

[patch.crates-io]
ring = { git = "https://github.com/1Password/ring", branch = "1p/release-0.16" }
7 changes: 7 additions & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion livekit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,3 @@ futures-util = { version = "0.3", default-features = false, features = ["sink"]
thiserror = "1.0"
lazy_static = "1.4"
log = "0.4"
arc-swap = "1.6.0"
124 changes: 42 additions & 82 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,7 @@ impl Room {
token: &str,
options: RoomOptions,
) -> RoomResult<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
let e2ee_options = options.e2ee.clone();

let e2ee_manager = E2eeManager::new(e2ee_options);
let e2ee_manager = E2eeManager::new(options.e2ee.clone());
let (rtc_engine, engine_events) = RtcEngine::connect(
url,
token,
Expand Down Expand Up @@ -282,66 +280,6 @@ impl Room {
e2ee_manager: e2ee_manager.clone(),
});

rtc_engine.on_resuming({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_resuming().await;
})
}
});

rtc_engine.on_resumed({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_resumed().await;
})
}
});

rtc_engine.on_signal_resumed({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_signal_resumed().await;
})
}
});

rtc_engine.on_restarting({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_restarting().await;
})
}
});

rtc_engine.on_restarted({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_restarted().await;
})
}
});

rtc_engine.on_signal_restarted({
let inner = inner.clone();
move || {
let inner = inner.clone();
Box::pin(async move {
inner.handle_signal_restarted().await;
})
}
});

e2ee_manager.on_state_changed({
let dispatcher = dispatcher.clone();
let inner = inner.clone();
Expand Down Expand Up @@ -553,6 +491,12 @@ impl RoomSession {
)))?;
}
}
EngineEvent::Resuming(tx) => self.handle_resuming(tx).await,
EngineEvent::Resumed(tx) => self.handle_resumed(tx).await,
EngineEvent::SignalResumed(tx) => self.handle_signal_resumed(tx).await,
EngineEvent::Restarting(tx) => self.handle_restarting(tx).await,
EngineEvent::Restarted(tx) => self.handle_restarted(tx).await,
EngineEvent::SignalRestarted(tx) => self.handle_signal_restarted(tx).await,
EngineEvent::Disconnected { reason } => self.handle_disconnected(reason),
EngineEvent::Data {
payload,
Expand Down Expand Up @@ -750,22 +694,29 @@ impl RoomSession {
}
}

async fn handle_resuming(self: &Arc<Self>) {
async fn handle_resuming(self: &Arc<Self>, tx: oneshot::Sender<()>) {
if self.update_connection_state(ConnectionState::Reconnecting) {
self.dispatcher.dispatch(&RoomEvent::Reconnecting);
}

let _ = tx.send(());
}

async fn handle_resumed(self: &Arc<Self>) {
async fn handle_resumed(self: &Arc<Self>, tx: oneshot::Sender<()>) {
self.update_connection_state(ConnectionState::Connected);
self.dispatcher.dispatch(&RoomEvent::Reconnected);

let _ = tx.send(());
}

async fn handle_signal_resumed(self: Arc<Self>) {
async fn handle_signal_resumed(self: &Arc<Self>, tx: oneshot::Sender<()>) {
self.send_sync_state().await;

// Always send the sync state before continuing the reconnection (e.g: publisher offer)
let _ = tx.send(());
}

async fn handle_restarting(self: &Arc<Self>) {
async fn handle_restarting(self: &Arc<Self>, tx: oneshot::Sender<()>) {
// Remove existing participants/subscriptions on full reconnect
let participants = self.participants.read().clone();
for (_, participant) in participants.iter() {
Expand All @@ -776,14 +727,18 @@ impl RoomSession {
if self.update_connection_state(ConnectionState::Reconnecting) {
self.dispatcher.dispatch(&RoomEvent::Reconnecting);
}

let _ = tx.send(());
}

async fn handle_restarted(self: &Arc<Self>) {
async fn handle_restarted(self: &Arc<Self>, tx: oneshot::Sender<()>) {
self.update_connection_state(ConnectionState::Connected);
self.dispatcher.dispatch(&RoomEvent::Reconnected);

let _ = tx.send(());
}

async fn handle_signal_restarted(self: Arc<Self>) {
async fn handle_signal_restarted(self: &Arc<Self>, tx: oneshot::Sender<()>) {
let join_response = self.rtc_engine.last_info().join_response;
self.local_participant
.update_info(join_response.participant.unwrap()); // The sid may have changed
Expand All @@ -794,21 +749,26 @@ impl RoomSession {
let published_tracks = self.local_participant.tracks();

// Should I create a new task?
tokio::spawn(async move {
for (_, publication) in published_tracks {
let track = publication.track();

let _ = self
.local_participant
.unpublish_track(&publication.sid())
.await;

let _ = self
.local_participant
.publish_track(track.unwrap(), publication.publish_options())
.await;
tokio::spawn({
let session = self.clone();
async move {
for (_, publication) in published_tracks {
let track = publication.track();

let _ = session
.local_participant
.unpublish_track(&publication.sid())
.await;

let _ = session
.local_participant
.publish_track(track.unwrap(), publication.publish_options())
.await;
}
}
});

let _ = tx.send(());
}

fn handle_disconnected(&self, reason: DisconnectReason) {
Expand Down
Loading

0 comments on commit 35885d3

Please sign in to comment.