diff --git a/jvb/src/main/java/org/jitsi/videobridge/Videobridge.java b/jvb/src/main/java/org/jitsi/videobridge/Videobridge.java index 4160002ad9..8b4ee46be0 100644 --- a/jvb/src/main/java/org/jitsi/videobridge/Videobridge.java +++ b/jvb/src/main/java/org/jitsi/videobridge/Videobridge.java @@ -49,7 +49,6 @@ import java.time.*; import java.util.*; -import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.stream.*; @@ -119,13 +118,8 @@ public class Videobridge /** * The {@link JvbLoadManager} instance used for this bridge. */ - private final JvbLoadManager jvbLoadManager; - - /** - * The task which manages the recurring load sampling and updating of - * {@link Videobridge#jvbLoadManager}. - */ - private final ScheduledFuture loadSamplerTask; + @NotNull + private final JvbLoadManager jvbLoadManager; @NotNull private final Version version; @Nullable private final String releaseId; @@ -158,29 +152,7 @@ public Videobridge( { this.clock = clock; videobridgeExpireThread = new VideobridgeExpireThread(this); - jvbLoadManager = new JvbLoadManager<>( - PacketRateMeasurement.getLoadedThreshold(), - PacketRateMeasurement.getRecoveryThreshold(), - new LastNReducer( - this::getConferences, - JvbLastNKt.jvbLastNSingleton - ) - ); - loadSamplerTask = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate( - new PacketRateLoadSampler( - this, - (loadMeasurement) -> { - // Update the load manager with the latest measurement - jvbLoadManager.loadUpdate(loadMeasurement); - // Update the stats with the latest stress level - getStatistics().stressLevel = jvbLoadManager.getCurrentStressLevel(); - return Unit.INSTANCE; - } - ), - 0, - 10, - TimeUnit.SECONDS - ); + jvbLoadManager = JvbLoadManager.create(this); if (xmppConnection != null) { xmppConnection.setEventHandler(new XmppConnectionEventHandler()); @@ -610,10 +582,7 @@ public void start() public void stop() { videobridgeExpireThread.stop(); - if (loadSamplerTask != null) - { - loadSamplerTask.cancel(true); - } + jvbLoadManager.stop(); } /** diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/load_management/CpuLoadSampler.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/load_management/CpuLoadSampler.kt new file mode 100644 index 0000000000..11b98aa453 --- /dev/null +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/load_management/CpuLoadSampler.kt @@ -0,0 +1,26 @@ +/* + * Copyright @ 2023 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jitsi.videobridge.load_management + +import com.sun.management.OperatingSystemMXBean +import java.lang.management.ManagementFactory + +class CpuLoadSampler(private val newMeasurementHandler: (CpuMeasurement) -> Unit) : Runnable { + override fun run() { + val osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean::class.java) + newMeasurementHandler(CpuMeasurement(osBean.systemCpuLoad)) + } +} diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/load_management/CpuMeasurement.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/load_management/CpuMeasurement.kt new file mode 100644 index 0000000000..6b021c2af0 --- /dev/null +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/load_management/CpuMeasurement.kt @@ -0,0 +1,45 @@ +/* + * Copyright @ 2023 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jitsi.videobridge.load_management + +import org.jitsi.config.JitsiConfig +import org.jitsi.metaconfig.config + +class CpuMeasurement(private val value: Double) : JvbLoadMeasurement { + override fun getLoad(): Double = value + + override fun div(other: JvbLoadMeasurement): Double { + if (other !is CpuMeasurement) { + throw UnsupportedOperationException("Can only divide load measurements of same type") + } + return value / other.value + } + + override fun toString(): String = "CPU usage ${String.format("%.2f", value * 100)}%" + + companion object { + val loadThreshold: CpuMeasurement by config { + "${JvbLoadMeasurement.CONFIG_BASE}.cpu-usage.load-threshold" + .from(JitsiConfig.newConfig) + .convertFrom(::CpuMeasurement) + } + val recoverThreshold: CpuMeasurement by config { + "${JvbLoadMeasurement.CONFIG_BASE}.cpu-usage.recovery-threshold" + .from(JitsiConfig.newConfig) + .convertFrom(::CpuMeasurement) + } + } +} diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/load_management/JvbLoadManager.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/load_management/JvbLoadManager.kt index 991b0ca5b6..1961faf03a 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/load_management/JvbLoadManager.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/load_management/JvbLoadManager.kt @@ -23,20 +23,23 @@ import org.jitsi.nlj.util.NEVER import org.jitsi.utils.OrderedJsonObject import org.jitsi.utils.logging2.cdebug import org.jitsi.utils.logging2.createLogger +import org.jitsi.videobridge.Videobridge +import org.jitsi.videobridge.jvbLastNSingleton +import org.jitsi.videobridge.util.TaskPools import java.time.Clock import java.time.Duration import java.time.Instant +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit import java.util.logging.Level -class JvbLoadManager @JvmOverloads constructor( +open class JvbLoadManager @JvmOverloads constructor( private val jvbLoadThreshold: T, private val jvbRecoveryThreshold: T, - private val loadReducer: JvbLoadReducer, + private val loadReducer: JvbLoadReducer?, private val clock: Clock = Clock.systemUTC() ) { - private val logger = createLogger(minLogLevel = Level.ALL) - - val reducerEnabled: Boolean by config("videobridge.load-management.reducer-enabled".from(JitsiConfig.newConfig)) + protected val logger = createLogger(minLogLevel = Level.ALL) private var lastReducerTime: Instant = NEVER @@ -44,31 +47,42 @@ class JvbLoadManager @JvmOverloads constructor( private var mostRecentLoadMeasurement: T? = null + private var loadSamplerTask: ScheduledFuture<*>? = null + + protected fun startSampler(sampler: Runnable) { + loadSamplerTask = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(sampler, 0, 10, TimeUnit.SECONDS) + } + + fun stop() { + loadSamplerTask?.cancel(true) + loadSamplerTask = null + } + fun loadUpdate(loadMeasurement: T) { logger.cdebug { "Got a load measurement of $loadMeasurement" } mostRecentLoadMeasurement = loadMeasurement val now = clock.instant() if (loadMeasurement.getLoad() >= jvbLoadThreshold.getLoad()) { state = State.OVERLOADED - if (reducerEnabled) { + loadReducer?.let { logger.info("Load measurement $loadMeasurement is above threshold of $jvbLoadThreshold") if (canRunReducer(now)) { logger.info("Running load reducer") - loadReducer.reduceLoad() + it.reduceLoad() lastReducerTime = now } else { logger.info( "Load reducer ran at $lastReducerTime, which is within " + - "${loadReducer.impactTime()} of now, not running reduce" + "${it.impactTime()} of now, not running reduce" ) } } } else { state = State.NOT_OVERLOADED - if (reducerEnabled) { + loadReducer?.let { if (loadMeasurement.getLoad() < jvbRecoveryThreshold.getLoad()) { if (canRunReducer(now)) { - if (loadReducer.recover()) { + if (it.recover()) { logger.info( "Recovery ran after a load measurement of $loadMeasurement (which was " + "below threshold of $jvbRecoveryThreshold) was received" @@ -80,7 +94,7 @@ class JvbLoadManager @JvmOverloads constructor( } else { logger.cdebug { "Load measurement $loadMeasurement is below recovery threshold, but load reducer " + - "ran at $lastReducerTime, which is within ${loadReducer.impactTime()} of now, " + + "ran at $lastReducerTime, which is within ${it.impactTime()} of now, " + "not running recover" } } @@ -95,11 +109,17 @@ class JvbLoadManager @JvmOverloads constructor( put("state", state.toString()) put("stress", getCurrentStressLevel().toString()) put("reducer_enabled", reducerEnabled.toString()) - put("reducer", loadReducer.getStats()) + loadReducer?.let { + put("reducer", it.getStats()) + } } - private fun canRunReducer(now: Instant): Boolean = - Duration.between(lastReducerTime, now) >= loadReducer.impactTime() + private fun canRunReducer(now: Instant): Boolean { + loadReducer?.let { + return Duration.between(lastReducerTime, now) >= it.impactTime() + } + return false + } enum class State { OVERLOADED, @@ -110,5 +130,70 @@ class JvbLoadManager @JvmOverloads constructor( val averageParticipantStress: Double by config { "videobridge.load-management.average-participant-stress".from(JitsiConfig.newConfig) } + + val loadMeasurement: String by config { + "videobridge.load-management.load-measurements.load-measurement".from(JitsiConfig.newConfig) + } + + val reducerEnabled: Boolean by config("videobridge.load-management.reducer-enabled".from(JitsiConfig.newConfig)) + + const val PACKET_RATE_MEASUREMENT = "packet-rate" + const val CPU_USAGE_MEASUREMENT = "cpu-usage" + + @JvmStatic + fun create(videobridge: Videobridge): JvbLoadManager<*> { + val reducer = if (reducerEnabled) LastNReducer({ videobridge.conferences }, jvbLastNSingleton) else null + + return when (loadMeasurement) { + PACKET_RATE_MEASUREMENT -> PacketRateLoadManager( + PacketRateMeasurement.loadedThreshold, + PacketRateMeasurement.recoveryThreshold, + reducer, + videobridge + ) + CPU_USAGE_MEASUREMENT -> CpuUsageLoadManager( + CpuMeasurement.loadThreshold, + CpuMeasurement.recoverThreshold, + reducer, + videobridge + ) + else -> throw IllegalArgumentException( + "Invalid configuration for load measurement type: $loadMeasurement" + ) + } + } + } +} + +class PacketRateLoadManager( + loadThreshold: PacketRateMeasurement, + recoveryThreshold: PacketRateMeasurement, + loadReducer: JvbLoadReducer?, + videobridge: Videobridge +) : JvbLoadManager(loadThreshold, recoveryThreshold, loadReducer) { + + init { + val sampler = PacketRateLoadSampler(videobridge) { loadMeasurement -> + loadUpdate(loadMeasurement) + videobridge.statistics.stressLevel = loadMeasurement.getLoad() + } + + startSampler(sampler) + } +} + +class CpuUsageLoadManager( + loadThreshold: CpuMeasurement, + recoveryThreshold: CpuMeasurement, + loadReducer: JvbLoadReducer?, + videobridge: Videobridge +) : JvbLoadManager(loadThreshold, recoveryThreshold, loadReducer) { + init { + val sampler = CpuLoadSampler { loadMeasurement -> + loadUpdate(loadMeasurement) + videobridge.statistics.stressLevel = loadMeasurement.getLoad() + } + + startSampler(sampler) } } diff --git a/jvb/src/main/resources/reference.conf b/jvb/src/main/resources/reference.conf index 3e93c45ff7..16dab152ca 100644 --- a/jvb/src/main/resources/reference.conf +++ b/jvb/src/main/resources/reference.conf @@ -165,6 +165,13 @@ videobridge { # to start recovery recovery-threshold = 40000 } + cpu-usage { + load-threshold = 0.9 + recovery-threshold = 0.72 + } + + # Which of the available measurements to use, either "packet-rate" or "cpu-usage". + load-measurement = "packet-rate" } load-reducers { last-n {