diff --git a/bin/pulsar b/bin/pulsar index 8f8d77ed1f74d..7c2a75aad665f 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -322,6 +322,8 @@ if [[ -z "$IS_JAVA_8" ]]; then OPTS="$OPTS --add-opens java.management/sun.management=ALL-UNNAMED" # MBeanStatsGenerator OPTS="$OPTS --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED" + # LinuxInfoUtils + OPTS="$OPTS --add-opens java.base/jdk.internal.platform=ALL-UNNAMED" fi ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=* -Dzookeeper.snapshot.trust.empty=true -Dzookeeper.tcpKeepAlive=true" diff --git a/pom.xml b/pom.xml index f4d9f62cbc1a9..9d46bef2eee74 100644 --- a/pom.xml +++ b/pom.xml @@ -1919,6 +1919,7 @@ flexible messaging model and an intuitive client API. --add-opens java.base/java.io=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED + --add-opens java.base/jdk.internal.platform=ALL-UNNAMED diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java new file mode 100644 index 0000000000000..7e0d0dcfda1cc --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +public enum BitRateUnit { + + Bit { + public double toBit(double bitRate) { + return bitRate; + } + + public double toKilobit(double bitRate) { + return bitRate / C0; + } + + public double toMegabit(double bitRate) { + return bitRate / Math.pow(C0, 2); + } + + public double toGigabit(double bitRate) { + return bitRate / Math.pow(C0, 3); + } + + public double toByte(double bitRate) { + return bitRate / C1; + } + + public double convert(double bitRate, BitRateUnit bitRateUnit) { + return bitRateUnit.toBit(bitRate); + } + }, + Kilobit { + public double toBit(double bitRate) { + return bitRate * C0; + } + + public double toKilobit(double bitRate) { + return bitRate; + } + + public double toMegabit(double bitRate) { + return bitRate / C0; + } + + public double toGigabit(double bitRate) { + return bitRate / Math.pow(C0, 2); + } + + public double toByte(double bitRate) { + return bitRate * C0 / C1; + } + + public double convert(double bitRate, BitRateUnit bitRateUnit) { + return bitRateUnit.toKilobit(bitRate); + } + }, + Megabit { + public double toBit(double bitRate) { + return bitRate * Math.pow(C0, 2); + } + + public double toKilobit(double bitRate) { + return bitRate * C0; + } + + public double toMegabit(double bitRate) { + return bitRate; + } + + public double toGigabit(double bitRate) { + return bitRate / C0; + } + + public double toByte(double bitRate) { + return bitRate * Math.pow(C0, 2) / C1; + } + + public double convert(double bitRate, BitRateUnit bitRateUnit) { + return bitRateUnit.toMegabit(bitRate); + } + }, + Gigabit { + public double toBit(double bitRate) { + return bitRate * Math.pow(C0, 3); + } + + public double toKilobit(double bitRate) { + return bitRate * Math.pow(C0, 2); + } + + public double toMegabit(double bitRate) { + return bitRate * C0; + } + + public double toGigabit(double bitRate) { + return bitRate; + } + + public double toByte(double bitRate) { + return bitRate * Math.pow(C0, 3) / C1; + } + + public double convert(double bitRate, BitRateUnit bitRateUnit) { + return bitRateUnit.toGigabit(bitRate); + } + }, + Byte { + public double toBit(double bitRate) { + return bitRate * C1; + } + + public double toKilobit(double bitRate) { + return bitRate * C1 / C0; + } + + public double toMegabit(double bitRate) { + return bitRate * C1 / Math.pow(C0, 2); + } + + public double toGigabit(double bitRate) { + return bitRate * C1 / Math.pow(C0, 3); + } + + public double toByte(double bitRate) { + return bitRate; + } + + public double convert(double bitRate, BitRateUnit bitRateUnit) { + return bitRateUnit.toByte(bitRate); + } + }; + + static final int C0 = 1000; + static final int C1 = 8; + + public double toBit(double bitRate) { + throw new AbstractMethodError(); + } + + public double toKilobit(double bitRate) { + throw new AbstractMethodError(); + } + + public double toMegabit(double bitRate) { + throw new AbstractMethodError(); + } + + public double toGigabit(double bitRate) { + throw new AbstractMethodError(); + } + + public double toByte(double bitRate) { + throw new AbstractMethodError(); + } + + public double convert(double bitRate, BitRateUnit bitRateUnit) { + throw new AbstractMethodError(); + } +} diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java new file mode 100644 index 0000000000000..14adcc7027334 --- /dev/null +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import static org.testng.Assert.assertEquals; +import org.testng.annotations.Test; + +public class BitRateUnitTest { + + @Test + public void testBps() { + double bps = 1231434.12; + assertEquals(BitRateUnit.Bit.toBit(bps), bps); + assertEquals(BitRateUnit.Bit.toByte(bps), bps / 8); + assertEquals(BitRateUnit.Bit.toKilobit(bps), bps / 1000); + assertEquals(BitRateUnit.Bit.toMegabit(bps), bps / 1000 / 1000); + assertEquals(BitRateUnit.Bit.toGigabit(bps), bps / 1000 / 1000 / 1000); + } + + @Test + public void testKbps() { + double kbps = 1231434.12; + assertEquals(BitRateUnit.Kilobit.toBit(kbps), kbps * 1000); + assertEquals(BitRateUnit.Kilobit.toByte(kbps), kbps * 1000 / 8); + assertEquals(BitRateUnit.Kilobit.toKilobit(kbps), kbps); + assertEquals(BitRateUnit.Kilobit.toMegabit(kbps), kbps / 1000); + assertEquals(BitRateUnit.Kilobit.toGigabit(kbps), kbps / 1000 / 1000); + } + + @Test + public void testMbps() { + double mbps = 1231434.12; + assertEquals(BitRateUnit.Megabit.toBit(mbps), mbps * 1000 * 1000); + assertEquals(BitRateUnit.Megabit.toByte(mbps), mbps * 1000 * 1000 / 8); + assertEquals(BitRateUnit.Megabit.toKilobit(mbps), mbps * 1000); + assertEquals(BitRateUnit.Megabit.toMegabit(mbps), mbps); + assertEquals(BitRateUnit.Megabit.toGigabit(mbps), mbps / 1000); + } + + @Test + public void testGbps() { + double gbps = 1231434.12; + assertEquals(BitRateUnit.Gigabit.toBit(gbps),gbps * 1000 * 1000 * 1000 ); + assertEquals(BitRateUnit.Gigabit.toByte(gbps), gbps * 1000 * 1000 * 1000 / 8); + assertEquals(BitRateUnit.Gigabit.toKilobit(gbps), gbps * 1000 * 1000); + assertEquals(BitRateUnit.Gigabit.toMegabit(gbps), gbps * 1000); + assertEquals(BitRateUnit.Gigabit.toGigabit(gbps), gbps); + } + + @Test + public void testByte() { + double bytes = 1231434.12; + assertEquals(BitRateUnit.Byte.toBit(bytes), bytes * 8); + assertEquals(BitRateUnit.Byte.toByte(bytes), bytes); + assertEquals(BitRateUnit.Byte.toKilobit(bytes), bytes / 1000 * 8); + assertEquals(BitRateUnit.Byte.toMegabit(bytes), bytes / 1000 / 1000 * 8); + assertEquals(BitRateUnit.Byte.toGigabit(bytes), bytes / 1000 / 1000 / 1000 * 8); + } + + + @Test + public void testConvert() { + double unit = 12334125.1234; + assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toBit(unit)); + assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toBit(unit)); + assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toBit(unit)); + assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toBit(unit)); + assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toBit(unit)); + + assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toKilobit(unit)); + assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toKilobit(unit)); + assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toKilobit(unit)); + assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toKilobit(unit)); + assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toKilobit(unit)); + + assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toMegabit(unit)); + assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toMegabit(unit)); + assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toMegabit(unit)); + assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toMegabit(unit)); + assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toMegabit(unit)); + + assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toGigabit(unit)); + assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toGigabit(unit)); + assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toGigabit(unit)); + assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toGigabit(unit)); + assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toGigabit(unit)); + + assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toByte(unit)); + assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toByte(unit)); + assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toByte(unit)); + assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toByte(unit)); + assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toByte(unit)); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 4d689e124c93d..3bead53f10695 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -85,6 +85,7 @@ import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.intercept.BrokerInterceptors; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; +import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask; @@ -662,6 +663,14 @@ public void start() throws PulsarServerException { + "authenticationEnabled=true when authorization is enabled with authorizationEnabled=true."); } + if (!config.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent() + && config.isLoadBalancerEnabled() + && LinuxInfoUtils.isLinux() + && !LinuxInfoUtils.checkHasNicSpeeds()) { + throw new IllegalStateException("Unable to read VM NIC speed. You must set " + + "[loadBalancerOverrideBrokerNicSpeedGbps] to override it when load balancer is enabled."); + } + localMetadataStore = createLocalMetadataStore(); localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java new file mode 100644 index 0000000000000..2a423045b1bc4 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java @@ -0,0 +1,369 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.SystemUtils; +import org.apache.pulsar.broker.BitRateUnit; + +@Slf4j +public class LinuxInfoUtils { + + // CGROUP + private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage"; + private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; + private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; + + // proc states + private static final String PROC_STAT_PATH = "/proc/stat"; + private static final String NIC_PATH = "/sys/class/net/"; + // NIC type + private static final int ARPHRD_ETHER = 1; + private static final String NIC_SPEED_TEMPLATE = "/sys/class/net/%s/speed"; + + private static Object /*jdk.internal.platform.Metrics*/ metrics; + private static Method getMetricsProviderMethod; + private static Method getCpuQuotaMethod; + private static Method getCpuPeriodMethod; + private static Method getCpuUsageMethod; + + static { + try { + metrics = Class.forName("jdk.internal.platform.Container").getMethod("metrics") + .invoke(null); + if (metrics != null) { + getMetricsProviderMethod = metrics.getClass().getMethod("getProvider"); + getMetricsProviderMethod.setAccessible(true); + getCpuQuotaMethod = metrics.getClass().getMethod("getCpuQuota"); + getCpuQuotaMethod.setAccessible(true); + getCpuPeriodMethod = metrics.getClass().getMethod("getCpuPeriod"); + getCpuPeriodMethod.setAccessible(true); + getCpuUsageMethod = metrics.getClass().getMethod("getCpuUsage"); + getCpuUsageMethod.setAccessible(true); + } + } catch (Throwable e) { + log.warn("Failed to get runtime metrics", e); + } + } + + /** + * Determine whether the OS is the linux kernel. + * @return Whether the OS is the linux kernel + */ + public static boolean isLinux() { + return SystemUtils.IS_OS_LINUX; + } + + /** + * Determine whether the OS enable CG Group. + */ + public static boolean isCGroupEnabled() { + try { + if (metrics == null) { + return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH)); + } + String provider = (String) getMetricsProviderMethod.invoke(metrics); + log.info("[LinuxInfo] The system metrics provider is: {}", provider); + return provider.contains("cgroup"); + } catch (Exception e) { + log.warn("[LinuxInfo] Failed to check cgroup CPU: {}", e.getMessage()); + return false; + } + } + + /** + * Get total cpu limit. + * @param isCGroupsEnabled Whether CGroup is enabled + * @return Total cpu limit + */ + public static double getTotalCpuLimit(boolean isCGroupsEnabled) { + if (isCGroupsEnabled) { + try { + long quota; + long period; + if (metrics != null && getCpuQuotaMethod != null && getCpuPeriodMethod != null) { + quota = (long) getCpuQuotaMethod.invoke(metrics); + period = (long) getCpuPeriodMethod.invoke(metrics); + } else { + quota = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH)); + period = readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH)); + } + + if (quota > 0) { + return 100.0 * quota / period; + } + } catch (Exception e) { + log.warn("[LinuxInfo] Failed to read CPU quotas from cgroup", e); + // Fallback to availableProcessors + } + } + // Fallback to JVM reported CPU quota + return 100 * Runtime.getRuntime().availableProcessors(); + } + + /** + * Get CGroup cpu usage. + * @return Cpu usage + */ + public static long getCpuUsageForCGroup() { + try { + if (metrics != null && getCpuUsageMethod != null) { + return (long) getCpuUsageMethod.invoke(metrics); + } + return readLongFromFile(Paths.get(CGROUPS_CPU_USAGE_PATH)); + } catch (Exception e) { + log.error("[LinuxInfo] Failed to read CPU usage from cgroup", e); + return -1; + } + } + + + /** + * Reads first line of /proc/stat to get total cpu usage. + * + *
+     *     cpu  user   nice system idle    iowait irq softirq steal guest guest_nice
+     *     cpu  317808 128  58637  2503692 7634   0   13472   0     0     0
+     * 
+ *

