diff --git a/docs/mdfiles/changelog.md b/docs/mdfiles/changelog.md index 7ddd9a61..2a80d1d6 100644 --- a/docs/mdfiles/changelog.md +++ b/docs/mdfiles/changelog.md @@ -6,6 +6,8 @@ Change Log * Add a new property `rtpTransceivers` to `TransportSettings` and `TransportConstraints`. * Add a new property `peerConnection` to `ConferenceClient`. * The second argument of `ConferenceClient.publish` could be a list of `RTCRtpTransceiver`s. +* Add support to publish a MediaStream over WebTransport. + # 5.0 * Add WebTransport support for conference mode, see [this design doc](../../design/webtransport.md) for detailed information. * All publications and subscriptions for the same conference use the same `PeerConnection`. diff --git a/docs/mdfiles/index.md b/docs/mdfiles/index.md index 1dedfeff..b88d7595 100644 --- a/docs/mdfiles/index.md +++ b/docs/mdfiles/index.md @@ -142,6 +142,8 @@ WebTransport is supported in conference mode as an experimental feature. QUIC ag - [JavaScript SDK design doc for WebTransport support](https://github.com/open-webrtc-toolkit/owt-client-javascript/blob/master/docs/design/webtransport.md) - [QUIC programming guide for OWT server](https://github.com/open-webrtc-toolkit/owt-server/blob/master/doc/design/quic-programming-guide.md) +Publishing a MediaStream over WebTransport requires an additional worker for I/O. The worker is a standalone ES module, not included in owt.js. As we're moving the SDK from traditional JavaScript script to ES module, there is no plan to support this worker in old browsers. + # 7 Events The JavaScript objects fires events using `Owt.Base.EventDispatchers`. For more detailed events, please refer to the specific class description page. diff --git a/scripts/Gruntfile.js b/scripts/Gruntfile.js index 5672e0c1..c03daeaf 100644 --- a/scripts/Gruntfile.js +++ b/scripts/Gruntfile.js @@ -76,6 +76,10 @@ window.L = L;\n\ watch: true }, }, + worker:{ + src: ['dist/sdk/conference/webtransport/media-worker.js'], + dest: 'dist/sdk/media-worker.js', + }, sinon: { src: ['node_modules/sinon/lib/sinon.js'], dest: 'test/unit/resources/scripts/gen/sinon-browserified.js', @@ -102,7 +106,15 @@ window.L = L;\n\ options: { base: '.', port: 7080, - keepalive: true + keepalive: true, + middleware: function(connect, options, middlewares) { + middlewares.unshift((req, res, next) => { + res.setHeader('Cross-Origin-Embedder-Policy', 'require-corp'); + res.setHeader('Cross-Origin-Opener-Policy', 'same-origin'); + next(); + }); + return middlewares; + } }, }, }, @@ -189,7 +201,8 @@ window.L = L;\n\ {expand: true,cwd:'src/extension/',src:['**'],dest:'dist/',flatten:false}, {expand: true,cwd:'dist/sdk/',src:['owt.js'],dest:'dist/samples/conference/public/scripts/',flatten:false}, {expand: true,cwd:'dist/samples/conference/public/scripts',src:['rest.js'],dest:'dist/samples/conference/',flatten:false}, - {expand: true,cwd:'dist/sdk/',src:['owt.js'],dest:'dist/samples/p2p/js/',flatten:false} + {expand: true,cwd:'dist/sdk/',src:['owt.js'],dest:'dist/samples/p2p/js/',flatten:false}, + {expand: true,cwd: 'dist/sdk/',src: ['media-worker.js'],dest: 'dist/samples/conference/public/scripts/',flatten: false}, ] } }, @@ -267,8 +280,8 @@ window.L = L;\n\ grunt.registerTask('check', ['eslint:src']); grunt.registerTask('prepare', ['browserify:sinon', 'browserify:chai_as_promised']); - grunt.registerTask('pack', ['browserify:dist', 'concat:rest', 'uglify:dist', 'copy:dist', 'string-replace', 'compress:dist', 'jsdoc:dist']); - grunt.registerTask('dev', ['browserify:dev', 'connect:server']); + grunt.registerTask('pack', ['browserify:dist', 'browserify:worker', 'concat:rest', 'uglify:dist', 'copy:dist', 'string-replace', 'compress:dist', 'jsdoc:dist']); + grunt.registerTask('dev', ['browserify:dev', 'browserify:worker', 'connect:server']); grunt.registerTask('debug', ['browserify:dev']); grunt.registerTask('default', ['check', 'pack']); }; diff --git a/src/samples/conference/public/quic.html b/src/samples/conference/public/quic.html index 077f6637..8fdb15e8 100644 --- a/src/samples/conference/public/quic.html +++ b/src/samples/conference/public/quic.html @@ -23,20 +23,26 @@

Open WebRTC Toolkit

-

Sample of QuicTransport

+

Sample of WebTransport

-

This sample works with the latest Chrome.

+

This sample works with the Chrome >= 97.

+ + +
+
+
- + + - + diff --git a/src/samples/conference/public/scripts/quic.js b/src/samples/conference/public/scripts/quic.js index 1bd437a0..1397238b 100644 --- a/src/samples/conference/public/scripts/quic.js +++ b/src/samples/conference/public/scripts/quic.js @@ -2,42 +2,82 @@ // // SPDX-License-Identifier: Apache-2.0 +/* eslint-disable require-jsdoc */ + 'use strict'; let quicChannel = null; let bidirectionalStream = null; -let writeTask; -const conference=new Owt.Conference.ConferenceClient(); +let bidiAudioStream = null; +let writeTask, dataWorker, conferenceId, myId, mixedStream, generatorWriter, + mediaPublication; +const isMedia = true; + +window.Module={}; + +const conference = new Owt.Conference.ConferenceClient( + { + webTransportConfiguration: { + serverCertificateFingerprints: [{ + value: + 'FD:CD:87:EB:92:97:84:FD:D9:E9:C1:9F:AF:57:12:0E:32:AF:0D:C0:58:5F:33:BB:59:4A:2E:6E:C3:18:7A:93', + algorithm: 'sha-256', + }], + serverCertificateHashes: [{ + value: new Uint8Array([ + 0xFD, 0xCD, 0x87, 0xEB, 0x92, 0x97, 0x84, 0xFD, 0xD9, 0xE9, 0xC1, + 0x9F, 0xAF, 0x57, 0x12, 0x0E, 0x32, 0xAF, 0x0D, 0xC0, 0x58, 0x5F, + 0x33, 0xBB, 0x59, 0x4A, 0x2E, 0x6E, 0xC3, 0x18, 0x7A, 0x93 + ]), + algorithm: 'sha-256', + }], + } + }, + '../../../sdk/conference/webtransport'); conference.addEventListener('streamadded', async (event) => { console.log(event.stream); - if (event.stream.source.data) { - const subscription = await conference.subscribe(event.stream); - const reader = subscription.stream.readable.getReader(); - while (true) { - const {value, done} = await reader.read(); - if (done) { - console.log('Subscription ends.'); - break; - } - console.log('Received data: '+value); - } + if (event.stream.origin == myId) { + mixStream( + conferenceId, event.stream.id, 'common', + 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001'); } + // if (event.stream.source.data) { + // const subscription = await conference.subscribe( + // event.stream, + // // {transport:{type: 'quic'}}); + // {audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}}); + // const reader = subscription.stream.readable.getReader(); + // while (true) { + // const {value, done} = await reader.read(); + // if (done) { + // console.log('Subscription ends.'); + // break; + // } + // //console.log('Received data: ' + value); + // } + // } }); function updateConferenceStatus(message) { document.getElementById('conference-status').innerHTML += - ('

