Skip to content

Commit

Permalink
sending support and recv fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasIO committed Dec 20, 2024
1 parent 7e01c91 commit 8d1819d
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 26 deletions.
28 changes: 26 additions & 2 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ message RoomEvent {
DataPacketReceived data_packet_received = 27;
TranscriptionReceived transcription_received = 28;
ChatMessageReceived chat_message = 29;
DataStream.Header stream_header = 30;
DataStream.Chunk stream_chunk = 31;
DataStreamHeaderReceived stream_header_received = 30;
DataStreamChunkReceived stream_chunk_received = 31;
}
}

Expand Down Expand Up @@ -580,3 +580,27 @@ message DataStream {
}
}

message DataStreamHeaderReceived {
required string participant_identity = 1;
required DataStream.Header header = 2;
}

message DataStreamChunkReceived {
required string participant_identity = 1;
required DataStream.Chunk chunk = 2;
}

message SendStreamHeaderRequest {
required uint64 local_participant_handle = 1;
required DataStream.Header header = 2;
repeated string destination_identities = 3;
optional string sender_identity = 4;
}

message SendStreamChunkRequest {
required uint64 local_participant_handle = 1;
required DataStream.Chunk chunk = 2;
repeated string destination_identities = 3;
optional string sender_identity = 4;
}

