Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Support cgroup v2 by using jdk.internal.platform.Metrics in Pulsar Loadbalancer #16832

Merged
merged 9 commits into from
Apr 28, 2023
Merged
2 changes: 2 additions & 0 deletions bin/pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ if [[ -z "$IS_JAVA_8" ]]; then
OPTS="$OPTS --add-opens java.management/sun.management=ALL-UNNAMED"
# MBeanStatsGenerator
OPTS="$OPTS --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED"
# LinuxInfoUtils
OPTS="$OPTS --add-opens java.base/jdk.internal.platform=ALL-UNNAMED"
fi

OPTS="-cp $PULSAR_CLASSPATH $OPTS"
Expand Down
1 change: 1 addition & 0 deletions buildtools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
<test.additional.args>
--add-opens java.base/jdk.internal.loader=ALL-UNNAMED
--add-opens java.base/java.lang=ALL-UNNAMED <!--Mockito-->
--add-opens java.base/jdk.internal.platform=ALL-UNNAMED <!--LinuxInfoUtils-->
</test.additional.args>
</properties>

Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ flexible messaging model and an intuitive client API.</description>
--add-opens java.base/sun.net=ALL-UNNAMED <!--netty.DnsResolverUtil-->
--add-opens java.management/sun.management=ALL-UNNAMED <!--JvmDefaultGCMetricsLogger & MBeanStatsGenerator-->
--add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED <!--MBeanStatsGenerator-->
--add-opens java.base/jdk.internal.platform=ALL-UNNAMED <!--LinuxInfoUtils-->
</test.additional.args>
<testReuseFork>true</testReuseFork>
<testForkCount>4</testForkCount>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -45,13 +47,38 @@ public class LinuxInfoUtils {
private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage";
private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us";

// proc states
private static final String PROC_STAT_PATH = "/proc/stat";
private static final String NIC_PATH = "/sys/class/net/";
// NIC type
private static final int ARPHRD_ETHER = 1;
private static final String NIC_SPEED_TEMPLATE = "/sys/class/net/%s/speed";

private static Object /*jdk.internal.platform.Metrics*/ metrics;
private static Method getMetricsProviderMethod;
private static Method getCpuQuotaMethod;
private static Method getCpuPeriodMethod;
private static Method getCpuUsageMethod;

static {
try {
metrics = Class.forName("jdk.internal.platform.Container").getMethod("metrics")
.invoke(null);
if (metrics != null) {
getMetricsProviderMethod = metrics.getClass().getMethod("getProvider");
getMetricsProviderMethod.setAccessible(true);
getCpuQuotaMethod = metrics.getClass().getMethod("getCpuQuota");
getCpuQuotaMethod.setAccessible(true);
getCpuPeriodMethod = metrics.getClass().getMethod("getCpuPeriod");
getCpuPeriodMethod.setAccessible(true);
getCpuUsageMethod = metrics.getClass().getMethod("getCpuUsage");
getCpuUsageMethod.setAccessible(true);
}
} catch (Throwable e) {
log.warn("Failed to get runtime metrics", e);
}
}

/**
* Determine whether the OS is the linux kernel.
Expand All @@ -66,9 +93,14 @@ public static boolean isLinux() {
*/
public static boolean isCGroupEnabled() {
try {
return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
if (metrics == null) {
return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
}
String provider = (String) getMetricsProviderMethod.invoke(metrics);
log.info("[LinuxInfo] The system metrics provider is: {}", provider);
return provider.contains("cgroup");
} catch (Exception e) {
log.warn("[LinuxInfo] Failed to check cgroup CPU usage file: {}", e.getMessage());
log.warn("[LinuxInfo] Failed to check cgroup CPU: {}", e.getMessage());
return false;
}
}
Expand All @@ -81,13 +113,21 @@ public static boolean isCGroupEnabled() {
public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
if (isCGroupsEnabled) {
try {
long quota = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
long period = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
long quota;
long period;
if (metrics != null && getCpuQuotaMethod != null && getCpuPeriodMethod != null) {
quota = (long) getCpuQuotaMethod.invoke(metrics);
period = (long) getCpuPeriodMethod.invoke(metrics);
} else {
quota = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
period = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
}

if (quota > 0) {
return 100.0 * quota / period;
}
} catch (IOException e) {
log.warn("[LinuxInfo] Failed to read CPU quotas from cgroups", e);
} catch (Exception e) {
log.warn("[LinuxInfo] Failed to read CPU quotas from cgroup", e);
// Fallback to availableProcessors
}
}
Expand All @@ -99,11 +139,14 @@ public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
* Get CGroup cpu usage.
* @return Cpu usage
*/
public static double getCpuUsageForCGroup() {
public static long getCpuUsageForCGroup() {
try {
if (metrics != null && getCpuUsageMethod != null) {
return (long) getCpuUsageMethod.invoke(metrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backward compatibility, don't we need to multiply the limit in calculateBrokerHostUsage?

public void calculateBrokerHostUsage() {
... 
       double totalCpuLimit = getTotalCpuLimit(isCGroupsEnabled);
        if (isCGroupsEnabled && metrics != null && getCpuUsageMethod != null) {
            // cgroup cpuUsage is already scaled, [0.0, 1.0]
            usage.setCpu(new ResourceUsage(cpuUsage * totalCpuLimit, totalCpuLimit));
        } else {
            usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit));
        }
        

Copy link
Member Author

@coderzc coderzc Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Metrics.getCpuUsage will return the aggregate time, so I think we don't need to multiply the limit.

    /**
     * Returns the aggregate time, in nanoseconds, consumed by all
     * tasks in the Isolation Group.
     *
     * @return Time in nanoseconds, -1 if unknown or
     *         -2 if the metric is not supported.
     *
     */
    public long getCpuUsage();

Copy link
Contributor

@heesung-sn heesung-sn Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
return readLongFromFile(Paths.get(CGROUPS_CPU_USAGE_PATH));
} catch (IOException e) {
log.error("[LinuxInfo] Failed to read CPU usage from {}", CGROUPS_CPU_USAGE_PATH, e);
} catch (Exception e) {
log.error("[LinuxInfo] Failed to read CPU usage from cgroup", e);
return -1;
}
}
Expand Down Expand Up @@ -291,6 +334,11 @@ enum Operstate {
UP
}

@VisibleForTesting
public static Object getMetrics() {
return metrics;
}

@AllArgsConstructor
public enum NICUsageType {
// transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private double getTotalCpuUsage(double elapsedTimeSeconds) {
}

private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) {
double usage = getCpuUsageForCGroup();
double usage = (double) getCpuUsageForCGroup();
double currentUsage = usage - lastCpuUsage;
lastCpuUsage = usage;
return 100 * currentUsage / elapsedTimeSeconds / TimeUnit.SECONDS.toNanos(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,6 +30,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -96,4 +99,27 @@ public void testNoNICSpeed() throws Exception {
}


@Test
public void testCGroupMetrics() {
if (!LinuxInfoUtils.isLinux()) {
return;
}

boolean existsCGroup = Files.exists(Paths.get("/sys/fs/cgroup"));
boolean cGroupEnabled = LinuxInfoUtils.isCGroupEnabled();
Assert.assertEquals(cGroupEnabled, existsCGroup);

double totalCpuLimit = LinuxInfoUtils.getTotalCpuLimit(cGroupEnabled);
log.info("totalCpuLimit: {}", totalCpuLimit);
Assert.assertTrue(totalCpuLimit > 0.0);

if (cGroupEnabled) {
Assert.assertNotNull(LinuxInfoUtils.getMetrics());

long cpuUsageForCGroup = LinuxInfoUtils.getCpuUsageForCGroup();
log.info("cpuUsageForCGroup: {}", cpuUsageForCGroup);
Assert.assertTrue(cpuUsageForCGroup > 0);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import lombok.Cleanup;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
import org.testng.Assert;
import org.testng.annotations.Test;

@Slf4j
public class LinuxBrokerHostUsageImplTest {

@Test
Expand All @@ -42,4 +46,31 @@ public void checkOverrideBrokerNicSpeedGbps() {
double totalLimit = linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics);
Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
}

@Test
public void testCpuUsage() throws InterruptedException {
if (!LinuxInfoUtils.isLinux()) {
return;
}

@Cleanup("shutdown")
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, Optional.empty(), executorService);

linuxBrokerHostUsage.calculateBrokerHostUsage();
TimeUnit.SECONDS.sleep(1);
linuxBrokerHostUsage.calculateBrokerHostUsage();

double usage = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().usage;
double limit = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().limit;
float percentUsage = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().percentUsage();

Assert.assertTrue(usage > 0);
Assert.assertTrue(limit > 0);
Assert.assertTrue(limit >= usage);
Assert.assertTrue(percentUsage > 0);

log.info("usage: {}, limit: {}, percentUsage: {}", usage, limit, percentUsage);
}
}