Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 3b962df

Browse files
committed
Allow data flow between QUIC agent and video agent.
1 parent 862fb24 commit 3b962df

14 files changed

+189
-51
lines changed

doc/Client-Portal Protocol.md

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,14 +362,14 @@ This a format for client reconnects.
362362
```
363363
object(PublicationRequest)::
364364
{
365-
media: object(WebRTCMediaOptions) | null,
365+
media: object(WebRTCMediaOptions) | object(WebCodecsMediaOptions) | null,
366366
data: true | false,
367367
transport: object(TransportOptions),
368368
attributes: object(ClientDefinedAttributes) | null
369369
}
370370
```
371371

372-
A publication can send either media or data, but a QUIC *transport* channel can support multiple stream for both media and data. Setting `media:null` and `data:false` is meaningless, so it should be rejected by server. Protocol itself doesn't forbit to create WebRTC connection for data. However, SCTP data channel is not implemented at server side, so currently `data:true` is only support by QUIC transport channels.
372+
A publication can send either media or data. Setting `media:null` and `data:false` is meaningless, so it should be rejected by server. Protocol itself doesn't forbit to create WebRTC connection for data. However, SCTP data channel is not implemented at server side, so currently `data:true` is only support by WebTransport channels. When transport's type is "webrtc", `media` should be an object of `WebRTCMediaOptions`. When transport's type is "quic", `media` should be an object of `WebCodecsMediaOptions` or `null`.
373373

374374
```
375375
object(WebRTCMediaOptions)::
@@ -385,6 +385,20 @@ A publication can send either media or data, but a QUIC *transport* channel can
385385
}
386386
```
387387

388+
```
389+
object(WebCodecsMediaOptions)::
390+
{
391+
tracks: [
392+
{
393+
type: "audio" | "video",
394+
source: "mic" | "screen-cast" | ... | "encoded-file",
395+
format: object(AudioFormat) | object(VideoFormat)
396+
}
397+
]
398+
}
399+
}
400+
```
401+
388402
**ResponseData**: The PublicationResult object with following definition if **ResponseStatus** is “ok”:
389403

390404
object(PublicationResult)::