44 changes: 42 additions & 2 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2702,9 +2702,9 @@ pub mod room_event {
#[prost(message, tag="29")]
ChatMessage(super::ChatMessageReceived),
#[prost(message, tag="30")]
StreamHeader(super::data_stream::Header),
StreamHeaderReceived(super::DataStreamHeaderReceived),
#[prost(message, tag="31")]
StreamChunk(super::data_stream::Chunk),
StreamChunkReceived(super::DataStreamChunkReceived),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -3106,6 +3106,46 @@ pub mod data_stream {
}
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DataStreamHeaderReceived {
#[prost(string, required, tag="1")]
pub participant_identity: ::prost::alloc::string::String,
#[prost(message, required, tag="2")]
pub header: data_stream::Header,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DataStreamChunkReceived {
#[prost(string, required, tag="1")]
pub participant_identity: ::prost::alloc::string::String,
#[prost(message, required, tag="2")]
pub chunk: data_stream::Chunk,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SendStreamHeaderRequest {
#[prost(uint64, required, tag="1")]
pub local_participant_handle: u64,
#[prost(message, required, tag="2")]
pub header: data_stream::Header,
#[prost(string, repeated, tag="3")]
pub destination_identities: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(string, optional, tag="4")]
pub sender_identity: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SendStreamChunkRequest {
#[prost(uint64, required, tag="1")]
pub local_participant_handle: u64,
#[prost(message, required, tag="2")]
pub chunk: data_stream::Chunk,
#[prost(string, repeated, tag="3")]
pub destination_identities: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(string, optional, tag="4")]
pub sender_identity: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum IceTransportType {
Expand Down
34 changes: 34 additions & 0 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ fn on_publish_data(
ffi_participant.room.publish_data(server, publish)
}

/// Publish data to the room
fn on_publish_data(
server: &'static FfiServer,
publish: proto::PublishDataRequest,
) -> FfiResult<proto::PublishDataResponse> {
// Push the data to an async queue (avoid blocking and keep the order)
let ffi_participant =
server.retrieve_handle::<FfiParticipant>(publish.local_participant_handle)?;

ffi_participant.room.publish_data(server, publish)
}

/// Publish transcription to the room
fn on_publish_transcription(
server: &'static FfiServer,
Expand Down Expand Up @@ -236,6 +248,28 @@ fn on_edit_chat_message(
Ok(ffi_participant.room.edit_chat_message(server, edit_chat_message))
}

fn on_send_stream_header(
server: &'static FfiServer,
stream_header_message: proto::SendStreamHeaderRequest,
) {
let ffi_participant = server
.retrieve_handle::<FfiParticipant>(stream_header_message.local_participant_handle)?
.clone();

// Ok(ffi_participant.room.publish_raw_data(server, edit_chat_message))
}

fn on_send_stream_chunk(
server: &'static FfiServer,
stream_chunk_message: proto::SendStreamChunkRequest,
) {
let ffi_participant = server
.retrieve_handle::<FfiParticipant>(edit_chat_message.local_participant_handle)?
.clone();

Ok(ffi_participant.room.edit_chat_message(server, edit_chat_message))
}

/// Create a new video track from a source
fn on_create_video_track(
server: &'static FfiServer,
Expand Down
59 changes: 55 additions & 4 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Duration;
use std::{collections::HashSet, slice, sync::Arc};

use livekit::prelude::*;
use livekit::proto as lk_proto;
use livekit::ChatMessage;
use parking_lot::Mutex;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex as AsyncMutex};
Expand Down Expand Up @@ -100,6 +101,16 @@ struct FfiTranscriptionSegment {
language: String,
}

struct FfiStreamHeader {
header: proto::data_stream::Header,
async_id: u64,
}

struct FfiStreamChunk {
chunk: proto::data_stream::Chunk,
async_id: u64,
}

struct FfiSipDtmfPacket {
payload: SipDTMF,
async_id: u64,
Expand Down Expand Up @@ -666,6 +677,42 @@ impl RoomInner {
proto::SendChatMessageResponse { async_id }
}

pub fn send_stream_header(
&self,
server: &'static FfiServer,
send_stream_header: proto::SendStreamHeaderRequest,
) {
let packet = lk_proto::DataPacket {
kind: lk_proto::DataPacketKind::Reliable,
participant_identity: send_stream_header.sender_identity,
destination_identities: send_stream_header.destination_identities,
value: send_stream_header.header,
};
let async_id = server.next_id();
let inner = self.clone();
let handle = server.async_runtime.spawn(async move {
let res = inner.room.local_participant().publish_raw_data(packet, true).await;
});
}

pub fn send_stream_chunk(
&self,
server: &'static FfiServer,
send_stream_chunk: proto::SendStreamChunkRequest,
) {
let packet = lk_proto::DataPacket {
kind: lk_proto::DataPacketKind::Reliable,
participant_identity: send_stream_header.sender_identity,
destination_identities: send_stream_header.destination_identities,
value: send_stream_header.chunk,
};
let async_id = server.next_id();
let inner = self.clone();
let handle = server.async_runtime.spawn(async move {
let res = inner.room.local_participant().publish_raw_data(packet, true).await;
});
}

pub fn store_rpc_method_invocation_waiter(
&self,
invocation_id: u64,
Expand Down Expand Up @@ -1137,11 +1184,15 @@ async fn forward_event(
state: proto::EncryptionState::from(state).into(),
}));
}
RoomEvent::StreamHeaderReceived { header } => {
let _ = send_event(proto::room_event::Message::StreamHeader(header.into()));
RoomEvent::StreamHeaderReceived { header, participant_identity } => {
let _ = send_event(proto::room_event::Message::StreamHeaderReceived(
proto::DataStreamHeaderReceived { header: header.into(), participant_identity },
));
}
RoomEvent::StreamChunkReceived { chunk } => {
let _ = send_event(proto::room_event::Message::StreamChunk(chunk.into()));
RoomEvent::StreamChunkReceived { chunk, participant_identity } => {
let _ = send_event(proto::room_event::Message::StreamChunkReceived(
proto::DataStreamChunkReceived { chunk: chunk.into(), participant_identity },
));
}
_ => {
log::warn!("unhandled room event: {:?}", event);
Expand Down
26 changes: 18 additions & 8 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ pub enum RoomEvent {
},
StreamHeaderReceived {
header: proto::data_stream::Header,
participant_identity: String,
},
StreamChunkReceived {
chunk: proto::data_stream::Chunk,
participant_identity: String,
},
E2eeStateChanged {
participant: Participant,
Expand Down Expand Up @@ -723,11 +725,11 @@ impl RoomSession {
EngineEvent::LocalTrackSubscribed { track_sid } => {
self.handle_track_subscribed(track_sid)
}
EngineEvent::DataStreamHeader { header } => {
self.handle_data_stream_header(header);
EngineEvent::DataStreamHeader { header, participant_identity } => {
self.handle_data_stream_header(header, participant_identity);
}
EngineEvent::DataStreamChunk { chunk } => {
self.handle_data_stream_chunk(chunk);
EngineEvent::DataStreamChunk { chunk, participant_identity } => {
self.handle_data_stream_chunk(chunk, participant_identity);
}
_ => {}
}
Expand Down Expand Up @@ -1239,13 +1241,21 @@ impl RoomSession {
});
}

fn handle_data_stream_header(&self, header: proto::data_stream::Header) {
let event = RoomEvent::StreamHeaderReceived { header };
fn handle_data_stream_header(
&self,
header: proto::data_stream::Header,
participant_identity: String,
) {
let event = RoomEvent::StreamHeaderReceived { header, participant_identity };
self.dispatcher.dispatch(&event);
}

fn handle_data_stream_chunk(&self, chunk: proto::data_stream::Chunk) {
let event = RoomEvent::StreamChunkReceived { chunk };
fn handle_data_stream_chunk(
&self,
chunk: proto::data_stream::Chunk,
participant_identity: String,
) {
let event = RoomEvent::StreamChunkReceived { chunk, participant_identity };
self.dispatcher.dispatch(&event);
}

Expand Down
13 changes: 13 additions & 0 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,19 @@ impl LocalParticipant {
}
}

/** internal */
pub async fn publish_raw_data(
self,
packet: proto::DataPacket,
reliable: bool,
) -> RoomResult<()> {
let kind = match reliable {
true => DataPacketKind::Reliable,
false => DataPacketKind::Lossy,
};
self.inner.rtc_engine.publish_data(&packet, kind).await.map_err(Into::into)
}

pub async fn publish_data(&self, packet: DataPacket) -> RoomResult<()> {
let kind = match packet.reliable {
true => DataPacketKind::Reliable,
Expand Down
14 changes: 10 additions & 4 deletions livekit/src/rtc_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,11 @@ pub enum EngineEvent {
},
DataStreamHeader {
header: proto::data_stream::Header,
participant_identity: String,
},
DataStreamChunk {
chunk: proto::data_stream::Chunk,
participant_identity: String,
},
}

Expand Down Expand Up @@ -530,11 +532,15 @@ impl EngineInner {
SessionEvent::LocalTrackSubscribed { track_sid } => {
let _ = self.engine_tx.send(EngineEvent::LocalTrackSubscribed { track_sid });
}
SessionEvent::DataStreamHeader { header } => {
let _ = self.engine_tx.send(EngineEvent::DataStreamHeader { header });
SessionEvent::DataStreamHeader { header, participant_identity } => {
let _ = self
.engine_tx
.send(EngineEvent::DataStreamHeader { header, participant_identity });
}
SessionEvent::DataStreamChunk { chunk } => {
let _ = self.engine_tx.send(EngineEvent::DataStreamChunk { chunk });
SessionEvent::DataStreamChunk { chunk, participant_identity } => {
let _ = self
.engine_tx
.send(EngineEvent::DataStreamChunk { chunk, participant_identity });
}
}
Ok(())
Expand Down
16 changes: 10 additions & 6 deletions livekit/src/rtc_engine/rtc_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ pub enum SessionEvent {
},
DataStreamHeader {
header: proto::data_stream::Header,
participant_identity: String,
},
DataStreamChunk {
chunk: proto::data_stream::Chunk,
participant_identity: String,
},
}

Expand Down Expand Up @@ -730,14 +732,16 @@ impl SessionInner {
});
}
proto::data_packet::Value::StreamHeader(message) => {
let _ = self
.emitter
.send(SessionEvent::DataStreamHeader { header: message.clone() });
let _ = self.emitter.send(SessionEvent::DataStreamHeader {
header: message.clone(),
participant_identity: data.participant_identity.clone(),
});
}
proto::data_packet::Value::StreamChunk(message) => {
let _ = self
.emitter
.send(SessionEvent::DataStreamChunk { chunk: message.clone() });
let _ = self.emitter.send(SessionEvent::DataStreamChunk {
chunk: message.clone(),
participant_identity: data.participant_identity.clone(),
});
}
_ => {}
}
Expand Down

0 comments on commit 8d1819d

Please sign in to comment.