diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/AbstractSortingMuxer.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/AbstractSortingMuxer.kt new file mode 100644 index 000000000..9d0129e64 --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/AbstractSortingMuxer.kt @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2023 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.internal.muxers + +import io.github.thibaultbee.streampack.internal.data.Packet +import io.github.thibaultbee.streampack.internal.utils.SyncQueue +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +/** + * An abstract class that implements [IMuxer] and output frames in their natural order. + * + * Frames are not in order because audio and video frames are encoded in parallel and video encoding takes more time. + * So some new audio frame could arrive sooner than older video frame. + * + * Some protocols (like RTMP) require frames to be in order. Use this class for this kind of protocols. + * If the protocol doesn't need ordered frames, use [IMuxer] directly. + * + * Don't call [IMuxerListener.onOutputFrame] directly, use [queue] instead. + * Don't forget to call [stopStream] at the end of [IMuxer.stopStream] implementation. + * + * This implementation is based on [SyncQueue]. It waits for a video frame to send audio frames. + * Unfortunately, sometimes video frames come faster than audio frames. So we schedule a task to + * send the frames after a delay of [scheduleTime] [scheduleTimeUnit]. + */ +abstract class AbstractSortingMuxer( + private val scheduleTime: Long = 100, + private val scheduleTimeUnit: TimeUnit = TimeUnit.MILLISECONDS +) : IMuxer { + private val syncQueue = + SyncQueue( + { packet1, packet2 -> packet1.ts.compareTo(packet2.ts) }, + object : SyncQueue.Listener { + override fun onElement(element: Packet) { + listener?.onOutputFrame(element) + } + }) + protected abstract val hasVideo: Boolean + protected abstract val hasAudio: Boolean + + private val scheduler = Executors.newSingleThreadScheduledExecutor() + private var exception: Exception? = null + + private fun asyncSyncTo(packet: Packet) { + scheduler.schedule({ + try { + syncQueue.syncTo(packet) + } catch (e: Exception) { + exception = e + } + }, scheduleTime, scheduleTimeUnit) + } + + /** + * Queues multiple packets. + * + * If the muxer if for video only or audio only, the packet is directly sent to + * [IMuxerListener.onOutputFrame]. + * + * @param packets the list of packets to queue + */ + fun queue(packets: List) { + exception?.let { throw it } + + if (hasVideo && hasAudio) { + syncQueue.add(packets) + if (packets.any { it.isVideo }) { + asyncSyncTo(packets.last()) + } + } else { + // Audio only or video only. Don't need to sort. + packets.forEach { + listener?.onOutputFrame(it) + } + } + } + + /** + * Queues a packet. + * + * If the muxer if for video only or audio only, the packet is directly sent to + * [IMuxerListener.onOutputFrame]. + * + * @param packet the packet to queue + */ + fun queue(packet: Packet) { + exception?.let { throw it } + + if (hasVideo && hasAudio) { + syncQueue.add(packet, false) + asyncSyncTo(packet) + } else { + // Audio only or video only. Don't need to sort. + listener?.onOutputFrame(packet) + } + } + + /** + * To be called at the end of [stopStream] implementation. + */ + override fun stopStream() { + exception = null + syncQueue.clear() + } +} \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/flv/FlvMuxer.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/flv/FlvMuxer.kt index 8367b7f81..5ee20ce4f 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/flv/FlvMuxer.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/muxers/flv/FlvMuxer.kt @@ -20,7 +20,7 @@ import io.github.thibaultbee.streampack.internal.data.Frame import io.github.thibaultbee.streampack.internal.data.Packet import io.github.thibaultbee.streampack.internal.data.PacketType import io.github.thibaultbee.streampack.internal.interfaces.ISourceOrientationProvider -import io.github.thibaultbee.streampack.internal.muxers.IMuxer +import io.github.thibaultbee.streampack.internal.muxers.AbstractSortingMuxer import io.github.thibaultbee.streampack.internal.muxers.IMuxerListener import io.github.thibaultbee.streampack.internal.muxers.flv.tags.AVTagsFactory import io.github.thibaultbee.streampack.internal.muxers.flv.tags.FlvHeader @@ -33,12 +33,12 @@ class FlvMuxer( override var listener: IMuxerListener? = null, initialStreams: List? = null, private val writeToFile: Boolean, -) : IMuxer { +) : AbstractSortingMuxer() { override val helper = FlvMuxerHelper() private val streams = mutableListOf() - private val hasAudio: Boolean + override val hasAudio: Boolean get() = streams.any { it.mimeType.isAudio } - private val hasVideo: Boolean + override val hasVideo: Boolean get() = streams.any { it.mimeType.isVideo } private var startUpTime: Long? = null private var hasFirstFrame = false @@ -73,17 +73,16 @@ class FlvMuxer( frame.pts -= startUpTime!! val flvTags = AVTagsFactory(frame, streams[streamPid]).build() - flvTags.forEach { - listener?.onOutputFrame( - Packet( - it.write(), frame.pts, if (frame.isVideo) { - PacketType.VIDEO - } else { - PacketType.AUDIO - } - ) + val flvPacket = flvTags.map { + Packet( + it.write(), frame.pts, if (frame.isVideo) { + PacketType.VIDEO + } else { + PacketType.AUDIO + } ) } + queue(flvPacket) } override fun addStreams(streamsConfig: List): Map { @@ -122,6 +121,7 @@ class FlvMuxer( startUpTime = null hasFirstFrame = false streams.clear() + super.stopStream() } override fun release() { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/internal/utils/SyncQueue.kt b/core/src/main/java/io/github/thibaultbee/streampack/internal/utils/SyncQueue.kt new file mode 100644 index 000000000..5061f7a05 --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/internal/utils/SyncQueue.kt @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2023 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.internal.utils + +import java.util.PriorityQueue + +/** + * A synchronized queue that allows to add elements in order. Elements are stored till a sync + * element is added. When a sync element is added, all elements that are comparatively lower are + * sent to the listener. + * + * The purpose of this class is to allow to put elements in order. + * + * @param E the type of elements held in this collection + * @param comparator the comparator that will be used to order the elements + * @param listener the listener that will be called when a sync element is added + */ +class SyncQueue( + private val comparator: Comparator, + private val listener: Listener +) { + private val priorityQueue: PriorityQueue = PriorityQueue(8, comparator) + + val size: Int + get() = priorityQueue.size + + /** + * Sends all elements that are comparatively lower than [element] to the listener. + * [element] is not outputted. + * + * @param element the element to compare + */ + fun syncTo(element: E) { + var polledElement: E? = pollIf(comparator, element) + while (polledElement != null) { + listener.onElement(polledElement) + polledElement = pollIf(comparator, element) + } + } + + /** + * Adds an element in order. + * If [isSync] is true, all elements that are comparatively lower than [element] are sent to the + * listener. + * + * @param element the element to add + * @param isSync true if [element] is a sync element + */ + fun add(element: E, isSync: Boolean = false) { + if (isSync) { + syncTo(element) + // Send sync element + listener.onElement(element) + } else { + synchronized(this) { + priorityQueue.add(element) + } + } + } + + /** + * Adds all elements in order. + * + * @param elements the elements to add + */ + fun add(elements: List) { + synchronized(this) { + priorityQueue.addAll(elements) + } + } + + private fun pollIf(comparator: Comparator, comparableElement: E): E? { + synchronized(this) { + val element = priorityQueue.peek() + if ((element != null) && comparator.compare(element, comparableElement) <= 0) { + return priorityQueue.poll() + } + return null + } + } + + fun clear() { + synchronized(this) { + priorityQueue.clear() + } + } + + interface Listener { + /** + * Called when element is polled. + */ + fun onElement(element: E) + } +} \ No newline at end of file diff --git a/core/src/test/java/io/github/thibaultbee/streampack/internal/utils/SyncQueueTest.kt b/core/src/test/java/io/github/thibaultbee/streampack/internal/utils/SyncQueueTest.kt new file mode 100644 index 000000000..eac1dba9f --- /dev/null +++ b/core/src/test/java/io/github/thibaultbee/streampack/internal/utils/SyncQueueTest.kt @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2023 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.internal.utils + +import org.junit.Assert.assertEquals +import org.junit.Test + +class SyncQueueTest { + @Test + fun `test sync 1 element`() { + val syncQueue = + SyncQueue({ o1, o2 -> o1.compareTo(o2) }, object : SyncQueue.Listener { + override fun onElement(element: Int) { + assertEquals(1, element) + } + }) + syncQueue.add(1, true) + assertEquals(0, syncQueue.size) + } + + @Test + fun `test already sorted elements`() { + var i = 1 + val syncQueue = + SyncQueue({ o1, o2 -> o1.compareTo(o2) }, object : SyncQueue.Listener { + override fun onElement(element: Int) { + assertEquals(i++, element) + } + }) + syncQueue.add(1) + syncQueue.add(2) + syncQueue.add(3) + syncQueue.add(4, isSync = true) + + assertEquals(5, i) + assertEquals(0, syncQueue.size) + } + + @Test + fun `test equals elements`() { + val syncQueue = + SyncQueue({ o1, o2 -> o1.compareTo(o2) }, object : SyncQueue.Listener { + override fun onElement(element: Int) { + assertEquals(1, element) + } + }) + syncQueue.add(1) + syncQueue.add(1, isSync = true) + + assertEquals(0, syncQueue.size) + } + + @Test + fun `test not sorted elements`() { + var i = 1 + val syncQueue = + SyncQueue({ o1, o2 -> o1.compareTo(o2) }, object : SyncQueue.Listener { + override fun onElement(element: Int) { + assertEquals(i++, element) + } + }) + syncQueue.add(1) + syncQueue.add(2) + syncQueue.add(3) + syncQueue.add(5) + syncQueue.add(6) + syncQueue.add(4, isSync = true) + + assertEquals(5, i) + assertEquals(2, syncQueue.size) + } +} \ No newline at end of file diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/internal/endpoints/RtmpProducer.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/internal/endpoints/RtmpProducer.kt index 4a488cdc2..9f15f11ca 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/internal/endpoints/RtmpProducer.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/internal/endpoints/RtmpProducer.kt @@ -26,9 +26,7 @@ import video.api.rtmpdroid.Rtmp import java.security.InvalidParameterException class RtmpProducer( - private val coroutineDispatcher: CoroutineDispatcher = Dispatchers.IO, - private val hasAudio: Boolean = true, - private val hasVideo: Boolean = true, + private val coroutineDispatcher: CoroutineDispatcher = Dispatchers.IO ) : ILiveEndpoint { override var onConnectionListener: OnConnectionListener? = null @@ -40,8 +38,6 @@ class RtmpProducer( override val isConnected: Boolean get() = _isConnected - private val audioPacketQueue = mutableListOf() - /** * Sets/gets supported video codecs. */ @@ -68,7 +64,6 @@ class RtmpProducer( withContext(coroutineDispatcher) { try { isOnError = false - audioPacketQueue.clear() socket.connect("$url live=1 flashver=FMLE/3.0\\20(compatible;\\20FMSc/1.0)") _isConnected = true onConnectionListener?.onSuccess() @@ -101,31 +96,7 @@ class RtmpProducer( } try { - - if (hasAudio && hasVideo) { - /** - * Audio and video packets are received out of timestamp order. We need to reorder them. - * We suppose that video packets arrive after audio packets. - * We store audio packets in a queue and send them before video packets. - */ - if (packet.isAudio) { - // Store audio packet to send them later - audioPacketQueue.add(packet) - } else { - // Send audio packets - val audioPackets = audioPacketQueue.filter { - it.ts <= packet.ts - } - - audioPackets.forEach { socket.write(it.buffer) } - audioPacketQueue.removeAll(audioPackets) - - // Send video packet - socket.write(packet.buffer) - } - } else { - socket.write(packet.buffer) - } + socket.write(packet.buffer) } catch (e: Exception) { disconnect() isOnError = true diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/streamers/AudioOnlyRtmpLiveStreamer.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/streamers/AudioOnlyRtmpLiveStreamer.kt index 8bf7e4651..985e7d6bb 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/streamers/AudioOnlyRtmpLiveStreamer.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/streamers/AudioOnlyRtmpLiveStreamer.kt @@ -36,7 +36,7 @@ class AudioOnlyRtmpLiveStreamer( ) : BaseAudioOnlyLiveStreamer( context = context, muxer = FlvMuxer(writeToFile = false), - endpoint = RtmpProducer(hasAudio = true, hasVideo = false), + endpoint = RtmpProducer(), initialOnErrorListener = initialOnErrorListener, initialOnConnectionListener = initialOnConnectionListener ) diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/streamers/CameraRtmpLiveStreamer.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/streamers/CameraRtmpLiveStreamer.kt index 8b50fd725..3bd836e36 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/streamers/CameraRtmpLiveStreamer.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/streamers/CameraRtmpLiveStreamer.kt @@ -40,7 +40,7 @@ class CameraRtmpLiveStreamer( context = context, enableAudio = enableAudio, muxer = FlvMuxer(writeToFile = false), - endpoint = RtmpProducer(hasAudio = enableAudio, hasVideo = true), + endpoint = RtmpProducer(), initialOnErrorListener = initialOnErrorListener, initialOnConnectionListener = initialOnConnectionListener ) { diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/streamers/ScreenRecorderRtmpLiveStreamer.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/streamers/ScreenRecorderRtmpLiveStreamer.kt index 8dc4fd193..91d54e3e0 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/streamers/ScreenRecorderRtmpLiveStreamer.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/streamers/ScreenRecorderRtmpLiveStreamer.kt @@ -44,7 +44,7 @@ class ScreenRecorderRtmpLiveStreamer( context = context, enableAudio = enableAudio, muxer = FlvMuxer(writeToFile = false), - endpoint = RtmpProducer(hasAudio = enableAudio, hasVideo = true), + endpoint = RtmpProducer(), initialOnErrorListener = initialOnErrorListener, initialOnConnectionListener = initialOnConnectionListener ) {