Skip to content

Commit

Permalink
feat: Add an option to use "CPU usage" for stress. (#2060)
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrozev authored Oct 5, 2023
1 parent 63849ed commit b09e7fc
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 49 deletions.
39 changes: 4 additions & 35 deletions jvb/src/main/java/org/jitsi/videobridge/Videobridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@

import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;

Expand Down Expand Up @@ -119,13 +118,8 @@ public class Videobridge
/**
* The {@link JvbLoadManager} instance used for this bridge.
*/
private final JvbLoadManager<PacketRateMeasurement> jvbLoadManager;

/**
* The task which manages the recurring load sampling and updating of
* {@link Videobridge#jvbLoadManager}.
*/
private final ScheduledFuture<?> loadSamplerTask;
@NotNull
private final JvbLoadManager<?> jvbLoadManager;

@NotNull private final Version version;
@Nullable private final String releaseId;
Expand Down Expand Up @@ -158,29 +152,7 @@ public Videobridge(
{
this.clock = clock;
videobridgeExpireThread = new VideobridgeExpireThread(this);
jvbLoadManager = new JvbLoadManager<>(
PacketRateMeasurement.getLoadedThreshold(),
PacketRateMeasurement.getRecoveryThreshold(),
new LastNReducer(
this::getConferences,
JvbLastNKt.jvbLastNSingleton
)
);
loadSamplerTask = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(
new PacketRateLoadSampler(
this,
(loadMeasurement) -> {
// Update the load manager with the latest measurement
jvbLoadManager.loadUpdate(loadMeasurement);
// Update the stats with the latest stress level
getStatistics().stressLevel = jvbLoadManager.getCurrentStressLevel();
return Unit.INSTANCE;
}
),
0,
10,
TimeUnit.SECONDS
);
jvbLoadManager = JvbLoadManager.create(this);
if (xmppConnection != null)
{
xmppConnection.setEventHandler(new XmppConnectionEventHandler());
Expand Down Expand Up @@ -610,10 +582,7 @@ public void start()
public void stop()
{
videobridgeExpireThread.stop();
if (loadSamplerTask != null)
{
loadSamplerTask.cancel(true);
}
jvbLoadManager.stop();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright @ 2023 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.videobridge.load_management

import com.sun.management.OperatingSystemMXBean
import java.lang.management.ManagementFactory

class CpuLoadSampler(private val newMeasurementHandler: (CpuMeasurement) -> Unit) : Runnable {
override fun run() {
val osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean::class.java)
newMeasurementHandler(CpuMeasurement(osBean.systemCpuLoad))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright @ 2023 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.videobridge.load_management

import org.jitsi.config.JitsiConfig
import org.jitsi.metaconfig.config

class CpuMeasurement(private val value: Double) : JvbLoadMeasurement {
override fun getLoad(): Double = value

override fun div(other: JvbLoadMeasurement): Double {
if (other !is CpuMeasurement) {
throw UnsupportedOperationException("Can only divide load measurements of same type")
}
return value / other.value
}

override fun toString(): String = "CPU usage ${String.format("%.2f", value * 100)}%"

companion object {
val loadThreshold: CpuMeasurement by config {
"${JvbLoadMeasurement.CONFIG_BASE}.cpu-usage.load-threshold"
.from(JitsiConfig.newConfig)
.convertFrom<Double>(::CpuMeasurement)
}
val recoverThreshold: CpuMeasurement by config {
"${JvbLoadMeasurement.CONFIG_BASE}.cpu-usage.recovery-threshold"
.from(JitsiConfig.newConfig)
.convertFrom<Double>(::CpuMeasurement)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,52 +23,66 @@ import org.jitsi.nlj.util.NEVER
import org.jitsi.utils.OrderedJsonObject
import org.jitsi.utils.logging2.cdebug
import org.jitsi.utils.logging2.createLogger
import org.jitsi.videobridge.Videobridge
import org.jitsi.videobridge.jvbLastNSingleton
import org.jitsi.videobridge.util.TaskPools
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.logging.Level

class JvbLoadManager<T : JvbLoadMeasurement> @JvmOverloads constructor(
open class JvbLoadManager<T : JvbLoadMeasurement> @JvmOverloads constructor(
private val jvbLoadThreshold: T,
private val jvbRecoveryThreshold: T,
private val loadReducer: JvbLoadReducer,
private val loadReducer: JvbLoadReducer?,
private val clock: Clock = Clock.systemUTC()
) {
private val logger = createLogger(minLogLevel = Level.ALL)

val reducerEnabled: Boolean by config("videobridge.load-management.reducer-enabled".from(JitsiConfig.newConfig))
protected val logger = createLogger(minLogLevel = Level.ALL)

private var lastReducerTime: Instant = NEVER

private var state: State = State.NOT_OVERLOADED

private var mostRecentLoadMeasurement: T? = null

private var loadSamplerTask: ScheduledFuture<*>? = null

protected fun startSampler(sampler: Runnable) {
loadSamplerTask = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(sampler, 0, 10, TimeUnit.SECONDS)
}

fun stop() {
loadSamplerTask?.cancel(true)
loadSamplerTask = null
}

fun loadUpdate(loadMeasurement: T) {
logger.cdebug { "Got a load measurement of $loadMeasurement" }
mostRecentLoadMeasurement = loadMeasurement
val now = clock.instant()
if (loadMeasurement.getLoad() >= jvbLoadThreshold.getLoad()) {
state = State.OVERLOADED
if (reducerEnabled) {
loadReducer?.let {
logger.info("Load measurement $loadMeasurement is above threshold of $jvbLoadThreshold")
if (canRunReducer(now)) {
logger.info("Running load reducer")
loadReducer.reduceLoad()
it.reduceLoad()
lastReducerTime = now
} else {
logger.info(
"Load reducer ran at $lastReducerTime, which is within " +
"${loadReducer.impactTime()} of now, not running reduce"
"${it.impactTime()} of now, not running reduce"
)
}
}
} else {
state = State.NOT_OVERLOADED
if (reducerEnabled) {
loadReducer?.let {
if (loadMeasurement.getLoad() < jvbRecoveryThreshold.getLoad()) {
if (canRunReducer(now)) {
if (loadReducer.recover()) {
if (it.recover()) {
logger.info(
"Recovery ran after a load measurement of $loadMeasurement (which was " +
"below threshold of $jvbRecoveryThreshold) was received"
Expand All @@ -80,7 +94,7 @@ class JvbLoadManager<T : JvbLoadMeasurement> @JvmOverloads constructor(
} else {
logger.cdebug {
"Load measurement $loadMeasurement is below recovery threshold, but load reducer " +
"ran at $lastReducerTime, which is within ${loadReducer.impactTime()} of now, " +
"ran at $lastReducerTime, which is within ${it.impactTime()} of now, " +
"not running recover"
}
}
Expand All @@ -95,11 +109,17 @@ class JvbLoadManager<T : JvbLoadMeasurement> @JvmOverloads constructor(
put("state", state.toString())
put("stress", getCurrentStressLevel().toString())
put("reducer_enabled", reducerEnabled.toString())
put("reducer", loadReducer.getStats())
loadReducer?.let {
put("reducer", it.getStats())
}
}

private fun canRunReducer(now: Instant): Boolean =
Duration.between(lastReducerTime, now) >= loadReducer.impactTime()
private fun canRunReducer(now: Instant): Boolean {
loadReducer?.let {
return Duration.between(lastReducerTime, now) >= it.impactTime()
}
return false
}

enum class State {
OVERLOADED,
Expand All @@ -110,5 +130,70 @@ class JvbLoadManager<T : JvbLoadMeasurement> @JvmOverloads constructor(
val averageParticipantStress: Double by config {
"videobridge.load-management.average-participant-stress".from(JitsiConfig.newConfig)
}

val loadMeasurement: String by config {
"videobridge.load-management.load-measurements.load-measurement".from(JitsiConfig.newConfig)
}

val reducerEnabled: Boolean by config("videobridge.load-management.reducer-enabled".from(JitsiConfig.newConfig))

const val PACKET_RATE_MEASUREMENT = "packet-rate"
const val CPU_USAGE_MEASUREMENT = "cpu-usage"

@JvmStatic
fun create(videobridge: Videobridge): JvbLoadManager<*> {
val reducer = if (reducerEnabled) LastNReducer({ videobridge.conferences }, jvbLastNSingleton) else null

return when (loadMeasurement) {
PACKET_RATE_MEASUREMENT -> PacketRateLoadManager(
PacketRateMeasurement.loadedThreshold,
PacketRateMeasurement.recoveryThreshold,
reducer,
videobridge
)
CPU_USAGE_MEASUREMENT -> CpuUsageLoadManager(
CpuMeasurement.loadThreshold,
CpuMeasurement.recoverThreshold,
reducer,
videobridge
)
else -> throw IllegalArgumentException(
"Invalid configuration for load measurement type: $loadMeasurement"
)
}
}
}
}

class PacketRateLoadManager(
loadThreshold: PacketRateMeasurement,
recoveryThreshold: PacketRateMeasurement,
loadReducer: JvbLoadReducer?,
videobridge: Videobridge
) : JvbLoadManager<PacketRateMeasurement>(loadThreshold, recoveryThreshold, loadReducer) {

init {
val sampler = PacketRateLoadSampler(videobridge) { loadMeasurement ->
loadUpdate(loadMeasurement)
videobridge.statistics.stressLevel = loadMeasurement.getLoad()
}

startSampler(sampler)
}
}

class CpuUsageLoadManager(
loadThreshold: CpuMeasurement,
recoveryThreshold: CpuMeasurement,
loadReducer: JvbLoadReducer?,
videobridge: Videobridge
) : JvbLoadManager<CpuMeasurement>(loadThreshold, recoveryThreshold, loadReducer) {
init {
val sampler = CpuLoadSampler { loadMeasurement ->
loadUpdate(loadMeasurement)
videobridge.statistics.stressLevel = loadMeasurement.getLoad()
}

startSampler(sampler)
}
}
7 changes: 7 additions & 0 deletions jvb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ videobridge {
# to start recovery
recovery-threshold = 40000
}
cpu-usage {
load-threshold = 0.9
recovery-threshold = 0.72
}

# Which of the available measurements to use, either "packet-rate" or "cpu-usage".
load-measurement = "packet-rate"
}
load-reducers {
last-n {
Expand Down

0 comments on commit b09e7fc

Please sign in to comment.