From 371727343846839519014a25a28133ec057de234 Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Fri, 13 Aug 2021 12:51:51 +0800 Subject: [PATCH 01/11] Publish media data with WebTransport. --- .../conference/public/scripts/media-worker.js | 75 ++++++ src/samples/conference/public/scripts/quic.js | 77 ++++-- src/sdk/base/publication.js | 20 +- src/sdk/base/transport.js | 4 +- src/sdk/conference/client.js | 15 +- src/sdk/conference/subscription.js | 4 +- .../connection.js} | 219 ++++++++++++++++-- .../webtransport/receive-stream-worker.js | 73 ++++++ 8 files changed, 436 insertions(+), 51 deletions(-) create mode 100644 src/samples/conference/public/scripts/media-worker.js rename src/sdk/conference/{quicconnection.js => webtransport/connection.js} (50%) create mode 100644 src/sdk/conference/webtransport/receive-stream-worker.js diff --git a/src/samples/conference/public/scripts/media-worker.js b/src/samples/conference/public/scripts/media-worker.js new file mode 100644 index 00000000..9bcc1df4 --- /dev/null +++ b/src/samples/conference/public/scripts/media-worker.js @@ -0,0 +1,75 @@ +// Copyright (C) <2021> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +/* eslint-disable require-jsdoc */ +/* global VideoEncoder */ + +let bidirectionalStreamWritable, videoEncoder, frameBuffer, sendStreamWriter; +// 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. +const sizePrefix = 4; + +onmessage = (e) => { + if (e.data[0] === 'video-source') { + readVideoData(e.data[1]); + } else if (e.data[0] === 'send-stream') { + bidirectionalStreamWritable = e.data[1]; + sendStreamWriter = bidirectionalStreamWritable.getWriter(); + writeTrackId(); + initVideoEncoder(); + } +}; + +async function videoOutput(chunk, metadata) { + if (bidirectionalStreamWritable) { + if (!frameBuffer || + frameBuffer.byteLength < chunk.byteLength + sizePrefix) { + frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); + } + const bufferView = new Uint8Array(frameBuffer, sizePrefix); + chunk.copyTo(bufferView); + const dataView = + new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); + dataView.setUint32(0, chunk.byteLength); + await sendStreamWriter.ready; + await sendStreamWriter.write(dataView); + console.log('Write a frame.'); + } +} + +function videoError(error) { + console.log('Encode error, ' + error); +} + +async function writeTrackId() { + const id = new Uint8Array(16); + id[16] = 2; + await sendStreamWriter.ready; + sendStreamWriter.write(id); +} + +function initVideoEncoder() { + videoEncoder = new VideoEncoder({output: videoOutput, error: videoError}); + videoEncoder.configure({ + codec: 'avc1.4d002a', + width: 640, + height: 480, + framerate: 30, + latencyMode: 'realtime', + avc: {format: 'annexb'}, + }); +} + +// Read data from video track. +async function readVideoData(readable) { + const reader = readable.getReader(); + while (true) { + const {value, done} = await reader.read(); + if (done) { + console.log('MediaStream ends.'); + break; + } + videoEncoder.encode(value); + value.close(); + } +} \ No newline at end of file diff --git a/src/samples/conference/public/scripts/quic.js b/src/samples/conference/public/scripts/quic.js index 1bd437a0..3d0d45a4 100644 --- a/src/samples/conference/public/scripts/quic.js +++ b/src/samples/conference/public/scripts/quic.js @@ -2,16 +2,34 @@ // // SPDX-License-Identifier: Apache-2.0 +/* eslint-disable require-jsdoc */ + 'use strict'; let quicChannel = null; let bidirectionalStream = null; -let writeTask; -const conference=new Owt.Conference.ConferenceClient(); +let writeTask, mediaStream, mediaWorker, conferenceId, myId; + +const conference = new Owt.Conference.ConferenceClient({ + webTransportConfiguration: { + serverCertificateFingerprints: [{ + value: + 'DD:A8:11:FD:A1:08:17:41:36:CD:1A:33:1E:CF:AE:0D:46:3D:15:16:2C:67:C5:A2:06:35:C2:0E:88:A1:9E:C6', + algorithm: 'sha-256', + }] + } +}); conference.addEventListener('streamadded', async (event) => { console.log(event.stream); - if (event.stream.source.data) { - const subscription = await conference.subscribe(event.stream); + if (event.stream.origin == myId) { + mixStream( + conferenceId, event.stream.id, 'common', + 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001'); + } + if (event.stream.source.data || event.stream.source.video) { + const subscription = await conference.subscribe( + event.stream, + {audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}}); const reader = subscription.stream.readable.getReader(); while (true) { const {value, done} = await reader.read(); @@ -19,25 +37,27 @@ conference.addEventListener('streamadded', async (event) => { console.log('Subscription ends.'); break; } - console.log('Received data: '+value); + console.log('Received data: ' + value); } } }); function updateConferenceStatus(message) { document.getElementById('conference-status').innerHTML += - ('

' + message + '

'); + ('

' + message + '

