Skip to content

Commit

Permalink
[improve][broker] Support cgroup v2 by using `jdk.internal.platform.M…
Browse files Browse the repository at this point in the history
…etrics` in Pulsar Loadbalancer (#16832)

(cherry picked from commit b8543ad)
  • Loading branch information
coderzc authored and lhotari committed Jun 27, 2023
1 parent c3cae36 commit fcf5d29
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 13 deletions.
2 changes: 2 additions & 0 deletions bin/pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ if [[ -z "$IS_JAVA_8" ]]; then
OPTS="$OPTS --add-opens java.management/sun.management=ALL-UNNAMED"
# MBeanStatsGenerator
OPTS="$OPTS --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED"
# LinuxInfoUtils
OPTS="$OPTS --add-opens java.base/jdk.internal.platform=ALL-UNNAMED"
fi

OPTS="-cp $PULSAR_CLASSPATH $OPTS"
Expand Down
1 change: 1 addition & 0 deletions buildtools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<test.additional.args>
--add-opens java.base/jdk.internal.loader=ALL-UNNAMED
--add-opens java.base/java.lang=ALL-UNNAMED <!--Mockito-->
--add-opens java.base/jdk.internal.platform=ALL-UNNAMED <!--LinuxInfoUtils-->
</test.additional.args>
</properties>

Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ flexible messaging model and an intuitive client API.</description>
--add-opens java.base/sun.net=ALL-UNNAMED <!--netty.DnsResolverUtil-->
--add-opens java.management/sun.management=ALL-UNNAMED <!--JvmDefaultGCMetricsLogger & MBeanStatsGenerator-->
--add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED <!--MBeanStatsGenerator-->
--add-opens java.base/jdk.internal.platform=ALL-UNNAMED <!--LinuxInfoUtils-->
</test.additional.args>
<testReuseFork>true</testReuseFork>
<testForkCount>4</testForkCount>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -45,13 +47,38 @@ public class LinuxInfoUtils {
private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage";
private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us";

// proc states
private static final String PROC_STAT_PATH = "/proc/stat";
private static final String NIC_PATH = "/sys/class/net/";
// NIC type
private static final int ARPHRD_ETHER = 1;
private static final String NIC_SPEED_TEMPLATE = "/sys/class/net/%s/speed";

private static Object /*jdk.internal.platform.Metrics*/ metrics;
private static Method getMetricsProviderMethod;
private static Method getCpuQuotaMethod;
private static Method getCpuPeriodMethod;
private static Method getCpuUsageMethod;

static {
try {
metrics = Class.forName("jdk.internal.platform.Container").getMethod("metrics")
.invoke(null);
if (metrics != null) {
getMetricsProviderMethod = metrics.getClass().getMethod("getProvider");
getMetricsProviderMethod.setAccessible(true);
getCpuQuotaMethod = metrics.getClass().getMethod("getCpuQuota");
getCpuQuotaMethod.setAccessible(true);
getCpuPeriodMethod = metrics.getClass().getMethod("getCpuPeriod");
getCpuPeriodMethod.setAccessible(true);
getCpuUsageMethod = metrics.getClass().getMethod("getCpuUsage");
getCpuUsageMethod.setAccessible(true);
}
} catch (Throwable e) {
log.warn("Failed to get runtime metrics", e);
}
}

/**
* Determine whether the OS is the linux kernel.
Expand All @@ -66,9 +93,14 @@ public static boolean isLinux() {
*/
public static boolean isCGroupEnabled() {
try {
return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
if (metrics == null) {
return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
}
String provider = (String) getMetricsProviderMethod.invoke(metrics);
log.info("[LinuxInfo] The system metrics provider is: {}", provider);
return provider.contains("cgroup");
} catch (Exception e) {
log.warn("[LinuxInfo] Failed to check cgroup CPU usage file: {}", e.getMessage());
log.warn("[LinuxInfo] Failed to check cgroup CPU: {}", e.getMessage());
return false;
}
}
Expand All @@ -81,13 +113,21 @@ public static boolean isCGroupEnabled() {
public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
if (isCGroupsEnabled) {
try {
long quota = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
long period = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
long quota;
long period;
if (metrics != null && getCpuQuotaMethod != null && getCpuPeriodMethod != null) {
quota = (long) getCpuQuotaMethod.invoke(metrics);
period = (long) getCpuPeriodMethod.invoke(metrics);
} else {
quota = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
period = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
}

if (quota > 0) {
return 100.0 * quota / period;
}
} catch (IOException e) {
log.warn("[LinuxInfo] Failed to read CPU quotas from cgroups", e);
} catch (Exception e) {
log.warn("[LinuxInfo] Failed to read CPU quotas from cgroup", e);
// Fallback to availableProcessors
}
}
Expand All @@ -99,11 +139,14 @@ public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
* Get CGroup cpu usage.
* @return Cpu usage
*/
public static double getCpuUsageForCGroup() {
public static long getCpuUsageForCGroup() {
try {
if (metrics != null && getCpuUsageMethod != null) {
return (long) getCpuUsageMethod.invoke(metrics);
}
return readLongFromFile(Paths.get(CGROUPS_CPU_USAGE_PATH));
} catch (IOException e) {
log.error("[LinuxInfo] Failed to read CPU usage from {}", CGROUPS_CPU_USAGE_PATH, e);
} catch (Exception e) {
log.error("[LinuxInfo] Failed to read CPU usage from cgroup", e);
return -1;
}
}
Expand Down Expand Up @@ -291,6 +334,11 @@ enum Operstate {
UP
}

@VisibleForTesting
public static Object getMetrics() {
return metrics;
}

@AllArgsConstructor
public enum NICUsageType {
// transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private double getTotalCpuUsage(double elapsedTimeSeconds) {
}

private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) {
double usage = getCpuUsageForCGroup();
double usage = (double) getCpuUsageForCGroup();
double currentUsage = usage - lastCpuUsage;
lastCpuUsage = usage;
return 100 * currentUsage / elapsedTimeSeconds / TimeUnit.SECONDS.toNanos(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,6 +30,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -96,4 +99,27 @@ public void testNoNICSpeed() throws Exception {
}


@Test
public void testCGroupMetrics() {
if (!LinuxInfoUtils.isLinux()) {
return;
}

boolean existsCGroup = Files.exists(Paths.get("/sys/fs/cgroup"));
boolean cGroupEnabled = LinuxInfoUtils.isCGroupEnabled();
Assert.assertEquals(cGroupEnabled, existsCGroup);

double totalCpuLimit = LinuxInfoUtils.getTotalCpuLimit(cGroupEnabled);
log.info("totalCpuLimit: {}", totalCpuLimit);
Assert.assertTrue(totalCpuLimit > 0.0);

if (cGroupEnabled) {
Assert.assertNotNull(LinuxInfoUtils.getMetrics());

long cpuUsageForCGroup = LinuxInfoUtils.getCpuUsageForCGroup();
log.info("cpuUsageForCGroup: {}", cpuUsageForCGroup);
Assert.assertTrue(cpuUsageForCGroup > 0);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import lombok.Cleanup;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
import org.testng.Assert;
import org.testng.annotations.Test;

@Slf4j
public class LinuxBrokerHostUsageImplTest {

@Test
Expand All @@ -42,4 +46,31 @@ public void checkOverrideBrokerNicSpeedGbps() {
double totalLimit = linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics);
Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
}

@Test
public void testCpuUsage() throws InterruptedException {
if (!LinuxInfoUtils.isLinux()) {
return;
}

@Cleanup("shutdown")
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, Optional.empty(), executorService);

linuxBrokerHostUsage.calculateBrokerHostUsage();
TimeUnit.SECONDS.sleep(1);
linuxBrokerHostUsage.calculateBrokerHostUsage();

double usage = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().usage;
double limit = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().limit;
float percentUsage = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().percentUsage();

Assert.assertTrue(usage > 0);
Assert.assertTrue(limit > 0);
Assert.assertTrue(limit >= usage);
Assert.assertTrue(percentUsage > 0);

log.info("usage: {}, limit: {}, percentUsage: {}", usage, limit, percentUsage);
}
}

0 comments on commit fcf5d29

Please sign in to comment.