Skip to content

Commit

Permalink
fix(core): improve algorithm performance that sort audio and video fr…
Browse files Browse the repository at this point in the history
…ames by timestamp for FLV/RTMP
  • Loading branch information
ThibaultBee committed Nov 23, 2023
1 parent 6d446ca commit aa7a5d0
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (C) 2023 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.internal.muxers

import io.github.thibaultbee.streampack.internal.data.Packet
import io.github.thibaultbee.streampack.internal.utils.SyncQueue
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

/**
* An abstract class that implements [IMuxer] and output frames in their natural order.
*
* Frames are not in order because audio and video frames are encoded in parallel and video encoding takes more time.
* So some new audio frame could arrive sooner than older video frame.
*
* Some protocols (like RTMP) require frames to be in order. Use this class for this kind of protocols.
* If the protocol doesn't need ordered frames, use [IMuxer] directly.
*
* Don't call [IMuxerListener.onOutputFrame] directly, use [queue] instead.
* Don't forget to call [stopStream] at the end of [IMuxer.stopStream] implementation.
*
* This implementation is based on [SyncQueue]. It waits for a video frame to send audio frames.
* Unfortunately, sometimes video frames come faster than audio frames. So we schedule a task to
* send the frames after a delay of [scheduleTime] [scheduleTimeUnit].
*/
abstract class AbstractSortingMuxer(
private val scheduleTime: Long = 100,
private val scheduleTimeUnit: TimeUnit = TimeUnit.MILLISECONDS
) : IMuxer {
private val syncQueue =
SyncQueue(
{ packet1, packet2 -> packet1.ts.compareTo(packet2.ts) },
object : SyncQueue.Listener<Packet> {
override fun onElement(element: Packet) {
listener?.onOutputFrame(element)
}
})
protected abstract val hasVideo: Boolean
protected abstract val hasAudio: Boolean

private val scheduler = Executors.newSingleThreadScheduledExecutor()
private var exception: Exception? = null

private fun asyncSyncTo(packet: Packet) {
scheduler.schedule({
try {
syncQueue.syncTo(packet)
} catch (e: Exception) {
exception = e
}
}, scheduleTime, scheduleTimeUnit)
}

/**
* Queues multiple packets.
*
* If the muxer if for video only or audio only, the packet is directly sent to
* [IMuxerListener.onOutputFrame].
*
* @param packets the list of packets to queue
*/
fun queue(packets: List<Packet>) {
exception?.let { throw it }

if (hasVideo && hasAudio) {
syncQueue.add(packets)
if (packets.any { it.isVideo }) {
asyncSyncTo(packets.last())
}
} else {
// Audio only or video only. Don't need to sort.
packets.forEach {
listener?.onOutputFrame(it)
}
}
}

/**
* Queues a packet.
*
* If the muxer if for video only or audio only, the packet is directly sent to
* [IMuxerListener.onOutputFrame].
*
* @param packet the packet to queue
*/
fun queue(packet: Packet) {
exception?.let { throw it }

if (hasVideo && hasAudio) {
syncQueue.add(packet, false)
asyncSyncTo(packet)
} else {
// Audio only or video only. Don't need to sort.
listener?.onOutputFrame(packet)
}
}

/**
* To be called at the end of [stopStream] implementation.
*/
override fun stopStream() {
exception = null
syncQueue.clear()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.github.thibaultbee.streampack.internal.data.Frame
import io.github.thibaultbee.streampack.internal.data.Packet
import io.github.thibaultbee.streampack.internal.data.PacketType
import io.github.thibaultbee.streampack.internal.interfaces.ISourceOrientationProvider
import io.github.thibaultbee.streampack.internal.muxers.IMuxer
import io.github.thibaultbee.streampack.internal.muxers.AbstractSortingMuxer
import io.github.thibaultbee.streampack.internal.muxers.IMuxerListener
import io.github.thibaultbee.streampack.internal.muxers.flv.tags.AVTagsFactory
import io.github.thibaultbee.streampack.internal.muxers.flv.tags.FlvHeader
Expand All @@ -33,12 +33,12 @@ class FlvMuxer(
override var listener: IMuxerListener? = null,
initialStreams: List<Config>? = null,
private val writeToFile: Boolean,
) : IMuxer {
) : AbstractSortingMuxer() {
override val helper = FlvMuxerHelper()
private val streams = mutableListOf<Config>()
private val hasAudio: Boolean
override val hasAudio: Boolean
get() = streams.any { it.mimeType.isAudio }
private val hasVideo: Boolean
override val hasVideo: Boolean
get() = streams.any { it.mimeType.isVideo }
private var startUpTime: Long? = null
private var hasFirstFrame = false
Expand Down Expand Up @@ -73,17 +73,16 @@ class FlvMuxer(

frame.pts -= startUpTime!!
val flvTags = AVTagsFactory(frame, streams[streamPid]).build()
flvTags.forEach {
listener?.onOutputFrame(
Packet(
it.write(), frame.pts, if (frame.isVideo) {
PacketType.VIDEO
} else {
PacketType.AUDIO
}
)
val flvPacket = flvTags.map {
Packet(
it.write(), frame.pts, if (frame.isVideo) {
PacketType.VIDEO
} else {
PacketType.AUDIO
}
)
}
queue(flvPacket)
}

override fun addStreams(streamsConfig: List<Config>): Map<Config, Int> {
Expand Down Expand Up @@ -122,6 +121,7 @@ class FlvMuxer(
startUpTime = null
hasFirstFrame = false
streams.clear()
super.stopStream()
}

override fun release() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (C) 2023 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.internal.utils

import java.util.PriorityQueue

/**
* A synchronized queue that allows to add elements in order. Elements are stored till a sync
* element is added. When a sync element is added, all elements that are comparatively lower are
* sent to the listener.
*
* The purpose of this class is to allow to put elements in order.
*
* @param E the type of elements held in this collection
* @param comparator the comparator that will be used to order the elements
* @param listener the listener that will be called when a sync element is added
*/
class SyncQueue<E>(
private val comparator: Comparator<in E>,
private val listener: Listener<E>
) {
private val priorityQueue: PriorityQueue<E> = PriorityQueue(8, comparator)

val size: Int
get() = priorityQueue.size

/**
* Sends all elements that are comparatively lower than [element] to the listener.
* [element] is not outputted.
*
* @param element the element to compare
*/
fun syncTo(element: E) {
var polledElement: E? = pollIf(comparator, element)
while (polledElement != null) {
listener.onElement(polledElement)
polledElement = pollIf(comparator, element)
}
}

/**
* Adds an element in order.
* If [isSync] is true, all elements that are comparatively lower than [element] are sent to the
* listener.
*
* @param element the element to add
* @param isSync true if [element] is a sync element
*/
fun add(element: E, isSync: Boolean = false) {
if (isSync) {
syncTo(element)
// Send sync element
listener.onElement(element)
} else {
synchronized(this) {
priorityQueue.add(element)
}
}
}

/**
* Adds all elements in order.
*
* @param elements the elements to add
*/
fun add(elements: List<E>) {
synchronized(this) {
priorityQueue.addAll(elements)
}
}

private fun pollIf(comparator: Comparator<in E>, comparableElement: E): E? {
synchronized(this) {
val element = priorityQueue.peek()
if ((element != null) && comparator.compare(element, comparableElement) <= 0) {
return priorityQueue.poll()
}
return null
}
}

fun clear() {
synchronized(this) {
priorityQueue.clear()
}
}

interface Listener<E> {
/**
* Called when element is polled.
*/
fun onElement(element: E)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (C) 2023 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.internal.utils

import org.junit.Assert.assertEquals
import org.junit.Test

class SyncQueueTest {
@Test
fun `test sync 1 element`() {
val syncQueue =
SyncQueue({ o1, o2 -> o1.compareTo(o2) }, object : SyncQueue.Listener<Int> {
override fun onElement(element: Int) {
assertEquals(1, element)
}
})
syncQueue.add(1, true)
assertEquals(0, syncQueue.size)
}

@Test
fun `test already sorted elements`() {
var i = 1
val syncQueue =
SyncQueue({ o1, o2 -> o1.compareTo(o2) }, object : SyncQueue.Listener<Int> {
override fun onElement(element: Int) {
assertEquals(i++, element)
}
})
syncQueue.add(1)
syncQueue.add(2)
syncQueue.add(3)
syncQueue.add(4, isSync = true)

assertEquals(5, i)
assertEquals(0, syncQueue.size)
}

@Test
fun `test equals elements`() {
val syncQueue =
SyncQueue({ o1, o2 -> o1.compareTo(o2) }, object : SyncQueue.Listener<Int> {
override fun onElement(element: Int) {
assertEquals(1, element)
}
})
syncQueue.add(1)
syncQueue.add(1, isSync = true)

assertEquals(0, syncQueue.size)
}

@Test
fun `test not sorted elements`() {
var i = 1
val syncQueue =
SyncQueue({ o1, o2 -> o1.compareTo(o2) }, object : SyncQueue.Listener<Int> {
override fun onElement(element: Int) {
assertEquals(i++, element)
}
})
syncQueue.add(1)
syncQueue.add(2)
syncQueue.add(3)
syncQueue.add(5)
syncQueue.add(6)
syncQueue.add(4, isSync = true)

assertEquals(5, i)
assertEquals(2, syncQueue.size)
}
}
Loading

0 comments on commit aa7a5d0

Please sign in to comment.