'); } function joinConference() { return new Promise((resolve, reject) => { - createToken(undefined, 'user', 'presenter', resp => { - conference.join(resp).then(() => { + createToken(undefined, 'user', 'presenter', token => { + conference.join(token).then((info) => { + conferenceId = info.id; + myId = info.self.id; updateConferenceStatus('Connected to conference server.'); resolve(); }); - }); + }, 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001'); }); }; @@ -55,10 +75,31 @@ function createRandomContentSessionId() { return id; } +async function attachReader(stream) { + const reader = stream.readable.getReader(); + while (true) { + const {value, done} = await reader.read(); + if (done) { + console.log('Ends.'); + break; + } + console.log('Received data: ' + value); + } +} + async function createSendChannel() { bidirectionalStream = await conference.createSendStream(); - const localStream=new Owt.Base.LocalStream(bidirectionalStream, new Owt.Base.StreamSourceInfo(undefined, undefined,true)); - const publication = await conference.publish(localStream); + const localStream = new Owt.Base.LocalStream( + bidirectionalStream, + new Owt.Base.StreamSourceInfo(undefined, 'camera', undefined)); + attachReader(bidirectionalStream); + const publication = await conference.publish( + localStream, {video: {codec: 'h264'}, transport: {type: 'quic'}}); + // const localStream = new Owt.Base.LocalStream( + // bidirectionalStream, + // new Owt.Base.StreamSourceInfo(undefined, undefined, true)); + // const publication = await conference.publish( + // localStream, {transport: {type: 'quic'}}); console.log(publication); updateConferenceStatus('Created send channel.'); } @@ -79,6 +120,16 @@ async function writeUuid() { return; } +async function writeVideoData() { + mediaStream = await navigator.mediaDevices.getUserMedia({video: true}); + const track = new MediaStreamTrackProcessor(mediaStream.getVideoTracks()[0]); + mediaWorker = new Worker('./scripts/media-worker.js'); + mediaWorker.postMessage(['video-source', track.readable], [track.readable]); + mediaWorker.postMessage( + ['send-stream', bidirectionalStream.writable], + [bidirectionalStream.writable]); +} + async function writeData() { const encoder = new TextEncoder(); const encoded = encoder.encode('message', {stream: true}); @@ -98,8 +149,8 @@ document.getElementById('start-sending').addEventListener('click', async () => { updateConferenceStatus('Stream is not created.'); return; } - await writeUuid(); - writeTask = setInterval(writeData, 2000); + writeVideoData(); + // writeTask = setInterval(writeData, 2000); updateConferenceStatus('Started sending.'); }); diff --git a/src/sdk/base/publication.js b/src/sdk/base/publication.js index 7ead125e..a1b4e8bc 100644 --- a/src/sdk/base/publication.js +++ b/src/sdk/base/publication.js @@ -195,17 +195,29 @@ export class PublishOptions { // eslint-disable-next-line require-jsdoc constructor(audio, video, transport) { /** - * @member {?Array | ?Array} audio + * @member {?Array | + * ?Array | ?AudioEncoderConfig } audio * @instance * @memberof Owt.Base.PublishOptions - * @desc Parameters for audio RtpSender. Publishing with RTCRtpEncodingParameters is an experimental feature. It is subject to change. + * @desc Parameters for audio RtpSender when transport's type is 'webrtc' or + * configuration of audio encoder when transport's type is 'quic'. + * Publishing with RTCRtpEncodingParameters is an experimental feature. It + * is subject to change. + * @see {@link https://www.w3.org/TR/webrtc/#rtcrtpencodingparameters|RTCRtpEncodingParameters} + * @see {@link https://w3c.github.io/webcodecs/#dictdef-audioencoderconfig|AudioEncoderConfig} */ this.audio = audio; /** - * @member {?Array | ?Array} video + * @member {?Array | + * ?Array | ?VideoEncoderConfig } video * @instance * @memberof Owt.Base.PublishOptions - * @desc Parameters for video RtpSender. Publishing with RTCRtpEncodingParameters is an experimental feature. It is subject to change. + * @desc Parameters for video RtpSender when transport's type is 'webrtc' or + * configuration of video encoder when transport's type is 'quic'. + * Publishing with RTCRtpEncodingParameters is an experimental feature. It + * is subject to change. + * @see {@link https://www.w3.org/TR/webrtc/#rtcrtpencodingparameters|RTCRtpEncodingParameters} + * @see {@link https://w3c.github.io/webcodecs/#dictdef-videoencoderconfig|VideoEncoderConfig} */ this.video = video; /** diff --git a/src/sdk/base/transport.js b/src/sdk/base/transport.js index 3bb5a79a..863e2c39 100644 --- a/src/sdk/base/transport.js +++ b/src/sdk/base/transport.js @@ -29,7 +29,9 @@ export class TransportConstraints { * @member {Array.} type * @instance * @memberof Owt.Base.TransportConstraints - * @desc Transport type for publication and subscription. + * @desc Transport type for publication and subscription. 'quic' is only + * supported in conference mode when WebTransport is supported by client and + * enabled at server side. */ this.type = type; /** diff --git a/src/sdk/conference/client.js b/src/sdk/conference/client.js index 35ee3581..91dbd3a8 100644 --- a/src/sdk/conference/client.js +++ b/src/sdk/conference/client.js @@ -16,7 +16,7 @@ import * as StreamModule from '../base/stream.js'; import {Participant} from './participant.js'; import {ConferenceInfo} from './info.js'; import {ConferencePeerConnectionChannel} from './channel.js'; -import {QuicConnection} from './quicconnection.js'; +import {QuicConnection} from './webtransport/connection.js'; import {RemoteMixedStream, ActiveAudioInputChangeEvent, LayoutChangeEvent} from './mixedstream.js'; import * as StreamUtilsModule from './streamutils.js'; @@ -464,7 +464,7 @@ export const ConferenceClient = function(config, signalingImpl) { * @instance * @desc Publish a LocalStream to conference server. Other participants will be able to subscribe this stream when it is successfully published. * @param {Owt.Base.LocalStream} stream The stream to be published. - * @param {(Owt.Base.PublishOptions|RTCRtpTransceiver[])} options If options is a PublishOptions, the stream will be published as options specified. If options is a list of RTCRtpTransceivers, each track in the first argument must have a corresponding RTCRtpTransceiver here, and the track will be published with the RTCRtpTransceiver associated with it. + * @param {(Owt.Base.PublishOptions|RTCRtpTransceiver[])} options If options is a PublishOptions, the stream will be published as options specified. If options is a list of RTCRtpTransceivers, each track in the first argument must have a corresponding RTCRtpTransceiver here, and the track will be published with the RTCRtpTransceiver associated with it. If the type of transport is quic, PublishOptions.audio should be AudioEncoderConfig, and PublishOptions.video should be VideoEncoderConfig. * @param {string[]} videoCodecs Video codec names for publishing. Valid values are 'VP8', 'VP9' and 'H264'. This parameter only valid when the second argument is PublishOptions and options.video is RTCRtpEncodingParameters. Publishing with RTCRtpEncodingParameters is an experimental feature. This parameter is subject to change. * @return {Promise} Returned promise will be resolved with a newly created Publication once specific stream is successfully published, or rejected with a newly created Error if stream is invalid or options cannot be satisfied. Successfully published means PeerConnection is established and server is able to process media data. */ @@ -472,8 +472,8 @@ export const ConferenceClient = function(config, signalingImpl) { if (!(stream instanceof StreamModule.LocalStream)) { return Promise.reject(new ConferenceError('Invalid stream.')); } - if (stream.source.data) { - return quicTransportChannel.publish(stream); + if (options?.transport?.type === 'quic') { + return quicTransportChannel.publish(stream, options, videoCodecs); } if (publishChannels.has(stream.mediaStream.id)) { return Promise.reject(new ConferenceError( @@ -501,13 +501,16 @@ export const ConferenceClient = function(config, signalingImpl) { 'Invalid source info. A remote stream is either a data stream or ' + 'a media stream.')); } + } + if (options?.transport?.type === 'quic') { if (quicTransportChannel) { - return quicTransportChannel.subscribe(stream); + return quicTransportChannel.subscribe(stream, options); } else { return Promise.reject(new TypeError('WebTransport is not supported.')); } + } else { + return peerConnectionChannel.subscribe(stream, options); } - return peerConnectionChannel.subscribe(stream, options); }; /** diff --git a/src/sdk/conference/subscription.js b/src/sdk/conference/subscription.js index 16676f24..84ff99f7 100644 --- a/src/sdk/conference/subscription.js +++ b/src/sdk/conference/subscription.js @@ -104,7 +104,7 @@ export class AudioSubscriptionConstraints { * @member {?Array.} codecs * @instance * @memberof Owt.Conference.AudioSubscriptionConstraints - * @desc Codecs accepted. If none of `codecs` supported by both sides, connection fails. Leave it undefined will use all possible codecs. + * @desc Codecs accepted. Please only include 1 item if transport is "quic". For "webrtc" transport, if none of `codecs` supported by both sides, connection fails. Leave it undefined will use all possible codecs. */ this.codecs = codecs; } @@ -124,7 +124,7 @@ export class VideoSubscriptionConstraints { * @member {?Array.} codecs * @instance * @memberof Owt.Conference.VideoSubscriptionConstraints - * @desc Codecs accepted. If none of `codecs` supported by both sides, connection fails. Leave it undefined will use all possible codecs. + * @desc Codecs accepted. Please only include 1 item if transport is "quic". For "webrtc" transport, if none of `codecs` supported by both sides, connection fails. Leave it undefined will use all possible codecs. */ this.codecs = codecs; /** diff --git a/src/sdk/conference/quicconnection.js b/src/sdk/conference/webtransport/connection.js similarity index 50% rename from src/sdk/conference/quicconnection.js rename to src/sdk/conference/webtransport/connection.js index 638c4963..61e2b5d2 100644 --- a/src/sdk/conference/quicconnection.js +++ b/src/sdk/conference/webtransport/connection.js @@ -3,15 +3,18 @@ // SPDX-License-Identifier: Apache-2.0 /* eslint-disable require-jsdoc */ -/* global Promise, Map, WebTransport, Uint8Array, Uint32Array, TextEncoder */ +/* global Promise, Map, WebTransport, Uint8Array, Uint32Array, TextEncoder, + * ArrayBuffer */ 'use strict'; -import Logger from '../base/logger.js'; -import {EventDispatcher} from '../base/event.js'; -import {Publication} from '../base/publication.js'; -import {Subscription} from './subscription.js'; -import {Base64} from '../base/base64.js'; +import Logger from '../../base/logger.js'; +import {EventDispatcher} from '../../base/event.js'; +import {Publication} from '../../base/publication.js'; +import {SubscribeOptions, Subscription} from '../subscription.js'; +import {Base64} from '../../base/base64.js'; + +const uuidByteLength = 16; /** * @class QuicConnection @@ -32,6 +35,9 @@ export class QuicConnection extends EventDispatcher { this._quicStreams = new Map(); // Key is publication or subscription ID. this._quicTransport = new WebTransport(url, webTransportOptions); this._subscribePromises = new Map(); // Key is subscription ID. + this._subscribeOptions = new Map(); // Key is subscription ID. + this._subscriptionInfoReady = + new Map(); // Key is subscription ID, value is a promise. this._transportId = this._token.transportId; this._initReceiveStreamReader(); } @@ -76,22 +82,84 @@ export class QuicConnection extends EventDispatcher { const {value: receiveStream, done: readingReceiveStreamsDone} = await receiveStreamReader.read(); Logger.info('New stream received'); + const subscriptionIdBytes = new Uint8Array(uuidByteLength); + let subscriptionIdBytesOffset = 0; + const trackIdBytes = new Uint8Array(uuidByteLength); + let trackIdBytesOffset = 0; if (readingReceiveStreamsDone) { receivingDone = true; break; } + // Use BYOB reader when it's supported to avoid copy. See + // https://github.com/w3c/webtransport/issues/131. Issue tracker: + // https://crbug.com/1182905. const chunkReader = receiveStream.readable.getReader(); - const {value: uuid, done: readingChunksDone} = await chunkReader.read(); - if (readingChunksDone) { - Logger.error('Stream closed unexpectedly.'); - return; - } - if (uuid.length != 16) { - Logger.error('Unexpected length for UUID.'); - return; + let readingChunksDone = false; + let readingHeaderDone = false; + let mediaStream = false; + let subscriptionId; + while (!readingChunksDone && !readingHeaderDone) { + const {value, done: readingChunksDone} = await chunkReader.read(); + let valueOffset = 0; + if (subscriptionIdBytesOffset < uuidByteLength) { + const copyLength = Math.min( + uuidByteLength - subscriptionIdBytesOffset, + value.byteLength - valueOffset); + subscriptionIdBytes.set( + value.subarray(valueOffset, valueOffset + copyLength), + subscriptionIdBytesOffset); + subscriptionIdBytesOffset += copyLength; + valueOffset += copyLength; + if (subscriptionIdBytesOffset < uuidByteLength) { + continue; + } + subscriptionId = + this._uint8ArrayToUuid(new Uint8Array(subscriptionIdBytes)); + if (!this._subscribeOptions.has(subscriptionId)) { + Logger.debug('Subscribe options is not ready.'); + const p = new Promise((resolve) => { + this._subscriptionInfoReady.set(subscriptionId, resolve); + }); + await p; + this._subscriptionInfoReady.delete(subscriptionId); + } + const subscribeOptions = this._subscribeOptions.get(subscriptionId); + if (subscribeOptions.audio || subscribeOptions.video) { + mediaStream = true; + } + if (!mediaStream) { + readingHeaderDone = true; + if (copyLength < value.byteLength) { + Logger.warning( + 'Potential data lose. Expect to be fixed when BYOB reader ' + + 'is supported.'); + } + continue; + } + } + if (valueOffset < value.byteLength) { + const copyLength = Math.min( + uuidByteLength - trackIdBytesOffset, + value.byteLength - valueOffset); + trackIdBytes.set( + value.subarray(valueOffset, valueOffset + copyLength), + trackIdBytesOffset); + trackIdBytesOffset += copyLength; + valueOffset += copyLength; + if (trackIdBytesOffset < uuidByteLength) { + continue; + } + const trackId = this._uint8ArrayToUuid(trackIdBytes); + Logger.debug(`WebTransport stream for subscription ID ${ + subscriptionId} and track ID ${ + trackId} is ready to receive data.`); + } + if (readingChunksDone) { + Logger.error('Stream closed unexpectedly.'); + return; + } } chunkReader.releaseLock(); - const subscriptionId = this._uint8ArrayToUuid(uuid); this._quicStreams.set(subscriptionId, receiveStream); if (this._subscribePromises.has(subscriptionId)) { const subscription = @@ -150,23 +218,23 @@ export class QuicConnection extends EventDispatcher { return quicStream; } - async publish(stream) { + async publish(stream, options) { // TODO: Avoid a stream to be published twice. The first 16 bit data send to // server must be it's publication ID. // TODO: Potential failure because of publication stream is created faster // than signaling stream(created by the 1st call to initiatePublication). - const publicationId = await this._initiatePublication(); + const publicationId = await this._initiatePublication(stream, options); const quicStream = stream.stream; const writer = quicStream.writable.getWriter(); await writer.ready; writer.write(this._uuidToUint8Array(publicationId)); writer.releaseLock(); - Logger.info('publish id'); this._quicStreams.set(publicationId, quicStream); const publication = new Publication(publicationId, () => { this._signaling.sendSignalingMessage('unpublish', {id: publication}) .catch((e) => { - Logger.warning('MCU returns negative ack for unpublishing, ' + e); + Logger.warning( + 'Server returns negative ack for unpublishing, ' + e); }); } /* TODO: getStats, mute, unmute is not implemented */); return publication; @@ -196,15 +264,76 @@ export class QuicConnection extends EventDispatcher { return s; } - subscribe(stream) { + async subscribe(stream, options) { + // TODO: Combine this with channel.js. + if (options === undefined) { + options = { + audio: !!stream.settings.audio, + video: !!stream.settings.video, + }; + } + if (typeof options !== 'object') { + return Promise.reject(new TypeError('Options should be an object.')); + } + if (options.audio === undefined) { + options.audio = !!stream.settings.audio; + } + if (options.video === undefined) { + options.video = !!stream.settings.video; + } + let mediaOptions; + let dataOptions; + if (options.audio || options.video) { + mediaOptions = {tracks: []}; + dataOptions = undefined; + if (options.audio) { + const trackOptions = {type: 'audio', from: stream.id}; + if (typeof options.audio !== 'object' || + !Array.isArray(options.audio.codecs) || + options.audio.codecs.length !== 1) { + return Promise.reject(new TypeError( + 'Audio codec is expect to be a list with one item.')); + } + mediaOptions.tracks.push(trackOptions); + } + if (options.video) { + const trackOptions = {type: 'video', from: stream.id}; + if (typeof options.video !== 'object' || + !Array.isArray(options.video.codecs) || + options.video.codecs.length !== 1) { + return Promise.reject(new TypeError( + 'Video codec is expect to be a list with one item.')); + } + if (options.video.resolution || options.video.frameRate || + (options.video.bitrateMultiplier && + options.video.bitrateMultiplier !== 1) || + options.video.keyFrameInterval) { + trackOptions.parameters = { + resolution: options.video.resolution, + framerate: options.video.frameRate, + bitrate: options.video.bitrateMultiplier ? + 'x' + options.video.bitrateMultiplier.toString() : + undefined, + keyFrameInterval: options.video.keyFrameInterval, + }; + } + mediaOptions.tracks.push(trackOptions); + } + } else { + // Data stream. + mediaOptions = null; + dataOptions = {from: stream.id}; + } const p = new Promise((resolve, reject) => { this._signaling .sendSignalingMessage('subscribe', { - media: null, - data: {from: stream.id}, + media: mediaOptions, + data: dataOptions, transport: {type: 'quic', id: this._transportId}, }) .then((data) => { + this._subscribeOptions.set(data.id, options); + Logger.debug('Subscribe info is set.'); if (this._quicStreams.has(data.id)) { // QUIC stream created before signaling returns. const subscription = this._createSubscription( @@ -217,6 +346,9 @@ export class QuicConnection extends EventDispatcher { this._subscribePromises.set( data.id, {resolve: resolve, reject: reject}); } + if (this._subscriptionInfoReady.has(data.id)) { + this._subscriptionInfoReady.get(data.id)(); + } }); }); return p; @@ -231,10 +363,47 @@ export class QuicConnection extends EventDispatcher { }); } - async _initiatePublication() { + async _initiatePublication(stream, options) { + const media = {tracks: []}; + if (stream.source.audio) { + if (!options.audio) { + throw new TypeError( + 'Options for audio is missing. Publish audio track with ' + + 'WebTransport must have AudioEncoderConfig specified.'); + } + const track = { + from: stream.id, + source: stream.source.audio, + type: 'audio', + format: { + codec: options.audio.codec, + sampleRate: options.audio.sampleRate, + channelNum: options.audio.numberOfChannels, + }, + }; + media.tracks.push(track); + } + if (stream.source.video) { + if (!options.video) { + throw new TypeError( + 'Options for audio is missing. Publish video track with ' + + 'WebTransport must have VideoEncoderConfig specified.'); + } + const track = { + from: stream.id, + source: stream.source.video, + type: 'video', + // TODO: convert from MIME type to the format required by server. + format: { + codec: 'h264', + profile: 'B', + }, + }; + media.tracks.push(track); + } const data = await this._signaling.sendSignalingMessage('publish', { - media: null, - data: true, + media: stream.source.data ? null : media, + data: stream.source.data, transport: {type: 'quic', id: this._transportId}, }); if (this._transportId !== data.transportId) { diff --git a/src/sdk/conference/webtransport/receive-stream-worker.js b/src/sdk/conference/webtransport/receive-stream-worker.js new file mode 100644 index 00000000..256f1244 --- /dev/null +++ b/src/sdk/conference/webtransport/receive-stream-worker.js @@ -0,0 +1,73 @@ +// Copyright (C) <2021> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +/* eslint-disable require-jsdoc */ +/* global AudioDecoder, postMessage */ + +'use strict'; + +import Logger from '../../base/logger.js'; + +let audioDecoder; + +onmessage = (e) => { + // ['init', SubscribeOptions, WebTransportStream, TrackKind]. + if (e.data[0] === 'init') { + _initWebCodecs(e.data[1], e.data[3]); + _handleReceiveStream(e.data[2]); + } +}; + +const audioDecoderConfig = { + codec: 'opus', + sampleRate: 48000, + numberOfChannels: 2, +}; + +function audioDecoderOutput(audioFrame) { + const audioBuffer = { + numberOfChannels: audioFrame.buffer.numberOfChannels, + sampleRate: audioFrame.buffer.sampleRate, + length: audioFrame.buffer.length, + duration: audioFrame.buffer.duration, + channelData: [], + }; + for (let i = 0; i < audioFrame.buffer.numberOfChannels; i++) { + audioBuffer.channelData.push(audioFrame.buffer.getChannelData(i)); + } + postMessage(['audio-frame', audioBuffer]); +} + +function audioDecoderError(error) { + Logger.warn('Audio decoder failed to decode. Error: ' + error); +} + +async function _initWebCodecs(options, trackKind) { + if (trackKind !== 'audio') { + Logger.error( + 'Receiving ' + trackKind + ' over WebTransport is not supported.'); + return; + } + if (options.audio) { + Logger.error('No options for audio.'); + return; + } + audioDecoder = + new AudioDecoder({output: audioDecoderOutput, error: audioDecoderError}); + audioDecoder.configure(audioDecoderConfig); +} + +async function _handleReceiveStream(stream) { + const reader = stream.readable.getReader(); + let readingDone = false; + while (!readingDone) { + const {value, done: finished} = await reader.read(); + if (finished) { + readingDone = true; + } + // TODO: Read audio frame header. + // Implement it when BYOB reader is implemented in Chrome to reduce buffer + // copy. + } +} \ No newline at end of file From e7ab0c612f310ca16590f64767b298d696751e16 Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Wed, 13 Oct 2021 14:28:31 +0800 Subject: [PATCH 02/11] Receive mixed stream with WebTransport datagram. After RTP packets are received, it uses WebRTC RTP depacketizer for depacketizing, WebCodecs for decoding and MediaStreamGenerator for rendering. --- src/samples/conference/public/quic.html | 10 +- .../conference/public/scripts/media-worker.js | 50 +++++- src/samples/conference/public/scripts/quic.js | 144 ++++++++++++++---- src/sdk/conference/client.js | 4 + src/sdk/conference/webtransport/connection.js | 25 ++- 5 files changed, 192 insertions(+), 41 deletions(-) diff --git a/src/samples/conference/public/quic.html b/src/samples/conference/public/quic.html index 077f6637..300ce4d1 100644 --- a/src/samples/conference/public/quic.html +++ b/src/samples/conference/public/quic.html @@ -30,13 +30,19 @@

Sample of QuicTransport

+ + +
+
+
- + + - + diff --git a/src/samples/conference/public/scripts/media-worker.js b/src/samples/conference/public/scripts/media-worker.js index 9bcc1df4..3b8c97ff 100644 --- a/src/samples/conference/public/scripts/media-worker.js +++ b/src/samples/conference/public/scripts/media-worker.js @@ -3,9 +3,10 @@ // SPDX-License-Identifier: Apache-2.0 /* eslint-disable require-jsdoc */ -/* global VideoEncoder */ +/* global VideoEncoder, VideoDecoder, EncodedVideoChunk */ -let bidirectionalStreamWritable, videoEncoder, frameBuffer, sendStreamWriter; +let bidirectionalStreamWritable, videoEncoder, frameBuffer, sendStreamWriter, + mediaSession, datagramReceiver, videoDecoder; // 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. const sizePrefix = 4; @@ -16,7 +17,15 @@ onmessage = (e) => { bidirectionalStreamWritable = e.data[1]; sendStreamWriter = bidirectionalStreamWritable.getWriter(); writeTrackId(); - initVideoEncoder(); + // initVideoEncoder(); + } else if (e.data[0] === 'datagram-receiver') { + datagramReceiver = e.data[1]; + } else if (e.data[0] === 'encoded-video-frame') { + if (videoDecoder.state === 'closed') { + return; + } + videoDecoder.decode(new EncodedVideoChunk( + {timestamp: Date.now(), data: e.data[1], type: 'key'})); } }; @@ -60,6 +69,22 @@ function initVideoEncoder() { }); } +function initVideoDecoder() { + videoDecoder = new VideoDecoder({ + output: videoFrameOutputCallback, + error: webCodecsErrorCallback, + }); + videoDecoder.configure({codec: 'avc1.42400a', optimizeForLatency: true}); +} + +function videoFrameOutputCallback(frame) { + postMessage(['video-frame', frame], [frame]); +} + +function webCodecsErrorCallback(error) { + console.log('error: ' + error.message); +} + // Read data from video track. async function readVideoData(readable) { const reader = readable.getReader(); @@ -72,4 +97,21 @@ async function readVideoData(readable) { videoEncoder.encode(value); value.close(); } -} \ No newline at end of file +} + +async function fetchWasm() { + const Module={}; + Module['instantiateWasm'] = async (imports, successCallback) => { + const response = await fetch('./owt.wasm'); + const buffer = await response.arrayBuffer(); + const module=new WebAssembly.Module(buffer); + const instance = await WebAssembly.instantiate(module, imports); + successCallback(instance, module); + return {}; + }; + // Module['wasmModule']=new WebAssembly.Module(buffer); + importScripts('./owt.js'); + console.log('Got wasm binary.'); +} + +initVideoDecoder(); diff --git a/src/samples/conference/public/scripts/quic.js b/src/samples/conference/public/scripts/quic.js index 3d0d45a4..ff8de67e 100644 --- a/src/samples/conference/public/scripts/quic.js +++ b/src/samples/conference/public/scripts/quic.js @@ -8,15 +8,25 @@ let quicChannel = null; let bidirectionalStream = null; -let writeTask, mediaStream, mediaWorker, conferenceId, myId; +let writeTask, mediaStream, mediaWorker, conferenceId, myId, mixedStream, generatorWriter; + +window.Module={}; const conference = new Owt.Conference.ConferenceClient({ webTransportConfiguration: { serverCertificateFingerprints: [{ value: - 'DD:A8:11:FD:A1:08:17:41:36:CD:1A:33:1E:CF:AE:0D:46:3D:15:16:2C:67:C5:A2:06:35:C2:0E:88:A1:9E:C6', + '59:74:C6:C5:2C:D8:E8:18:A9:D2:14:77:ED:94:89:87:DF:83:BA:B3:96:4C:4C:0B:B8:D3:22:58:11:55:67:1A', + algorithm: 'sha-256', + }], + serverCertificateHashes: [{ + value: new Uint8Array([ + 0x59, 0x74, 0xC6, 0xC5, 0x2C, 0xD8, 0xE8, 0x18, 0xA9, 0xD2, 0x14, + 0x77, 0xED, 0x94, 0x89, 0x87, 0xDF, 0x83, 0xBA, 0xB3, 0x96, 0x4C, + 0x4C, 0x0B, 0xB8, 0xD3, 0x22, 0x58, 0x11, 0x55, 0x67, 0x1A + ]), algorithm: 'sha-256', - }] + }], } }); conference.addEventListener('streamadded', async (event) => { @@ -26,20 +36,21 @@ conference.addEventListener('streamadded', async (event) => { conferenceId, event.stream.id, 'common', 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001'); } - if (event.stream.source.data || event.stream.source.video) { - const subscription = await conference.subscribe( - event.stream, - {audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}}); - const reader = subscription.stream.readable.getReader(); - while (true) { - const {value, done} = await reader.read(); - if (done) { - console.log('Subscription ends.'); - break; - } - console.log('Received data: ' + value); - } - } + // if (event.stream.source.data) { + // const subscription = await conference.subscribe( + // event.stream, + // // {transport:{type: 'quic'}}); + // {audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}}); + // const reader = subscription.stream.readable.getReader(); + // while (true) { + // const {value, done} = await reader.read(); + // if (done) { + // console.log('Subscription ends.'); + // break; + // } + // //console.log('Received data: ' + value); + // } + // } }); function updateConferenceStatus(message) { @@ -47,6 +58,15 @@ function updateConferenceStatus(message) { ('

' + message + '

'); } +function initWorker() { + mediaWorker = new Worker('./scripts/media-worker.js'); + mediaWorker.onmessage=((e) => { + if (e.data[0] === 'video-frame') { + generatorWriter.write(e.data[1]); + //console.log(e.data[1]); + } + }); +} function joinConference() { return new Promise((resolve, reject) => { @@ -54,7 +74,13 @@ function joinConference() { conference.join(token).then((info) => { conferenceId = info.id; myId = info.self.id; + for (const stream of info.remoteStreams) { + if (stream.source.video === 'mixed') { + mixedStream = stream; + } + } updateConferenceStatus('Connected to conference server.'); + initWorker(); resolve(); }); }, 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001'); @@ -89,17 +115,17 @@ async function attachReader(stream) { async function createSendChannel() { bidirectionalStream = await conference.createSendStream(); - const localStream = new Owt.Base.LocalStream( - bidirectionalStream, - new Owt.Base.StreamSourceInfo(undefined, 'camera', undefined)); - attachReader(bidirectionalStream); - const publication = await conference.publish( - localStream, {video: {codec: 'h264'}, transport: {type: 'quic'}}); // const localStream = new Owt.Base.LocalStream( // bidirectionalStream, - // new Owt.Base.StreamSourceInfo(undefined, undefined, true)); + // new Owt.Base.StreamSourceInfo(undefined, 'camera', undefined)); + // attachReader(bidirectionalStream); // const publication = await conference.publish( - // localStream, {transport: {type: 'quic'}}); + // localStream, {video: {codec: 'h264'}, transport: {type: 'quic'}}); + const localStream = new Owt.Base.LocalStream( + bidirectionalStream, + new Owt.Base.StreamSourceInfo(undefined, undefined, true)); + const publication = await conference.publish( + localStream, {transport: {type: 'quic'}}); console.log(publication); updateConferenceStatus('Created send channel.'); } @@ -123,7 +149,6 @@ async function writeUuid() { async function writeVideoData() { mediaStream = await navigator.mediaDevices.getUserMedia({video: true}); const track = new MediaStreamTrackProcessor(mediaStream.getVideoTracks()[0]); - mediaWorker = new Worker('./scripts/media-worker.js'); mediaWorker.postMessage(['video-source', track.readable], [track.readable]); mediaWorker.postMessage( ['send-stream', bidirectionalStream.writable], @@ -135,13 +160,16 @@ async function writeData() { const encoded = encoder.encode('message', {stream: true}); const writer = bidirectionalStream.writable.getWriter(); await writer.ready; - await writer.write(new ArrayBuffer(2)); + const ab=new Uint8Array(10000); + ab.fill(1, 0); + await writer.write(ab); writer.releaseLock(); return; } window.addEventListener('load', () => { windowOnLoad(); + fetchWasm(); }); document.getElementById('start-sending').addEventListener('click', async () => { @@ -149,8 +177,8 @@ document.getElementById('start-sending').addEventListener('click', async () => { updateConferenceStatus('Stream is not created.'); return; } - writeVideoData(); - // writeTask = setInterval(writeData, 2000); + //writeVideoData(); + writeTask = setInterval(writeData, 200); updateConferenceStatus('Started sending.'); }); @@ -158,3 +186,61 @@ document.getElementById('stop-sending').addEventListener('click', () => { clearInterval(writeTask); updateConferenceStatus('Stopped sending.'); }); + +document.getElementById('start-receiving') + .addEventListener('click', async () => { + const video=document.getElementById('remote-video'); + const generator = new MediaStreamTrackGenerator({kind: 'video'}); + generatorWriter=generator.writable.getWriter(); + video.srcObject = new MediaStream([generator]); + const reader = conference.datagramReader(); + const ms = new Module.MediaSession(); + const receiver = ms.createRtpVideoReceiver(); + receiver.setCompleteFrameCallback((frame) => { + const copiedFrame = frame.slice(0); + mediaWorker.postMessage( + ['encoded-video-frame', copiedFrame], [copiedFrame.buffer]); + }); + subscribeMixedStream(); + while (true) { + const received = await reader.read(); + const buffer = Module._malloc(received.value.byteLength); + Module.writeArrayToMemory(received.value, buffer); + receiver.onRtpPacket(buffer, received.value.byteLength); + } + }); + +async function fetchWasm() { + Module['instantiateWasm'] = async (imports, successCallback) => { + const response = await fetch('scripts/owt.wasm'); + const buffer = await response.arrayBuffer(); + const module=await WebAssembly.compile(buffer); + const instance = await WebAssembly.instantiate(module, imports); + successCallback(instance, module); + return {}; + }; + const scriptPromise = new Promise((resolve, reject) => { + const script = document.createElement('script'); + document.body.appendChild(script); + script.onload = resolve; + script.onerror = reject; + script.async = true; + script.src = 'scripts/owt.js'; + }); + await scriptPromise; +} + +async function subscribeMixedStream() { + const subscription = await conference.subscribe( + mixedStream, + {audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}}); + const reader = subscription.stream.readable.getReader(); + while (true) { + const {value, done} = await reader.read(); + if (done) { + console.log('Subscription ends.'); + break; + } + // console.log('Received data: ' + value); + } +} diff --git a/src/sdk/conference/client.js b/src/sdk/conference/client.js index 91dbd3a8..a5f1729f 100644 --- a/src/sdk/conference/client.js +++ b/src/sdk/conference/client.js @@ -558,5 +558,9 @@ export const ConferenceClient = function(config, signalingImpl) { } return quicTransportChannel.createSendStream(); }; + + this.datagramReader = function() { + return quicTransportChannel.datagramReader(); + }; } }; diff --git a/src/sdk/conference/webtransport/connection.js b/src/sdk/conference/webtransport/connection.js index 61e2b5d2..51520f07 100644 --- a/src/sdk/conference/webtransport/connection.js +++ b/src/sdk/conference/webtransport/connection.js @@ -40,6 +40,7 @@ export class QuicConnection extends EventDispatcher { new Map(); // Key is subscription ID, value is a promise. this._transportId = this._token.transportId; this._initReceiveStreamReader(); + //this._initDatagrams(); } /** @@ -73,6 +74,14 @@ export class QuicConnection extends EventDispatcher { await this._authenticate(this._tokenString); } + async _initDatagrams() { + const datagramReader = this._quicTransport.datagrams.readable.getReader(); + while (true) { + const value = await datagramReader.read(); + console.log(value); + } + } + async _initReceiveStreamReader() { const receiveStreamReader = this._quicTransport.incomingBidirectionalStreams.getReader(); @@ -275,12 +284,12 @@ export class QuicConnection extends EventDispatcher { if (typeof options !== 'object') { return Promise.reject(new TypeError('Options should be an object.')); } - if (options.audio === undefined) { - options.audio = !!stream.settings.audio; - } - if (options.video === undefined) { - options.video = !!stream.settings.video; - } + // if (options.audio === undefined) { + // options.audio = !!stream.settings.audio; + // } + // if (options.video === undefined) { + // options.video = !!stream.settings.video; + // } let mediaOptions; let dataOptions; if (options.audio || options.video) { @@ -416,4 +425,8 @@ export class QuicConnection extends EventDispatcher { // Ready message from server is useless for QuicStream since QuicStream has // its own status. Do nothing here. } + + datagramReader() { + return this._quicTransport.datagrams.readable.getReader(); + } } From c531a9732bb14268ef38d0e4faf251a4fd52518a Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Tue, 19 Oct 2021 16:40:40 +0800 Subject: [PATCH 03/11] Send audio over WebTransport streams. --- .../conference/public/scripts/media-worker.js | 85 ++++++++++++++----- src/samples/conference/public/scripts/quic.js | 51 +++++++---- 2 files changed, 96 insertions(+), 40 deletions(-) diff --git a/src/samples/conference/public/scripts/media-worker.js b/src/samples/conference/public/scripts/media-worker.js index 3b8c97ff..ab0c48b4 100644 --- a/src/samples/conference/public/scripts/media-worker.js +++ b/src/samples/conference/public/scripts/media-worker.js @@ -3,21 +3,30 @@ // SPDX-License-Identifier: Apache-2.0 /* eslint-disable require-jsdoc */ -/* global VideoEncoder, VideoDecoder, EncodedVideoChunk */ +/* global AudioEncoder, EncodedAudioChunk, VideoEncoder, VideoDecoder, + * EncodedVideoChunk */ -let bidirectionalStreamWritable, videoEncoder, frameBuffer, sendStreamWriter, - mediaSession, datagramReceiver, videoDecoder; +let videoBidiStreamWritable, audioEncoder, videoEncoder, frameBuffer, + audioSendStreamWriter, videoSendStreamWriter, mediaSession, + datagramReceiver, videoDecoder; // 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. const sizePrefix = 4; onmessage = (e) => { - if (e.data[0] === 'video-source') { - readVideoData(e.data[1]); - } else if (e.data[0] === 'send-stream') { - bidirectionalStreamWritable = e.data[1]; - sendStreamWriter = bidirectionalStreamWritable.getWriter(); - writeTrackId(); - // initVideoEncoder(); + if (e.data[0] === 'audio-source') { + readMediaData(e.data[1], 'audio'); + } else if (e.data[0] === 'video-source') { + readMediaData(e.data[1], 'video'); + } else if (e.data[0] === 'send-stream-audio') { + const audioBidiStreamWritable = e.data[1]; + audioSendStreamWriter = audioBidiStreamWritable.getWriter(); + writeTrackId('audio', audioSendStreamWriter); + initAudioEncoder(); + } else if (e.data[0] === 'send-stream-video') { + videoBidiStreamWritable = e.data[1]; + videoSendStreamWriter = videoBidiStreamWritable.getWriter(); + writeTrackId('video', videoSendStreamWriter); + initVideoEncoder(); } else if (e.data[0] === 'datagram-receiver') { datagramReceiver = e.data[1]; } else if (e.data[0] === 'encoded-video-frame') { @@ -30,7 +39,8 @@ onmessage = (e) => { }; async function videoOutput(chunk, metadata) { - if (bidirectionalStreamWritable) { + return; + if (videoBidiStreamWritable) { if (!frameBuffer || frameBuffer.byteLength < chunk.byteLength + sizePrefix) { frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); @@ -40,21 +50,48 @@ async function videoOutput(chunk, metadata) { const dataView = new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); dataView.setUint32(0, chunk.byteLength); - await sendStreamWriter.ready; - await sendStreamWriter.write(dataView); - console.log('Write a frame.'); + await videoSendStreamWriter.ready; + await videoSendStreamWriter.write(dataView); } } function videoError(error) { - console.log('Encode error, ' + error); + console.log('Video encode error, ' + error.message); } -async function writeTrackId() { +async function audioOutput(chunk, metadata) { + if (audioSendStreamWriter) { + if (!frameBuffer || + frameBuffer.byteLength < chunk.byteLength + sizePrefix) { + frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); + } + const bufferView = new Uint8Array(frameBuffer, sizePrefix); + chunk.copyTo(bufferView); + const dataView = + new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); + dataView.setUint32(0, chunk.byteLength); + await audioSendStreamWriter.ready; + await audioSendStreamWriter.write(dataView); + console.log('Wrote an audio frame. '+chunk.byteLength); + } +} + +function audioError(error) { + console.log(`Audio encode error: ${error.message}`); +} + +async function writeTrackId(kind, writer) { const id = new Uint8Array(16); - id[16] = 2; - await sendStreamWriter.ready; - sendStreamWriter.write(id); + id[15] = (kind === 'audio' ? 1 : 2); + await writer.ready; + writer.write(id); + console.log('Wrote track ID for '+kind); +} + +function initAudioEncoder() { + audioEncoder = new AudioEncoder({output: audioOutput, error: audioError}); + audioEncoder.configure( + {codec: 'opus', numberOfChannels: 1, sampleRate: 48000}); } function initVideoEncoder() { @@ -85,8 +122,8 @@ function webCodecsErrorCallback(error) { console.log('error: ' + error.message); } -// Read data from video track. -async function readVideoData(readable) { +// Read data from media track. +async function readMediaData(readable, kind) { const reader = readable.getReader(); while (true) { const {value, done} = await reader.read(); @@ -94,7 +131,11 @@ async function readVideoData(readable) { console.log('MediaStream ends.'); break; } - videoEncoder.encode(value); + if (kind === 'audio') { + audioEncoder.encode(value); + } else if (kind === 'video') { + videoEncoder.encode(value); + } value.close(); } } diff --git a/src/samples/conference/public/scripts/quic.js b/src/samples/conference/public/scripts/quic.js index ff8de67e..cb4cf8ee 100644 --- a/src/samples/conference/public/scripts/quic.js +++ b/src/samples/conference/public/scripts/quic.js @@ -8,6 +8,7 @@ let quicChannel = null; let bidirectionalStream = null; +let bidiAudioStream = null; let writeTask, mediaStream, mediaWorker, conferenceId, myId, mixedStream, generatorWriter; window.Module={}; @@ -114,18 +115,22 @@ async function attachReader(stream) { } async function createSendChannel() { - bidirectionalStream = await conference.createSendStream(); + //bidirectionalStream = await conference.createSendStream(); + bidiAudioStream = await conference.createSendStream(); + const localStream = new Owt.Base.LocalStream( + bidiAudioStream, + new Owt.Base.StreamSourceInfo('mic', undefined, undefined)); + attachReader(bidiAudioStream); + const publication = await conference.publish(localStream, { + audio: {codec: 'opus', numberOfChannels: 2, sampleRate: 48000}, + video: false, + transport: {type: 'quic'}, + }); // const localStream = new Owt.Base.LocalStream( // bidirectionalStream, - // new Owt.Base.StreamSourceInfo(undefined, 'camera', undefined)); - // attachReader(bidirectionalStream); + // new Owt.Base.StreamSourceInfo(undefined, undefined, true)); // const publication = await conference.publish( - // localStream, {video: {codec: 'h264'}, transport: {type: 'quic'}}); - const localStream = new Owt.Base.LocalStream( - bidirectionalStream, - new Owt.Base.StreamSourceInfo(undefined, undefined, true)); - const publication = await conference.publish( - localStream, {transport: {type: 'quic'}}); + // localStream, {transport: {type: 'quic'}}); console.log(publication); updateConferenceStatus('Created send channel.'); } @@ -146,13 +151,23 @@ async function writeUuid() { return; } -async function writeVideoData() { - mediaStream = await navigator.mediaDevices.getUserMedia({video: true}); - const track = new MediaStreamTrackProcessor(mediaStream.getVideoTracks()[0]); - mediaWorker.postMessage(['video-source', track.readable], [track.readable]); +async function writeMediaData() { + mediaStream = + await navigator.mediaDevices.getUserMedia({audio: true, video: true}); + const audioTrack = + new MediaStreamTrackProcessor(mediaStream.getAudioTracks()[0]); + mediaWorker.postMessage( + ['audio-source', audioTrack.readable], [audioTrack.readable]); mediaWorker.postMessage( - ['send-stream', bidirectionalStream.writable], - [bidirectionalStream.writable]); + ['send-stream-audio', bidiAudioStream.writable], + [bidiAudioStream.writable]); + // const videoTrack = + // new MediaStreamTrackProcessor(mediaStream.getVideoTracks()[0]); + // mediaWorker.postMessage( + // ['video-source', videoTrack.readable], [videoTrack.readable]); + // mediaWorker.postMessage( + // ['send-stream-video', bidirectionalStream.writable], + // [bidirectionalStream.writable]); } async function writeData() { @@ -173,12 +188,12 @@ window.addEventListener('load', () => { }); document.getElementById('start-sending').addEventListener('click', async () => { - if (!bidirectionalStream) { + if (!bidiAudioStream) { updateConferenceStatus('Stream is not created.'); return; } - //writeVideoData(); - writeTask = setInterval(writeData, 200); + writeMediaData(); + //writeTask = setInterval(writeData, 200); updateConferenceStatus('Started sending.'); }); From f35acdca5b0a6fb515775293ededb1cb28606243 Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Wed, 27 Oct 2021 14:37:35 +0800 Subject: [PATCH 04/11] Support sending MediaStream over WebTransport streams. --- docs/mdfiles/changelog.md | 2 + docs/mdfiles/index.md | 2 + scripts/Gruntfile.js | 21 ++- src/samples/conference/public/quic.html | 4 +- .../conference/public/scripts/media-worker.js | 158 ------------------ src/samples/conference/public/scripts/quic.js | 116 ++++++------- src/sdk/base/logger.js | 8 +- src/sdk/conference/client.js | 6 +- src/sdk/conference/webtransport/connection.js | 133 +++++++++------ .../conference/webtransport/media-worker.js | 140 ++++++++++++++++ .../webtransport/receive-stream-worker.js | 3 + 11 files changed, 304 insertions(+), 289 deletions(-) delete mode 100644 src/samples/conference/public/scripts/media-worker.js create mode 100644 src/sdk/conference/webtransport/media-worker.js diff --git a/docs/mdfiles/changelog.md b/docs/mdfiles/changelog.md index 7ddd9a61..2a80d1d6 100644 --- a/docs/mdfiles/changelog.md +++ b/docs/mdfiles/changelog.md @@ -6,6 +6,8 @@ Change Log * Add a new property `rtpTransceivers` to `TransportSettings` and `TransportConstraints`. * Add a new property `peerConnection` to `ConferenceClient`. * The second argument of `ConferenceClient.publish` could be a list of `RTCRtpTransceiver`s. +* Add support to publish a MediaStream over WebTransport. + # 5.0 * Add WebTransport support for conference mode, see [this design doc](../../design/webtransport.md) for detailed information. * All publications and subscriptions for the same conference use the same `PeerConnection`. diff --git a/docs/mdfiles/index.md b/docs/mdfiles/index.md index 1dedfeff..b88d7595 100644 --- a/docs/mdfiles/index.md +++ b/docs/mdfiles/index.md @@ -142,6 +142,8 @@ WebTransport is supported in conference mode as an experimental feature. QUIC ag - [JavaScript SDK design doc for WebTransport support](https://github.com/open-webrtc-toolkit/owt-client-javascript/blob/master/docs/design/webtransport.md) - [QUIC programming guide for OWT server](https://github.com/open-webrtc-toolkit/owt-server/blob/master/doc/design/quic-programming-guide.md) +Publishing a MediaStream over WebTransport requires an additional worker for I/O. The worker is a standalone ES module, not included in owt.js. As we're moving the SDK from traditional JavaScript script to ES module, there is no plan to support this worker in old browsers. + # 7 Events The JavaScript objects fires events using `Owt.Base.EventDispatchers`. For more detailed events, please refer to the specific class description page. diff --git a/scripts/Gruntfile.js b/scripts/Gruntfile.js index 5672e0c1..c03daeaf 100644 --- a/scripts/Gruntfile.js +++ b/scripts/Gruntfile.js @@ -76,6 +76,10 @@ window.L = L;\n\ watch: true }, }, + worker:{ + src: ['dist/sdk/conference/webtransport/media-worker.js'], + dest: 'dist/sdk/media-worker.js', + }, sinon: { src: ['node_modules/sinon/lib/sinon.js'], dest: 'test/unit/resources/scripts/gen/sinon-browserified.js', @@ -102,7 +106,15 @@ window.L = L;\n\ options: { base: '.', port: 7080, - keepalive: true + keepalive: true, + middleware: function(connect, options, middlewares) { + middlewares.unshift((req, res, next) => { + res.setHeader('Cross-Origin-Embedder-Policy', 'require-corp'); + res.setHeader('Cross-Origin-Opener-Policy', 'same-origin'); + next(); + }); + return middlewares; + } }, }, }, @@ -189,7 +201,8 @@ window.L = L;\n\ {expand: true,cwd:'src/extension/',src:['**'],dest:'dist/',flatten:false}, {expand: true,cwd:'dist/sdk/',src:['owt.js'],dest:'dist/samples/conference/public/scripts/',flatten:false}, {expand: true,cwd:'dist/samples/conference/public/scripts',src:['rest.js'],dest:'dist/samples/conference/',flatten:false}, - {expand: true,cwd:'dist/sdk/',src:['owt.js'],dest:'dist/samples/p2p/js/',flatten:false} + {expand: true,cwd:'dist/sdk/',src:['owt.js'],dest:'dist/samples/p2p/js/',flatten:false}, + {expand: true,cwd: 'dist/sdk/',src: ['media-worker.js'],dest: 'dist/samples/conference/public/scripts/',flatten: false}, ] } }, @@ -267,8 +280,8 @@ window.L = L;\n\ grunt.registerTask('check', ['eslint:src']); grunt.registerTask('prepare', ['browserify:sinon', 'browserify:chai_as_promised']); - grunt.registerTask('pack', ['browserify:dist', 'concat:rest', 'uglify:dist', 'copy:dist', 'string-replace', 'compress:dist', 'jsdoc:dist']); - grunt.registerTask('dev', ['browserify:dev', 'connect:server']); + grunt.registerTask('pack', ['browserify:dist', 'browserify:worker', 'concat:rest', 'uglify:dist', 'copy:dist', 'string-replace', 'compress:dist', 'jsdoc:dist']); + grunt.registerTask('dev', ['browserify:dev', 'browserify:worker', 'connect:server']); grunt.registerTask('debug', ['browserify:dev']); grunt.registerTask('default', ['check', 'pack']); }; diff --git a/src/samples/conference/public/quic.html b/src/samples/conference/public/quic.html index 300ce4d1..8fdb15e8 100644 --- a/src/samples/conference/public/quic.html +++ b/src/samples/conference/public/quic.html @@ -23,9 +23,9 @@

Open WebRTC Toolkit

-

Sample of QuicTransport

+

Sample of WebTransport

-

This sample works with the latest Chrome.

+

This sample works with the Chrome >= 97.

diff --git a/src/samples/conference/public/scripts/media-worker.js b/src/samples/conference/public/scripts/media-worker.js deleted file mode 100644 index ab0c48b4..00000000 --- a/src/samples/conference/public/scripts/media-worker.js +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright (C) <2021> Intel Corporation -// -// SPDX-License-Identifier: Apache-2.0 - -/* eslint-disable require-jsdoc */ -/* global AudioEncoder, EncodedAudioChunk, VideoEncoder, VideoDecoder, - * EncodedVideoChunk */ - -let videoBidiStreamWritable, audioEncoder, videoEncoder, frameBuffer, - audioSendStreamWriter, videoSendStreamWriter, mediaSession, - datagramReceiver, videoDecoder; -// 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. -const sizePrefix = 4; - -onmessage = (e) => { - if (e.data[0] === 'audio-source') { - readMediaData(e.data[1], 'audio'); - } else if (e.data[0] === 'video-source') { - readMediaData(e.data[1], 'video'); - } else if (e.data[0] === 'send-stream-audio') { - const audioBidiStreamWritable = e.data[1]; - audioSendStreamWriter = audioBidiStreamWritable.getWriter(); - writeTrackId('audio', audioSendStreamWriter); - initAudioEncoder(); - } else if (e.data[0] === 'send-stream-video') { - videoBidiStreamWritable = e.data[1]; - videoSendStreamWriter = videoBidiStreamWritable.getWriter(); - writeTrackId('video', videoSendStreamWriter); - initVideoEncoder(); - } else if (e.data[0] === 'datagram-receiver') { - datagramReceiver = e.data[1]; - } else if (e.data[0] === 'encoded-video-frame') { - if (videoDecoder.state === 'closed') { - return; - } - videoDecoder.decode(new EncodedVideoChunk( - {timestamp: Date.now(), data: e.data[1], type: 'key'})); - } -}; - -async function videoOutput(chunk, metadata) { - return; - if (videoBidiStreamWritable) { - if (!frameBuffer || - frameBuffer.byteLength < chunk.byteLength + sizePrefix) { - frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); - } - const bufferView = new Uint8Array(frameBuffer, sizePrefix); - chunk.copyTo(bufferView); - const dataView = - new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); - dataView.setUint32(0, chunk.byteLength); - await videoSendStreamWriter.ready; - await videoSendStreamWriter.write(dataView); - } -} - -function videoError(error) { - console.log('Video encode error, ' + error.message); -} - -async function audioOutput(chunk, metadata) { - if (audioSendStreamWriter) { - if (!frameBuffer || - frameBuffer.byteLength < chunk.byteLength + sizePrefix) { - frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); - } - const bufferView = new Uint8Array(frameBuffer, sizePrefix); - chunk.copyTo(bufferView); - const dataView = - new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); - dataView.setUint32(0, chunk.byteLength); - await audioSendStreamWriter.ready; - await audioSendStreamWriter.write(dataView); - console.log('Wrote an audio frame. '+chunk.byteLength); - } -} - -function audioError(error) { - console.log(`Audio encode error: ${error.message}`); -} - -async function writeTrackId(kind, writer) { - const id = new Uint8Array(16); - id[15] = (kind === 'audio' ? 1 : 2); - await writer.ready; - writer.write(id); - console.log('Wrote track ID for '+kind); -} - -function initAudioEncoder() { - audioEncoder = new AudioEncoder({output: audioOutput, error: audioError}); - audioEncoder.configure( - {codec: 'opus', numberOfChannels: 1, sampleRate: 48000}); -} - -function initVideoEncoder() { - videoEncoder = new VideoEncoder({output: videoOutput, error: videoError}); - videoEncoder.configure({ - codec: 'avc1.4d002a', - width: 640, - height: 480, - framerate: 30, - latencyMode: 'realtime', - avc: {format: 'annexb'}, - }); -} - -function initVideoDecoder() { - videoDecoder = new VideoDecoder({ - output: videoFrameOutputCallback, - error: webCodecsErrorCallback, - }); - videoDecoder.configure({codec: 'avc1.42400a', optimizeForLatency: true}); -} - -function videoFrameOutputCallback(frame) { - postMessage(['video-frame', frame], [frame]); -} - -function webCodecsErrorCallback(error) { - console.log('error: ' + error.message); -} - -// Read data from media track. -async function readMediaData(readable, kind) { - const reader = readable.getReader(); - while (true) { - const {value, done} = await reader.read(); - if (done) { - console.log('MediaStream ends.'); - break; - } - if (kind === 'audio') { - audioEncoder.encode(value); - } else if (kind === 'video') { - videoEncoder.encode(value); - } - value.close(); - } -} - -async function fetchWasm() { - const Module={}; - Module['instantiateWasm'] = async (imports, successCallback) => { - const response = await fetch('./owt.wasm'); - const buffer = await response.arrayBuffer(); - const module=new WebAssembly.Module(buffer); - const instance = await WebAssembly.instantiate(module, imports); - successCallback(instance, module); - return {}; - }; - // Module['wasmModule']=new WebAssembly.Module(buffer); - importScripts('./owt.js'); - console.log('Got wasm binary.'); -} - -initVideoDecoder(); diff --git a/src/samples/conference/public/scripts/quic.js b/src/samples/conference/public/scripts/quic.js index cb4cf8ee..f4f3d9d2 100644 --- a/src/samples/conference/public/scripts/quic.js +++ b/src/samples/conference/public/scripts/quic.js @@ -9,27 +9,31 @@ let quicChannel = null; let bidirectionalStream = null; let bidiAudioStream = null; -let writeTask, mediaStream, mediaWorker, conferenceId, myId, mixedStream, generatorWriter; +let writeTask, dataWorker, conferenceId, myId, mixedStream, generatorWriter, + mediaPublication; +const isMedia = true; window.Module={}; -const conference = new Owt.Conference.ConferenceClient({ - webTransportConfiguration: { - serverCertificateFingerprints: [{ - value: - '59:74:C6:C5:2C:D8:E8:18:A9:D2:14:77:ED:94:89:87:DF:83:BA:B3:96:4C:4C:0B:B8:D3:22:58:11:55:67:1A', - algorithm: 'sha-256', - }], - serverCertificateHashes: [{ - value: new Uint8Array([ - 0x59, 0x74, 0xC6, 0xC5, 0x2C, 0xD8, 0xE8, 0x18, 0xA9, 0xD2, 0x14, - 0x77, 0xED, 0x94, 0x89, 0x87, 0xDF, 0x83, 0xBA, 0xB3, 0x96, 0x4C, - 0x4C, 0x0B, 0xB8, 0xD3, 0x22, 0x58, 0x11, 0x55, 0x67, 0x1A - ]), - algorithm: 'sha-256', - }], - } -}); +const conference = new Owt.Conference.ConferenceClient( + { + webTransportConfiguration: { + serverCertificateFingerprints: [{ + value: + 'FD:CD:87:EB:92:97:84:FD:D9:E9:C1:9F:AF:57:12:0E:32:AF:0D:C0:58:5F:33:BB:59:4A:2E:6E:C3:18:7A:93', + algorithm: 'sha-256', + }], + serverCertificateHashes: [{ + value: new Uint8Array([ + 0xFD, 0xCD, 0x87, 0xEB, 0x92, 0x97, 0x84, 0xFD, 0xD9, 0xE9, 0xC1, + 0x9F, 0xAF, 0x57, 0x12, 0x0E, 0x32, 0xAF, 0x0D, 0xC0, 0x58, 0x5F, + 0x33, 0xBB, 0x59, 0x4A, 0x2E, 0x6E, 0xC3, 0x18, 0x7A, 0x93 + ]), + algorithm: 'sha-256', + }], + } + }, + '../../../sdk/conference/webtransport'); conference.addEventListener('streamadded', async (event) => { console.log(event.stream); if (event.stream.origin == myId) { @@ -60,8 +64,8 @@ function updateConferenceStatus(message) { } function initWorker() { - mediaWorker = new Worker('./scripts/media-worker.js'); - mediaWorker.onmessage=((e) => { + dataWorker = new Worker('./scripts/data-worker.js'); + dataWorker.onmessage=((e) => { if (e.data[0] === 'video-frame') { generatorWriter.write(e.data[1]); //console.log(e.data[1]); @@ -115,29 +119,13 @@ async function attachReader(stream) { } async function createSendChannel() { - //bidirectionalStream = await conference.createSendStream(); - bidiAudioStream = await conference.createSendStream(); - const localStream = new Owt.Base.LocalStream( - bidiAudioStream, - new Owt.Base.StreamSourceInfo('mic', undefined, undefined)); - attachReader(bidiAudioStream); - const publication = await conference.publish(localStream, { - audio: {codec: 'opus', numberOfChannels: 2, sampleRate: 48000}, - video: false, - transport: {type: 'quic'}, - }); - // const localStream = new Owt.Base.LocalStream( - // bidirectionalStream, - // new Owt.Base.StreamSourceInfo(undefined, undefined, true)); - // const publication = await conference.publish( - // localStream, {transport: {type: 'quic'}}); - console.log(publication); + bidirectionalStream = await conference.createSendStream(); updateConferenceStatus('Created send channel.'); } async function windowOnLoad() { await joinConference(); - await createSendChannel(); + //await createSendChannel(); } async function writeUuid() { @@ -151,31 +139,12 @@ async function writeUuid() { return; } -async function writeMediaData() { - mediaStream = - await navigator.mediaDevices.getUserMedia({audio: true, video: true}); - const audioTrack = - new MediaStreamTrackProcessor(mediaStream.getAudioTracks()[0]); - mediaWorker.postMessage( - ['audio-source', audioTrack.readable], [audioTrack.readable]); - mediaWorker.postMessage( - ['send-stream-audio', bidiAudioStream.writable], - [bidiAudioStream.writable]); - // const videoTrack = - // new MediaStreamTrackProcessor(mediaStream.getVideoTracks()[0]); - // mediaWorker.postMessage( - // ['video-source', videoTrack.readable], [videoTrack.readable]); - // mediaWorker.postMessage( - // ['send-stream-video', bidirectionalStream.writable], - // [bidirectionalStream.writable]); -} - async function writeData() { const encoder = new TextEncoder(); const encoded = encoder.encode('message', {stream: true}); const writer = bidirectionalStream.writable.getWriter(); await writer.ready; - const ab=new Uint8Array(10000); + const ab = new Uint8Array(10000); ab.fill(1, 0); await writer.write(ab); writer.releaseLock(); @@ -188,17 +157,34 @@ window.addEventListener('load', () => { }); document.getElementById('start-sending').addEventListener('click', async () => { - if (!bidiAudioStream) { - updateConferenceStatus('Stream is not created.'); - return; + if (isMedia) { + const mediaStream = + await navigator.mediaDevices.getUserMedia({audio: true, video: true}); + const localStream = new Owt.Base.LocalStream( + mediaStream, new Owt.Base.StreamSourceInfo('mic', 'camera', undefined)); + mediaPublication = await conference.publish(localStream, { + audio: {codec: 'opus', numberOfChannels: 2, sampleRate: 48000}, + video: {codec: 'h264'}, + transport: {type: 'quic'}, + }); + } else { + if (!bidirectionalStream) { + updateConferenceStatus('Stream is not created.'); + return; + } + writeTask = setInterval(writeData, 200); } - writeMediaData(); - //writeTask = setInterval(writeData, 200); updateConferenceStatus('Started sending.'); }); document.getElementById('stop-sending').addEventListener('click', () => { - clearInterval(writeTask); + if (isMedia) { + if (mediaPublication) { + mediaPublication.stop(); + } + } else { + clearInterval(writeTask); + } updateConferenceStatus('Stopped sending.'); }); @@ -213,7 +199,7 @@ document.getElementById('start-receiving') const receiver = ms.createRtpVideoReceiver(); receiver.setCompleteFrameCallback((frame) => { const copiedFrame = frame.slice(0); - mediaWorker.postMessage( + dataWorker.postMessage( ['encoded-video-frame', copiedFrame], [copiedFrame.buffer]); }); subscribeMixedStream(); diff --git a/src/sdk/base/logger.js b/src/sdk/base/logger.js index 2b2ba22e..504c65ed 100644 --- a/src/sdk/base/logger.js +++ b/src/sdk/base/logger.js @@ -26,7 +26,7 @@ // This file is borrowed from lynckia/licode with some modifications. -/* global window */ +/* global console */ 'use strict'; @@ -55,13 +55,13 @@ const Logger = (function() { }; that.log = (...args) => { - window.console.log((new Date()).toISOString(), ...args); + console.log((new Date()).toISOString(), ...args); }; const bindType = function(type) { - if (typeof window.console[type] === 'function') { + if (typeof console[type] === 'function') { return (...args) => { - window.console[type]((new Date()).toISOString(), ...args); + console[type]((new Date()).toISOString(), ...args); }; } else { return that.log; diff --git a/src/sdk/conference/client.js b/src/sdk/conference/client.js index a5f1729f..bb64d4be 100644 --- a/src/sdk/conference/client.js +++ b/src/sdk/conference/client.js @@ -117,9 +117,10 @@ class ConferenceClientConfiguration { // eslint-disable-line no-unused-vars * @extends Owt.Base.EventDispatcher * @constructor * @param {?Owt.Conference.ConferenceClientConfiguration } config Configuration for ConferenceClient. + * @param {string} workerDir Path of the directory for workers shipped with OWT SDK. It could be an relative path to your HTML file or an absolute path. * @param {?Owt.Conference.SioSignaling } signalingImpl Signaling channel implementation for ConferenceClient. SDK uses default signaling channel implementation if this parameter is undefined. Currently, a Socket.IO signaling channel implementation was provided as ics.conference.SioSignaling. However, it is not recommended to directly access signaling channel or customize signaling channel for ConferenceClient as this time. */ -export const ConferenceClient = function(config, signalingImpl) { +export const ConferenceClient = function(config, workerDir, signalingImpl) { Object.setPrototypeOf(this, new EventModule.EventDispatcher()); config = config || {}; const self = this; @@ -439,7 +440,8 @@ export const ConferenceClient = function(config, signalingImpl) { if (typeof WebTransport === 'function' && token.webTransportUrl) { quicTransportChannel = new QuicConnection( token.webTransportUrl, resp.webTransportToken, - createSignalingForChannel(), config.webTransportConfiguration); + createSignalingForChannel(), config.webTransportConfiguration, + workerDir); } const conferenceInfo = new ConferenceInfo( resp.room.id, Array.from(participants.values()), diff --git a/src/sdk/conference/webtransport/connection.js b/src/sdk/conference/webtransport/connection.js index 51520f07..fa8f9a1e 100644 --- a/src/sdk/conference/webtransport/connection.js +++ b/src/sdk/conference/webtransport/connection.js @@ -3,15 +3,15 @@ // SPDX-License-Identifier: Apache-2.0 /* eslint-disable require-jsdoc */ -/* global Promise, Map, WebTransport, Uint8Array, Uint32Array, TextEncoder, - * ArrayBuffer */ +/* global Promise, Map, WebTransport, WebTransportBidirectionalStream, + Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor */ 'use strict'; import Logger from '../../base/logger.js'; import {EventDispatcher} from '../../base/event.js'; import {Publication} from '../../base/publication.js'; -import {SubscribeOptions, Subscription} from '../subscription.js'; +import {Subscription} from '../subscription.js'; import {Base64} from '../../base/base64.js'; const uuidByteLength = 16; @@ -26,21 +26,24 @@ const uuidByteLength = 16; export class QuicConnection extends EventDispatcher { // `tokenString` is a base64 string of the token object. It's in the return // value of `ConferenceClient.join`. - constructor(url, tokenString, signaling, webTransportOptions) { + constructor(url, tokenString, signaling, webTransportOptions, workerDir) { super(); this._tokenString = tokenString; this._token = JSON.parse(Base64.decodeBase64(tokenString)); this._signaling = signaling; this._ended = false; - this._quicStreams = new Map(); // Key is publication or subscription ID. + // Key is publication or subscription ID, value is a list of streams. + this._quicDataStreams = new Map(); + // Key is MediaStreamTrack ID, value is a bidirectional stream. + this._quicMediaStreamTracks = new Map(); this._quicTransport = new WebTransport(url, webTransportOptions); this._subscribePromises = new Map(); // Key is subscription ID. this._subscribeOptions = new Map(); // Key is subscription ID. this._subscriptionInfoReady = - new Map(); // Key is subscription ID, value is a promise. + new Map(); // Key is subscription ID, value is a promise. this._transportId = this._token.transportId; this._initReceiveStreamReader(); - //this._initDatagrams(); + this._worker = new Worker(workerDir + '/media-worker.js', {type: 'module'}); } /** @@ -74,14 +77,6 @@ export class QuicConnection extends EventDispatcher { await this._authenticate(this._tokenString); } - async _initDatagrams() { - const datagramReader = this._quicTransport.datagrams.readable.getReader(); - while (true) { - const value = await datagramReader.read(); - console.log(value); - } - } - async _initReceiveStreamReader() { const receiveStreamReader = this._quicTransport.incomingBidirectionalStreams.getReader(); @@ -103,7 +98,7 @@ export class QuicConnection extends EventDispatcher { // https://github.com/w3c/webtransport/issues/131. Issue tracker: // https://crbug.com/1182905. const chunkReader = receiveStream.readable.getReader(); - let readingChunksDone = false; + const readingChunksDone = false; let readingHeaderDone = false; let mediaStream = false; let subscriptionId; @@ -169,7 +164,7 @@ export class QuicConnection extends EventDispatcher { } } chunkReader.releaseLock(); - this._quicStreams.set(subscriptionId, receiveStream); + this._quicDataStreams.set(subscriptionId, [receiveStream]); if (this._subscribePromises.has(subscriptionId)) { const subscription = this._createSubscription(subscriptionId, receiveStream); @@ -212,33 +207,71 @@ export class QuicConnection extends EventDispatcher { return quicStream; } - async createSendStream1(sessionId) { - Logger.info('Create stream.'); - await this._quicTransport.ready; - // TODO: Potential failure because of publication stream is created faster - // than signaling stream(created by the 1st call to initiatePublication). - const publicationId = await this._initiatePublication(); - const quicStream = await this._quicTransport.createSendStream(); - const writer = quicStream.writable.getWriter(); - await writer.ready; - writer.write(this._uuidToUint8Array(publicationId)); - writer.releaseLock(); - this._quicStreams.set(publicationId, quicStream); - return quicStream; - } - async publish(stream, options) { // TODO: Avoid a stream to be published twice. The first 16 bit data send to // server must be it's publication ID. // TODO: Potential failure because of publication stream is created faster // than signaling stream(created by the 1st call to initiatePublication). const publicationId = await this._initiatePublication(stream, options); - const quicStream = stream.stream; - const writer = quicStream.writable.getWriter(); - await writer.ready; - writer.write(this._uuidToUint8Array(publicationId)); - writer.releaseLock(); - this._quicStreams.set(publicationId, quicStream); + const quicStreams = []; + if (stream.stream instanceof WebTransportBidirectionalStream) { + quicStreams.push(stream.stream); + this._quicDataStreams.set(publicationId, stream.streams); + } else if (stream.stream instanceof MediaStream) { + if (typeof MediaStreamTrackProcessor === 'undefined') { + throw new TypeError( + 'MediaStreamTrackProcessor is not supported by your browser.'); + } + for (const track of stream.stream.getTracks()) { + const quicStream = + await this._quicTransport.createBidirectionalStream(); + this._quicMediaStreamTracks.set(track.id, quicStream); + quicStreams.push(quicStream); + } + } else { + throw new TypeError('Invalid stream.'); + } + for (const quicStream of quicStreams) { + const writer = quicStream.writable.getWriter(); + await writer.ready; + writer.write(this._uuidToUint8Array(publicationId)); + writer.releaseLock(); + } + if (stream.stream instanceof MediaStream) { + for (const track of stream.stream.getTracks()) { + let encoderConfig; + if (track.kind === 'audio') { + encoderConfig = { + codec: 'opus', + numberOfChannels: 1, + sampleRate: 48000, + }; + } else if (track.kind === 'video') { + encoderConfig = { + codec: 'avc1.4d002a', + width: 640, + height: 480, + framerate: 30, + latencyMode: 'realtime', + avc: {format: 'annexb'}, + }; + } + const quicStream = this._quicMediaStreamTracks.get(track.id); + const processor = new MediaStreamTrackProcessor(track); + this._worker.postMessage( + [ + 'media-sender', + [ + track.id, + track.kind, + processor.readable, + quicStream.writable, + encoderConfig, + ], + ], + [processor.readable, quicStream.writable]); + } + } const publication = new Publication(publicationId, () => { this._signaling.sendSignalingMessage('unpublish', {id: publication}) .catch((e) => { @@ -250,7 +283,7 @@ export class QuicConnection extends EventDispatcher { } hasContentSessionId(id) { - return this._quicStreams.has(id); + return this._quicDataStreams.has(id); } _uuidToUint8Array(uuidString) { @@ -343,13 +376,14 @@ export class QuicConnection extends EventDispatcher { .then((data) => { this._subscribeOptions.set(data.id, options); Logger.debug('Subscribe info is set.'); - if (this._quicStreams.has(data.id)) { + if (this._quicDataStreams.has(data.id)) { // QUIC stream created before signaling returns. + // TODO: Update subscription to accept list of QUIC streams. const subscription = this._createSubscription( - data.id, this._quicStreams.get(data.id)); + data.id, this._quicDataStreams.get(data.id)[0]); resolve(subscription); } else { - this._quicStreams.set(data.id, null); + this._quicDataStreams.set(data.id, null); // QUIC stream is not created yet, resolve promise after getting // QUIC stream. this._subscribePromises.set( @@ -363,15 +397,6 @@ export class QuicConnection extends EventDispatcher { return p; } - _readAndPrint() { - this._quicStreams[0].waitForReadable(5).then(() => { - const data = new Uint8Array(this._quicStreams[0].readBufferedAmount); - this._quicStreams[0].readInto(data); - Logger.info('Read data: ' + data); - this._readAndPrint(); - }); - } - async _initiatePublication(stream, options) { const media = {tracks: []}; if (stream.source.audio) { @@ -410,15 +435,15 @@ export class QuicConnection extends EventDispatcher { }; media.tracks.push(track); } - const data = await this._signaling.sendSignalingMessage('publish', { + const resp = await this._signaling.sendSignalingMessage('publish', { media: stream.source.data ? null : media, data: stream.source.data, transport: {type: 'quic', id: this._transportId}, }); - if (this._transportId !== data.transportId) { + if (this._transportId !== resp.transportId) { throw new Error('Transport ID not match.'); } - return data.id; + return resp.id; } _readyHandler() { diff --git a/src/sdk/conference/webtransport/media-worker.js b/src/sdk/conference/webtransport/media-worker.js new file mode 100644 index 00000000..dd571817 --- /dev/null +++ b/src/sdk/conference/webtransport/media-worker.js @@ -0,0 +1,140 @@ +// Copyright (C) <2021> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +/* eslint-disable require-jsdoc */ +/* global AudioEncoder, VideoEncoder, VideoDecoder, Map, ArrayBuffer, + Uint8Array, DataView */ + +import Logger from '../../base/logger.js'; + +// Key is MediaStreamTrack ID, value is AudioEncoder or VideoEncoder. +const encoders = new Map(); +// Key is MediaStreamTrack ID, value is WritableStreamDefaultWriter. +const writers = new Map(); + +let frameBuffer; +let videoDecoder; +// 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. +const sizePrefix = 4; + +/* Messages it accepts: + * media-sender: [MediaStreamTrack, WebTransportStream, + * AudioEncoderConfig/VideoEncoderConfig] + */ +// eslint-disable-next-line no-undef +onmessage = (e) => { + if (e.data[0] === 'media-sender') { + const [trackId, trackKind, trackReadable, sendStreamWritable, config] = + e.data[1]; + let encoder; + const writer = sendStreamWritable.getWriter(); + if (trackKind === 'audio') { + encoder = initAudioEncoder(config, writer); + } else { // Video. + encoder = initVideoEncoder(config, writer); + } + encoders.set(trackId, encoder); + writers.set(trackId, writer); + readMediaData(trackReadable, encoder); + writeTrackId(trackKind, writer); + } +}; + +async function videoOutput(writer, chunk, metadata) { + // TODO: Combine audio and video output callback. + if (!frameBuffer || frameBuffer.byteLength < chunk.byteLength + sizePrefix) { + frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); + } + const bufferView = new Uint8Array(frameBuffer, sizePrefix); + chunk.copyTo(bufferView); + const dataView = new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); + dataView.setUint32(0, chunk.byteLength); + await writer.ready; + await writer.write(dataView); +} + +function videoError(error) { + Logger.error('Video encode error: ' + error.message); +} + +async function audioOutput(writer, chunk, metadata) { + if (!frameBuffer || frameBuffer.byteLength < chunk.byteLength + sizePrefix) { + frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); + } + const bufferView = new Uint8Array(frameBuffer, sizePrefix); + chunk.copyTo(bufferView); + const dataView = new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); + dataView.setUint32(0, chunk.byteLength); + await writer.ready; + await writer.write(dataView); +} + +function audioError(error) { + Logger.error(`Audio encode error: ${error.message}`); +} + +async function writeTrackId(kind, writer) { + const id = new Uint8Array(16); + id[15] = (kind === 'audio' ? 1 : 2); + await writer.ready; + writer.write(id); +} + +function initAudioEncoder(config, writer) { + const audioEncoder = new AudioEncoder( + {output: audioOutput.bind(null, writer), error: audioError}); + // TODO: Respect config. + audioEncoder.configure( + {codec: 'opus', numberOfChannels: 1, sampleRate: 48000}); + return audioEncoder; +} + +function initVideoEncoder(config, writer) { + const videoEncoder = new VideoEncoder( + {output: videoOutput.bind(null, writer), error: videoError}); + // TODO: Respect config. + videoEncoder.configure({ + codec: 'avc1.4d002a', + width: 640, + height: 480, + framerate: 30, + latencyMode: 'realtime', + avc: {format: 'annexb'}, + }); + return videoEncoder; +} + +function initVideoDecoder() { + videoDecoder = new VideoDecoder({ + output: videoFrameOutputCallback, + error: webCodecsErrorCallback, + }); + videoDecoder.configure({codec: 'avc1.42400a', optimizeForLatency: true}); +} + +function videoFrameOutputCallback(frame) { + // eslint-disable-next-line no-undef + postMessage(['video-frame', frame], [frame]); +} + +function webCodecsErrorCallback(error) { + Logger.warn('error: ' + error.message); +} + +// Read data from media track. +async function readMediaData(trackReadable, encoder) { + const reader = trackReadable.getReader(); + // eslint-disable-next-line no-constant-condition + while (true) { + const {value, done} = await reader.read(); + if (done) { + Logger.debug('MediaStream ends.'); + break; + } + encoder.encode(value); + value.close(); + } +} + +initVideoDecoder(); diff --git a/src/sdk/conference/webtransport/receive-stream-worker.js b/src/sdk/conference/webtransport/receive-stream-worker.js index 256f1244..89e60a45 100644 --- a/src/sdk/conference/webtransport/receive-stream-worker.js +++ b/src/sdk/conference/webtransport/receive-stream-worker.js @@ -5,6 +5,9 @@ /* eslint-disable require-jsdoc */ /* global AudioDecoder, postMessage */ +// TODO: Enable ESLint for this file. +/* eslint-disable */ + 'use strict'; import Logger from '../../base/logger.js'; From 01b9d6320cebfcdeafcb6aa389795516854aa18b Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Fri, 26 Nov 2021 14:46:30 +0800 Subject: [PATCH 05/11] Support receive MediaStream over WebTransport datagrams. --- src/sdk/conference/webtransport/connection.js | 187 ++++++++++++++++-- .../conference/webtransport/media-worker.js | 113 ++++++++--- 2 files changed, 254 insertions(+), 46 deletions(-) diff --git a/src/sdk/conference/webtransport/connection.js b/src/sdk/conference/webtransport/connection.js index fa8f9a1e..4a1e9c95 100644 --- a/src/sdk/conference/webtransport/connection.js +++ b/src/sdk/conference/webtransport/connection.js @@ -4,7 +4,8 @@ /* eslint-disable require-jsdoc */ /* global Promise, Map, WebTransport, WebTransportBidirectionalStream, - Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor */ + Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor, + MediaStreamTrackGenerator, proto */ 'use strict'; @@ -44,6 +45,10 @@ export class QuicConnection extends EventDispatcher { this._transportId = this._token.transportId; this._initReceiveStreamReader(); this._worker = new Worker(workerDir + '/media-worker.js', {type: 'module'}); + // Key is subscription ID, value is a MediaStreamTrackGenerator writer. + this._mstVideoGeneratorWriters = new Map(); + this._initRtpModule(); + this._initDatagramReader(); } /** @@ -77,15 +82,18 @@ export class QuicConnection extends EventDispatcher { await this._authenticate(this._tokenString); } + _initRtpModule() { + this._worker.postMessage(['init-rtp']); + } + async _initReceiveStreamReader() { const receiveStreamReader = this._quicTransport.incomingBidirectionalStreams.getReader(); - Logger.info('Reader: ' + receiveStreamReader); let receivingDone = false; while (!receivingDone) { const {value: receiveStream, done: readingReceiveStreamsDone} = await receiveStreamReader.read(); - Logger.info('New stream received'); + Logger.debug('New stream received.'); const subscriptionIdBytes = new Uint8Array(uuidByteLength); let subscriptionIdBytesOffset = 0; const trackIdBytes = new Uint8Array(uuidByteLength); @@ -173,6 +181,19 @@ export class QuicConnection extends EventDispatcher { } } + async _initDatagramReader() { + const datagramReader = this._quicTransport.datagrams.readable.getReader(); + let receivingDone = false; + while (!receivingDone) { + const {value: datagram, done: readingDatagramsDone} = + await datagramReader.read(); + this._worker.postMessage(['rtp-packet', datagram]); + if (readingDatagramsDone) { + receivingDone = true; + } + } + } + _createSubscription(id, receiveStream) { // TODO: Incomplete subscription. const subscription = new Subscription(id, () => { @@ -207,6 +228,95 @@ export class QuicConnection extends EventDispatcher { return quicStream; } + async bindFeedbackReader(stream, publicationId) { + // The receiver side of a publication stream starts with a UUID of + // publication ID, then each feedback message has a 4 bytes header indicates + // its length, and followed by protobuf encoded body. + const feedbackChunkReader = stream.readable.getReader(); + let feedbackChunksDone = false; + let publicationIdOffset = 0; + const headerSize=4; + const header = new Uint8Array(headerSize); + let headerOffset = 0; + let bodySize = 0; + let bodyOffset = 0; + let bodyBytes; + while (!feedbackChunksDone) { + let valueOffset=0; + const {value, done} = await feedbackChunkReader.read(); + Logger.debug(value); + while (valueOffset < value.byteLength) { + if (publicationIdOffset < uuidByteLength) { + // TODO: Check publication ID matches. For now, we just skip this ID. + const readLength = + Math.min(uuidByteLength - publicationIdOffset, value.byteLength); + valueOffset += readLength; + publicationIdOffset += readLength; + } + if (headerOffset < headerSize) { + // Read header. + const copyLength = Math.min( + headerSize - headerOffset, value.byteLength - valueOffset); + if (copyLength === 0) { + continue; + } + header.set( + value.subarray(valueOffset, valueOffset + copyLength), + headerOffset); + headerOffset += copyLength; + valueOffset += copyLength; + if (headerOffset < headerSize) { + continue; + } + bodySize = 0; + bodyOffset = 0; + for (let i = 0; i < headerSize; i++) { + bodySize += (header[i] << ((headerSize - 1 - i) * 8)); + } + bodyBytes = new Uint8Array(bodySize); + Logger.debug('Body size ' + bodySize); + } + if (bodyOffset < bodySize) { + const copyLength = + Math.min(bodySize - bodyOffset, value.byteLength - valueOffset); + if (copyLength === 0) { + continue; + } + Logger.debug('Bytes for body: '+copyLength); + bodyBytes.set( + value.subarray(valueOffset, valueOffset + copyLength), + bodyOffset); + bodyOffset += copyLength; + valueOffset += copyLength; + if (valueOffset < bodySize) { + continue; + } + // Decode body. + const feedback = + proto.owt.protobuf.Feedback.deserializeBinary(bodyBytes); + this.handleFeedback(feedback, publicationId); + } + } + if (done) { + feedbackChunksDone = true; + break; + } + } + } + + async handleFeedback(feedback, publicationId) { + Logger.debug( + 'Key frame request type: ' + + proto.owt.protobuf.Feedback.Type.KEY_FRAME_REQUEST); + if (feedback.getType() === + proto.owt.protobuf.Feedback.Type.KEY_FRAME_REQUEST) { + this._worker.postMessage( + ['rtcp-feedback', ['key-frame-request', publicationId]]); + } else { + Logger.warning('Unrecognized feedback type ' + feedback.getType()); + } + } + async publish(stream, options) { // TODO: Avoid a stream to be published twice. The first 16 bit data send to // server must be it's publication ID. @@ -225,6 +335,7 @@ export class QuicConnection extends EventDispatcher { for (const track of stream.stream.getTracks()) { const quicStream = await this._quicTransport.createBidirectionalStream(); + this.bindFeedbackReader(quicStream, publicationId); this._quicMediaStreamTracks.set(track.id, quicStream); quicStreams.push(quicStream); } @@ -262,6 +373,7 @@ export class QuicConnection extends EventDispatcher { [ 'media-sender', [ + publicationId, track.id, track.kind, processor.readable, @@ -317,12 +429,12 @@ export class QuicConnection extends EventDispatcher { if (typeof options !== 'object') { return Promise.reject(new TypeError('Options should be an object.')); } - // if (options.audio === undefined) { - // options.audio = !!stream.settings.audio; - // } - // if (options.video === undefined) { - // options.video = !!stream.settings.video; - // } + if (options.audio === undefined) { + options.audio = !!stream.settings.audio; + } + if (options.video === undefined) { + options.video = !!stream.settings.video; + } let mediaOptions; let dataOptions; if (options.audio || options.video) { @@ -375,19 +487,38 @@ export class QuicConnection extends EventDispatcher { }) .then((data) => { this._subscribeOptions.set(data.id, options); - Logger.debug('Subscribe info is set.'); - if (this._quicDataStreams.has(data.id)) { - // QUIC stream created before signaling returns. - // TODO: Update subscription to accept list of QUIC streams. - const subscription = this._createSubscription( - data.id, this._quicDataStreams.get(data.id)[0]); - resolve(subscription); + if (dataOptions) { + // A WebTransport stream is associated with a subscription for + // data. + if (this._quicDataStreams.has(data.id)) { + // QUIC stream created before signaling returns. + // TODO: Update subscription to accept list of QUIC streams. + const subscription = this._createSubscription( + data.id, this._quicDataStreams.get(data.id)[0]); + resolve(subscription); + } else { + this._quicDataStreams.set(data.id, null); + // QUIC stream is not created yet, resolve promise after getting + // QUIC stream. + this._subscribePromises.set( + data.id, {resolve: resolve, reject: reject}); + } } else { - this._quicDataStreams.set(data.id, null); - // QUIC stream is not created yet, resolve promise after getting - // QUIC stream. - this._subscribePromises.set( - data.id, {resolve: resolve, reject: reject}); + // A MediaStream is associated with a subscription for media. + // Media packets are received over WebTransport datagram. + const generators = []; + for (const track of mediaOptions) { + const generator = + new MediaStreamTrackGenerator({kind: track.type}); + generators.push(generator); + // TODO: Update key with the correct SSRC. + this._mstVideoGeneratorWriters.set( + '0', generator.writable.getWriter()); + } + const mediaStream = new MediaStream(generators); + const subscription = + this._createSubscription(data.id, mediaStream); + resolve(subscription); } if (this._subscriptionInfoReady.has(data.id)) { this._subscriptionInfoReady.get(data.id)(); @@ -454,4 +585,18 @@ export class QuicConnection extends EventDispatcher { datagramReader() { return this._quicTransport.datagrams.readable.getReader(); } + + initHandlersForWorker() { + this._worker.onmessage = ((e) => { + const [command, args] = e.data; + switch (command) { + case 'video-frame': + // TODO: Use actual subscription ID. + this._mstVideoGeneratorWriters.get('0').getWriter.write(args); + break; + default: + Logger.warn('Unrecognized command ' + command); + } + }); + } } diff --git a/src/sdk/conference/webtransport/media-worker.js b/src/sdk/conference/webtransport/media-worker.js index dd571817..00dbcade 100644 --- a/src/sdk/conference/webtransport/media-worker.js +++ b/src/sdk/conference/webtransport/media-worker.js @@ -4,43 +4,95 @@ /* eslint-disable require-jsdoc */ /* global AudioEncoder, VideoEncoder, VideoDecoder, Map, ArrayBuffer, - Uint8Array, DataView */ + Uint8Array, DataView, console, EncodedVideoChunk */ -import Logger from '../../base/logger.js'; +// TODO: Use relative path instead. +import initModule from '/src/samples/conference/public/scripts/owt.js'; // Key is MediaStreamTrack ID, value is AudioEncoder or VideoEncoder. const encoders = new Map(); // Key is MediaStreamTrack ID, value is WritableStreamDefaultWriter. const writers = new Map(); +// Key is publication ID, value is bool indicates whether key frame is requested +// for its video track. +const keyFrameRequested = new Map(); +let wasmModule; +let mediaSession; +let rtpReceiver; let frameBuffer; let videoDecoder; // 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. const sizePrefix = 4; /* Messages it accepts: - * media-sender: [MediaStreamTrack, WebTransportStream, + * media-sender: [Publication ID, MediaStreamTrack ID, MediaStreamTrack kind, + * MediaStreamTrackProcessor readable, WebTransportStream writable, * AudioEncoderConfig/VideoEncoderConfig] */ // eslint-disable-next-line no-undef -onmessage = (e) => { - if (e.data[0] === 'media-sender') { - const [trackId, trackKind, trackReadable, sendStreamWritable, config] = - e.data[1]; - let encoder; - const writer = sendStreamWritable.getWriter(); - if (trackKind === 'audio') { - encoder = initAudioEncoder(config, writer); - } else { // Video. - encoder = initVideoEncoder(config, writer); - } - encoders.set(trackId, encoder); - writers.set(trackId, writer); - readMediaData(trackReadable, encoder); - writeTrackId(trackKind, writer); +onmessage = async (e) => { + const [command, args] = e.data; + switch (command) { + case 'media-sender': + initMediaSender(...args); + break; + case 'rtcp-feedback': + await handleFeedback(...args); + break; + case 'init-rtp': + await initRtpModule(); + break; + case 'rtp-packet': + await handleRtpPacket(args); + break; + default: + console.warn('Unrecognized command ' + command); } }; +async function initMediaSender( + publicationId, trackId, trackKind, trackReadable, sendStreamWritable, + config) { + let encoder; + const writer = sendStreamWritable.getWriter(); + if (trackKind === 'audio') { + encoder = initAudioEncoder(config, writer); + } else { // Video. + encoder = initVideoEncoder(config, writer); + keyFrameRequested[publicationId] = false; + } + encoders.set(trackId, encoder); + writers.set(trackId, writer); + readMediaData(trackReadable, encoder, publicationId); + writeTrackId(trackKind, writer); +} + +async function initRtpModule() { + initVideoDecoder(); + wasmModule = await fetchWasm(); + mediaSession = new wasmModule.MediaSession(); + rtpReceiver = mediaSession.createRtpVideoReceiver(); + rtpReceiver.setCompleteFrameCallback((frame) => { + videoDecoder.decode(new EncodedVideoChunk( + {timestamp: Date.now(), data: frame, type: 'key'})); + }); +} + +async function fetchWasm() { + const owtWasmModule = {}; + initModule(owtWasmModule); + await owtWasmModule.ready; + return owtWasmModule; +} + +async function handleFeedback(feedback, publicationId) { + if (feedback === 'key-frame-request') { + console.log('Setting key frame request flag.'); + keyFrameRequested[publicationId] = true; + } +} + async function videoOutput(writer, chunk, metadata) { // TODO: Combine audio and video output callback. if (!frameBuffer || frameBuffer.byteLength < chunk.byteLength + sizePrefix) { @@ -55,7 +107,7 @@ async function videoOutput(writer, chunk, metadata) { } function videoError(error) { - Logger.error('Video encode error: ' + error.message); + console.error('Video encode error: ' + error.message); } async function audioOutput(writer, chunk, metadata) { @@ -71,7 +123,7 @@ async function audioOutput(writer, chunk, metadata) { } function audioError(error) { - Logger.error(`Audio encode error: ${error.message}`); + console.error(`Audio encode error: ${error.message}`); } async function writeTrackId(kind, writer) { @@ -116,25 +168,36 @@ function initVideoDecoder() { function videoFrameOutputCallback(frame) { // eslint-disable-next-line no-undef postMessage(['video-frame', frame], [frame]); + frame.close(); } function webCodecsErrorCallback(error) { - Logger.warn('error: ' + error.message); + console.warn('error: ' + error.message); } // Read data from media track. -async function readMediaData(trackReadable, encoder) { +async function readMediaData(trackReadable, encoder, publicationId) { const reader = trackReadable.getReader(); // eslint-disable-next-line no-constant-condition while (true) { const {value, done} = await reader.read(); if (done) { - Logger.debug('MediaStream ends.'); + console.debug('MediaStream ends.'); break; } - encoder.encode(value); + if (keyFrameRequested.get(publicationId)) { + console.debug(typeof encoder + ' encode a key frame.'); + encoder.encode(value, {keyFrame: true}); + keyFrameRequested[publicationId] = false; + } else { + encoder.encode(value); + } value.close(); } } -initVideoDecoder(); +async function handleRtpPacket(packet) { + const buffer = wasmModule._malloc(packet.byteLength); + wasmModule.writeArrayToMemory(packet, buffer); + rtpReceiver.onRtpPacket(buffer, packet.byteLength); +} From 12422ef66227082ea591d77af2840a211c41fe0a Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Mon, 20 Dec 2021 15:29:35 +0800 Subject: [PATCH 06/11] Move RTP processing from sample to SDK. --- src/samples/conference/public/scripts/quic.js | 60 +------------------ src/sdk/conference/client.js | 3 +- src/sdk/conference/webtransport/connection.js | 27 +++++++-- .../conference/webtransport/media-worker.js | 43 +++++++++---- 4 files changed, 55 insertions(+), 78 deletions(-) diff --git a/src/samples/conference/public/scripts/quic.js b/src/samples/conference/public/scripts/quic.js index f4f3d9d2..1397238b 100644 --- a/src/samples/conference/public/scripts/quic.js +++ b/src/samples/conference/public/scripts/quic.js @@ -63,16 +63,6 @@ function updateConferenceStatus(message) { ('

' + message + '

'); } -function initWorker() { - dataWorker = new Worker('./scripts/data-worker.js'); - dataWorker.onmessage=((e) => { - if (e.data[0] === 'video-frame') { - generatorWriter.write(e.data[1]); - //console.log(e.data[1]); - } - }); -} - function joinConference() { return new Promise((resolve, reject) => { createToken(undefined, 'user', 'presenter', token => { @@ -85,7 +75,6 @@ function joinConference() { } } updateConferenceStatus('Connected to conference server.'); - initWorker(); resolve(); }); }, 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001'); @@ -153,7 +142,6 @@ async function writeData() { window.addEventListener('load', () => { windowOnLoad(); - fetchWasm(); }); document.getElementById('start-sending').addEventListener('click', async () => { @@ -190,58 +178,12 @@ document.getElementById('stop-sending').addEventListener('click', () => { document.getElementById('start-receiving') .addEventListener('click', async () => { - const video=document.getElementById('remote-video'); - const generator = new MediaStreamTrackGenerator({kind: 'video'}); - generatorWriter=generator.writable.getWriter(); - video.srcObject = new MediaStream([generator]); - const reader = conference.datagramReader(); - const ms = new Module.MediaSession(); - const receiver = ms.createRtpVideoReceiver(); - receiver.setCompleteFrameCallback((frame) => { - const copiedFrame = frame.slice(0); - dataWorker.postMessage( - ['encoded-video-frame', copiedFrame], [copiedFrame.buffer]); - }); subscribeMixedStream(); - while (true) { - const received = await reader.read(); - const buffer = Module._malloc(received.value.byteLength); - Module.writeArrayToMemory(received.value, buffer); - receiver.onRtpPacket(buffer, received.value.byteLength); - } }); -async function fetchWasm() { - Module['instantiateWasm'] = async (imports, successCallback) => { - const response = await fetch('scripts/owt.wasm'); - const buffer = await response.arrayBuffer(); - const module=await WebAssembly.compile(buffer); - const instance = await WebAssembly.instantiate(module, imports); - successCallback(instance, module); - return {}; - }; - const scriptPromise = new Promise((resolve, reject) => { - const script = document.createElement('script'); - document.body.appendChild(script); - script.onload = resolve; - script.onerror = reject; - script.async = true; - script.src = 'scripts/owt.js'; - }); - await scriptPromise; -} - async function subscribeMixedStream() { const subscription = await conference.subscribe( mixedStream, {audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}}); - const reader = subscription.stream.readable.getReader(); - while (true) { - const {value, done} = await reader.read(); - if (done) { - console.log('Subscription ends.'); - break; - } - // console.log('Received data: ' + value); - } + document.getElementById('remote-video').srcObject = subscription.stream; } diff --git a/src/sdk/conference/client.js b/src/sdk/conference/client.js index bb64d4be..67af63ac 100644 --- a/src/sdk/conference/client.js +++ b/src/sdk/conference/client.js @@ -161,8 +161,7 @@ export const ConferenceClient = function(config, workerDir, signalingImpl) { if (notification === 'soac' || notification === 'progress') { if (channels.has(data.id)) { channels.get(data.id).onMessage(notification, data); - } else if (quicTransportChannel && quicTransportChannel - .hasContentSessionId(data.id)) { + } else if (quicTransportChannel) { quicTransportChannel.onMessage(notification, data); } else { Logger.warning('Cannot find a channel for incoming data.'); diff --git a/src/sdk/conference/webtransport/connection.js b/src/sdk/conference/webtransport/connection.js index 4a1e9c95..68712418 100644 --- a/src/sdk/conference/webtransport/connection.js +++ b/src/sdk/conference/webtransport/connection.js @@ -42,9 +42,13 @@ export class QuicConnection extends EventDispatcher { this._subscribeOptions = new Map(); // Key is subscription ID. this._subscriptionInfoReady = new Map(); // Key is subscription ID, value is a promise. + // Key is subscription ID, value is an object with audio and video RTP + // configs. + this._rtpConfigs = new Map(); this._transportId = this._token.transportId; this._initReceiveStreamReader(); this._worker = new Worker(workerDir + '/media-worker.js', {type: 'module'}); + this._initHandlersForWorker(); // Key is subscription ID, value is a MediaStreamTrackGenerator writer. this._mstVideoGeneratorWriters = new Map(); this._initRtpModule(); @@ -68,6 +72,8 @@ export class QuicConnection extends EventDispatcher { this._readyHandler(); } else if (message.status === 'error') { this._errorHandler(message.data); + } else if (message.status === 'rtp'){ + this._rtpHandler(message); } break; case 'stream': @@ -507,17 +513,24 @@ export class QuicConnection extends EventDispatcher { // A MediaStream is associated with a subscription for media. // Media packets are received over WebTransport datagram. const generators = []; - for (const track of mediaOptions) { + for (const track of mediaOptions.tracks) { const generator = new MediaStreamTrackGenerator({kind: track.type}); generators.push(generator); // TODO: Update key with the correct SSRC. this._mstVideoGeneratorWriters.set( - '0', generator.writable.getWriter()); + data.id, generator.writable.getWriter()); } const mediaStream = new MediaStream(generators); const subscription = this._createSubscription(data.id, mediaStream); + this._worker.postMessage([ + 'add-subscription', + [ + subscription.id, options, + this._rtpConfigs.get(subscription.id) + ] + ]); resolve(subscription); } if (this._subscriptionInfoReady.has(data.id)) { @@ -582,17 +595,21 @@ export class QuicConnection extends EventDispatcher { // its own status. Do nothing here. } + _rtpHandler(message) { + Logger.debug(`RTP config: ${JSON.stringify(message.data)}.`); + this._rtpConfigs.set(message.id, message.data); + } + datagramReader() { return this._quicTransport.datagrams.readable.getReader(); } - initHandlersForWorker() { + _initHandlersForWorker() { this._worker.onmessage = ((e) => { const [command, args] = e.data; switch (command) { case 'video-frame': - // TODO: Use actual subscription ID. - this._mstVideoGeneratorWriters.get('0').getWriter.write(args); + this._mstVideoGeneratorWriters.get(args[0]).write(args[1]); break; default: Logger.warn('Unrecognized command ' + command); diff --git a/src/sdk/conference/webtransport/media-worker.js b/src/sdk/conference/webtransport/media-worker.js index 00dbcade..c131b649 100644 --- a/src/sdk/conference/webtransport/media-worker.js +++ b/src/sdk/conference/webtransport/media-worker.js @@ -19,7 +19,8 @@ const keyFrameRequested = new Map(); let wasmModule; let mediaSession; -let rtpReceiver; +// Key is SSRC, value is an RTP receiver. +let rtpReceivers = new Map(); let frameBuffer; let videoDecoder; // 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. @@ -46,6 +47,9 @@ onmessage = async (e) => { case 'rtp-packet': await handleRtpPacket(args); break; + case 'add-subscription': + addNewSubscription(...args); + break; default: console.warn('Unrecognized command ' + command); } @@ -69,14 +73,8 @@ async function initMediaSender( } async function initRtpModule() { - initVideoDecoder(); wasmModule = await fetchWasm(); mediaSession = new wasmModule.MediaSession(); - rtpReceiver = mediaSession.createRtpVideoReceiver(); - rtpReceiver.setCompleteFrameCallback((frame) => { - videoDecoder.decode(new EncodedVideoChunk( - {timestamp: Date.now(), data: frame, type: 'key'})); - }); } async function fetchWasm() { @@ -157,17 +155,17 @@ function initVideoEncoder(config, writer) { return videoEncoder; } -function initVideoDecoder() { +function initVideoDecoder(subscriptionId) { videoDecoder = new VideoDecoder({ - output: videoFrameOutputCallback, + output: videoFrameOutputCallback.bind(null, subscriptionId), error: webCodecsErrorCallback, }); videoDecoder.configure({codec: 'avc1.42400a', optimizeForLatency: true}); } -function videoFrameOutputCallback(frame) { +function videoFrameOutputCallback(subscriptionId, frame) { // eslint-disable-next-line no-undef - postMessage(['video-frame', frame], [frame]); + postMessage(['video-frame', [subscriptionId, frame]], [frame]); frame.close(); } @@ -196,8 +194,29 @@ async function readMediaData(trackReadable, encoder, publicationId) { } } +function getSsrc(packet) { + // SSRC starts from the 65th bit, in network order. + return new DataView(packet.buffer).getUint32(8, false); +} + async function handleRtpPacket(packet) { + const ssrc = getSsrc(packet); const buffer = wasmModule._malloc(packet.byteLength); wasmModule.writeArrayToMemory(packet, buffer); - rtpReceiver.onRtpPacket(buffer, packet.byteLength); + rtpReceivers.get(ssrc).onRtpPacket(buffer, packet.byteLength); } + +function addNewSubscription(subscriptionId, subscribeOptions, rtpConfig) { + // TODO: Audio is not supported yet, ignore the audio part. + initVideoDecoder(subscriptionId); + const videoSsrc = rtpConfig.video.ssrc; + if (rtpReceivers.has(videoSsrc)) { + console.error(`RTP receiver for SSRC ${videoSsrc} exits.`); + } + const rtpReceiver = mediaSession.createRtpVideoReceiver(videoSsrc); + rtpReceivers.set(videoSsrc, rtpReceiver); + rtpReceiver.setCompleteFrameCallback((frame) => { + videoDecoder.decode(new EncodedVideoChunk( + {timestamp: Date.now(), data: frame, type: 'key'})); + }); +} \ No newline at end of file From dda047c3cf648f5a6870ae746e7afa37e4178933 Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Mon, 20 Dec 2021 15:44:07 +0800 Subject: [PATCH 07/11] Fix style issues. --- src/sdk/conference/webtransport/connection.js | 6 +++--- src/sdk/conference/webtransport/media-worker.js | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/sdk/conference/webtransport/connection.js b/src/sdk/conference/webtransport/connection.js index 68712418..7f3e154a 100644 --- a/src/sdk/conference/webtransport/connection.js +++ b/src/sdk/conference/webtransport/connection.js @@ -72,7 +72,7 @@ export class QuicConnection extends EventDispatcher { this._readyHandler(); } else if (message.status === 'error') { this._errorHandler(message.data); - } else if (message.status === 'rtp'){ + } else if (message.status === 'rtp') { this._rtpHandler(message); } break; @@ -528,8 +528,8 @@ export class QuicConnection extends EventDispatcher { 'add-subscription', [ subscription.id, options, - this._rtpConfigs.get(subscription.id) - ] + this._rtpConfigs.get(subscription.id), + ], ]); resolve(subscription); } diff --git a/src/sdk/conference/webtransport/media-worker.js b/src/sdk/conference/webtransport/media-worker.js index c131b649..2f855d7a 100644 --- a/src/sdk/conference/webtransport/media-worker.js +++ b/src/sdk/conference/webtransport/media-worker.js @@ -20,7 +20,7 @@ const keyFrameRequested = new Map(); let wasmModule; let mediaSession; // Key is SSRC, value is an RTP receiver. -let rtpReceivers = new Map(); +const rtpReceivers = new Map(); let frameBuffer; let videoDecoder; // 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. @@ -219,4 +219,4 @@ function addNewSubscription(subscriptionId, subscribeOptions, rtpConfig) { videoDecoder.decode(new EncodedVideoChunk( {timestamp: Date.now(), data: frame, type: 'key'})); }); -} \ No newline at end of file +} From e085822c19842c8ce184005c5d81f330516acdf2 Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Thu, 10 Feb 2022 14:20:01 +0800 Subject: [PATCH 08/11] Support sending RTCP packet. --- src/sdk/conference/webtransport/connection.js | 10 ++++++++++ src/sdk/conference/webtransport/media-worker.js | 13 ++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/sdk/conference/webtransport/connection.js b/src/sdk/conference/webtransport/connection.js index 7f3e154a..fdfcd668 100644 --- a/src/sdk/conference/webtransport/connection.js +++ b/src/sdk/conference/webtransport/connection.js @@ -604,6 +604,13 @@ export class QuicConnection extends EventDispatcher { return this._quicTransport.datagrams.readable.getReader(); } + async _sendRtcp(buffer) { + const writer = this._quicTransport.datagrams.writable.getWriter(); + await writer.ready; + writer.write(buffer); + writer.releaseLock(); + } + _initHandlersForWorker() { this._worker.onmessage = ((e) => { const [command, args] = e.data; @@ -611,6 +618,9 @@ export class QuicConnection extends EventDispatcher { case 'video-frame': this._mstVideoGeneratorWriters.get(args[0]).write(args[1]); break; + case 'rtcp-packet': + this._sendRtcp(...args); + break; default: Logger.warn('Unrecognized command ' + command); } diff --git a/src/sdk/conference/webtransport/media-worker.js b/src/sdk/conference/webtransport/media-worker.js index 2f855d7a..4303db4a 100644 --- a/src/sdk/conference/webtransport/media-worker.js +++ b/src/sdk/conference/webtransport/media-worker.js @@ -74,7 +74,6 @@ async function initMediaSender( async function initRtpModule() { wasmModule = await fetchWasm(); - mediaSession = new wasmModule.MediaSession(); } async function fetchWasm() { @@ -203,6 +202,10 @@ async function handleRtpPacket(packet) { const ssrc = getSsrc(packet); const buffer = wasmModule._malloc(packet.byteLength); wasmModule.writeArrayToMemory(packet, buffer); + if(!rtpReceivers.has(ssrc)){ + console.log('RTP receiver not found.'); + return; + } rtpReceivers.get(ssrc).onRtpPacket(buffer, packet.byteLength); } @@ -213,6 +216,14 @@ function addNewSubscription(subscriptionId, subscribeOptions, rtpConfig) { if (rtpReceivers.has(videoSsrc)) { console.error(`RTP receiver for SSRC ${videoSsrc} exits.`); } + if (!mediaSession) { + mediaSession = new wasmModule.MediaSession(); + mediaSession.setRtcpCallback((packet) => { + console.log('RTCP callback.'); + const buffer = new Uint8Array(packet); + postMessage(['rtcp-packet', [buffer.buffer]], [buffer.buffer]); + }); + } const rtpReceiver = mediaSession.createRtpVideoReceiver(videoSsrc); rtpReceivers.set(videoSsrc, rtpReceiver); rtpReceiver.setCompleteFrameCallback((frame) => { From 33663c24177166d239556577ea9d9d39d8a84b8a Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Thu, 10 Feb 2022 14:20:42 +0800 Subject: [PATCH 09/11] Fix timestamp issue for video element. --- src/sdk/conference/webtransport/media-worker.js | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/sdk/conference/webtransport/media-worker.js b/src/sdk/conference/webtransport/media-worker.js index 4303db4a..4cb42017 100644 --- a/src/sdk/conference/webtransport/media-worker.js +++ b/src/sdk/conference/webtransport/media-worker.js @@ -26,6 +26,9 @@ let videoDecoder; // 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. const sizePrefix = 4; +// Timestamp of the first frame. +let startTime = 0; + /* Messages it accepts: * media-sender: [Publication ID, MediaStreamTrack ID, MediaStreamTrack kind, * MediaStreamTrackProcessor readable, WebTransportStream writable, @@ -226,8 +229,14 @@ function addNewSubscription(subscriptionId, subscribeOptions, rtpConfig) { } const rtpReceiver = mediaSession.createRtpVideoReceiver(videoSsrc); rtpReceivers.set(videoSsrc, rtpReceiver); - rtpReceiver.setCompleteFrameCallback((frame) => { - videoDecoder.decode(new EncodedVideoChunk( - {timestamp: Date.now(), data: frame, type: 'key'})); + rtpReceiver.setCompleteFrameCallback((frame, isKeyFrame) => { + if (startTime === 0) { + startTime = Date.now()*1000; + } + videoDecoder.decode(new EncodedVideoChunk({ + timestamp: Date.now()*1000 - startTime, + data: frame, + type: isKeyFrame ? 'key' : 'delta' + })); }); } From 176567f6abe876658d0d51fd5034c5fe8e7bc859 Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Thu, 10 Feb 2022 14:39:31 +0800 Subject: [PATCH 10/11] Fix style issues. --- src/sdk/conference/channel.js | 1 + src/sdk/conference/webtransport/media-worker.js | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/sdk/conference/channel.js b/src/sdk/conference/channel.js index 168df389..f3e75590 100644 --- a/src/sdk/conference/channel.js +++ b/src/sdk/conference/channel.js @@ -898,6 +898,7 @@ export class ConferencePeerConnectionChannel extends EventDispatcher { _createPeerConnection() { if (this.pc) { + Logger.warning('PeerConnection exists.'); return; } diff --git a/src/sdk/conference/webtransport/media-worker.js b/src/sdk/conference/webtransport/media-worker.js index 4cb42017..85cf358a 100644 --- a/src/sdk/conference/webtransport/media-worker.js +++ b/src/sdk/conference/webtransport/media-worker.js @@ -4,7 +4,7 @@ /* eslint-disable require-jsdoc */ /* global AudioEncoder, VideoEncoder, VideoDecoder, Map, ArrayBuffer, - Uint8Array, DataView, console, EncodedVideoChunk */ + Uint8Array, DataView, console, EncodedVideoChunk, postMessage */ // TODO: Use relative path instead. import initModule from '/src/samples/conference/public/scripts/owt.js'; @@ -205,7 +205,7 @@ async function handleRtpPacket(packet) { const ssrc = getSsrc(packet); const buffer = wasmModule._malloc(packet.byteLength); wasmModule.writeArrayToMemory(packet, buffer); - if(!rtpReceivers.has(ssrc)){ + if (!rtpReceivers.has(ssrc)) { console.log('RTP receiver not found.'); return; } @@ -234,9 +234,9 @@ function addNewSubscription(subscriptionId, subscribeOptions, rtpConfig) { startTime = Date.now()*1000; } videoDecoder.decode(new EncodedVideoChunk({ - timestamp: Date.now()*1000 - startTime, + timestamp: Date.now() * 1000 - startTime, data: frame, - type: isKeyFrame ? 'key' : 'delta' + type: isKeyFrame ? 'key' : 'delta', })); }); } From ba08fcad2247055b5ebb0f4e8e763b31db8e3c2d Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Tue, 8 Mar 2022 16:39:12 +0800 Subject: [PATCH 11/11] Fix an issue for unit test. connection.js is not included in unit test. --- test/unit/config/karma.config.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/config/karma.config.js b/test/unit/config/karma.config.js index 70d79bf4..62fd871b 100644 --- a/test/unit/config/karma.config.js +++ b/test/unit/config/karma.config.js @@ -46,7 +46,7 @@ module.exports = function (config) { pattern: './test/unit/resources/scripts/gen/sinon-browserified.js' }, { - pattern: './src/sdk/!(rest)/*.js', + pattern: './src/sdk/!(rest)/**/*.js', type: 'module' }, {