Skip to content

Commit

Permalink
Maintain a VideoCodecParser per source, not per endpoint. (jitsi#2133)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonathanLennox authored and jerry2013 committed Dec 9, 2024
1 parent 9f1da82 commit 9fa0290
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import org.jitsi.nlj.rtp.VideoRtpPacket
import org.jitsi.nlj.util.Bandwidth
import org.jitsi.nlj.util.bps
import org.jitsi.rtp.rtp.RtpPacket
import org.jitsi.utils.ArrayUtils
import java.util.Collections
import java.util.NavigableMap
Expand Down Expand Up @@ -190,6 +191,12 @@ fun Array<MediaSourceDesc>.findRtpLayerDescs(packet: VideoRtpPacket): Collection
return this.flatMap { it.findRtpLayerDescs(packet) }
}

fun Array<MediaSourceDesc>.findRtpSource(ssrc: Long): MediaSourceDesc? {
return this.find { it.matches(ssrc) }
}

fun Array<MediaSourceDesc>.findRtpSource(packet: RtpPacket): MediaSourceDesc? = findRtpSource(packet.ssrc)

fun Array<MediaSourceDesc>.findRtpEncodingId(packet: VideoRtpPacket): Int? {
for (source in this) {
source.findRtpEncodingDesc(packet.ssrc)?.let {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,16 @@ import org.jitsi.nlj.rtp.VideoRtpPacket
* and verify stream consistency.
*/
abstract class VideoCodecParser(
var sources: Array<MediaSourceDesc>
var source: MediaSourceDesc
) {
abstract fun parse(packetInfo: PacketInfo)

protected fun findRtpEncodingDesc(packet: VideoRtpPacket): RtpEncodingDesc? {
for (source in sources) {
source.findRtpEncodingDesc(packet.ssrc)?.let {
return it
}
source.findRtpEncodingDesc(packet.ssrc)?.let {
return it
}
return null
}

protected fun findSourceDescAndRtpEncodingDesc(packet: VideoRtpPacket): Pair<MediaSourceDesc, RtpEncodingDesc>? {
for (source in sources) {
source.findRtpEncodingDesc(packet.ssrc)?.let {
return Pair(source, it)
}
}
return null
}

protected fun findRtpLayerDescs(packet: VideoRtpPacket) = sources.findRtpLayerDescs(packet)
protected fun findRtpLayerDescs(packet: VideoRtpPacket) = source.findRtpLayerDescs(packet)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import org.jitsi.utils.logging2.createChildLogger
* won't be able to route.
*/
class Av1DDParser(
sources: Array<MediaSourceDesc>,
source: MediaSourceDesc,
parentLogger: Logger,
private val diagnosticContext: DiagnosticContext
) : VideoCodecParser(sources) {
) : VideoCodecParser(source) {
private val logger = createChildLogger(parentLogger)

/** History of AV1 templates. */
Expand Down Expand Up @@ -121,13 +121,13 @@ class Av1DDParser(
"now 0x${Integer.toHexString(activeDecodeTargets)}. Updating layering."
}

findSourceDescAndRtpEncodingDesc(av1Packet)?.let { (src, enc) ->
findRtpEncodingDesc(av1Packet)?.let { enc ->
av1Packet.getScalabilityStructure(eid = enc.eid)?.let {
src.setEncodingLayers(it.layers, av1Packet.ssrc)
source.setEncodingLayers(it.layers, av1Packet.ssrc)
}
for (otherEnc in src.rtpEncodings) {
for (otherEnc in source.rtpEncodings) {
if (!ddStateHistory.keys.contains(otherEnc.primarySSRC)) {
src.setEncodingLayers(emptyArray(), otherEnc.primarySSRC)
source.setEncodingLayers(emptyArray(), otherEnc.primarySSRC)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import org.jitsi.utils.logging2.createChildLogger
* from frames, and also diagnoses packet format variants that the Jitsi videobridge won't be able to route.
*/
class Vp8Parser(
sources: Array<MediaSourceDesc>,
source: MediaSourceDesc,
parentLogger: Logger
) : VideoCodecParser(sources) {
) : VideoCodecParser(source) {
private val logger = createChildLogger(parentLogger)

// Consistency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import org.jitsi.utils.logging2.createChildLogger
* from frames, and also diagnoses packet format variants that the Jitsi videobridge won't be able to route.
*/
class Vp9Parser(
sources: Array<MediaSourceDesc>,
source: MediaSourceDesc,
parentLogger: Logger
) : VideoCodecParser(sources) {
) : VideoCodecParser(source) {
private val logger = createChildLogger(parentLogger)

private val pictureIdState = StateChangeLogger("missing picture id", logger)
Expand All @@ -58,13 +58,13 @@ class Vp9Parser(
}
numSpatialLayers = packetSpatialLayers
}
findSourceDescAndRtpEncodingDesc(vp9Packet)?.let { (src, enc) ->
findRtpEncodingDesc(vp9Packet)?.let { enc ->
vp9Packet.getScalabilityStructure(eid = enc.eid)?.let {
src.setEncodingLayers(it.layers, vp9Packet.ssrc)
source.setEncodingLayers(it.layers, vp9Packet.ssrc)
}
for (otherEnc in src.rtpEncodings) {
for (otherEnc in source.rtpEncodings) {
if (!ssrcsSeen.contains(otherEnc.primarySSRC)) {
src.setEncodingLayers(emptyArray(), otherEnc.primarySSRC)
source.setEncodingLayers(emptyArray(), otherEnc.primarySSRC)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.jitsi.nlj.Event
import org.jitsi.nlj.MediaSourceDesc
import org.jitsi.nlj.PacketInfo
import org.jitsi.nlj.SetMediaSourcesEvent
import org.jitsi.nlj.findRtpSource
import org.jitsi.nlj.format.Vp8PayloadType
import org.jitsi.nlj.format.Vp9PayloadType
import org.jitsi.nlj.rtp.RtpExtensionType
Expand Down Expand Up @@ -55,7 +56,7 @@ class VideoParser(

private var av1DDExtId: Int? = null

private var videoCodecParser: VideoCodecParser? = null
private val videoCodecParsers = mutableMapOf<Long, VideoCodecParser>()

init {
streamInformationStore.onRtpExtensionMapping(RtpExtensionType.AV1_DEPENDENCY_DESCRIPTOR) {
Expand All @@ -71,61 +72,56 @@ class VideoParser(
stats.numPacketsDroppedUnknownPt++
return null
}
val videoCodecParser: VideoCodecParser?
val parsedPacket = try {
when {
payloadType is Vp8PayloadType -> {
val vp8Packet = packetInfo.packet.toOtherType(::Vp8Packet)
packetInfo.packet = vp8Packet
packetInfo.resetPayloadVerification()

if (videoCodecParser !is Vp8Parser) {
logger.cdebug {
"Creating new VP8Parser, current videoCodecParser is ${videoCodecParser?.javaClass}"
}
resetSources()
packetInfo.layeringChanged = true
videoCodecParser = Vp8Parser(sources, logger)
videoCodecParser = checkParserType<Vp8Parser>(packetInfo) { source ->
Vp8Parser(source, logger)
}

vp8Packet
}
payloadType is Vp9PayloadType -> {
val vp9Packet = packetInfo.packet.toOtherType(::Vp9Packet)
packetInfo.packet = vp9Packet
packetInfo.resetPayloadVerification()

if (videoCodecParser !is Vp9Parser) {
logger.cdebug {
"Creating new VP9Parser, current videoCodecParser is ${videoCodecParser?.javaClass}"
}
resetSources()
packetInfo.layeringChanged = true
videoCodecParser = Vp9Parser(sources, logger)
videoCodecParser = checkParserType<Vp9Parser>(packetInfo) { source ->
Vp9Parser(source, logger)
}

vp9Packet
}
av1DDExtId != null && packet.getHeaderExtension(av1DDExtId) != null -> {
if (videoCodecParser !is Av1DDParser) {
logger.cdebug {
"Creating new Av1DDParser, current videoCodecParser is ${videoCodecParser?.javaClass}"
}
resetSources()
packetInfo.layeringChanged = true
videoCodecParser = Av1DDParser(sources, logger, diagnosticContext)
videoCodecParser = checkParserType<Av1DDParser>(packetInfo) { source ->
Av1DDParser(source, logger, diagnosticContext)
}

val av1DDPacket = (videoCodecParser as Av1DDParser).createFrom(packet, av1DDExtId)
packetInfo.packet = av1DDPacket
packetInfo.resetPayloadVerification()
val av1DDPacket = videoCodecParser?.createFrom(packet, av1DDExtId)?.also {
packetInfo.packet = it
packetInfo.resetPayloadVerification()
}

av1DDPacket
}
else -> {
if (videoCodecParser != null) {
val curParser = videoCodecParsers[packet.ssrc]
if (curParser != null) {
logger.cdebug {
"Removing videoCodecParser on ${payloadType.javaClass} packet, " +
"current videoCodecParser is ${videoCodecParser?.javaClass}"
"current videoCodecParser is ${curParser.javaClass}"
}
sources.findRtpSource(packet)?.let { source ->
resetSource(source)
source.rtpEncodings.forEach {
videoCodecParsers.remove(it.primarySSRC)
}
}
resetSources()
packetInfo.layeringChanged = true
videoCodecParser = null
}
Expand All @@ -146,7 +142,7 @@ class VideoParser(
/* Some codecs mark keyframes in every packet of the keyframe - only count the start of the frame,
* so the count is correct. */
/* Alternately we could keep track of keyframes we've already seen, by timestamp, but that seems unnecessary. */
if (parsedPacket.isKeyframe && parsedPacket.isStartOfFrame) {
if (parsedPacket != null && parsedPacket.isKeyframe && parsedPacket.isStartOfFrame) {
logger.cdebug { "Received a keyframe for ssrc ${packet.ssrc} ${packet.sequenceNumber}" }
stats.numKeyframes++
}
Expand All @@ -158,29 +154,60 @@ class VideoParser(
return packetInfo
}

private inline fun <reified T : VideoCodecParser> checkParserType(
packetInfo: PacketInfo,
constructor: (MediaSourceDesc) -> T
): T? {
val packet = packetInfo.packetAs<RtpPacket>()
val parser = videoCodecParsers[packet.ssrc]
if (parser is T) {
return parser
}

val source = sources.findRtpSource(packet)
?: // VideoQualityLayerLookup will drop this packet later, so no need to warn about it now
return null
logger.cdebug {
"Creating new ${T::class.java} for source ${source.sourceName}, " +
"current videoCodecParser is ${parser?.javaClass}"
}
resetSource(source)
packetInfo.layeringChanged = true
val newParser = constructor(source)
source.rtpEncodings.forEach {
videoCodecParsers[it.primarySSRC] = newParser
}

return newParser
}

override fun handleEvent(event: Event) {
when (event) {
is SetMediaSourcesEvent -> {
sources = event.mediaSourceDescs
signaledSources = event.signaledMediaSourceDescs
videoCodecParser?.sources = sources
val ssrcsSeen = mutableSetOf<Long>()
sources.forEach { source ->
source.rtpEncodings.forEach {
videoCodecParsers[it.primarySSRC]?.source = source
ssrcsSeen.add(it.primarySSRC)
}
}
videoCodecParsers.keys.removeIf { !ssrcsSeen.contains(it) }
}
}
super.handleEvent(event)
}

private fun resetSources() {
logger.cdebug { "Resetting sources to signaled sources: ${signaledSources.joinToString(separator = "\n")}" }
for (signaledSource in signaledSources) {
for (source in sources) {
if (source.primarySSRC != signaledSource.primarySSRC) {
continue
}
for (signaledEncoding in signaledSource.rtpEncodings) {
source.setEncodingLayers(signaledEncoding.layers, signaledEncoding.primarySSRC)
}
break
}
private fun resetSource(source: MediaSourceDesc) {
val signaledSource = signaledSources.findRtpSource(source.primarySSRC)
if (signaledSource == null) {
logger.warn("Unable to find signaled source corresponding to ${source.primarySSRC}")
return
}
logger.cdebug { "Resetting source ${source.sourceName} to signaled source: $signaledSource" }
for (signaledEncoding in signaledSource.rtpEncodings) {
source.setEncodingLayers(signaledEncoding.layers, signaledEncoding.primarySSRC)
}
}

Expand Down

0 comments on commit 9fa0290

Please sign in to comment.