Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit c71d8d4

Browse files
committed
Allow data flow between QUIC agent and video agent.
1 parent 862fb24 commit c71d8d4

10 files changed

+97
-39
lines changed

source/agent/addons/internalIO/InternalClientWrapper.cc

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,23 +88,36 @@ NAN_METHOD(InternalClient::close) {
8888
obj->me = nullptr;
8989
}
9090

91-
NAN_METHOD(InternalClient::addDestination) {
92-
InternalClient* obj = ObjectWrap::Unwrap<InternalClient>(info.Holder());
93-
owt_base::InternalClient* me = obj->me;
91+
NAN_METHOD(InternalClient::addDestination)
92+
{
93+
InternalClient* obj = ObjectWrap::Unwrap<InternalClient>(info.Holder());
94+
owt_base::InternalClient* me = obj->me;
9495

95-
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
96-
std::string track = std::string(*param0);
96+
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
97+
std::string track = std::string(*param0);
9798

98-
FrameDestination* param =
99-
ObjectWrap::Unwrap<FrameDestination>(
100-
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
101-
owt_base::FrameDestination* dest = param->dest;
99+
bool isNanDestination(false);
100+
if (info.Length() >= 3) {
101+
isNanDestination = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value();
102+
}
102103

103-
if (track == "audio") {
104-
me->addAudioDestination(dest);
105-
} else if (track == "video") {
106-
me->addVideoDestination(dest);
107-
}
104+
owt_base::FrameDestination* dest(nullptr);
105+
if (isNanDestination) {
106+
NanFrameNode* param = Nan::ObjectWrap::Unwrap<NanFrameNode>(info[1]->ToObject());
107+
dest = param->FrameDestination();
108+
} else {
109+
FrameDestination* param = ObjectWrap::Unwrap<FrameDestination>(
110+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
111+
dest = param->dest;
112+
}
113+
114+
if (track == "audio") {
115+
me->addAudioDestination(dest);
116+
} else if (track == "video") {
117+
me->addVideoDestination(dest);
118+
} else if (track == "data") {
119+
me->addDataDestination(dest);
120+
}
108121
}
109122

110123
NAN_METHOD(InternalClient::removeDestination) {

source/agent/addons/internalIO/InternalServerWrapper.cc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,20 @@ NAN_METHOD(InternalServer::addSource) {
9999
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
100100
std::string streamId = std::string(*param0);
101101

102-
FrameSource* param =
103-
ObjectWrap::Unwrap<FrameSource>(
104-
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
105-
owt_base::FrameSource* src = param->src;
102+
bool isNanSource(false);
103+
if (info.Length() >= 3) {
104+
isNanSource = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value();
105+
}
106+
107+
owt_base::FrameSource* src(nullptr);
108+
if (isNanSource) {
109+
NanFrameNode* param = Nan::ObjectWrap::Unwrap<NanFrameNode>(info[1]->ToObject());
110+
src = param->FrameSource();
111+
} else {
112+
FrameSource* param = ObjectWrap::Unwrap<FrameSource>(
113+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
114+
src = param->src;
115+
}
106116

107117
me->addSource(streamId, src);
108118
}

source/agent/addons/internalIO/InternalServerWrapper.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ class InternalServer : public node::ObjectWrap,
4444
static NAN_METHOD(close);
4545

4646
static NAN_METHOD(getListeningPort);
47-
47+
// Arguments:
48+
// type: string, type of the source, "audio", "video" or "data".
49+
// source: A node addon object or NAN object.
50+
// isNanObject: indicates whether `source` is a NAN object.
4851
static NAN_METHOD(addSource);
4952

5053
static NAN_METHOD(removeSource);

source/agent/addons/quic/QuicTransportStream.cc

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66

77
#include "QuicTransportStream.h"
8+
#include "../common/MediaFramePipelineWrapper.h"
89

910
using v8::Function;
1011
using v8::FunctionTemplate;
@@ -113,15 +114,32 @@ NAN_METHOD(QuicTransportStream::close)
113114
NAN_METHOD(QuicTransportStream::addDestination)
114115
{
115116
QuicTransportStream* obj = Nan::ObjectWrap::Unwrap<QuicTransportStream>(info.Holder());
116-
if (info.Length() != 2) {
117+
if (info.Length() > 3) {
117118
Nan::ThrowTypeError("Invalid argument length for addDestination.");
118119
return;
119120
}
120-
// TODO: Check if info[0] is an Nan wrapped object.
121-
auto framePtr = Nan::ObjectWrap::Unwrap<QuicTransportStream>(info[1]->ToObject());
122-
// void* ptr = info[0]->ToObject()->GetAlignedPointerFromInternalField(0);
123-
// auto framePtr=static_cast<owt_base::FrameDestination*>(ptr);
124-
obj->addDataDestination(framePtr);
121+
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
122+
std::string track = std::string(*param0);
123+
bool isNanDestination(false);
124+
if (info.Length() == 3) {
125+
isNanDestination = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value();
126+
}
127+
owt_base::FrameDestination* dest(nullptr);
128+
if (isNanDestination) {
129+
NanFrameNode* param = Nan::ObjectWrap::Unwrap<NanFrameNode>(info[1]->ToObject());
130+
dest = param->FrameDestination();
131+
} else {
132+
::FrameDestination* param = node::ObjectWrap::Unwrap<::FrameDestination>(
133+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
134+
dest = param->dest;
135+
}
136+
if (track == "audio") {
137+
obj->addAudioDestination(dest);
138+
} else if (track == "video") {
139+
obj->addVideoDestination(dest);
140+
} else if (track == "data") {
141+
obj->addDataDestination(dest);
142+
}
125143
obj->m_isPiped = true;
126144
}
127145

@@ -153,6 +171,12 @@ void QuicTransportStream::MaybeReadContentSessionId()
153171
m_receivedContentSessionId = true;
154172
m_asyncOnContentSessionId.data = this;
155173
uv_async_send(&m_asyncOnContentSessionId);
174+
for (uint8_t d : m_contentSessionId) {
175+
if (d != 0) {
176+
m_isPiped = true;
177+
break;
178+
}
179+
}
156180
if (m_stream->ReadableBytes() > 0) {
157181
SignalOnData();
158182
}
@@ -215,10 +239,11 @@ void QuicTransportStream::SignalOnData()
215239
ReallocateBuffer(readableBytes);
216240
}
217241
owt_base::Frame frame;
218-
frame.format = owt_base::FRAME_FORMAT_DATA;
242+
frame.format = owt_base::FRAME_FORMAT_I420;
219243
frame.length = readableBytes;
220244
frame.payload = m_buffer;
221245
m_stream->Read(frame.payload, readableBytes);
246+
ELOG_ERROR("Delivery %d bytes.",frame.length);
222247
deliverFrame(frame);
223248
}
224249
}

source/agent/conference/conference.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1418,7 +1418,7 @@ var Conference = function (rpcClient, selfRpcId) {
14181418
}
14191419

14201420
if (subDesc.media.video && !subDesc.media.video.format) {
1421-
subDesc.media.video.format = {codec: 'h264'};
1421+
subDesc.media.video.format = {codec: 'h264_B'};
14221422
}
14231423

14241424
//FIXME: To support codecs other than those in the following list.

source/agent/conference/quicController.js

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,20 @@ class Transport {
3434

3535
class Operation {
3636
constructor(id, transport, direction, tracks, data) {
37-
if(tracks){
38-
throw new Error('QUIC agent does not support media stream tracks so far.');
39-
}
4037
this.id = id;
4138
this.transport = transport;
4239
this.transportId = transport.id;
4340
this.direction = direction;
41+
this.tracks = tracks;
4442
this.data = data;
4543
this.promise = Promise.resolve();
44+
this.tracks = this.tracks ? this.tracks.map(t => {
45+
if (t.type === 'video') {
46+
t.format = { codec : 'h264', profile : 'B' };
47+
}
48+
return t;
49+
})
50+
: undefined;
4651
}
4752
}
4853

@@ -168,6 +173,7 @@ class QuicController extends EventEmitter {
168173

169174
// Return Promise
170175
terminate(sessionId, direction, reason) {
176+
console.trace();
171177
log.debug(`terminate, sessionId: ${sessionId} direction: ${direction}, ${reason}`);
172178

173179
if (!this.operations.has(sessionId)) {

source/agent/connections.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,11 @@ module.exports = function Connections () {
124124
if (!dest) {
125125
return Promise.reject({ type : 'failed', reason : 'Destination connection(' + name + ') is not ready' });
126126
}
127-
connections[from].connection.addDestination(name, dest);
127+
let isNanObj=false;
128+
if (dest.constructor.name === 'QuicTransportStream'){
129+
isNanObj=true;
130+
}
131+
connections[from].connection.addDestination(name, dest, isNanObj);
128132
connections[connectionId][name + 'From'] = from;
129133
}
130134
}

source/agent/internalConnectionRouter.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,11 @@ class StreamSource {
107107
addDestination(track, sink) {
108108
if (!this.dests[track].has(sink.id)) {
109109
this.dests[track].set(sink.id, sink);
110-
this.conn.addDestination(track, sink.conn);
110+
let isNanObject = false;
111+
if (sink instanceof QuicTransportStreamPipeline) {
112+
isNanObject = true;
113+
}
114+
this.conn.addDestination(track, sink.conn, isNanObject);
111115
sink._addSource(track, this);
112116
}
113117
}

source/agent/quic/index.js

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,6 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
213213
}
214214
}
215215
log.debug('subscribe, connectionId:', connectionId, 'connectionType:', connectionType, 'options:', options);
216-
if(!options.data){
217-
log.error('Subscription request does not include data field.');
218-
}
219216
if (router.getConnection(connectionId)) {
220217
return callback('callback', {type: 'failed', reason: 'Connection already exists:'+connectionId});
221218
}

source/agent/quic/webtransport/quicTransportStreamPipeline.js

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,6 @@ module.exports = class QuicTransportStreamPipeline {
4646
};
4747

4848
this.receiver = function(kind) {
49-
if (kind !== 'data') {
50-
log.error('Unsupported receiver.');
51-
return null;
52-
}
5349
return this._quicStream;
5450
};
5551

0 commit comments

Comments
 (0)