diff --git a/bin/pulsar b/bin/pulsar
index a033de947d4b3..e3b22caced52e 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -307,6 +307,8 @@ if [[ -z "$IS_JAVA_8" ]]; then
OPTS="$OPTS --add-opens java.management/sun.management=ALL-UNNAMED"
# MBeanStatsGenerator
OPTS="$OPTS --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED"
+ # LinuxInfoUtils
+ OPTS="$OPTS --add-opens java.base/jdk.internal.platform=ALL-UNNAMED"
fi
OPTS="-cp $PULSAR_CLASSPATH $OPTS"
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index 04b25cced42ac..a21618004e31a 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -58,6 +58,7 @@
--add-opens java.base/jdk.internal.loader=ALL-UNNAMED
--add-opens java.base/java.lang=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.platform=ALL-UNNAMED
diff --git a/pom.xml b/pom.xml
index aef380c5cd09c..c5d7092c7b4ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,7 @@ flexible messaging model and an intuitive client API.
--add-opens java.base/sun.net=ALL-UNNAMED
--add-opens java.management/sun.management=ALL-UNNAMED
--add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.platform=ALL-UNNAMED
true
4
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
index 42ef264b6db04..1405979ae3b12 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -45,6 +47,7 @@ public class LinuxInfoUtils {
private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage";
private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us";
+
// proc states
private static final String PROC_STAT_PATH = "/proc/stat";
private static final String NIC_PATH = "/sys/class/net/";
@@ -52,6 +55,30 @@ public class LinuxInfoUtils {
private static final int ARPHRD_ETHER = 1;
private static final String NIC_SPEED_TEMPLATE = "/sys/class/net/%s/speed";
+ private static Object /*jdk.internal.platform.Metrics*/ metrics;
+ private static Method getMetricsProviderMethod;
+ private static Method getCpuQuotaMethod;
+ private static Method getCpuPeriodMethod;
+ private static Method getCpuUsageMethod;
+
+ static {
+ try {
+ metrics = Class.forName("jdk.internal.platform.Container").getMethod("metrics")
+ .invoke(null);
+ if (metrics != null) {
+ getMetricsProviderMethod = metrics.getClass().getMethod("getProvider");
+ getMetricsProviderMethod.setAccessible(true);
+ getCpuQuotaMethod = metrics.getClass().getMethod("getCpuQuota");
+ getCpuQuotaMethod.setAccessible(true);
+ getCpuPeriodMethod = metrics.getClass().getMethod("getCpuPeriod");
+ getCpuPeriodMethod.setAccessible(true);
+ getCpuUsageMethod = metrics.getClass().getMethod("getCpuUsage");
+ getCpuUsageMethod.setAccessible(true);
+ }
+ } catch (Throwable e) {
+ log.warn("Failed to get runtime metrics", e);
+ }
+ }
/**
* Determine whether the OS is the linux kernel.
@@ -66,9 +93,14 @@ public static boolean isLinux() {
*/
public static boolean isCGroupEnabled() {
try {
- return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
+ if (metrics == null) {
+ return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
+ }
+ String provider = (String) getMetricsProviderMethod.invoke(metrics);
+ log.info("[LinuxInfo] The system metrics provider is: {}", provider);
+ return provider.contains("cgroup");
} catch (Exception e) {
- log.warn("[LinuxInfo] Failed to check cgroup CPU usage file: {}", e.getMessage());
+ log.warn("[LinuxInfo] Failed to check cgroup CPU: {}", e.getMessage());
return false;
}
}
@@ -81,13 +113,21 @@ public static boolean isCGroupEnabled() {
public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
if (isCGroupsEnabled) {
try {
- long quota = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
- long period = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
+ long quota;
+ long period;
+ if (metrics != null && getCpuQuotaMethod != null && getCpuPeriodMethod != null) {
+ quota = (long) getCpuQuotaMethod.invoke(metrics);
+ period = (long) getCpuPeriodMethod.invoke(metrics);
+ } else {
+ quota = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
+ period = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
+ }
+
if (quota > 0) {
return 100.0 * quota / period;
}
- } catch (IOException e) {
- log.warn("[LinuxInfo] Failed to read CPU quotas from cgroups", e);
+ } catch (Exception e) {
+ log.warn("[LinuxInfo] Failed to read CPU quotas from cgroup", e);
// Fallback to availableProcessors
}
}
@@ -99,11 +139,14 @@ public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
* Get CGroup cpu usage.
* @return Cpu usage
*/
- public static double getCpuUsageForCGroup() {
+ public static long getCpuUsageForCGroup() {
try {
+ if (metrics != null && getCpuUsageMethod != null) {
+ return (long) getCpuUsageMethod.invoke(metrics);
+ }
return readLongFromFile(Paths.get(CGROUPS_CPU_USAGE_PATH));
- } catch (IOException e) {
- log.error("[LinuxInfo] Failed to read CPU usage from {}", CGROUPS_CPU_USAGE_PATH, e);
+ } catch (Exception e) {
+ log.error("[LinuxInfo] Failed to read CPU usage from cgroup", e);
return -1;
}
}
@@ -291,6 +334,11 @@ enum Operstate {
UP
}
+ @VisibleForTesting
+ public static Object getMetrics() {
+ return metrics;
+ }
+
@AllArgsConstructor
public enum NICUsageType {
// transport
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
index 318f37f7f7a97..5d2b7bdd09adf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
@@ -140,7 +140,7 @@ private double getTotalCpuUsage(double elapsedTimeSeconds) {
}
private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) {
- double usage = getCpuUsageForCGroup();
+ double usage = (double) getCpuUsageForCGroup();
double currentUsage = usage - lastCpuUsage;
lastCpuUsage = usage;
return 100 * currentUsage / elapsedTimeSeconds / TimeUnit.SECONDS.toNanos(1);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
index 6de2eb90f12ef..28dde8b7f559d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
@@ -20,6 +20,8 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -28,6 +30,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
import org.testng.annotations.Test;
@Slf4j
@@ -96,4 +99,27 @@ public void testNoNICSpeed() throws Exception {
}
+ @Test
+ public void testCGroupMetrics() {
+ if (!LinuxInfoUtils.isLinux()) {
+ return;
+ }
+
+ boolean existsCGroup = Files.exists(Paths.get("/sys/fs/cgroup"));
+ boolean cGroupEnabled = LinuxInfoUtils.isCGroupEnabled();
+ Assert.assertEquals(cGroupEnabled, existsCGroup);
+
+ double totalCpuLimit = LinuxInfoUtils.getTotalCpuLimit(cGroupEnabled);
+ log.info("totalCpuLimit: {}", totalCpuLimit);
+ Assert.assertTrue(totalCpuLimit > 0.0);
+
+ if (cGroupEnabled) {
+ Assert.assertNotNull(LinuxInfoUtils.getMetrics());
+
+ long cpuUsageForCGroup = LinuxInfoUtils.getCpuUsageForCGroup();
+ log.info("cpuUsageForCGroup: {}", cpuUsageForCGroup);
+ Assert.assertTrue(cpuUsageForCGroup > 0);
+ }
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
index 39298dce0b16a..563f707c445b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
@@ -18,15 +18,19 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import lombok.Cleanup;
-import org.testng.Assert;
-import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+@Slf4j
public class LinuxBrokerHostUsageImplTest {
@Test
@@ -42,4 +46,31 @@ public void checkOverrideBrokerNicSpeedGbps() {
double totalLimit = linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics);
Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
}
+
+ @Test
+ public void testCpuUsage() throws InterruptedException {
+ if (!LinuxInfoUtils.isLinux()) {
+ return;
+ }
+
+ @Cleanup("shutdown")
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
+ new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, Optional.empty(), executorService);
+
+ linuxBrokerHostUsage.calculateBrokerHostUsage();
+ TimeUnit.SECONDS.sleep(1);
+ linuxBrokerHostUsage.calculateBrokerHostUsage();
+
+ double usage = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().usage;
+ double limit = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().limit;
+ float percentUsage = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().percentUsage();
+
+ Assert.assertTrue(usage > 0);
+ Assert.assertTrue(limit > 0);
+ Assert.assertTrue(limit >= usage);
+ Assert.assertTrue(percentUsage > 0);
+
+ log.info("usage: {}, limit: {}, percentUsage: {}", usage, limit, percentUsage);
+ }
}