Skip to content

Commit

Permalink
Merge pull request #284 from gingernaz/sfu-multiplex-data-channels
Browse files Browse the repository at this point in the history
Add DataChannel multiplex support to PS2
  • Loading branch information
mcottontensor authored Sep 24, 2024
2 parents b96db92 + 01650e8 commit c7653a0
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Common/protobuf/signalling_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ message offer {
optional string playerId = 3;
// Indiates that this offer is coming from an SFU.
optional bool sfu = 4;
//Indicates that the streamer is multiplexing data channels
optional bool multiplex = 5;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion Common/src/Protocol/SignallingProtocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import * as MessageHelpers from '../Messages/message_helpers';
*/
export class SignallingProtocol extends EventEmitter {
static get SIGNALLING_VERSION(): string {
return '1.2.0';
return '1.2.1';
}

// The transport in use by this protocol object.
Expand Down
167 changes: 163 additions & 4 deletions SFU/sfu_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,133 @@ let signalServer = null;
let mediasoupRouter;
let streamer = null;
let peers = new Map();
let dataRouter;

async function createDataRouter()
{
const MULTIPLEX_MESSAGE_ID = 199; // ID | 2 byte length | PlayerId | Original message
const CHANNEL_RELAY_STATUS_MESSAGE_ID = 198; // ID | 2 byte length | PlayerId | 1 byte flag

if (!mediasoupRouter)
{
console.error('cannot initialize direct transport, router is undefined');
throw new Error('mediasoupRouter is undefined');
}
const transport = await mediasoupRouter.createDirectTransport({maxMessageSize: 262144});

let streamerProducer;
const producers = {};

function createMultiplexHeader(playerId) {
const byteLength = 1 + 2 + playerId.length * 2;
const buffer = Buffer.alloc(byteLength);
let byteOffset = 0;
buffer.writeUInt8(MULTIPLEX_MESSAGE_ID, byteOffset);
byteOffset++;
buffer.writeUInt16LE(playerId.length * 2, byteOffset);
byteOffset += 2;
for (let i = 0; i < playerId.length; i++) {
buffer.writeUInt16LE(playerId.charCodeAt(i), byteOffset);
byteOffset += 2;
}
return buffer;
}

function parseMultiplexHeader(message)
{
const type = message.readUInt8(0);
if (type !== MULTIPLEX_MESSAGE_ID) {
console.log("Received non multiplexed message type [%d]", type);
return {
playerId: ""
};
}
const length = message.readUint16LE(1);
const headerEnd = length + 3;
return {
playerId: new TextDecoder("utf-16").decode(message.subarray(3, headerEnd)),
buffer: message.subarray(headerEnd, message.length)
}
}

function createRelayStatusMessage(playerId, status)
{
const byteLength = 1 + 2 + playerId.length * 2 + 1;
const buffer = Buffer.alloc(byteLength);
let byteOffset = 0;
buffer.writeUInt8(CHANNEL_RELAY_STATUS_MESSAGE_ID, byteOffset);
byteOffset++;
buffer.writeUInt16LE(playerId.length * 2, byteOffset);
byteOffset += 2;
for (let i = 0; i < playerId.length; i++) {
buffer.writeUInt16LE(playerId.charCodeAt(i), byteOffset);
byteOffset += 2;
}
buffer.writeUInt8(status, byteOffset);
return buffer;
}

async function handleStreamer(dataProducer) {
const consumer = await transport.consumeData({dataProducerId: dataProducer.id});
consumer.on('message', (message, ppid) =>
{
const relayMessage = parseMultiplexHeader(message);
if (relayMessage.playerId !== "" && producers.hasOwnProperty(relayMessage.playerId)) {
producers[relayMessage.playerId].send(relayMessage.buffer, 53);
}
});

dataProducer.on('close', () => {
streamerProducer.close();
streamerProducer = undefined;
});

streamerProducer = await transport.produceData({ label: 'streamer-producer' });
return streamerProducer.id;
}

async function handlePlayer(dataProducer, playerId) {
streamerProducer.send(createRelayStatusMessage(playerId, 1), 53);

const consumer = await transport.consumeData({dataProducerId: dataProducer.id});
consumer.on('message', (message, ppid) =>
{
if (streamerProducer) {
const relayMessage = Buffer.concat([createMultiplexHeader(playerId), message]);
streamerProducer.send(relayMessage, 53);
}
});
const playerProducer = await transport.produceData({ label: 'player-producer' });
producers[playerId] = playerProducer;

dataProducer.on('close', () => {
producers[playerId].close();
delete producers[playerId];
streamerProducer.send(createRelayStatusMessage(playerId, 0), 53);
});

return playerProducer.id;
}

return {
handleStreamer,
handlePlayer
};
}

function connectSignalling(server) {
console.log("Connecting to Signalling Server at %s", server);
signalServer = new WebSocket(server);
signalServer.addEventListener("open", _ => { console.log(`Connected to signalling server`); });
signalServer.addEventListener("error", result => { console.log(`Error: ${result.message}`); });
signalServer.addEventListener("message", result => onSignallingMessage(result.data));
signalServer.addEventListener("close", result => {
signalServer.addEventListener("close", result => {
onStreamerDisconnected();
console.log(`Disconnected from signalling server: ${result.code} ${result.reason}`);
console.log("Attempting reconnect to signalling server...");
setTimeout(()=> {
setTimeout(()=> {
connectSignalling(server);
}, 2000);
}, 2000);
});
}

Expand Down Expand Up @@ -69,12 +182,13 @@ async function onStreamerOffer(msg) {
const transport = await createWebRtcTransport("Streamer");
const sdpEndpoint = mediasoupSdp.createSdpEndpoint(transport, mediasoupRouter.rtpCapabilities);
const producers = await sdpEndpoint.processOffer(msg.sdp, msg.scalabilityMode ? msg.scalabilityMode : "L1T1");
const multiplex = msg.multiplex;
const sdpAnswer = sdpEndpoint.createAnswer();
const answer = { type: "answer", sdp: sdpAnswer };

console.log("Sending answer to streamer.");
signalServer.send(JSON.stringify(answer));
streamer = { transport: transport, producers: producers };
streamer = { transport: transport, producers: producers, multiplexChannels: multiplex};
}

function getNextStreamerSCTPId() {
Expand Down Expand Up @@ -153,6 +267,12 @@ async function setupPeerDataChannels(peerId) {
return;
}

if (streamer.multiplexChannels)
{
await setupMultiplexPeerDataChannels(peer);
return;
}

const nextStreamerSCTPStreamId = getNextStreamerSCTPId();
const nextPeerSCTPStreamId = getNextStreamerSCTPId();

Expand Down Expand Up @@ -187,7 +307,31 @@ async function setupPeerDataChannels(peerId) {

}

async function setupMultiplexPeerDataChannels(peer) {
//this will be always 0 as we are using only one producer
const nextPeerSCTPStreamId = peer.transport._getNextSctpStreamId();
peer.peerDataProducer = await peer.transport.produceData({label: 'send-datachannel', sctpStreamParameters: {streamId: nextPeerSCTPStreamId, ordered: true}});

const dataProducerId = await dataRouter.handlePlayer(peer.peerDataProducer, peer.id);
peer.peerDataConsumer = await peer.transport.consumeData({dataProducerId});
//TODO: Why exactly do we need to do this?
peer.transport._sctpStreamIds[nextPeerSCTPStreamId] = 1;
console.log('peerProducerId %s, peerConsumerId %s', peer.peerDataProducer.id, peer.peerDataConsumer.id);

const peerSignal = {
type: 'peerDataChannels',
playerId: peer.id,
sendStreamId: peer.peerDataProducer.sctpStreamParameters.streamId,
recvStreamId: peer.peerDataConsumer.sctpStreamParameters.streamId
};
signalServer.send(JSON.stringify(peerSignal));
}

async function setupStreamerDataChannelsForPeer(peerId) {
if (streamer.multiplexChannels)
{
return;
}

const peer = peers.get(peerId);
if (!peer) {
Expand Down Expand Up @@ -325,6 +469,20 @@ async function onICEStateChange(identifier, iceState) {
console.log("%s ICE state changed to %s", identifier, iceState);

if (identifier == 'Streamer' && iceState == 'completed') {
if (streamer.multiplexChannels) {
const nextStreamerSCTPStreamId = streamer.transport._getNextSctpStreamId();
//this will always be 0 since we are using one producer only
console.log(`Attempting streamer SCTP id=${nextStreamerSCTPStreamId}`);

const producer = await streamer.transport.produceData({
label: 'send-datachannel',
sctpStreamParameters: {streamId: nextStreamerSCTPStreamId, ordered: true}
});
streamer.transport._sctpStreamIds[nextStreamerSCTPStreamId] = 1;
const dataProducerId = await dataRouter.handleStreamer(producer);
const streamerDataConsumer = await streamer.transport.consumeData({dataProducerId});
console.log('Setting up sctp for the streamer, producer sctp id %s, consumer sctp id %s', producer.sctpStreamParameters.streamId, streamerDataConsumer.sctpStreamParameters.streamId);
}
signalServer.send(JSON.stringify({type: 'startStreaming'}));
}
}
Expand Down Expand Up @@ -363,6 +521,7 @@ async function main() {
console.log(config);

mediasoupRouter = await startMediasoup();
dataRouter = await createDataRouter();

connectSignalling(config.signallingURL);
}
Expand Down

0 comments on commit c7653a0

Please sign in to comment.