+ * Line is split in "words", filtering the first. The sum of all numbers give the amount of cpu cycles used this + * far. Real CPU usage should equal the sum subtracting the idle cycles, this would include iowait, irq and steal. + */ + public static ResourceUsage getCpuUsageForEntireHost() { + try (Stream stream = Files.lines(Paths.get(PROC_STAT_PATH))) { + Optional first = stream.findFirst(); + if (!first.isPresent()) { + log.error("[LinuxInfo] Failed to read CPU usage from /proc/stat, because of empty values."); + return ResourceUsage.empty(); + } + String[] words = first.get().split("\\s+"); + long total = Arrays.stream(words) + .filter(s -> !s.contains("cpu")) + .mapToLong(Long::parseLong) + .sum(); + long idle = Long.parseLong(words[4]); + return ResourceUsage.builder() + .usage(total - idle) + .idle(idle) + .total(total).build(); + } catch (IOException e) { + log.error("[LinuxInfo] Failed to read CPU usage from /proc/stat", e); + return ResourceUsage.empty(); + } + } + + /** + * Determine whether the VM has physical nic. + * @param nicPath Nic path + * @return whether The VM has physical nic. + */ + private static boolean isPhysicalNic(Path nicPath) { + try { + if (nicPath.toString().contains("/virtual/")) { + return false; + } + // Check the type to make sure it's ethernet (type "1") + String type = readTrimStringFromFile(nicPath.resolve("type")); + // wireless NICs don't report speed, ignore them. + return Integer.parseInt(type) == ARPHRD_ETHER; + } catch (Exception e) { + log.warn("[LinuxInfo] Failed to read {} NIC type, the detail is: {}", nicPath, e.getMessage()); + // Read type got error. + return false; + } + } + + /** + * Determine whether nic is usable. + * @param nicPath Nic path + * @return whether nic is usable. + */ + private static boolean isUsable(Path nicPath) { + try { + String operstate = readTrimStringFromFile(nicPath.resolve("operstate")); + Operstate operState = Operstate.valueOf(operstate.toUpperCase(Locale.ROOT)); + switch (operState) { + case UP: + case UNKNOWN: + case DORMANT: + return true; + default: + return false; + } + } catch (Exception e) { + log.warn("[LinuxInfo] Failed to read {} NIC operstate, the detail is: {}", nicPath, e.getMessage()); + // Read operstate got error. + return false; + } + } + + /** + * Get all physical nic limit. + * @param nics All nic path + * @param bitRateUnit Bit rate unit + * @return Total nic limit + */ + public static double getTotalNicLimit(List nics, BitRateUnit bitRateUnit) { + return bitRateUnit.convert(nics.stream().mapToDouble(nicPath -> { + try { + return readDoubleFromFile(getReplacedNICPath(NIC_SPEED_TEMPLATE, nicPath)); + } catch (IOException e) { + log.error("[LinuxInfo] Failed to get total nic limit.", e); + return 0d; + } + }).sum(), BitRateUnit.Megabit); + } + + /** + * Get all physical nic usage. + * @param nics All nic path + * @param type Nic's usage type: transport, receive + * @param bitRateUnit Bit rate unit + * @return Total nic usage + */ + public static double getTotalNicUsage(List nics, NICUsageType type, BitRateUnit bitRateUnit) { + return bitRateUnit.convert(nics.stream().mapToDouble(nic -> { + try { + return readDoubleFromFile(getReplacedNICPath(type.template, nic)); + } catch (IOException e) { + log.error("[LinuxInfo] Failed to read {} bytes for NIC {} ", type, nic, e); + return 0d; + } + }).sum(), BitRateUnit.Byte); + } + + /** + * Get paths of all usable physical nic. + * @return All usable physical nic paths. + */ + public static List getUsablePhysicalNICs() { + try (Stream stream = Files.list(Paths.get(NIC_PATH))) { + return stream.filter(LinuxInfoUtils::isPhysicalNic) + .filter(LinuxInfoUtils::isUsable) + .map(path -> path.getFileName().toString()) + .collect(Collectors.toList()); + } catch (IOException e) { + log.error("[LinuxInfo] Failed to find NICs", e); + return Collections.emptyList(); + } + } + + /** + * Check this VM has nic speed. + * @return Whether the VM has nic speed + */ + public static boolean checkHasNicSpeeds() { + List physicalNICs = getUsablePhysicalNICs(); + if (CollectionUtils.isEmpty(physicalNICs)) { + return false; + } + double totalNicLimit = getTotalNicLimit(physicalNICs, BitRateUnit.Kilobit); + return totalNicLimit > 0; + } + + private static Path getReplacedNICPath(String template, String nic) { + return Paths.get(String.format(template, nic)); + } + + private static String readTrimStringFromFile(Path path) throws IOException { + return new String(Files.readAllBytes(path), StandardCharsets.UTF_8).trim(); + } + + private static long readLongFromFile(Path path) throws IOException { + return Long.parseLong(readTrimStringFromFile(path)); + } + + private static double readDoubleFromFile(Path path) throws IOException { + return Double.parseDouble(readTrimStringFromFile(path)); + } + + /** + * TLV IFLA_OPERSTATE + * contains RFC2863 state of the interface in numeric representation: + * See ... + */ + enum Operstate { + // Interface is in unknown state, neither driver nor userspace has set + // operational state. Interface must be considered for user data as + // setting operational state has not been implemented in every driver. + UNKNOWN, + // Interface is unable to transfer data on L1, f.e. ethernet is not + // plugged or interface is ADMIN down. + DOWN, + // Interfaces stacked on an interface that is IF_OPER_DOWN show this + // state (f.e. VLAN). + LOWERLAYERDOWN, + // Interface is L1 up, but waiting for an external event, f.e. for a + // protocol to establish. (802.1X) + DORMANT, + // Interface is operational up and can be used. + UP + } + + @VisibleForTesting + public static Object getMetrics() { + return metrics; + } + + @AllArgsConstructor + public enum NICUsageType { + // transport + TX("/sys/class/net/%s/statistics/tx_bytes"), + // receive + RX("/sys/class/net/%s/statistics/rx_bytes"); + private final String template; + } + + @Data + @Builder + public static class ResourceUsage { + private final long total; + private final long idle; + private final long usage; + + public static ResourceUsage empty() { + return ResourceUsage.builder() + .total(-1) + .idle(-1) + .usage(-1).build(); + } + + public boolean isEmpty() { + return this.total == -1 && idle == -1 && usage == -1; + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java index fc6c4116e0cbe..9c9a00d7f5dcf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java @@ -18,26 +18,27 @@ */ package org.apache.pulsar.broker.loadbalance.impl; +import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.NICUsageType; +import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getCpuUsageForCGroup; +import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getCpuUsageForEntireHost; +import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getTotalCpuLimit; +import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getTotalNicLimit; +import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getTotalNicUsage; +import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getUsablePhysicalNICs; +import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.isCGroupEnabled; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; -import com.google.common.base.Charsets; +import com.google.common.annotations.VisibleForTesting; import com.sun.management.OperatingSystemMXBean; -import java.io.IOException; import java.lang.management.ManagementFactory; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BitRateUnit; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; +import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; @@ -54,14 +55,9 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { private double lastCpuTotalTime; private OperatingSystemMXBean systemBean; private SystemResourceUsage usage; - private final Optional overrideBrokerNicSpeedGbps; private final boolean isCGroupsEnabled; - private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage"; - private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; - private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; - public LinuxBrokerHostUsageImpl(PulsarService pulsar) { this( pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes(), @@ -77,15 +73,7 @@ public LinuxBrokerHostUsageImpl(int hostUsageCheckIntervalMin, this.lastCollection = 0L; this.usage = new SystemResourceUsage(); this.overrideBrokerNicSpeedGbps = overrideBrokerNicSpeedGbps; - - boolean isCGroupsEnabled = false; - try { - isCGroupsEnabled = Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH)); - } catch (Exception e) { - log.warn("Failed to check cgroup CPU usage file: {}", e.getMessage()); - } - this.isCGroupsEnabled = isCGroupsEnabled; - + this.isCGroupsEnabled = isCGroupEnabled(); // Call now to initialize values before the constructor returns calculateBrokerHostUsage(); executorService.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::calculateBrokerHostUsage), @@ -100,19 +88,17 @@ public SystemResourceUsage getBrokerHostUsage() { @Override public void calculateBrokerHostUsage() { - List nics = getNics(); - double totalNicLimit = getTotalNicLimitKbps(nics); - double totalNicUsageTx = getTotalNicUsageTxKb(nics); - double totalNicUsageRx = getTotalNicUsageRxKb(nics); - double totalCpuLimit = getTotalCpuLimit(); - + List nics = getUsablePhysicalNICs(); + double totalNicLimit = getTotalNicLimitWithConfiguration(nics); + double totalNicUsageTx = getTotalNicUsage(nics, NICUsageType.TX, BitRateUnit.Kilobit); + double totalNicUsageRx = getTotalNicUsage(nics, NICUsageType.RX, BitRateUnit.Kilobit); + double totalCpuLimit = getTotalCpuLimit(isCGroupsEnabled); long now = System.currentTimeMillis(); double elapsedSeconds = (now - lastCollection) / 1000d; if (elapsedSeconds <= 0) { log.warn("elapsedSeconds {} is not expected, skip this round of calculateBrokerHostUsage", elapsedSeconds); return; } - SystemResourceUsage usage = new SystemResourceUsage(); double cpuUsage = getTotalCpuUsage(elapsedSeconds); @@ -128,30 +114,21 @@ public void calculateBrokerHostUsage() { usage.setBandwidthIn(new ResourceUsage(nicUsageRx, totalNicLimit)); usage.setBandwidthOut(new ResourceUsage(nicUsageTx, totalNicLimit)); } + usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit)); lastTotalNicUsageTx = totalNicUsageTx; lastTotalNicUsageRx = totalNicUsageRx; lastCollection = System.currentTimeMillis(); this.usage = usage; - usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit)); } - private double getTotalCpuLimit() { - if (isCGroupsEnabled) { - try { - long quota = readLongFromFile(CGROUPS_CPU_LIMIT_QUOTA_PATH); - long period = readLongFromFile(CGROUPS_CPU_LIMIT_PERIOD_PATH); - if (quota > 0) { - return 100.0 * quota / period; - } - } catch (IOException e) { - log.warn("Failed to read CPU quotas from cgroups", e); - // Fallback to availableProcessors - } - } - - // Fallback to JVM reported CPU quota - return 100 * Runtime.getRuntime().availableProcessors(); + @VisibleForTesting + double getTotalNicLimitWithConfiguration(List nics) { + // Use the override value as configured. Return the total max speed across all available NICs, converted + // from Gbps into Kbps + return overrideBrokerNicSpeedGbps.map(BitRateUnit.Gigabit::toKilobit) + .map(speed -> speed * nics.size()) + .orElseGet(() -> getTotalNicLimit(nics, BitRateUnit.Kilobit)); } private double getTotalCpuUsage(double elapsedTimeSeconds) { @@ -162,6 +139,13 @@ private double getTotalCpuUsage(double elapsedTimeSeconds) { } } + private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) { + double usage = (double) getCpuUsageForCGroup(); + double currentUsage = usage - lastCpuUsage; + lastCpuUsage = usage; + return 100 * currentUsage / elapsedTimeSeconds / TimeUnit.SECONDS.toNanos(1); + } + /** * Reads first line of /proc/stat to get total cpu usage. * @@ -171,39 +155,18 @@ private double getTotalCpuUsage(double elapsedTimeSeconds) { * * * Line is split in "words", filtering the first. The sum of all numbers give the amount of cpu cycles used this - * far. Real CPU usage should equal the sum substracting the idle cycles, this would include iowait, irq and steal. + * far. Real CPU usage should equal the sum subtracting the idle cycles, this would include iowait, irq and steal. */ private double getTotalCpuUsageForEntireHost() { - try (Stream stream = Files.lines(Paths.get("/proc/stat"))) { - String[] words = stream.findFirst().get().split("\\s+"); - - long total = Arrays.stream(words).filter(s -> !s.contains("cpu")).mapToLong(Long::parseLong).sum(); - long idle = Long.parseLong(words[4]); - long usage = total - idle; - - double currentUsage = (usage - lastCpuUsage) / (total - lastCpuTotalTime) * getTotalCpuLimit(); - - lastCpuUsage = usage; - lastCpuTotalTime = total; - - return currentUsage; - } catch (IOException e) { - log.error("Failed to read CPU usage from /proc/stat", e); - return -1; - } - } - - private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) { - try { - long usage = readLongFromFile(CGROUPS_CPU_USAGE_PATH); - double currentUsage = usage - lastCpuUsage; - lastCpuUsage = usage; - - return 100 * currentUsage / elapsedTimeSeconds / TimeUnit.SECONDS.toNanos(1); - } catch (IOException e) { - log.error("Failed to read CPU usage from {}", CGROUPS_CPU_USAGE_PATH, e); + LinuxInfoUtils.ResourceUsage cpuUsageForEntireHost = getCpuUsageForEntireHost(); + if (cpuUsageForEntireHost.isEmpty()) { return -1; } + double currentUsage = (cpuUsageForEntireHost.getUsage() - lastCpuUsage) + / (cpuUsageForEntireHost.getTotal() - lastCpuTotalTime) * getTotalCpuLimit(isCGroupsEnabled); + lastCpuUsage = cpuUsageForEntireHost.getUsage(); + lastCpuTotalTime = cpuUsageForEntireHost.getTotal(); + return currentUsage; } private ResourceUsage getMemUsage() { @@ -212,86 +175,4 @@ private ResourceUsage getMemUsage() { return new ResourceUsage(total - free, total); } - private List getNics() { - try (Stream stream = Files.list(Paths.get("/sys/class/net/"))) { - return stream.filter(this::isPhysicalNic).map(path -> path.getFileName().toString()) - .collect(Collectors.toList()); - } catch (IOException e) { - log.error("Failed to find NICs", e); - return Collections.emptyList(); - } - } - - public int getNicCount() { - return getNics().size(); - } - - private boolean isPhysicalNic(Path path) { - try { - if (path.toString().contains("/virtual/")) { - return false; - } - // Check the type to make sure it's ethernet (type "1") - String type = new String(Files.readAllBytes(path.resolve("type")), StandardCharsets.UTF_8).trim(); - // wireless NICs don't report speed, ignore them. - return Integer.parseInt(type) == 1; - } catch (Exception e) { - // Read type got error. - return false; - } - } - - private Path getNicSpeedPath(String nic) { - return Paths.get(String.format("/sys/class/net/%s/speed", nic)); - } - - private double getTotalNicLimitKbps(List nics) { - // Use the override value as configured. Return the total max speed across all available NICs, converted - // from Gbps into Kbps - return overrideBrokerNicSpeedGbps.map(aDouble -> aDouble * nics.size() * 1000 * 1000) - .orElseGet(() -> nics.stream().mapToDouble(nicPath -> { - // Nic speed is in Mbits/s, return kbits/s - try { - return Double.parseDouble(new String(Files.readAllBytes(getNicSpeedPath(nicPath)))); - } catch (IOException e) { - log.error(String.format("Failed to read speed for nic %s, maybe you can set broker" - + " config [loadBalancerOverrideBrokerNicSpeedGbps] to override it.", nicPath), e); - return 0d; - } - }).sum() * 1000); - } - - private Path getNicTxPath(String nic) { - return Paths.get(String.format("/sys/class/net/%s/statistics/tx_bytes", nic)); - } - - private Path getNicRxPath(String nic) { - return Paths.get(String.format("/sys/class/net/%s/statistics/rx_bytes", nic)); - } - - private double getTotalNicUsageRxKb(List nics) { - return nics.stream().mapToDouble(s -> { - try { - return Double.parseDouble(new String(Files.readAllBytes(getNicRxPath(s)))); - } catch (IOException e) { - log.error("Failed to read rx_bytes for NIC " + s, e); - return 0d; - } - }).sum() * 8d / 1000; - } - - private double getTotalNicUsageTxKb(List nics) { - return nics.stream().mapToDouble(s -> { - try { - return Double.parseDouble(new String(Files.readAllBytes(getNicTxPath(s)))); - } catch (IOException e) { - log.error("Failed to read tx_bytes for NIC " + s, e); - return 0d; - } - }).sum() * 8d / 1000; - } - - private static long readLongFromFile(String path) throws IOException { - return Long.parseLong(new String(Files.readAllBytes(Paths.get(path)), Charsets.UTF_8).trim()); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java index b9a017504e004..512dc594f29fb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java @@ -101,6 +101,7 @@ private ServiceConfiguration getConf() { conf.setDefaultNumberOfNamespaceBundles(1); conf.setMetadataStoreUrl(metadataStoreUrl); conf.setBrokerShutdownTimeoutMs(0L); + conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf.setBrokerServicePort(Optional.of(0)); conf.setWebServicePort(Optional.of(0)); conf.setNumExecutorThreadPoolSize(1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index ddb1ed4d46960..c905f87bce181 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -68,6 +68,7 @@ public void testGetWorkerService() throws Exception { configuration.setClusterName("clusterName"); configuration.setFunctionsWorkerEnabled(true); configuration.setBrokerShutdownTimeoutMs(0L); + configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); WorkerService expectedWorkerService = mock(WorkerService.class); @Cleanup PulsarService pulsarService = spy(new PulsarService(configuration, new WorkerConfig(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 36784372f6b46..0156b845ad4cf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -78,6 +78,7 @@ void setup() throws Exception { for (int i = 0; i < BROKER_COUNT; i++) { ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerShutdownTimeoutMs(0L); config.setClusterName("my-cluster"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java index ce2acf22aecb3..19c174d0edf4a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java @@ -114,6 +114,7 @@ public void testPersistentList() throws Exception { /***** Start Broker 2 ******/ ServiceConfiguration conf = new ServiceConfiguration(); conf.setBrokerShutdownTimeoutMs(0L); + conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf.setBrokerServicePort(Optional.of(0)); conf.setBrokerServicePortTls(Optional.of(0)); conf.setWebServicePort(Optional.of(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 84964680d3472..67b86f71c223b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -171,6 +171,7 @@ protected final void internalSetupForStatsTest() throws Exception { protected void doInitConf() throws Exception { this.conf.setBrokerShutdownTimeoutMs(0L); + this.conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); this.conf.setBrokerServicePort(Optional.of(0)); this.conf.setBrokerServicePortTls(Optional.of(0)); this.conf.setAdvertisedAddress("localhost"); @@ -468,6 +469,7 @@ protected static ServiceConfiguration getDefaultConf() { configuration.setConfigurationStoreServers("localhost:3181"); configuration.setAllowAutoTopicCreationType("non-partitioned"); configuration.setBrokerShutdownTimeoutMs(0L); + configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); configuration.setBrokerServicePort(Optional.of(0)); configuration.setBrokerServicePortTls(Optional.of(0)); configuration.setWebServicePort(Optional.of(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java index 9e81a3e1db968..eb23d6197fc5a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -111,6 +111,7 @@ void setup() throws Exception { config1.setWebServicePort(Optional.of(0)); config1.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config1.setBrokerShutdownTimeoutMs(0L); + config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config1.setBrokerServicePort(Optional.of(0)); config1.setFailureDomainsEnabled(true); config1.setLoadBalancerEnabled(true); @@ -132,6 +133,7 @@ void setup() throws Exception { config2.setWebServicePort(Optional.of(0)); config2.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config2.setBrokerShutdownTimeoutMs(0L); + config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config2.setBrokerServicePort(Optional.of(0)); config2.setFailureDomainsEnabled(true); config2.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index 1348b1e7279ae..ec0c9d574e62c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java @@ -69,6 +69,7 @@ public void anErrorShouldBeThrowBeforeLeaderElected() throws PulsarServerExcepti final String clusterName = "elect-test"; ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setWebServicePort(Optional.of(0)); config.setClusterName(clusterName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index a64f28384d3fb..b7d6a391fb02a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -125,6 +125,7 @@ void setup() throws Exception { config.setWebServicePortTls(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setAdvertisedAddress(localhost+i); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java index 50bb6b37760d5..a1c52e13c69cb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java @@ -22,7 +22,6 @@ import java.util.Optional; import org.apache.commons.lang3.SystemUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -30,17 +29,21 @@ @Test(groups = "broker") public class LoadReportNetworkLimitTest extends MockedPulsarServiceBaseTest { - int nicCount; + int usableNicCount; - @BeforeClass @Override - public void setup() throws Exception { + protected void doInitConf() throws Exception { + super.doInitConf(); conf.setLoadBalancerEnabled(true); conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(5.4)); - super.internalSetup(); + } + @BeforeClass + @Override + public void setup() throws Exception { + super.internalSetup(); if (SystemUtils.IS_OS_LINUX) { - nicCount = new LinuxBrokerHostUsageImpl(pulsar).getNicCount(); + usableNicCount = LinuxInfoUtils.getUsablePhysicalNICs().size(); } } @@ -57,12 +60,12 @@ public void checkLoadReportNicSpeed() throws Exception { LoadManagerReport report = admin.brokerStats().getLoadReport(); if (SystemUtils.IS_OS_LINUX) { - assertEquals(report.getBandwidthIn().limit, nicCount * 5.4 * 1000 * 1000); - assertEquals(report.getBandwidthOut().limit, nicCount * 5.4 * 1000 * 1000); + assertEquals(report.getBandwidthIn().limit, usableNicCount * 5.4 * 1000 * 1000, 0.0001); + assertEquals(report.getBandwidthOut().limit, usableNicCount * 5.4 * 1000 * 1000, 0.0001); } else { // On non-Linux system we don't report the network usage - assertEquals(report.getBandwidthIn().limit, -1.0); - assertEquals(report.getBandwidthOut().limit, -1.0); + assertEquals(report.getBandwidthIn().limit, -1.0, 0.0001); + assertEquals(report.getBandwidthOut().limit, -1.0, 0.0001); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java index 5fb3dca26d6f0..6b5898f4174e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java @@ -148,6 +148,7 @@ void setup() throws Exception { config1.setAdvertisedAddress("localhost"); config1.setBrokerShutdownTimeoutMs(0L); + config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config1.setBrokerServicePort(Optional.of(0)); config1.setBrokerServicePortTls(Optional.of(0)); config1.setWebServicePortTls(Optional.of(0)); @@ -167,6 +168,7 @@ void setup() throws Exception { config2.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config2.setAdvertisedAddress("localhost"); config2.setBrokerShutdownTimeoutMs(0L); + config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config2.setBrokerServicePort(Optional.of(0)); config2.setBrokerServicePortTls(Optional.of(0)); config2.setWebServicePortTls(Optional.of(0)); @@ -588,6 +590,7 @@ public void testOwnBrokerZnodeByMultipleBroker() throws Exception { config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); PulsarService pulsar = new PulsarService(config); // create znode using different zk-session diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java new file mode 100644 index 0000000000000..80ab649000075 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance; + +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Optional; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class SimpleBrokerStartTest { + + public void testHasNICSpeed() throws Exception { + if (!LinuxInfoUtils.isLinux()) { + return; + } + // Start local bookkeeper ensemble + @Cleanup("stop") + LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble.start(); + // Start broker + ServiceConfiguration config = new ServiceConfiguration(); + config.setClusterName("use"); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); + config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + config.setBrokerServicePort(Optional.of(0)); + config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); + config.setBrokerServicePortTls(Optional.of(0)); + config.setWebServicePortTls(Optional.of(0)); + config.setAdvertisedAddress("localhost"); + boolean hasNicSpeeds = LinuxInfoUtils.checkHasNicSpeeds(); + if (hasNicSpeeds) { + @Cleanup + PulsarService pulsarService = new PulsarService(config); + pulsarService.start(); + } + } + + public void testNoNICSpeed() throws Exception { + if (!LinuxInfoUtils.isLinux()) { + return; + } + // Start local bookkeeper ensemble + @Cleanup("stop") + LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble.start(); + // Start broker + ServiceConfiguration config = new ServiceConfiguration(); + config.setClusterName("use"); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); + config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + config.setBrokerServicePort(Optional.of(0)); + config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); + config.setBrokerServicePortTls(Optional.of(0)); + config.setWebServicePortTls(Optional.of(0)); + config.setAdvertisedAddress("localhost"); + boolean hasNicSpeeds = LinuxInfoUtils.checkHasNicSpeeds(); + if (!hasNicSpeeds) { + @Cleanup + PulsarService pulsarService = new PulsarService(config); + try { + pulsarService.start(); + fail("unexpected behaviour"); + } catch (PulsarServerException ex) { + assertTrue(ex.getCause() instanceof IllegalStateException); + } + } + } + + + @Test + public void testCGroupMetrics() { + if (!LinuxInfoUtils.isLinux()) { + return; + } + + boolean existsCGroup = Files.exists(Paths.get("/sys/fs/cgroup")); + boolean cGroupEnabled = LinuxInfoUtils.isCGroupEnabled(); + Assert.assertEquals(cGroupEnabled, existsCGroup); + + double totalCpuLimit = LinuxInfoUtils.getTotalCpuLimit(cGroupEnabled); + log.info("totalCpuLimit: {}", totalCpuLimit); + Assert.assertTrue(totalCpuLimit > 0.0); + + if (cGroupEnabled) { + Assert.assertNotNull(LinuxInfoUtils.getMetrics()); + + long cpuUsageForCGroup = LinuxInfoUtils.getCpuUsageForCGroup(); + log.info("cpuUsageForCGroup: {}", cpuUsageForCGroup); + Assert.assertTrue(cpuUsageForCGroup > 0); + } + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index ceff0d8aaed47..f71aa9a522520 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -111,6 +111,7 @@ void setup() throws Exception { config1.setWebServicePort(Optional.of(0)); config1.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config1.setBrokerShutdownTimeoutMs(0L); + config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config1.setBrokerServicePort(Optional.of(0)); config1.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config1.setBrokerServicePortTls(Optional.of(0)); @@ -130,6 +131,7 @@ void setup() throws Exception { config2.setWebServicePort(Optional.of(0)); config2.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config2.setBrokerShutdownTimeoutMs(0L); + config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config2.setBrokerServicePort(Optional.of(0)); config2.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config2.setBrokerServicePortTls(Optional.of(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java index 9ff266ba96ce0..3b9c839775aa5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java @@ -64,6 +64,7 @@ void setup() throws Exception { config.setAdvertisedAddress("localhost"); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerServicePortTls(Optional.of(0)); config.setWebServicePortTls(Optional.of(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java new file mode 100644 index 0000000000000..b6a625a4fe19e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Slf4j +public class LinuxBrokerHostUsageImplTest { + + @Test + public void checkOverrideBrokerNicSpeedGbps() { + @Cleanup("shutdown") + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + LinuxBrokerHostUsageImpl linuxBrokerHostUsage = + new LinuxBrokerHostUsageImpl(1, Optional.of(3.0), executorService); + List nics = new ArrayList<>(); + nics.add("1"); + nics.add("2"); + nics.add("3"); + double totalLimit = linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics); + Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3); + } + + @Test + public void testCpuUsage() throws InterruptedException { + if (!LinuxInfoUtils.isLinux()) { + return; + } + + @Cleanup("shutdown") + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + LinuxBrokerHostUsageImpl linuxBrokerHostUsage = + new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, Optional.empty(), executorService); + + linuxBrokerHostUsage.calculateBrokerHostUsage(); + TimeUnit.SECONDS.sleep(1); + linuxBrokerHostUsage.calculateBrokerHostUsage(); + + double usage = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().usage; + double limit = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().limit; + float percentUsage = linuxBrokerHostUsage.getBrokerHostUsage().getCpu().percentUsage(); + + Assert.assertTrue(usage > 0); + Assert.assertTrue(limit > 0); + Assert.assertTrue(limit >= usage); + Assert.assertTrue(percentUsage > 0); + + log.info("usage: {}, limit: {}, percentUsage: {}", usage, limit, percentUsage); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java index c25c6c23fb011..c24164acbbf99 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java @@ -109,6 +109,7 @@ protected void startBroker() throws Exception { conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); conf.setBrokerShutdownTimeoutMs(0L); + conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf.setBrokerServicePort(Optional.of(0)); conf.setBrokerServicePortTls(Optional.of(0)); conf.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 1d2878ca64471..abe58585d4473 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -101,6 +101,7 @@ void setup() throws Exception { config.setWebServicePort(Optional.of(0)); config.setClusterName("usc"); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setAuthorizationEnabled(false); config.setAuthenticationEnabled(false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java index b5aaafa6e21bf..de7d2bba2d43e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java @@ -81,6 +81,7 @@ protected void setup() throws Exception { config.setWebServicePort(Optional.of(0)); config.setClusterName("usc"); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setAuthorizationEnabled(false); config.setAuthenticationEnabled(false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java index 6a9ec94de8424..a5d8e62c3bb82 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java @@ -145,8 +145,11 @@ public void testBookieIsolation() throws Exception { config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); config.setClusterName(cluster); config.setWebServicePort(Optional.of(0)); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setAdvertisedAddress("localhost"); config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups); @@ -308,6 +311,7 @@ public void testBookieIsolationWithSecondaryGroup() throws Exception { config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setAdvertisedAddress("localhost"); config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups); @@ -449,6 +453,7 @@ public void testDeleteIsolationGroup() throws Exception { config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setAdvertisedAddress("localhost"); config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java index f6a90778c6ffe..480894b51f88b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java @@ -64,6 +64,7 @@ void setup() { configuration.setWebServicePort(Optional.of(0)); configuration.setClusterName("max_message_test"); configuration.setBrokerShutdownTimeoutMs(0L); + configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); configuration.setBrokerServicePort(Optional.of(0)); configuration.setAuthorizationEnabled(false); configuration.setAuthenticationEnabled(false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 31e6f5579b08b..7e35a1d0fc883 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -119,6 +120,7 @@ public void setup() throws Exception { executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-failover-test").build(); ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setBrokerShutdownTimeoutMs(0L); + svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); svcConfig.setClusterName("pulsar-cluster"); pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index 1ced87a5a2a6f..d81b3041003d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -30,6 +30,7 @@ import java.lang.reflect.Method; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -79,6 +80,7 @@ public void setup(Method m) throws Exception { super.setUp(m); ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setBrokerShutdownTimeoutMs(0L); + svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); @Cleanup PulsarService pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); doReturn(svcConfig).when(pulsar).getConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 24b056d3a5667..67ef1bbfef7e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -178,6 +178,7 @@ public void setup() throws Exception { ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setAdvertisedAddress("localhost"); svcConfig.setBrokerShutdownTimeoutMs(0L); + svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); svcConfig.setMaxUnackedMessagesPerConsumer(50000); svcConfig.setClusterName("pulsar-cluster"); pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 4270ec448b6d9..852ea66597796 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -270,6 +270,7 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName, config.setBrokerDeleteInactiveTopicsFrequencySeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerServicePortTls(Optional.of(0)); config.setTlsCertificateFilePath(brokerCertFilePath); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 5e4cd2fd0d17b..e2af5944ef0f5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -178,6 +178,7 @@ public void setup() throws Exception { executor = OrderedExecutor.newBuilder().numThreads(1).build(); svcConfig = spy(ServiceConfiguration.class); svcConfig.setBrokerShutdownTimeoutMs(0L); + svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); svcConfig.setClusterName("pulsar-cluster"); pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java index 8c68c2de91a4f..ccdfb7aa77cb4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java @@ -92,6 +92,7 @@ void setup() throws Exception { for (int i = 0; i < BROKER_COUNT; i++) { ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setClusterName("my-cluster"); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 0e2d300f7a802..2d1e72d45c80e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -112,6 +112,7 @@ public void setup() throws Exception { ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setBrokerShutdownTimeoutMs(0L); + svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); svcConfig.setTransactionCoordinatorEnabled(true); svcConfig.setClusterName("pulsar-cluster"); pulsarMock = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index 25c555f09b949..e8da743a29b78 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -170,6 +170,7 @@ protected void startBroker() throws Exception { conf.setBookkeeperClientExposeStatsToPrometheus(true); conf.setForceDeleteNamespaceAllowed(true); conf.setBrokerShutdownTimeoutMs(0L); + conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf.setBrokerServicePort(Optional.of(0)); conf.setBrokerServicePortTls(Optional.of(0)); conf.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java index aba5b044583df..e017c28693a67 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java @@ -66,6 +66,7 @@ protected final void setup() throws Exception { for (int i = 0; i < BROKER_COUNT; i++) { ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setClusterName("my-cluster"); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index 0cd72f19a4db9..4a2fe5b69f4a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -397,6 +397,7 @@ private void setupEnv(boolean enableFilter, String minApiVersion, boolean allowU ServiceConfiguration config = new ServiceConfiguration(); config.setAdvertisedAddress("localhost"); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setWebServicePort(Optional.of(0)); if (enableTls) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 396fda82b59fb..8dc1c0c00209b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -142,6 +142,7 @@ public void testMultipleBrokerLookup() throws Exception { /**** start broker-2 ****/ ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setBrokerShutdownTimeoutMs(0L); + conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf2.setBrokerServicePort(Optional.of(0)); conf2.setWebServicePort(Optional.of(0)); conf2.setAdvertisedAddress("localhost"); @@ -258,6 +259,7 @@ public void testMultipleBrokerDifferentClusterLookup() throws Exception { ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerShutdownTimeoutMs(0L); + conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf2.setBrokerServicePort(Optional.of(0)); conf2.setWebServicePort(Optional.of(0)); conf2.setAdvertisedAddress("localhost"); @@ -351,6 +353,7 @@ public void testPartitionTopicLookup() throws Exception { ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerShutdownTimeoutMs(0L); + conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf2.setBrokerServicePort(Optional.of(0)); conf2.setWebServicePort(Optional.of(0)); conf2.setAdvertisedAddress("localhost"); @@ -427,6 +430,7 @@ public void testWebserviceServiceTls() throws Exception { /**** start broker-2 ****/ ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setBrokerShutdownTimeoutMs(0L); + conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerShutdownTimeoutMs(0L); conf2.setBrokerServicePort(Optional.of(0)); @@ -543,6 +547,7 @@ public void testSplitUnloadLookupTest() throws Exception { ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerShutdownTimeoutMs(0L); + conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf2.setBrokerServicePort(Optional.of(0)); conf2.setWebServicePort(Optional.of(0)); conf2.setAdvertisedAddress("localhost"); @@ -647,6 +652,7 @@ public void testModularLoadManagerSplitBundle() throws Exception { // (1) Start broker-1 ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setBrokerShutdownTimeoutMs(0L); + conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerShutdownTimeoutMs(0L); conf2.setBrokerServicePort(Optional.of(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java index c8325908c6c0b..faf747f36edc2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java @@ -86,6 +86,7 @@ void setup(Method method) throws Exception { config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setTlsAllowInsecureConnection(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 6548dbe26ef3a..46ea5bf04529d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -944,6 +944,7 @@ void setupReplicationCluster() throws Exception { config1.setBrokerDeleteInactiveTopicsFrequencySeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config1.setBrokerShutdownTimeoutMs(0L); + config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config1.setBrokerServicePort(Optional.of(0)); config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); config1.setAllowAutoTopicCreationType("non-partitioned"); @@ -970,6 +971,7 @@ void setupReplicationCluster() throws Exception { config2.setBrokerDeleteInactiveTopicsFrequencySeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config2.setBrokerShutdownTimeoutMs(0L); + config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config2.setBrokerServicePort(Optional.of(0)); config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); config2.setAllowAutoTopicCreationType("non-partitioned"); @@ -996,6 +998,7 @@ void setupReplicationCluster() throws Exception { config3.setBrokerDeleteInactiveTopicsFrequencySeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config3.setBrokerShutdownTimeoutMs(0L); + config3.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config3.setBrokerServicePort(Optional.of(0)); config3.setAllowAutoTopicCreationType("non-partitioned"); pulsar3 = new PulsarService(config3); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java index e6d050f798573..80408910141a9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java @@ -107,6 +107,7 @@ public void testCreateClientWithAutoChangedServiceUrlProvider() throws Exception PulsarService pulsarService1 = pulsar; conf.setBrokerShutdownTimeoutMs(0L); + conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); conf.setBrokerServicePort(Optional.of(0)); conf.setWebServicePort(Optional.of(0)); restartBroker(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java index 173794a2cfb58..455021651803f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java @@ -130,6 +130,7 @@ void setup(Method method) throws Exception { config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java index 7029dc222dbd5..3f019a39c08b6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java @@ -197,6 +197,7 @@ void setup(Method method) throws Exception { config.setWebServicePortTls(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerServicePortTls(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index d985241e2903d..5d830a67778a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -127,6 +127,7 @@ void setup(Method method) throws Exception { config.setWebServicePortTls(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerServicePortTls(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index f4a27506c2e10..043c2ecccd649 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -90,6 +90,7 @@ void setup() throws Exception { ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setWebServicePort(Optional.empty()); config.setWebServicePortTls(Optional.of(webPort)); config.setBrokerServicePort(Optional.empty()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index a9fd06dc05809..9ba2ccde471df 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -94,6 +94,7 @@ void setup(Method method) throws Exception { config.setWebServicePort(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setAdvertisedAddress("localhost"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java index 9e1edea2f8029..7c0f2f0bbe143 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java @@ -118,6 +118,7 @@ public void setup(Method method) throws Exception { config.setWebServicePortTls(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerServicePortTls(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java index b3126defa11d2..afe16c63ec283 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -101,6 +101,7 @@ void setup(Method method) throws Exception { config.setWebServicePortTls(Optional.of(0)); config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); config.setBrokerServicePortTls(Optional.of(0)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index 9f6d9f65dbbd8..dd097ce034c5f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -36,6 +36,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; @@ -107,6 +108,7 @@ void setup(Method method) throws Exception { config = spy(ServiceConfiguration.class); config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setClusterName("use"); Set superUsers = Sets.newHashSet("superUser", "admin"); config.setSuperUserRoles(superUsers); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index dc2ecd42a64ed..92b10cf0c30a8 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -174,6 +174,7 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT) .withEnv("clusterName", clusterName) .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1") + .withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1") // used in s3 tests .withEnv("AWS_ACCESS_KEY_ID", "accesskey") .withEnv("AWS_SECRET_KEY", "secretkey")