' + message + '

'); + ('

' + message + '

'); } - function joinConference() { return new Promise((resolve, reject) => { - createToken(undefined, 'user', 'presenter', resp => { - conference.join(resp).then(() => { + createToken(undefined, 'user', 'presenter', token => { + conference.join(token).then((info) => { + conferenceId = info.id; + myId = info.self.id; + for (const stream of info.remoteStreams) { + if (stream.source.video === 'mixed') { + mixedStream = stream; + } + } updateConferenceStatus('Connected to conference server.'); resolve(); }); - }); + }, 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001'); }); }; @@ -55,17 +95,26 @@ function createRandomContentSessionId() { return id; } +async function attachReader(stream) { + const reader = stream.readable.getReader(); + while (true) { + const {value, done} = await reader.read(); + if (done) { + console.log('Ends.'); + break; + } + console.log('Received data: ' + value); + } +} + async function createSendChannel() { bidirectionalStream = await conference.createSendStream(); - const localStream=new Owt.Base.LocalStream(bidirectionalStream, new Owt.Base.StreamSourceInfo(undefined, undefined,true)); - const publication = await conference.publish(localStream); - console.log(publication); updateConferenceStatus('Created send channel.'); } async function windowOnLoad() { await joinConference(); - await createSendChannel(); + //await createSendChannel(); } async function writeUuid() { @@ -84,7 +133,9 @@ async function writeData() { const encoded = encoder.encode('message', {stream: true}); const writer = bidirectionalStream.writable.getWriter(); await writer.ready; - await writer.write(new ArrayBuffer(2)); + const ab = new Uint8Array(10000); + ab.fill(1, 0); + await writer.write(ab); writer.releaseLock(); return; } @@ -94,16 +145,45 @@ window.addEventListener('load', () => { }); document.getElementById('start-sending').addEventListener('click', async () => { - if (!bidirectionalStream) { - updateConferenceStatus('Stream is not created.'); - return; + if (isMedia) { + const mediaStream = + await navigator.mediaDevices.getUserMedia({audio: true, video: true}); + const localStream = new Owt.Base.LocalStream( + mediaStream, new Owt.Base.StreamSourceInfo('mic', 'camera', undefined)); + mediaPublication = await conference.publish(localStream, { + audio: {codec: 'opus', numberOfChannels: 2, sampleRate: 48000}, + video: {codec: 'h264'}, + transport: {type: 'quic'}, + }); + } else { + if (!bidirectionalStream) { + updateConferenceStatus('Stream is not created.'); + return; + } + writeTask = setInterval(writeData, 200); } - await writeUuid(); - writeTask = setInterval(writeData, 2000); updateConferenceStatus('Started sending.'); }); document.getElementById('stop-sending').addEventListener('click', () => { - clearInterval(writeTask); + if (isMedia) { + if (mediaPublication) { + mediaPublication.stop(); + } + } else { + clearInterval(writeTask); + } updateConferenceStatus('Stopped sending.'); }); + +document.getElementById('start-receiving') + .addEventListener('click', async () => { + subscribeMixedStream(); + }); + +async function subscribeMixedStream() { + const subscription = await conference.subscribe( + mixedStream, + {audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}}); + document.getElementById('remote-video').srcObject = subscription.stream; +} diff --git a/src/sdk/base/logger.js b/src/sdk/base/logger.js index 2b2ba22e..504c65ed 100644 --- a/src/sdk/base/logger.js +++ b/src/sdk/base/logger.js @@ -26,7 +26,7 @@ // This file is borrowed from lynckia/licode with some modifications. -/* global window */ +/* global console */ 'use strict'; @@ -55,13 +55,13 @@ const Logger = (function() { }; that.log = (...args) => { - window.console.log((new Date()).toISOString(), ...args); + console.log((new Date()).toISOString(), ...args); }; const bindType = function(type) { - if (typeof window.console[type] === 'function') { + if (typeof console[type] === 'function') { return (...args) => { - window.console[type]((new Date()).toISOString(), ...args); + console[type]((new Date()).toISOString(), ...args); }; } else { return that.log; diff --git a/src/sdk/base/publication.js b/src/sdk/base/publication.js index 7ead125e..a1b4e8bc 100644 --- a/src/sdk/base/publication.js +++ b/src/sdk/base/publication.js @@ -195,17 +195,29 @@ export class PublishOptions { // eslint-disable-next-line require-jsdoc constructor(audio, video, transport) { /** - * @member {?Array | ?Array} audio + * @member {?Array | + * ?Array | ?AudioEncoderConfig } audio * @instance * @memberof Owt.Base.PublishOptions - * @desc Parameters for audio RtpSender. Publishing with RTCRtpEncodingParameters is an experimental feature. It is subject to change. + * @desc Parameters for audio RtpSender when transport's type is 'webrtc' or + * configuration of audio encoder when transport's type is 'quic'. + * Publishing with RTCRtpEncodingParameters is an experimental feature. It + * is subject to change. + * @see {@link https://www.w3.org/TR/webrtc/#rtcrtpencodingparameters|RTCRtpEncodingParameters} + * @see {@link https://w3c.github.io/webcodecs/#dictdef-audioencoderconfig|AudioEncoderConfig} */ this.audio = audio; /** - * @member {?Array | ?Array} video + * @member {?Array | + * ?Array | ?VideoEncoderConfig } video * @instance * @memberof Owt.Base.PublishOptions - * @desc Parameters for video RtpSender. Publishing with RTCRtpEncodingParameters is an experimental feature. It is subject to change. + * @desc Parameters for video RtpSender when transport's type is 'webrtc' or + * configuration of video encoder when transport's type is 'quic'. + * Publishing with RTCRtpEncodingParameters is an experimental feature. It + * is subject to change. + * @see {@link https://www.w3.org/TR/webrtc/#rtcrtpencodingparameters|RTCRtpEncodingParameters} + * @see {@link https://w3c.github.io/webcodecs/#dictdef-videoencoderconfig|VideoEncoderConfig} */ this.video = video; /** diff --git a/src/sdk/base/transport.js b/src/sdk/base/transport.js index 3bb5a79a..863e2c39 100644 --- a/src/sdk/base/transport.js +++ b/src/sdk/base/transport.js @@ -29,7 +29,9 @@ export class TransportConstraints { * @member {Array.} type * @instance * @memberof Owt.Base.TransportConstraints - * @desc Transport type for publication and subscription. + * @desc Transport type for publication and subscription. 'quic' is only + * supported in conference mode when WebTransport is supported by client and + * enabled at server side. */ this.type = type; /** diff --git a/src/sdk/conference/channel.js b/src/sdk/conference/channel.js index 168df389..f3e75590 100644 --- a/src/sdk/conference/channel.js +++ b/src/sdk/conference/channel.js @@ -898,6 +898,7 @@ export class ConferencePeerConnectionChannel extends EventDispatcher { _createPeerConnection() { if (this.pc) { + Logger.warning('PeerConnection exists.'); return; } diff --git a/src/sdk/conference/client.js b/src/sdk/conference/client.js index 35ee3581..67af63ac 100644 --- a/src/sdk/conference/client.js +++ b/src/sdk/conference/client.js @@ -16,7 +16,7 @@ import * as StreamModule from '../base/stream.js'; import {Participant} from './participant.js'; import {ConferenceInfo} from './info.js'; import {ConferencePeerConnectionChannel} from './channel.js'; -import {QuicConnection} from './quicconnection.js'; +import {QuicConnection} from './webtransport/connection.js'; import {RemoteMixedStream, ActiveAudioInputChangeEvent, LayoutChangeEvent} from './mixedstream.js'; import * as StreamUtilsModule from './streamutils.js'; @@ -117,9 +117,10 @@ class ConferenceClientConfiguration { // eslint-disable-line no-unused-vars * @extends Owt.Base.EventDispatcher * @constructor * @param {?Owt.Conference.ConferenceClientConfiguration } config Configuration for ConferenceClient. + * @param {string} workerDir Path of the directory for workers shipped with OWT SDK. It could be an relative path to your HTML file or an absolute path. * @param {?Owt.Conference.SioSignaling } signalingImpl Signaling channel implementation for ConferenceClient. SDK uses default signaling channel implementation if this parameter is undefined. Currently, a Socket.IO signaling channel implementation was provided as ics.conference.SioSignaling. However, it is not recommended to directly access signaling channel or customize signaling channel for ConferenceClient as this time. */ -export const ConferenceClient = function(config, signalingImpl) { +export const ConferenceClient = function(config, workerDir, signalingImpl) { Object.setPrototypeOf(this, new EventModule.EventDispatcher()); config = config || {}; const self = this; @@ -160,8 +161,7 @@ export const ConferenceClient = function(config, signalingImpl) { if (notification === 'soac' || notification === 'progress') { if (channels.has(data.id)) { channels.get(data.id).onMessage(notification, data); - } else if (quicTransportChannel && quicTransportChannel - .hasContentSessionId(data.id)) { + } else if (quicTransportChannel) { quicTransportChannel.onMessage(notification, data); } else { Logger.warning('Cannot find a channel for incoming data.'); @@ -439,7 +439,8 @@ export const ConferenceClient = function(config, signalingImpl) { if (typeof WebTransport === 'function' && token.webTransportUrl) { quicTransportChannel = new QuicConnection( token.webTransportUrl, resp.webTransportToken, - createSignalingForChannel(), config.webTransportConfiguration); + createSignalingForChannel(), config.webTransportConfiguration, + workerDir); } const conferenceInfo = new ConferenceInfo( resp.room.id, Array.from(participants.values()), @@ -464,7 +465,7 @@ export const ConferenceClient = function(config, signalingImpl) { * @instance * @desc Publish a LocalStream to conference server. Other participants will be able to subscribe this stream when it is successfully published. * @param {Owt.Base.LocalStream} stream The stream to be published. - * @param {(Owt.Base.PublishOptions|RTCRtpTransceiver[])} options If options is a PublishOptions, the stream will be published as options specified. If options is a list of RTCRtpTransceivers, each track in the first argument must have a corresponding RTCRtpTransceiver here, and the track will be published with the RTCRtpTransceiver associated with it. + * @param {(Owt.Base.PublishOptions|RTCRtpTransceiver[])} options If options is a PublishOptions, the stream will be published as options specified. If options is a list of RTCRtpTransceivers, each track in the first argument must have a corresponding RTCRtpTransceiver here, and the track will be published with the RTCRtpTransceiver associated with it. If the type of transport is quic, PublishOptions.audio should be AudioEncoderConfig, and PublishOptions.video should be VideoEncoderConfig. * @param {string[]} videoCodecs Video codec names for publishing. Valid values are 'VP8', 'VP9' and 'H264'. This parameter only valid when the second argument is PublishOptions and options.video is RTCRtpEncodingParameters. Publishing with RTCRtpEncodingParameters is an experimental feature. This parameter is subject to change. * @return {Promise} Returned promise will be resolved with a newly created Publication once specific stream is successfully published, or rejected with a newly created Error if stream is invalid or options cannot be satisfied. Successfully published means PeerConnection is established and server is able to process media data. */ @@ -472,8 +473,8 @@ export const ConferenceClient = function(config, signalingImpl) { if (!(stream instanceof StreamModule.LocalStream)) { return Promise.reject(new ConferenceError('Invalid stream.')); } - if (stream.source.data) { - return quicTransportChannel.publish(stream); + if (options?.transport?.type === 'quic') { + return quicTransportChannel.publish(stream, options, videoCodecs); } if (publishChannels.has(stream.mediaStream.id)) { return Promise.reject(new ConferenceError( @@ -501,13 +502,16 @@ export const ConferenceClient = function(config, signalingImpl) { 'Invalid source info. A remote stream is either a data stream or ' + 'a media stream.')); } + } + if (options?.transport?.type === 'quic') { if (quicTransportChannel) { - return quicTransportChannel.subscribe(stream); + return quicTransportChannel.subscribe(stream, options); } else { return Promise.reject(new TypeError('WebTransport is not supported.')); } + } else { + return peerConnectionChannel.subscribe(stream, options); } - return peerConnectionChannel.subscribe(stream, options); }; /** @@ -555,5 +559,9 @@ export const ConferenceClient = function(config, signalingImpl) { } return quicTransportChannel.createSendStream(); }; + + this.datagramReader = function() { + return quicTransportChannel.datagramReader(); + }; } }; diff --git a/src/sdk/conference/quicconnection.js b/src/sdk/conference/quicconnection.js deleted file mode 100644 index 638c4963..00000000 --- a/src/sdk/conference/quicconnection.js +++ /dev/null @@ -1,250 +0,0 @@ -// Copyright (C) <2018> Intel Corporation -// -// SPDX-License-Identifier: Apache-2.0 - -/* eslint-disable require-jsdoc */ -/* global Promise, Map, WebTransport, Uint8Array, Uint32Array, TextEncoder */ - -'use strict'; - -import Logger from '../base/logger.js'; -import {EventDispatcher} from '../base/event.js'; -import {Publication} from '../base/publication.js'; -import {Subscription} from './subscription.js'; -import {Base64} from '../base/base64.js'; - -/** - * @class QuicConnection - * @classDesc A channel for a QUIC transport between client and conference - * server. - * @hideconstructor - * @private - */ -export class QuicConnection extends EventDispatcher { - // `tokenString` is a base64 string of the token object. It's in the return - // value of `ConferenceClient.join`. - constructor(url, tokenString, signaling, webTransportOptions) { - super(); - this._tokenString = tokenString; - this._token = JSON.parse(Base64.decodeBase64(tokenString)); - this._signaling = signaling; - this._ended = false; - this._quicStreams = new Map(); // Key is publication or subscription ID. - this._quicTransport = new WebTransport(url, webTransportOptions); - this._subscribePromises = new Map(); // Key is subscription ID. - this._transportId = this._token.transportId; - this._initReceiveStreamReader(); - } - - /** - * @function onMessage - * @desc Received a message from conference portal. Defined in client-server - * protocol. - * @param {string} notification Notification type. - * @param {object} message Message received. - * @private - */ - onMessage(notification, message) { - switch (notification) { - case 'progress': - if (message.status === 'soac') { - this._soacHandler(message.data); - } else if (message.status === 'ready') { - this._readyHandler(); - } else if (message.status === 'error') { - this._errorHandler(message.data); - } - break; - case 'stream': - this._onStreamEvent(message); - break; - default: - Logger.warning('Unknown notification from MCU.'); - } - } - - async init() { - await this._authenticate(this._tokenString); - } - - async _initReceiveStreamReader() { - const receiveStreamReader = - this._quicTransport.incomingBidirectionalStreams.getReader(); - Logger.info('Reader: ' + receiveStreamReader); - let receivingDone = false; - while (!receivingDone) { - const {value: receiveStream, done: readingReceiveStreamsDone} = - await receiveStreamReader.read(); - Logger.info('New stream received'); - if (readingReceiveStreamsDone) { - receivingDone = true; - break; - } - const chunkReader = receiveStream.readable.getReader(); - const {value: uuid, done: readingChunksDone} = await chunkReader.read(); - if (readingChunksDone) { - Logger.error('Stream closed unexpectedly.'); - return; - } - if (uuid.length != 16) { - Logger.error('Unexpected length for UUID.'); - return; - } - chunkReader.releaseLock(); - const subscriptionId = this._uint8ArrayToUuid(uuid); - this._quicStreams.set(subscriptionId, receiveStream); - if (this._subscribePromises.has(subscriptionId)) { - const subscription = - this._createSubscription(subscriptionId, receiveStream); - this._subscribePromises.get(subscriptionId).resolve(subscription); - } - } - } - - _createSubscription(id, receiveStream) { - // TODO: Incomplete subscription. - const subscription = new Subscription(id, () => { - receiveStream.abortReading(); - }); - subscription.stream = receiveStream; - return subscription; - } - - async _authenticate(token) { - await this._quicTransport.ready; - const quicStream = await this._quicTransport.createBidirectionalStream(); - const chunkReader = quicStream.readable.getReader(); - const writer = quicStream.writable.getWriter(); - await writer.ready; - // 128 bit of zero indicates this is a stream for signaling. - writer.write(new Uint8Array(16)); - // Send token as described in - // https://github.com/open-webrtc-toolkit/owt-server/blob/20e8aad216cc446095f63c409339c34c7ee770ee/doc/design/quic-transport-payload-format.md. - const encoder = new TextEncoder(); - const encodedToken = encoder.encode(token); - writer.write(Uint32Array.of(encodedToken.length)); - writer.write(encodedToken); - // Server returns transport ID as an ack. Ignore it here. - await chunkReader.read(); - Logger.info('Authentication success.'); - } - - async createSendStream() { - await this._quicTransport.ready; - const quicStream = await this._quicTransport.createBidirectionalStream(); - return quicStream; - } - - async createSendStream1(sessionId) { - Logger.info('Create stream.'); - await this._quicTransport.ready; - // TODO: Potential failure because of publication stream is created faster - // than signaling stream(created by the 1st call to initiatePublication). - const publicationId = await this._initiatePublication(); - const quicStream = await this._quicTransport.createSendStream(); - const writer = quicStream.writable.getWriter(); - await writer.ready; - writer.write(this._uuidToUint8Array(publicationId)); - writer.releaseLock(); - this._quicStreams.set(publicationId, quicStream); - return quicStream; - } - - async publish(stream) { - // TODO: Avoid a stream to be published twice. The first 16 bit data send to - // server must be it's publication ID. - // TODO: Potential failure because of publication stream is created faster - // than signaling stream(created by the 1st call to initiatePublication). - const publicationId = await this._initiatePublication(); - const quicStream = stream.stream; - const writer = quicStream.writable.getWriter(); - await writer.ready; - writer.write(this._uuidToUint8Array(publicationId)); - writer.releaseLock(); - Logger.info('publish id'); - this._quicStreams.set(publicationId, quicStream); - const publication = new Publication(publicationId, () => { - this._signaling.sendSignalingMessage('unpublish', {id: publication}) - .catch((e) => { - Logger.warning('MCU returns negative ack for unpublishing, ' + e); - }); - } /* TODO: getStats, mute, unmute is not implemented */); - return publication; - } - - hasContentSessionId(id) { - return this._quicStreams.has(id); - } - - _uuidToUint8Array(uuidString) { - if (uuidString.length != 32) { - throw new TypeError('Incorrect UUID.'); - } - const uuidArray = new Uint8Array(16); - for (let i = 0; i < 16; i++) { - uuidArray[i] = parseInt(uuidString.substring(i * 2, i * 2 + 2), 16); - } - return uuidArray; - } - - _uint8ArrayToUuid(uuidBytes) { - let s = ''; - for (const hex of uuidBytes) { - const str = hex.toString(16); - s += str.padStart(2, '0'); - } - return s; - } - - subscribe(stream) { - const p = new Promise((resolve, reject) => { - this._signaling - .sendSignalingMessage('subscribe', { - media: null, - data: {from: stream.id}, - transport: {type: 'quic', id: this._transportId}, - }) - .then((data) => { - if (this._quicStreams.has(data.id)) { - // QUIC stream created before signaling returns. - const subscription = this._createSubscription( - data.id, this._quicStreams.get(data.id)); - resolve(subscription); - } else { - this._quicStreams.set(data.id, null); - // QUIC stream is not created yet, resolve promise after getting - // QUIC stream. - this._subscribePromises.set( - data.id, {resolve: resolve, reject: reject}); - } - }); - }); - return p; - } - - _readAndPrint() { - this._quicStreams[0].waitForReadable(5).then(() => { - const data = new Uint8Array(this._quicStreams[0].readBufferedAmount); - this._quicStreams[0].readInto(data); - Logger.info('Read data: ' + data); - this._readAndPrint(); - }); - } - - async _initiatePublication() { - const data = await this._signaling.sendSignalingMessage('publish', { - media: null, - data: true, - transport: {type: 'quic', id: this._transportId}, - }); - if (this._transportId !== data.transportId) { - throw new Error('Transport ID not match.'); - } - return data.id; - } - - _readyHandler() { - // Ready message from server is useless for QuicStream since QuicStream has - // its own status. Do nothing here. - } -} diff --git a/src/sdk/conference/subscription.js b/src/sdk/conference/subscription.js index 16676f24..84ff99f7 100644 --- a/src/sdk/conference/subscription.js +++ b/src/sdk/conference/subscription.js @@ -104,7 +104,7 @@ export class AudioSubscriptionConstraints { * @member {?Array.} codecs * @instance * @memberof Owt.Conference.AudioSubscriptionConstraints - * @desc Codecs accepted. If none of `codecs` supported by both sides, connection fails. Leave it undefined will use all possible codecs. + * @desc Codecs accepted. Please only include 1 item if transport is "quic". For "webrtc" transport, if none of `codecs` supported by both sides, connection fails. Leave it undefined will use all possible codecs. */ this.codecs = codecs; } @@ -124,7 +124,7 @@ export class VideoSubscriptionConstraints { * @member {?Array.} codecs * @instance * @memberof Owt.Conference.VideoSubscriptionConstraints - * @desc Codecs accepted. If none of `codecs` supported by both sides, connection fails. Leave it undefined will use all possible codecs. + * @desc Codecs accepted. Please only include 1 item if transport is "quic". For "webrtc" transport, if none of `codecs` supported by both sides, connection fails. Leave it undefined will use all possible codecs. */ this.codecs = codecs; /** diff --git a/src/sdk/conference/webtransport/connection.js b/src/sdk/conference/webtransport/connection.js new file mode 100644 index 00000000..fdfcd668 --- /dev/null +++ b/src/sdk/conference/webtransport/connection.js @@ -0,0 +1,629 @@ +// Copyright (C) <2018> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +/* eslint-disable require-jsdoc */ +/* global Promise, Map, WebTransport, WebTransportBidirectionalStream, + Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor, + MediaStreamTrackGenerator, proto */ + +'use strict'; + +import Logger from '../../base/logger.js'; +import {EventDispatcher} from '../../base/event.js'; +import {Publication} from '../../base/publication.js'; +import {Subscription} from '../subscription.js'; +import {Base64} from '../../base/base64.js'; + +const uuidByteLength = 16; + +/** + * @class QuicConnection + * @classDesc A channel for a QUIC transport between client and conference + * server. + * @hideconstructor + * @private + */ +export class QuicConnection extends EventDispatcher { + // `tokenString` is a base64 string of the token object. It's in the return + // value of `ConferenceClient.join`. + constructor(url, tokenString, signaling, webTransportOptions, workerDir) { + super(); + this._tokenString = tokenString; + this._token = JSON.parse(Base64.decodeBase64(tokenString)); + this._signaling = signaling; + this._ended = false; + // Key is publication or subscription ID, value is a list of streams. + this._quicDataStreams = new Map(); + // Key is MediaStreamTrack ID, value is a bidirectional stream. + this._quicMediaStreamTracks = new Map(); + this._quicTransport = new WebTransport(url, webTransportOptions); + this._subscribePromises = new Map(); // Key is subscription ID. + this._subscribeOptions = new Map(); // Key is subscription ID. + this._subscriptionInfoReady = + new Map(); // Key is subscription ID, value is a promise. + // Key is subscription ID, value is an object with audio and video RTP + // configs. + this._rtpConfigs = new Map(); + this._transportId = this._token.transportId; + this._initReceiveStreamReader(); + this._worker = new Worker(workerDir + '/media-worker.js', {type: 'module'}); + this._initHandlersForWorker(); + // Key is subscription ID, value is a MediaStreamTrackGenerator writer. + this._mstVideoGeneratorWriters = new Map(); + this._initRtpModule(); + this._initDatagramReader(); + } + + /** + * @function onMessage + * @desc Received a message from conference portal. Defined in client-server + * protocol. + * @param {string} notification Notification type. + * @param {object} message Message received. + * @private + */ + onMessage(notification, message) { + switch (notification) { + case 'progress': + if (message.status === 'soac') { + this._soacHandler(message.data); + } else if (message.status === 'ready') { + this._readyHandler(); + } else if (message.status === 'error') { + this._errorHandler(message.data); + } else if (message.status === 'rtp') { + this._rtpHandler(message); + } + break; + case 'stream': + this._onStreamEvent(message); + break; + default: + Logger.warning('Unknown notification from MCU.'); + } + } + + async init() { + await this._authenticate(this._tokenString); + } + + _initRtpModule() { + this._worker.postMessage(['init-rtp']); + } + + async _initReceiveStreamReader() { + const receiveStreamReader = + this._quicTransport.incomingBidirectionalStreams.getReader(); + let receivingDone = false; + while (!receivingDone) { + const {value: receiveStream, done: readingReceiveStreamsDone} = + await receiveStreamReader.read(); + Logger.debug('New stream received.'); + const subscriptionIdBytes = new Uint8Array(uuidByteLength); + let subscriptionIdBytesOffset = 0; + const trackIdBytes = new Uint8Array(uuidByteLength); + let trackIdBytesOffset = 0; + if (readingReceiveStreamsDone) { + receivingDone = true; + break; + } + // Use BYOB reader when it's supported to avoid copy. See + // https://github.com/w3c/webtransport/issues/131. Issue tracker: + // https://crbug.com/1182905. + const chunkReader = receiveStream.readable.getReader(); + const readingChunksDone = false; + let readingHeaderDone = false; + let mediaStream = false; + let subscriptionId; + while (!readingChunksDone && !readingHeaderDone) { + const {value, done: readingChunksDone} = await chunkReader.read(); + let valueOffset = 0; + if (subscriptionIdBytesOffset < uuidByteLength) { + const copyLength = Math.min( + uuidByteLength - subscriptionIdBytesOffset, + value.byteLength - valueOffset); + subscriptionIdBytes.set( + value.subarray(valueOffset, valueOffset + copyLength), + subscriptionIdBytesOffset); + subscriptionIdBytesOffset += copyLength; + valueOffset += copyLength; + if (subscriptionIdBytesOffset < uuidByteLength) { + continue; + } + subscriptionId = + this._uint8ArrayToUuid(new Uint8Array(subscriptionIdBytes)); + if (!this._subscribeOptions.has(subscriptionId)) { + Logger.debug('Subscribe options is not ready.'); + const p = new Promise((resolve) => { + this._subscriptionInfoReady.set(subscriptionId, resolve); + }); + await p; + this._subscriptionInfoReady.delete(subscriptionId); + } + const subscribeOptions = this._subscribeOptions.get(subscriptionId); + if (subscribeOptions.audio || subscribeOptions.video) { + mediaStream = true; + } + if (!mediaStream) { + readingHeaderDone = true; + if (copyLength < value.byteLength) { + Logger.warning( + 'Potential data lose. Expect to be fixed when BYOB reader ' + + 'is supported.'); + } + continue; + } + } + if (valueOffset < value.byteLength) { + const copyLength = Math.min( + uuidByteLength - trackIdBytesOffset, + value.byteLength - valueOffset); + trackIdBytes.set( + value.subarray(valueOffset, valueOffset + copyLength), + trackIdBytesOffset); + trackIdBytesOffset += copyLength; + valueOffset += copyLength; + if (trackIdBytesOffset < uuidByteLength) { + continue; + } + const trackId = this._uint8ArrayToUuid(trackIdBytes); + Logger.debug(`WebTransport stream for subscription ID ${ + subscriptionId} and track ID ${ + trackId} is ready to receive data.`); + } + if (readingChunksDone) { + Logger.error('Stream closed unexpectedly.'); + return; + } + } + chunkReader.releaseLock(); + this._quicDataStreams.set(subscriptionId, [receiveStream]); + if (this._subscribePromises.has(subscriptionId)) { + const subscription = + this._createSubscription(subscriptionId, receiveStream); + this._subscribePromises.get(subscriptionId).resolve(subscription); + } + } + } + + async _initDatagramReader() { + const datagramReader = this._quicTransport.datagrams.readable.getReader(); + let receivingDone = false; + while (!receivingDone) { + const {value: datagram, done: readingDatagramsDone} = + await datagramReader.read(); + this._worker.postMessage(['rtp-packet', datagram]); + if (readingDatagramsDone) { + receivingDone = true; + } + } + } + + _createSubscription(id, receiveStream) { + // TODO: Incomplete subscription. + const subscription = new Subscription(id, () => { + receiveStream.abortReading(); + }); + subscription.stream = receiveStream; + return subscription; + } + + async _authenticate(token) { + await this._quicTransport.ready; + const quicStream = await this._quicTransport.createBidirectionalStream(); + const chunkReader = quicStream.readable.getReader(); + const writer = quicStream.writable.getWriter(); + await writer.ready; + // 128 bit of zero indicates this is a stream for signaling. + writer.write(new Uint8Array(16)); + // Send token as described in + // https://github.com/open-webrtc-toolkit/owt-server/blob/20e8aad216cc446095f63c409339c34c7ee770ee/doc/design/quic-transport-payload-format.md. + const encoder = new TextEncoder(); + const encodedToken = encoder.encode(token); + writer.write(Uint32Array.of(encodedToken.length)); + writer.write(encodedToken); + // Server returns transport ID as an ack. Ignore it here. + await chunkReader.read(); + Logger.info('Authentication success.'); + } + + async createSendStream() { + await this._quicTransport.ready; + const quicStream = await this._quicTransport.createBidirectionalStream(); + return quicStream; + } + + async bindFeedbackReader(stream, publicationId) { + // The receiver side of a publication stream starts with a UUID of + // publication ID, then each feedback message has a 4 bytes header indicates + // its length, and followed by protobuf encoded body. + const feedbackChunkReader = stream.readable.getReader(); + let feedbackChunksDone = false; + let publicationIdOffset = 0; + const headerSize=4; + const header = new Uint8Array(headerSize); + let headerOffset = 0; + let bodySize = 0; + let bodyOffset = 0; + let bodyBytes; + while (!feedbackChunksDone) { + let valueOffset=0; + const {value, done} = await feedbackChunkReader.read(); + Logger.debug(value); + while (valueOffset < value.byteLength) { + if (publicationIdOffset < uuidByteLength) { + // TODO: Check publication ID matches. For now, we just skip this ID. + const readLength = + Math.min(uuidByteLength - publicationIdOffset, value.byteLength); + valueOffset += readLength; + publicationIdOffset += readLength; + } + if (headerOffset < headerSize) { + // Read header. + const copyLength = Math.min( + headerSize - headerOffset, value.byteLength - valueOffset); + if (copyLength === 0) { + continue; + } + header.set( + value.subarray(valueOffset, valueOffset + copyLength), + headerOffset); + headerOffset += copyLength; + valueOffset += copyLength; + if (headerOffset < headerSize) { + continue; + } + bodySize = 0; + bodyOffset = 0; + for (let i = 0; i < headerSize; i++) { + bodySize += (header[i] << ((headerSize - 1 - i) * 8)); + } + bodyBytes = new Uint8Array(bodySize); + Logger.debug('Body size ' + bodySize); + } + if (bodyOffset < bodySize) { + const copyLength = + Math.min(bodySize - bodyOffset, value.byteLength - valueOffset); + if (copyLength === 0) { + continue; + } + Logger.debug('Bytes for body: '+copyLength); + bodyBytes.set( + value.subarray(valueOffset, valueOffset + copyLength), + bodyOffset); + bodyOffset += copyLength; + valueOffset += copyLength; + if (valueOffset < bodySize) { + continue; + } + // Decode body. + const feedback = + proto.owt.protobuf.Feedback.deserializeBinary(bodyBytes); + this.handleFeedback(feedback, publicationId); + } + } + if (done) { + feedbackChunksDone = true; + break; + } + } + } + + async handleFeedback(feedback, publicationId) { + Logger.debug( + 'Key frame request type: ' + + proto.owt.protobuf.Feedback.Type.KEY_FRAME_REQUEST); + if (feedback.getType() === + proto.owt.protobuf.Feedback.Type.KEY_FRAME_REQUEST) { + this._worker.postMessage( + ['rtcp-feedback', ['key-frame-request', publicationId]]); + } else { + Logger.warning('Unrecognized feedback type ' + feedback.getType()); + } + } + + async publish(stream, options) { + // TODO: Avoid a stream to be published twice. The first 16 bit data send to + // server must be it's publication ID. + // TODO: Potential failure because of publication stream is created faster + // than signaling stream(created by the 1st call to initiatePublication). + const publicationId = await this._initiatePublication(stream, options); + const quicStreams = []; + if (stream.stream instanceof WebTransportBidirectionalStream) { + quicStreams.push(stream.stream); + this._quicDataStreams.set(publicationId, stream.streams); + } else if (stream.stream instanceof MediaStream) { + if (typeof MediaStreamTrackProcessor === 'undefined') { + throw new TypeError( + 'MediaStreamTrackProcessor is not supported by your browser.'); + } + for (const track of stream.stream.getTracks()) { + const quicStream = + await this._quicTransport.createBidirectionalStream(); + this.bindFeedbackReader(quicStream, publicationId); + this._quicMediaStreamTracks.set(track.id, quicStream); + quicStreams.push(quicStream); + } + } else { + throw new TypeError('Invalid stream.'); + } + for (const quicStream of quicStreams) { + const writer = quicStream.writable.getWriter(); + await writer.ready; + writer.write(this._uuidToUint8Array(publicationId)); + writer.releaseLock(); + } + if (stream.stream instanceof MediaStream) { + for (const track of stream.stream.getTracks()) { + let encoderConfig; + if (track.kind === 'audio') { + encoderConfig = { + codec: 'opus', + numberOfChannels: 1, + sampleRate: 48000, + }; + } else if (track.kind === 'video') { + encoderConfig = { + codec: 'avc1.4d002a', + width: 640, + height: 480, + framerate: 30, + latencyMode: 'realtime', + avc: {format: 'annexb'}, + }; + } + const quicStream = this._quicMediaStreamTracks.get(track.id); + const processor = new MediaStreamTrackProcessor(track); + this._worker.postMessage( + [ + 'media-sender', + [ + publicationId, + track.id, + track.kind, + processor.readable, + quicStream.writable, + encoderConfig, + ], + ], + [processor.readable, quicStream.writable]); + } + } + const publication = new Publication(publicationId, () => { + this._signaling.sendSignalingMessage('unpublish', {id: publication}) + .catch((e) => { + Logger.warning( + 'Server returns negative ack for unpublishing, ' + e); + }); + } /* TODO: getStats, mute, unmute is not implemented */); + return publication; + } + + hasContentSessionId(id) { + return this._quicDataStreams.has(id); + } + + _uuidToUint8Array(uuidString) { + if (uuidString.length != 32) { + throw new TypeError('Incorrect UUID.'); + } + const uuidArray = new Uint8Array(16); + for (let i = 0; i < 16; i++) { + uuidArray[i] = parseInt(uuidString.substring(i * 2, i * 2 + 2), 16); + } + return uuidArray; + } + + _uint8ArrayToUuid(uuidBytes) { + let s = ''; + for (const hex of uuidBytes) { + const str = hex.toString(16); + s += str.padStart(2, '0'); + } + return s; + } + + async subscribe(stream, options) { + // TODO: Combine this with channel.js. + if (options === undefined) { + options = { + audio: !!stream.settings.audio, + video: !!stream.settings.video, + }; + } + if (typeof options !== 'object') { + return Promise.reject(new TypeError('Options should be an object.')); + } + if (options.audio === undefined) { + options.audio = !!stream.settings.audio; + } + if (options.video === undefined) { + options.video = !!stream.settings.video; + } + let mediaOptions; + let dataOptions; + if (options.audio || options.video) { + mediaOptions = {tracks: []}; + dataOptions = undefined; + if (options.audio) { + const trackOptions = {type: 'audio', from: stream.id}; + if (typeof options.audio !== 'object' || + !Array.isArray(options.audio.codecs) || + options.audio.codecs.length !== 1) { + return Promise.reject(new TypeError( + 'Audio codec is expect to be a list with one item.')); + } + mediaOptions.tracks.push(trackOptions); + } + if (options.video) { + const trackOptions = {type: 'video', from: stream.id}; + if (typeof options.video !== 'object' || + !Array.isArray(options.video.codecs) || + options.video.codecs.length !== 1) { + return Promise.reject(new TypeError( + 'Video codec is expect to be a list with one item.')); + } + if (options.video.resolution || options.video.frameRate || + (options.video.bitrateMultiplier && + options.video.bitrateMultiplier !== 1) || + options.video.keyFrameInterval) { + trackOptions.parameters = { + resolution: options.video.resolution, + framerate: options.video.frameRate, + bitrate: options.video.bitrateMultiplier ? + 'x' + options.video.bitrateMultiplier.toString() : + undefined, + keyFrameInterval: options.video.keyFrameInterval, + }; + } + mediaOptions.tracks.push(trackOptions); + } + } else { + // Data stream. + mediaOptions = null; + dataOptions = {from: stream.id}; + } + const p = new Promise((resolve, reject) => { + this._signaling + .sendSignalingMessage('subscribe', { + media: mediaOptions, + data: dataOptions, + transport: {type: 'quic', id: this._transportId}, + }) + .then((data) => { + this._subscribeOptions.set(data.id, options); + if (dataOptions) { + // A WebTransport stream is associated with a subscription for + // data. + if (this._quicDataStreams.has(data.id)) { + // QUIC stream created before signaling returns. + // TODO: Update subscription to accept list of QUIC streams. + const subscription = this._createSubscription( + data.id, this._quicDataStreams.get(data.id)[0]); + resolve(subscription); + } else { + this._quicDataStreams.set(data.id, null); + // QUIC stream is not created yet, resolve promise after getting + // QUIC stream. + this._subscribePromises.set( + data.id, {resolve: resolve, reject: reject}); + } + } else { + // A MediaStream is associated with a subscription for media. + // Media packets are received over WebTransport datagram. + const generators = []; + for (const track of mediaOptions.tracks) { + const generator = + new MediaStreamTrackGenerator({kind: track.type}); + generators.push(generator); + // TODO: Update key with the correct SSRC. + this._mstVideoGeneratorWriters.set( + data.id, generator.writable.getWriter()); + } + const mediaStream = new MediaStream(generators); + const subscription = + this._createSubscription(data.id, mediaStream); + this._worker.postMessage([ + 'add-subscription', + [ + subscription.id, options, + this._rtpConfigs.get(subscription.id), + ], + ]); + resolve(subscription); + } + if (this._subscriptionInfoReady.has(data.id)) { + this._subscriptionInfoReady.get(data.id)(); + } + }); + }); + return p; + } + + async _initiatePublication(stream, options) { + const media = {tracks: []}; + if (stream.source.audio) { + if (!options.audio) { + throw new TypeError( + 'Options for audio is missing. Publish audio track with ' + + 'WebTransport must have AudioEncoderConfig specified.'); + } + const track = { + from: stream.id, + source: stream.source.audio, + type: 'audio', + format: { + codec: options.audio.codec, + sampleRate: options.audio.sampleRate, + channelNum: options.audio.numberOfChannels, + }, + }; + media.tracks.push(track); + } + if (stream.source.video) { + if (!options.video) { + throw new TypeError( + 'Options for audio is missing. Publish video track with ' + + 'WebTransport must have VideoEncoderConfig specified.'); + } + const track = { + from: stream.id, + source: stream.source.video, + type: 'video', + // TODO: convert from MIME type to the format required by server. + format: { + codec: 'h264', + profile: 'B', + }, + }; + media.tracks.push(track); + } + const resp = await this._signaling.sendSignalingMessage('publish', { + media: stream.source.data ? null : media, + data: stream.source.data, + transport: {type: 'quic', id: this._transportId}, + }); + if (this._transportId !== resp.transportId) { + throw new Error('Transport ID not match.'); + } + return resp.id; + } + + _readyHandler() { + // Ready message from server is useless for QuicStream since QuicStream has + // its own status. Do nothing here. + } + + _rtpHandler(message) { + Logger.debug(`RTP config: ${JSON.stringify(message.data)}.`); + this._rtpConfigs.set(message.id, message.data); + } + + datagramReader() { + return this._quicTransport.datagrams.readable.getReader(); + } + + async _sendRtcp(buffer) { + const writer = this._quicTransport.datagrams.writable.getWriter(); + await writer.ready; + writer.write(buffer); + writer.releaseLock(); + } + + _initHandlersForWorker() { + this._worker.onmessage = ((e) => { + const [command, args] = e.data; + switch (command) { + case 'video-frame': + this._mstVideoGeneratorWriters.get(args[0]).write(args[1]); + break; + case 'rtcp-packet': + this._sendRtcp(...args); + break; + default: + Logger.warn('Unrecognized command ' + command); + } + }); + } +} diff --git a/src/sdk/conference/webtransport/media-worker.js b/src/sdk/conference/webtransport/media-worker.js new file mode 100644 index 00000000..85cf358a --- /dev/null +++ b/src/sdk/conference/webtransport/media-worker.js @@ -0,0 +1,242 @@ +// Copyright (C) <2021> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +/* eslint-disable require-jsdoc */ +/* global AudioEncoder, VideoEncoder, VideoDecoder, Map, ArrayBuffer, + Uint8Array, DataView, console, EncodedVideoChunk, postMessage */ + +// TODO: Use relative path instead. +import initModule from '/src/samples/conference/public/scripts/owt.js'; + +// Key is MediaStreamTrack ID, value is AudioEncoder or VideoEncoder. +const encoders = new Map(); +// Key is MediaStreamTrack ID, value is WritableStreamDefaultWriter. +const writers = new Map(); +// Key is publication ID, value is bool indicates whether key frame is requested +// for its video track. +const keyFrameRequested = new Map(); + +let wasmModule; +let mediaSession; +// Key is SSRC, value is an RTP receiver. +const rtpReceivers = new Map(); +let frameBuffer; +let videoDecoder; +// 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. +const sizePrefix = 4; + +// Timestamp of the first frame. +let startTime = 0; + +/* Messages it accepts: + * media-sender: [Publication ID, MediaStreamTrack ID, MediaStreamTrack kind, + * MediaStreamTrackProcessor readable, WebTransportStream writable, + * AudioEncoderConfig/VideoEncoderConfig] + */ +// eslint-disable-next-line no-undef +onmessage = async (e) => { + const [command, args] = e.data; + switch (command) { + case 'media-sender': + initMediaSender(...args); + break; + case 'rtcp-feedback': + await handleFeedback(...args); + break; + case 'init-rtp': + await initRtpModule(); + break; + case 'rtp-packet': + await handleRtpPacket(args); + break; + case 'add-subscription': + addNewSubscription(...args); + break; + default: + console.warn('Unrecognized command ' + command); + } +}; + +async function initMediaSender( + publicationId, trackId, trackKind, trackReadable, sendStreamWritable, + config) { + let encoder; + const writer = sendStreamWritable.getWriter(); + if (trackKind === 'audio') { + encoder = initAudioEncoder(config, writer); + } else { // Video. + encoder = initVideoEncoder(config, writer); + keyFrameRequested[publicationId] = false; + } + encoders.set(trackId, encoder); + writers.set(trackId, writer); + readMediaData(trackReadable, encoder, publicationId); + writeTrackId(trackKind, writer); +} + +async function initRtpModule() { + wasmModule = await fetchWasm(); +} + +async function fetchWasm() { + const owtWasmModule = {}; + initModule(owtWasmModule); + await owtWasmModule.ready; + return owtWasmModule; +} + +async function handleFeedback(feedback, publicationId) { + if (feedback === 'key-frame-request') { + console.log('Setting key frame request flag.'); + keyFrameRequested[publicationId] = true; + } +} + +async function videoOutput(writer, chunk, metadata) { + // TODO: Combine audio and video output callback. + if (!frameBuffer || frameBuffer.byteLength < chunk.byteLength + sizePrefix) { + frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); + } + const bufferView = new Uint8Array(frameBuffer, sizePrefix); + chunk.copyTo(bufferView); + const dataView = new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); + dataView.setUint32(0, chunk.byteLength); + await writer.ready; + await writer.write(dataView); +} + +function videoError(error) { + console.error('Video encode error: ' + error.message); +} + +async function audioOutput(writer, chunk, metadata) { + if (!frameBuffer || frameBuffer.byteLength < chunk.byteLength + sizePrefix) { + frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); + } + const bufferView = new Uint8Array(frameBuffer, sizePrefix); + chunk.copyTo(bufferView); + const dataView = new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); + dataView.setUint32(0, chunk.byteLength); + await writer.ready; + await writer.write(dataView); +} + +function audioError(error) { + console.error(`Audio encode error: ${error.message}`); +} + +async function writeTrackId(kind, writer) { + const id = new Uint8Array(16); + id[15] = (kind === 'audio' ? 1 : 2); + await writer.ready; + writer.write(id); +} + +function initAudioEncoder(config, writer) { + const audioEncoder = new AudioEncoder( + {output: audioOutput.bind(null, writer), error: audioError}); + // TODO: Respect config. + audioEncoder.configure( + {codec: 'opus', numberOfChannels: 1, sampleRate: 48000}); + return audioEncoder; +} + +function initVideoEncoder(config, writer) { + const videoEncoder = new VideoEncoder( + {output: videoOutput.bind(null, writer), error: videoError}); + // TODO: Respect config. + videoEncoder.configure({ + codec: 'avc1.4d002a', + width: 640, + height: 480, + framerate: 30, + latencyMode: 'realtime', + avc: {format: 'annexb'}, + }); + return videoEncoder; +} + +function initVideoDecoder(subscriptionId) { + videoDecoder = new VideoDecoder({ + output: videoFrameOutputCallback.bind(null, subscriptionId), + error: webCodecsErrorCallback, + }); + videoDecoder.configure({codec: 'avc1.42400a', optimizeForLatency: true}); +} + +function videoFrameOutputCallback(subscriptionId, frame) { + // eslint-disable-next-line no-undef + postMessage(['video-frame', [subscriptionId, frame]], [frame]); + frame.close(); +} + +function webCodecsErrorCallback(error) { + console.warn('error: ' + error.message); +} + +// Read data from media track. +async function readMediaData(trackReadable, encoder, publicationId) { + const reader = trackReadable.getReader(); + // eslint-disable-next-line no-constant-condition + while (true) { + const {value, done} = await reader.read(); + if (done) { + console.debug('MediaStream ends.'); + break; + } + if (keyFrameRequested.get(publicationId)) { + console.debug(typeof encoder + ' encode a key frame.'); + encoder.encode(value, {keyFrame: true}); + keyFrameRequested[publicationId] = false; + } else { + encoder.encode(value); + } + value.close(); + } +} + +function getSsrc(packet) { + // SSRC starts from the 65th bit, in network order. + return new DataView(packet.buffer).getUint32(8, false); +} + +async function handleRtpPacket(packet) { + const ssrc = getSsrc(packet); + const buffer = wasmModule._malloc(packet.byteLength); + wasmModule.writeArrayToMemory(packet, buffer); + if (!rtpReceivers.has(ssrc)) { + console.log('RTP receiver not found.'); + return; + } + rtpReceivers.get(ssrc).onRtpPacket(buffer, packet.byteLength); +} + +function addNewSubscription(subscriptionId, subscribeOptions, rtpConfig) { + // TODO: Audio is not supported yet, ignore the audio part. + initVideoDecoder(subscriptionId); + const videoSsrc = rtpConfig.video.ssrc; + if (rtpReceivers.has(videoSsrc)) { + console.error(`RTP receiver for SSRC ${videoSsrc} exits.`); + } + if (!mediaSession) { + mediaSession = new wasmModule.MediaSession(); + mediaSession.setRtcpCallback((packet) => { + console.log('RTCP callback.'); + const buffer = new Uint8Array(packet); + postMessage(['rtcp-packet', [buffer.buffer]], [buffer.buffer]); + }); + } + const rtpReceiver = mediaSession.createRtpVideoReceiver(videoSsrc); + rtpReceivers.set(videoSsrc, rtpReceiver); + rtpReceiver.setCompleteFrameCallback((frame, isKeyFrame) => { + if (startTime === 0) { + startTime = Date.now()*1000; + } + videoDecoder.decode(new EncodedVideoChunk({ + timestamp: Date.now() * 1000 - startTime, + data: frame, + type: isKeyFrame ? 'key' : 'delta', + })); + }); +} diff --git a/src/sdk/conference/webtransport/receive-stream-worker.js b/src/sdk/conference/webtransport/receive-stream-worker.js new file mode 100644 index 00000000..89e60a45 --- /dev/null +++ b/src/sdk/conference/webtransport/receive-stream-worker.js @@ -0,0 +1,76 @@ +// Copyright (C) <2021> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +/* eslint-disable require-jsdoc */ +/* global AudioDecoder, postMessage */ + +// TODO: Enable ESLint for this file. +/* eslint-disable */ + +'use strict'; + +import Logger from '../../base/logger.js'; + +let audioDecoder; + +onmessage = (e) => { + // ['init', SubscribeOptions, WebTransportStream, TrackKind]. + if (e.data[0] === 'init') { + _initWebCodecs(e.data[1], e.data[3]); + _handleReceiveStream(e.data[2]); + } +}; + +const audioDecoderConfig = { + codec: 'opus', + sampleRate: 48000, + numberOfChannels: 2, +}; + +function audioDecoderOutput(audioFrame) { + const audioBuffer = { + numberOfChannels: audioFrame.buffer.numberOfChannels, + sampleRate: audioFrame.buffer.sampleRate, + length: audioFrame.buffer.length, + duration: audioFrame.buffer.duration, + channelData: [], + }; + for (let i = 0; i < audioFrame.buffer.numberOfChannels; i++) { + audioBuffer.channelData.push(audioFrame.buffer.getChannelData(i)); + } + postMessage(['audio-frame', audioBuffer]); +} + +function audioDecoderError(error) { + Logger.warn('Audio decoder failed to decode. Error: ' + error); +} + +async function _initWebCodecs(options, trackKind) { + if (trackKind !== 'audio') { + Logger.error( + 'Receiving ' + trackKind + ' over WebTransport is not supported.'); + return; + } + if (options.audio) { + Logger.error('No options for audio.'); + return; + } + audioDecoder = + new AudioDecoder({output: audioDecoderOutput, error: audioDecoderError}); + audioDecoder.configure(audioDecoderConfig); +} + +async function _handleReceiveStream(stream) { + const reader = stream.readable.getReader(); + let readingDone = false; + while (!readingDone) { + const {value, done: finished} = await reader.read(); + if (finished) { + readingDone = true; + } + // TODO: Read audio frame header. + // Implement it when BYOB reader is implemented in Chrome to reduce buffer + // copy. + } +} \ No newline at end of file diff --git a/test/unit/config/karma.config.js b/test/unit/config/karma.config.js index 70d79bf4..62fd871b 100644 --- a/test/unit/config/karma.config.js +++ b/test/unit/config/karma.config.js @@ -46,7 +46,7 @@ module.exports = function (config) { pattern: './test/unit/resources/scripts/gen/sinon-browserified.js' }, { - pattern: './src/sdk/!(rest)/*.js', + pattern: './src/sdk/!(rest)/**/*.js', type: 'module' }, {