doc/design/quic-transport-payload-format.md renamed to doc/design/web-transport-payload-format.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,20 @@ After creating a WebTransport, a stream with session 0 should be created for aut
3939
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
4040
```
4141

42+
## Media Stream
43+
44+
When a WebTransport stream is used for transmitting data of a media stream track (e.g.: H.264 bitstream), a 32 (8+24) bit length header is added to indicates frame size.
45+
46+
```
47+
0 1 2 3
48+
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
49+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
50+
| Reserved | Message length |
51+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
52+
| Message ...
53+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
54+
```
55+
4256
## Authentication
4357

4458
If signaling messages are transmitted over WebTransport, authentication follows the regular process defined by [Client-Portal Protocol](https://github.com/open-webrtc-toolkit/owt-server/blob/master/doc/Client-Portal%20Protocol.md). Otherwise, client sends a token for WebTransport as a signaling message. WebTransport token is issued during joining a conference. If the token is valid, server sends a 128 bit length zeros to client.

source/agent/addons/internalIO/InternalClientWrapper.cc

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,23 +88,36 @@ NAN_METHOD(InternalClient::close) {
8888
obj->me = nullptr;
8989
}
9090

91-
NAN_METHOD(InternalClient::addDestination) {
92-
InternalClient* obj = ObjectWrap::Unwrap<InternalClient>(info.Holder());
93-
owt_base::InternalClient* me = obj->me;
91+
NAN_METHOD(InternalClient::addDestination)
92+
{
93+
InternalClient* obj = ObjectWrap::Unwrap<InternalClient>(info.Holder());
94+
owt_base::InternalClient* me = obj->me;
9495

95-
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
96-
std::string track = std::string(*param0);
96+
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
97+
std::string track = std::string(*param0);
9798

98-
FrameDestination* param =
99-
ObjectWrap::Unwrap<FrameDestination>(
100-
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
101-
owt_base::FrameDestination* dest = param->dest;
99+
bool isNanDestination(false);
100+
if (info.Length() >= 3) {
101+
isNanDestination = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value();
102+
}
102103

103-
if (track == "audio") {
104-
me->addAudioDestination(dest);
105-
} else if (track == "video") {
106-
me->addVideoDestination(dest);
107-
}
104+
owt_base::FrameDestination* dest(nullptr);
105+
if (isNanDestination) {
106+
NanFrameNode* param = Nan::ObjectWrap::Unwrap<NanFrameNode>(info[1]->ToObject());
107+
dest = param->FrameDestination();
108+
} else {
109+
FrameDestination* param = ObjectWrap::Unwrap<FrameDestination>(
110+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
111+
dest = param->dest;
112+
}
113+
114+
if (track == "audio") {
115+
me->addAudioDestination(dest);
116+
} else if (track == "video") {
117+
me->addVideoDestination(dest);
118+
} else if (track == "data") {
119+
me->addDataDestination(dest);
120+
}
108121
}
109122

110123
NAN_METHOD(InternalClient::removeDestination) {

source/agent/addons/internalIO/InternalServerWrapper.cc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,20 @@ NAN_METHOD(InternalServer::addSource) {
9999
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
100100
std::string streamId = std::string(*param0);
101101

102-
FrameSource* param =
103-
ObjectWrap::Unwrap<FrameSource>(
104-
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
105-
owt_base::FrameSource* src = param->src;
102+
bool isNanSource(false);
103+
if (info.Length() >= 3) {
104+
isNanSource = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value();
105+
}
106+
107+
owt_base::FrameSource* src(nullptr);
108+
if (isNanSource) {
109+
NanFrameNode* param = Nan::ObjectWrap::Unwrap<NanFrameNode>(info[1]->ToObject());
110+
src = param->FrameSource();
111+
} else {
112+
FrameSource* param = ObjectWrap::Unwrap<FrameSource>(
113+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
114+
src = param->src;
115+
}
106116

107117
me->addSource(streamId, src);
108118
}

source/agent/addons/internalIO/InternalServerWrapper.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ class InternalServer : public node::ObjectWrap,
4444
static NAN_METHOD(close);
4545

4646
static NAN_METHOD(getListeningPort);
47-
47+
// Arguments:
48+
// type: string, type of the source, "audio", "video" or "data".
49+
// source: A node addon object or NAN object.
50+
// isNanObject: indicates whether `source` is a NAN object.
4851
static NAN_METHOD(addSource);
4952

5053
static NAN_METHOD(removeSource);
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#ifndef QUIC_ADDON_MEDIA_FRAME_PACKETIZER_H_
2+
#define QUIC_ADDON_MEDIA_FRAME_PACKETIZER_H_
3+
4+
#endif

source/agent/addons/quic/QuicTransportStream.cc

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66

77
#include "QuicTransportStream.h"
8+
#include "../common/MediaFramePipelineWrapper.h"
89

910
using v8::Function;
1011
using v8::FunctionTemplate;
@@ -30,6 +31,9 @@ QuicTransportStream::QuicTransportStream(owt::quic::WebTransportStreamInterface*
3031
, m_isPiped(false)
3132
, m_buffer(nullptr)
3233
, m_bufferSize(0)
34+
, m_isMedia(false)
35+
, m_currentFrameSize(0)
36+
, m_readFrameSize(0)
3337
{
3438
}
3539

@@ -113,15 +117,32 @@ NAN_METHOD(QuicTransportStream::close)
113117
NAN_METHOD(QuicTransportStream::addDestination)
114118
{
115119
QuicTransportStream* obj = Nan::ObjectWrap::Unwrap<QuicTransportStream>(info.Holder());
116-
if (info.Length() != 2) {
120+
if (info.Length() > 3) {
117121
Nan::ThrowTypeError("Invalid argument length for addDestination.");
118122
return;
119123
}
120-
// TODO: Check if info[0] is an Nan wrapped object.
121-
auto framePtr = Nan::ObjectWrap::Unwrap<QuicTransportStream>(info[1]->ToObject());
122-
// void* ptr = info[0]->ToObject()->GetAlignedPointerFromInternalField(0);
123-
// auto framePtr=static_cast<owt_base::FrameDestination*>(ptr);
124-
obj->addDataDestination(framePtr);
124+
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
125+
std::string track = std::string(*param0);
126+
bool isNanDestination(false);
127+
if (info.Length() == 3) {
128+
isNanDestination = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value();
129+
}
130+
owt_base::FrameDestination* dest(nullptr);
131+
if (isNanDestination) {
132+
NanFrameNode* param = Nan::ObjectWrap::Unwrap<NanFrameNode>(info[1]->ToObject());
133+
dest = param->FrameDestination();
134+
} else {
135+
::FrameDestination* param = node::ObjectWrap::Unwrap<::FrameDestination>(
136+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
137+
dest = param->dest;
138+
}
139+
if (track == "audio") {
140+
obj->addAudioDestination(dest);
141+
} else if (track == "video") {
142+
obj->addVideoDestination(dest);
143+
} else if (track == "data") {
144+
obj->addDataDestination(dest);
145+
}
125146
obj->m_isPiped = true;
126147
}
127148

@@ -153,6 +174,13 @@ void QuicTransportStream::MaybeReadContentSessionId()
153174
m_receivedContentSessionId = true;
154175
m_asyncOnContentSessionId.data = this;
155176
uv_async_send(&m_asyncOnContentSessionId);
177+
for (uint8_t d : m_contentSessionId) {
178+
if (d != 0) {
179+
m_isPiped = true;
180+
m_isMedia = true;
181+
break;
182+
}
183+
}
156184
if (m_stream->ReadableBytes() > 0) {
157185
SignalOnData();
158186
}
@@ -211,15 +239,55 @@ void QuicTransportStream::SignalOnData()
211239

212240
while (m_stream->ReadableBytes() > 0) {
213241
auto readableBytes = m_stream->ReadableBytes();
214-
if (readableBytes > m_bufferSize) {
215-
ReallocateBuffer(readableBytes);
242+
ELOG_DEBUG("Readable bytes: %d", readableBytes);
243+
if (m_isMedia) {
244+
// A new frame.
245+
if (m_currentFrameSize == 0 && m_readFrameSize == 0) {
246+
if (readableBytes >= 2) {
247+
const int headerSize = 4; // In bytes.
248+
uint8_t* frameSizeArray = new uint8_t[headerSize];
249+
memset(frameSizeArray, 0, headerSize * sizeof(uint8_t));
250+
m_stream->Read((uint8_t*)frameSizeArray, headerSize);
251+
// Usually only the last 2 bytes are used. The first two bits could be used for indicating frame size.
252+
for (int i = 0; i < headerSize; i++) {
253+
m_currentFrameSize <<= 8;
254+
m_currentFrameSize += frameSizeArray[i];
255+
}
256+
if (m_currentFrameSize > m_bufferSize) {
257+
ReallocateBuffer(m_currentFrameSize);
258+
}
259+
}
260+
continue;
261+
}
262+
if (m_readFrameSize < m_currentFrameSize) {
263+
// Append data to current frame.
264+
size_t readBytes = std::min(readableBytes, m_currentFrameSize - m_readFrameSize);
265+
m_stream->Read(m_buffer + m_readFrameSize, readBytes);
266+
m_readFrameSize += readBytes;
267+
}
268+
// Complete frame.
269+
if (m_readFrameSize == m_currentFrameSize) {
270+
owt_base::Frame frame;
271+
frame.format = owt_base::FRAME_FORMAT_I420;
272+
frame.length = m_currentFrameSize;
273+
frame.payload = m_buffer;
274+
// Transport layer doesn't know a frame's type. Video agent is able to parse the type of a frame from bistream. However, video agent doesn't feed the frame to decoder when a key frame is requested.
275+
frame.additionalInfo.video.isKeyFrame = "key";
276+
deliverFrame(frame);
277+
m_currentFrameSize = 0;
278+
m_readFrameSize = 0;
279+
}
280+
} else {
281+
if (readableBytes > m_bufferSize) {
282+
ReallocateBuffer(readableBytes);
283+
}
284+
owt_base::Frame frame;
285+
frame.format = owt_base::FRAME_FORMAT_DATA;
286+
frame.length = readableBytes;
287+
frame.payload = m_buffer;
288+
m_stream->Read(frame.payload, readableBytes);
289+
deliverFrame(frame);
216290
}
217-
owt_base::Frame frame;
218-
frame.format = owt_base::FRAME_FORMAT_DATA;
219-
frame.length = readableBytes;
220-
frame.payload = m_buffer;
221-
m_stream->Read(frame.payload, readableBytes);
222-
deliverFrame(frame);
223291
}
224292
}
225293

source/agent/addons/quic/QuicTransportStream.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
#ifndef QUIC_QUICTRANSPORTSTREAM_H_
88
#define QUIC_QUICTRANSPORTSTREAM_H_
99

10-
#include "owt/quic/web_transport_stream_interface.h"
1110
#include "../../core/owt_base/MediaFramePipeline.h"
1211
#include "../common/MediaFramePipelineWrapper.h"
12+
#include "owt/quic/web_transport_stream_interface.h"
1313
#include <logger.h>
14-
#include <nan.h>
1514
#include <mutex>
15+
#include <nan.h>
1616
#include <string>
1717

1818
class QuicTransportStream : public owt_base::FrameSource, public owt_base::FrameDestination, public NanFrameNode, public owt::quic::WebTransportStreamInterface::Visitor {
@@ -37,7 +37,7 @@ class QuicTransportStream : public owt_base::FrameSource, public owt_base::Frame
3737
static NAN_METHOD(addDestination);
3838
static NAN_METHOD(removeDestination);
3939
static NAUV_WORK_CB(onContentSessionId);
40-
static NAUV_WORK_CB(onData); // TODO: Move to pipe.
40+
static NAUV_WORK_CB(onData); // TODO: Move to pipe.
4141

4242
// Overrides owt_base::FrameSource.
4343
void onFeedback(const owt_base::FeedbackMsg&) override;
@@ -72,6 +72,11 @@ class QuicTransportStream : public owt_base::FrameSource, public owt_base::Frame
7272
uint8_t* m_buffer;
7373
size_t m_bufferSize;
7474

75+
// Indicates whether this is a media stream. If this is not a media stream, it can only be piped to another QUIC agent.
76+
bool m_isMedia;
77+
size_t m_currentFrameSize;
78+
size_t m_readFrameSize;
79+
7580
uv_async_t m_asyncOnContentSessionId;
7681
uv_async_t m_asyncOnData;
7782
};

source/agent/conference/conference.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1418,7 +1418,7 @@ var Conference = function (rpcClient, selfRpcId) {
14181418
}
14191419

14201420
if (subDesc.media.video && !subDesc.media.video.format) {
1421-
subDesc.media.video.format = {codec: 'h264'};
1421+
subDesc.media.video.format = {codec: 'h264_B'};
14221422
}
14231423

14241424
//FIXME: To support codecs other than those in the following list.

source/agent/conference/quicController.js

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,20 @@ class Transport {
3434

3535
class Operation {
3636
constructor(id, transport, direction, tracks, data) {
37-
if(tracks){
38-
throw new Error('QUIC agent does not support media stream tracks so far.');
39-
}
4037
this.id = id;
4138
this.transport = transport;
4239
this.transportId = transport.id;
4340
this.direction = direction;
41+
this.tracks = tracks;
4442
this.data = data;
4543
this.promise = Promise.resolve();
44+
this.tracks = this.tracks ? this.tracks.map(t => {
45+
if (t.type === 'video') {
46+
t.format = { codec : 'h264', profile : 'B' };
47+
}
48+
return t;
49+
})
50+
: undefined;
4651
}
4752
}
4853

@@ -168,6 +173,7 @@ class QuicController extends EventEmitter {
168173

169174
// Return Promise
170175
terminate(sessionId, direction, reason) {
176+
console.trace();
171177
log.debug(`terminate, sessionId: ${sessionId} direction: ${direction}, ${reason}`);
172178

173179
if (!this.operations.has(sessionId)) {

source/agent/connections.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,11 @@ module.exports = function Connections () {
124124
if (!dest) {
125125
return Promise.reject({ type : 'failed', reason : 'Destination connection(' + name + ') is not ready' });
126126
}
127-
connections[from].connection.addDestination(name, dest);
127+
let isNanObj=false;
128+
if (dest.constructor.name === 'QuicTransportStream'){
129+
isNanObj=true;
130+
}
131+
connections[from].connection.addDestination(name, dest, isNanObj);
128132
connections[connectionId][name + 'From'] = from;
129133
}
130134
}

source/agent/internalConnectionRouter.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,11 @@ class StreamSource {
107107
addDestination(track, sink) {
108108
if (!this.dests[track].has(sink.id)) {
109109
this.dests[track].set(sink.id, sink);
110-
this.conn.addDestination(track, sink.conn);
110+
let isNanObject = false;
111+
if (sink instanceof QuicTransportStreamPipeline) {
112+
isNanObject = true;
113+
}
114+
this.conn.addDestination(track, sink.conn, isNanObject);
111115
sink._addSource(track, this);
112116
}
113117
}

0 commit comments

Comments
 (0)