Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data stream support #361

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/data-streams/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# 1. Copy this file and rename it to .env
# 2. Update the enviroment variables below.

LIVEKIT_API_KEY=mykey
LIVEKIT_API_SECRET=mysecret
LIVEKIT_URL=wss://myproject.livekit.cloud
39 changes: 39 additions & 0 deletions examples/data-streams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# RPC Example
lukasIO marked this conversation as resolved.
Show resolved Hide resolved

This example demonstrates how to use RPC between two participants with LiveKit.

The example includes two scenarios:
1. A simple greeting exchange.
2. A contrived function-calling service with JSON data payloads and multiple method types.

## Prerequisites

Before running this example, make sure you have:

1. Node.js installed on your machine.
2. A LiveKit server running (either locally or remotely).
3. LiveKit API key and secret.

## Setup

1. Install dependencies:
```
pnpm install
```

2. Create a `.env.local` file in the example directory with your LiveKit credentials:
```
LIVEKIT_API_KEY=your_api_key
LIVEKIT_API_SECRET=your_api_secret
LIVEKIT_URL=your_livekit_url
```

## Running the Example

To run the example, use the following command:

```
pnpm run start
```

The example will log to your terminal.
65 changes: 65 additions & 0 deletions examples/data-streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { RemoteParticipant, Room, RoomEvent, type TextStreamReader } from '@livekit/rtc-node';
import { config } from 'dotenv';
import { AccessToken } from 'livekit-server-sdk';

config({ path: '.env.local', override: false });
const LIVEKIT_API_KEY = process.env.LIVEKIT_API_KEY;
const LIVEKIT_API_SECRET = process.env.LIVEKIT_API_SECRET;
const LIVEKIT_URL = process.env.LIVEKIT_URL;
if (!LIVEKIT_API_KEY || !LIVEKIT_API_SECRET || !LIVEKIT_URL) {
throw new Error('Missing required environment variables. Please check your .env.local file.');
}

const greetParticipant = async (room: Room, recipient: RemoteParticipant) => {
const greeting = 'Hi this is just a text sample';
const streamWriter = await room.localParticipant?.streamText({
destinationIdentities: [recipient.identity],
});

for (const c of greeting) {
await streamWriter?.write([c]);
}

await streamWriter?.close();
};

const main = async () => {
const roomName = `dev`;
const identity = 'tester';
const token = await createToken(identity, roomName);

const room = new Room();

const finishedPromise = new Promise((resolve) => {
room.on(RoomEvent.ParticipantDisconnected, resolve);
});

room.on(RoomEvent.TextStreamReceived, async (reader: TextStreamReader) => {
for await (const { collected } of reader) {
console.log(collected);
}
});

room.on(RoomEvent.ParticipantConnected, async (participant) => {
await greetParticipant(room, participant);
});

await room.connect(LIVEKIT_URL, token);

await finishedPromise;
};

const createToken = async (identity: string, roomName: string) => {
const token = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, {
identity,
});
token.addGrant({
room: roomName,
roomJoin: true,
canPublish: true,
canSubscribe: true,
});
return await token.toJwt();
};

