Skip to content

Commit

Permalink
Queue outgoing audio packets
Browse files Browse the repository at this point in the history
With RTP, sending too fast or too slow can cause playback issues for the
receiver. So we queue up a bit (start at a 100ms buffer and growing to a
max of 5000ms depending on if we ever run out) and send it out at a
constant rate with a timer, to smooth over any timing bumps in the
production of any single packet.

If packets come too fast we stop asking for them until the queue shrinks
again.

If some packets come too slow we keep sending out of the queue so long
as we have any. If every single packet takes more than its length to
produce you'll be forever behind and get weird lag issues still.
  • Loading branch information
singpolyma committed Sep 24, 2024
1 parent 0f601cd commit 6bd4e8b
Showing 1 changed file with 36 additions and 5 deletions.
41 changes: 36 additions & 5 deletions snikket/jingle/PeerConnection.cpp.hx
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,11 @@ class MediaStreamTrack {
private var opusEncoder: cpp.Struct<OpusEncoder>;
private var rtpPacketizationConfig: SharedPtr<RtpPacketizationConfig>;
private final eventLoop: sys.thread.EventLoop;
private var timer: haxe.Timer;
private var audioQ: Array<{stamp: Float, channels: Int, payloadType: cpp.UInt8, clockRate: Int, payload: Array<cpp.UInt8>}> = [];
private var alive = true;
private var waitForQ = false;
private var bufferSizeInSeconds = 0.0;

@:allow(snikket)
private var media(get, default): StdOptional<DescriptionMedia>;
Expand All @@ -361,7 +365,21 @@ class MediaStreamTrack {

@:allow(snikket)
private function new() {
eventLoop = sys.thread.Thread.createWithEventLoop(() -> while(alive) { sys.thread.Thread.processEvents(); sys.thread.Thread.current().events.wait(); }).events;
eventLoop = sys.thread.Thread.createWithEventLoop(() -> {
timer = new haxe.Timer(10); // This timer will stop when the audioloop for this track stops
timer.run = () -> {
if (audioQ.length > 0 && audioQ[audioQ.length - 1].stamp <= haxe.Timer.stamp()) {
final packet = audioQ.pop();
write(packet.payload, packet.payloadType, packet.clockRate);
advanceTimestamp(Std.int(packet.payload.length / packet.channels));
}
if (waitForQ && audioQ.length < (50+50*bufferSizeInSeconds)) {
waitForQ = false;
notifyReadyForData(false);
}
};
while(alive) { sys.thread.Thread.processEvents(); sys.thread.Thread.current().events.wait(); }
}).events;
}

private function get_media() {
Expand Down Expand Up @@ -497,7 +515,13 @@ class MediaStreamTrack {
private function notifyReadyForData(fromCPP: Bool) {
untyped __cpp__("if (fromCPP) { int base = 0; hx::SetTopOfStack(&base, true); }"); // allow running haxe code on foreign thread
if (readyForPCMCallback != null) {
eventLoop.run(() -> readyForPCMCallback());
eventLoop.run(() -> {
if (audioQ.length > (50+50*bufferSizeInSeconds)) {
waitForQ = true;
} else {
readyForPCMCallback();
}
});
}
untyped __cpp__("if (fromCPP) { hx::SetTopOfStack((int*)0, true); }"); // unregister with GC
}
Expand All @@ -514,8 +538,15 @@ class MediaStreamTrack {
if (format == null) throw "Unsupported audo format: " + clockRate + "/" + channels;
eventLoop.run(() -> {
if (track.ref.isClosed()) return;
final stamp = if (audioQ.length < 1) {
bufferSizeInSeconds = Math.max(bufferSizeInSeconds, bufferSizeInSeconds + 0.1);
haxe.Timer.stamp() + bufferSizeInSeconds;
} else {
audioQ[0].stamp + (pcm.length / (clockRate / 1000)) / 1000.0;
}
if (format.format == "PCMU") {
write(pcm.map(pcmToUlaw), format.payloadType, clockRate);
final packet = { channels: channels, payloadType: format.payloadType, clockRate: clockRate, payload: pcm.map(pcmToUlaw), stamp: stamp };
audioQ.unshift(packet);
} else if (format.format == "opus") {
if (untyped __cpp__("!{0}", opusEncoder)) {
opusEncoder = OpusEncoder.create(clockRate, channels, untyped __cpp__("OPUS_APPLICATION_VOIP"), null); // assume only one opus clockRate+channels for this track
Expand All @@ -526,11 +557,11 @@ class MediaStreamTrack {
final rawOpus = new haxe.ds.Vector(pcm.length * 2).toData(); // Shoudn't be bigger than the input
final encoded = OpusEncoder.encode(opusEncoder, cpp.Pointer.ofArray(pcm), Std.int(pcm.length / channels), cpp.Pointer.ofArray(rawOpus), rawOpus.length);
rawOpus.resize(encoded);
write(rawOpus, format.payloadType, clockRate);
final packet = { channels: channels, payloadType: format.payloadType, clockRate: clockRate, payload: rawOpus, stamp: stamp };
audioQ.unshift(packet);
} else {
trace("Ignoring audio meant to go out as", format.format, format.clockRate, format.channels);
}
advanceTimestamp(Std.int(pcm.length / channels));
notifyReadyForData(false);
});
}
Expand Down

0 comments on commit 6bd4e8b

Please sign in to comment.