main();
23 changes: 23 additions & 0 deletions examples/data-streams/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "example-data-streams",
"author": "LiveKit",
"private": "true",
"description": "Example of using data streams in LiveKit",
"type": "module",
"main": "index.ts",
"scripts": {
"lint": "eslint -f unix \"**/*.ts\"",
"start": "tsx index.ts"
},
"keywords": [],
"license": "Apache-2.0",
"dependencies": {
"@livekit/rtc-node": "workspace:*",
"dotenv": "^16.4.5",
"livekit-server-sdk": "workspace:*"
},
"devDependencies": {
"@types/node": "^20.10.4",
"tsx": "^4.7.1"
}
}
3 changes: 2 additions & 1 deletion packages/livekit-rtc/generate_proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ PATH=$PATH:$(pwd)/node_modules/.bin \
$FFI_PROTOCOL/video_frame.proto \
$FFI_PROTOCOL/e2ee.proto \
$FFI_PROTOCOL/stats.proto \
$FFI_PROTOCOL/rpc.proto
$FFI_PROTOCOL/rpc.proto \
$FFI_PROTOCOL/track_publication.proto
5 changes: 4 additions & 1 deletion packages/livekit-rtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@
"dependencies": {
"@bufbuild/protobuf": "^2.2.0",
"@livekit/mutex": "^1.0.0",
"@livekit/typed-emitter": "^3.0.0"
"@livekit/protocol": "^1.29.4",
"@livekit/typed-emitter": "^3.0.0",
"pino": "^8.19.0",
"pino-pretty": "^11.0.0"
},
"devDependencies": {
"@bufbuild/protoc-gen-es": "^2.2.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/livekit-rtc/rust-sdks
Submodule rust-sdks updated 106 files
3 changes: 3 additions & 0 deletions packages/livekit-rtc/src/data_streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './stream_reader.js';
export * from './stream_writer.js';
export type * from './types.js';
170 changes: 170 additions & 0 deletions packages/livekit-rtc/src/data_streams/stream_reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import type { DataStream_Chunk } from '@livekit/protocol';
import type { ReadableStream } from 'node:stream/web';
import { log } from '../log.js';
import { bigIntToNumber } from '../utils.js';
import type { BaseStreamInfo, FileStreamInfo, TextStreamChunk, TextStreamInfo } from './types.js';

abstract class BaseStreamReader<T extends BaseStreamInfo> {
protected reader: ReadableStream<DataStream_Chunk>;

protected totalChunkCount?: number;

protected _info: T;

get info() {
return this._info;
}

constructor(info: T, stream: ReadableStream<DataStream_Chunk>, totalChunkCount?: number) {
this.reader = stream;
this.totalChunkCount = totalChunkCount;
this._info = info;
}

protected abstract handleChunkReceived(chunk: DataStream_Chunk): void;

onProgress?: (progress: number | undefined) => void;

abstract readAll(): Promise<string | Array<Uint8Array>>;
}

export class BinaryStreamReader extends BaseStreamReader<FileStreamInfo> {
private chunksReceived: Set<number>;

constructor(
info: FileStreamInfo,
stream: ReadableStream<DataStream_Chunk>,
totalChunkCount?: number,
) {
super(info, stream, totalChunkCount);
this.chunksReceived = new Set();
}

protected handleChunkReceived(chunk: DataStream_Chunk) {
this.chunksReceived.add(bigIntToNumber(chunk.chunkIndex));
const currentProgress = this.totalChunkCount
? this.chunksReceived.size / this.totalChunkCount
: undefined;
this.onProgress?.(currentProgress);
}

[Symbol.asyncIterator]() {
const reader = this.reader.getReader();

return {
next: async (): Promise<IteratorResult<Uint8Array>> => {
try {
const { done, value } = await reader.read();
if (done) {
return { done: true, value: undefined as any };
} else {
this.handleChunkReceived(value);
return { done: false, value: value.content };
}
} catch (error) {
// TODO handle errors
log.error('error processing stream update', error);
return { done: true, value: undefined };
}
},

return(): IteratorResult<Uint8Array> {
reader.releaseLock();
return { done: true, value: undefined };
},
};
}

async readAll(): Promise<Array<Uint8Array>> {
const chunks: Set<Uint8Array> = new Set();
for await (const chunk of this) {
chunks.add(chunk);
}
return Array.from(chunks);
}
}

/**
* A class to read chunks from a ReadableStream and provide them in a structured format.
*/
export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
private receivedChunks: Map<number, DataStream_Chunk>;

/**
* A TextStreamReader instance can be used as an AsyncIterator that returns the entire string
* that has been received up to the current point in time.
*/
constructor(
info: TextStreamInfo,
stream: ReadableStream<DataStream_Chunk>,
totalChunkCount?: number,
) {
super(info, stream, totalChunkCount);
this.receivedChunks = new Map();
}

protected handleChunkReceived(chunk: DataStream_Chunk) {
const index = bigIntToNumber(chunk.chunkIndex);
const previousChunkAtIndex = this.receivedChunks.get(index);
if (previousChunkAtIndex && previousChunkAtIndex.version > chunk.version) {
// we have a newer version already, dropping the old one
return;
}
this.receivedChunks.set(index, chunk);
const currentProgress = this.totalChunkCount
? this.receivedChunks.size / this.totalChunkCount
: undefined;
this.onProgress?.(currentProgress);
}

/**
* Async iterator implementation to allow usage of `for await...of` syntax.
* Yields structured chunks from the stream.
*
*/
[Symbol.asyncIterator]() {
const reader = this.reader.getReader();
const decoder = new TextDecoder();

return {
next: async (): Promise<IteratorResult<TextStreamChunk>> => {
try {
const { done, value } = await reader.read();
if (done) {
return { done: true, value: undefined };
} else {
this.handleChunkReceived(value);
return {
done: false,
value: {
index: bigIntToNumber(value.chunkIndex),
current: decoder.decode(value.content),
collected: Array.from(this.receivedChunks.values())
.sort((a, b) => bigIntToNumber(a.chunkIndex) - bigIntToNumber(b.chunkIndex))
.map((chunk) => decoder.decode(chunk.content))
.join(''),
},
};
}
} catch (error) {
// TODO handle errors
log.error('error processing stream update', error);
return { done: true, value: undefined };
}
},

return(): IteratorResult<TextStreamChunk> {
reader.releaseLock();
return { done: true, value: undefined };
},
};
}

async readAll(): Promise<string> {
let latestString: string = '';
for await (const { collected } of this) {
latestString = collected;
}
return latestString;
}
}
29 changes: 29 additions & 0 deletions packages/livekit-rtc/src/data_streams/stream_writer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import type { WritableStream } from 'node:stream/web';

class BaseStreamWriter<T> {
protected writableStream: WritableStream<T>;

protected defaultWriter: WritableStreamDefaultWriter<T>;

protected onClose?: () => void;

constructor(writableStream: WritableStream<T>, onClose?: () => void) {
this.writableStream = writableStream;
this.defaultWriter = writableStream.getWriter();
this.onClose = onClose;
}

write(chunk: T): Promise<void> {
return this.defaultWriter.write(chunk);
}

async close() {
await this.defaultWriter.close();
this.defaultWriter.releaseLock();
this.onClose?.();
}
}

export class TextStreamWriter extends BaseStreamWriter<[string, number?]> {}

export class BinaryStreamWriter extends BaseStreamWriter<Uint8Array> {}
Loading
Loading