Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
Revert "Revert "[improve][broker][branch-2.10] Backport Linux metrics…
Browse files Browse the repository at this point in the history
… changes from master branch (apache#20659)""

This reverts commit a47c6ac.
  • Loading branch information
mattisonchao committed Dec 28, 2023
1 parent 1762050 commit 16b07d4
Show file tree
Hide file tree
Showing 49 changed files with 978 additions and 169 deletions.
2 changes: 2 additions & 0 deletions bin/pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ if [[ -z "$IS_JAVA_8" ]]; then
OPTS="$OPTS --add-opens java.management/sun.management=ALL-UNNAMED"
# MBeanStatsGenerator
OPTS="$OPTS --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED"
# LinuxInfoUtils
OPTS="$OPTS --add-opens java.base/jdk.internal.platform=ALL-UNNAMED"
fi

ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=* -Dzookeeper.snapshot.trust.empty=true -Dzookeeper.tcpKeepAlive=true"
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,7 @@ flexible messaging model and an intuitive client API.</description>
--add-opens java.base/java.io=ALL-UNNAMED <!--Bookkeeper NativeIO -->
--add-opens java.management/sun.management=ALL-UNNAMED <!--JvmDefaultGCMetricsLogger & MBeanStatsGenerator-->
--add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED <!--MBeanStatsGenerator-->
--add-opens java.base/jdk.internal.platform=ALL-UNNAMED <!--LinuxInfoUtils-->
</test.additional.args>
</properties>
<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;

public enum BitRateUnit {

Bit {
public double toBit(double bitRate) {
return bitRate;
}

public double toKilobit(double bitRate) {
return bitRate / C0;
}

public double toMegabit(double bitRate) {
return bitRate / Math.pow(C0, 2);
}

public double toGigabit(double bitRate) {
return bitRate / Math.pow(C0, 3);
}

public double toByte(double bitRate) {
return bitRate / C1;
}

public double convert(double bitRate, BitRateUnit bitRateUnit) {
return bitRateUnit.toBit(bitRate);
}
},
Kilobit {
public double toBit(double bitRate) {
return bitRate * C0;
}

public double toKilobit(double bitRate) {
return bitRate;
}

public double toMegabit(double bitRate) {
return bitRate / C0;
}

public double toGigabit(double bitRate) {
return bitRate / Math.pow(C0, 2);
}

public double toByte(double bitRate) {
return bitRate * C0 / C1;
}

public double convert(double bitRate, BitRateUnit bitRateUnit) {
return bitRateUnit.toKilobit(bitRate);
}
},
Megabit {
public double toBit(double bitRate) {
return bitRate * Math.pow(C0, 2);
}

public double toKilobit(double bitRate) {
return bitRate * C0;
}

public double toMegabit(double bitRate) {
return bitRate;
}

public double toGigabit(double bitRate) {
return bitRate / C0;
}

public double toByte(double bitRate) {
return bitRate * Math.pow(C0, 2) / C1;
}

public double convert(double bitRate, BitRateUnit bitRateUnit) {
return bitRateUnit.toMegabit(bitRate);
}
},
Gigabit {
public double toBit(double bitRate) {
return bitRate * Math.pow(C0, 3);
}

public double toKilobit(double bitRate) {
return bitRate * Math.pow(C0, 2);
}

public double toMegabit(double bitRate) {
return bitRate * C0;
}

public double toGigabit(double bitRate) {
return bitRate;
}

public double toByte(double bitRate) {
return bitRate * Math.pow(C0, 3) / C1;
}

public double convert(double bitRate, BitRateUnit bitRateUnit) {
return bitRateUnit.toGigabit(bitRate);
}
},
Byte {
public double toBit(double bitRate) {
return bitRate * C1;
}

public double toKilobit(double bitRate) {
return bitRate * C1 / C0;
}

public double toMegabit(double bitRate) {
return bitRate * C1 / Math.pow(C0, 2);
}

public double toGigabit(double bitRate) {
return bitRate * C1 / Math.pow(C0, 3);
}

public double toByte(double bitRate) {
return bitRate;
}

public double convert(double bitRate, BitRateUnit bitRateUnit) {
return bitRateUnit.toByte(bitRate);
}
};

static final int C0 = 1000;
static final int C1 = 8;

public double toBit(double bitRate) {
throw new AbstractMethodError();
}

public double toKilobit(double bitRate) {
throw new AbstractMethodError();
}

public double toMegabit(double bitRate) {
throw new AbstractMethodError();
}

public double toGigabit(double bitRate) {
throw new AbstractMethodError();
}

public double toByte(double bitRate) {
throw new AbstractMethodError();
}

public double convert(double bitRate, BitRateUnit bitRateUnit) {
throw new AbstractMethodError();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;

import static org.testng.Assert.assertEquals;
import org.testng.annotations.Test;

public class BitRateUnitTest {

@Test
public void testBps() {
double bps = 1231434.12;
assertEquals(BitRateUnit.Bit.toBit(bps), bps);
assertEquals(BitRateUnit.Bit.toByte(bps), bps / 8);
assertEquals(BitRateUnit.Bit.toKilobit(bps), bps / 1000);
assertEquals(BitRateUnit.Bit.toMegabit(bps), bps / 1000 / 1000);
assertEquals(BitRateUnit.Bit.toGigabit(bps), bps / 1000 / 1000 / 1000);
}

@Test
public void testKbps() {
double kbps = 1231434.12;
assertEquals(BitRateUnit.Kilobit.toBit(kbps), kbps * 1000);
assertEquals(BitRateUnit.Kilobit.toByte(kbps), kbps * 1000 / 8);
assertEquals(BitRateUnit.Kilobit.toKilobit(kbps), kbps);
assertEquals(BitRateUnit.Kilobit.toMegabit(kbps), kbps / 1000);
assertEquals(BitRateUnit.Kilobit.toGigabit(kbps), kbps / 1000 / 1000);
}

@Test
public void testMbps() {
double mbps = 1231434.12;
assertEquals(BitRateUnit.Megabit.toBit(mbps), mbps * 1000 * 1000);
assertEquals(BitRateUnit.Megabit.toByte(mbps), mbps * 1000 * 1000 / 8);
assertEquals(BitRateUnit.Megabit.toKilobit(mbps), mbps * 1000);
assertEquals(BitRateUnit.Megabit.toMegabit(mbps), mbps);
assertEquals(BitRateUnit.Megabit.toGigabit(mbps), mbps / 1000);
}

@Test
public void testGbps() {
double gbps = 1231434.12;
assertEquals(BitRateUnit.Gigabit.toBit(gbps),gbps * 1000 * 1000 * 1000 );
assertEquals(BitRateUnit.Gigabit.toByte(gbps), gbps * 1000 * 1000 * 1000 / 8);
assertEquals(BitRateUnit.Gigabit.toKilobit(gbps), gbps * 1000 * 1000);
assertEquals(BitRateUnit.Gigabit.toMegabit(gbps), gbps * 1000);
assertEquals(BitRateUnit.Gigabit.toGigabit(gbps), gbps);
}

@Test
public void testByte() {
double bytes = 1231434.12;
assertEquals(BitRateUnit.Byte.toBit(bytes), bytes * 8);
assertEquals(BitRateUnit.Byte.toByte(bytes), bytes);
assertEquals(BitRateUnit.Byte.toKilobit(bytes), bytes / 1000 * 8);
assertEquals(BitRateUnit.Byte.toMegabit(bytes), bytes / 1000 / 1000 * 8);
assertEquals(BitRateUnit.Byte.toGigabit(bytes), bytes / 1000 / 1000 / 1000 * 8);
}


@Test
public void testConvert() {
double unit = 12334125.1234;
assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toBit(unit));
assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toBit(unit));
assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toBit(unit));
assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toBit(unit));
assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toBit(unit));

assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toKilobit(unit));
assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toKilobit(unit));
assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toKilobit(unit));
assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toKilobit(unit));
assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toKilobit(unit));

assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toMegabit(unit));
assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toMegabit(unit));
assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toMegabit(unit));
assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toMegabit(unit));
assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toMegabit(unit));

assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toGigabit(unit));
assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toGigabit(unit));
assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toGigabit(unit));
assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toGigabit(unit));
assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toGigabit(unit));

assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Bit), BitRateUnit.Bit.toByte(unit));
assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Kilobit), BitRateUnit.Kilobit.toByte(unit));
assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Megabit), BitRateUnit.Megabit.toByte(unit));
assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Gigabit), BitRateUnit.Gigabit.toByte(unit));
assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Byte), BitRateUnit.Byte.toByte(unit));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.BrokerInterceptors;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
Expand Down Expand Up @@ -661,6 +662,14 @@ public void start() throws PulsarServerException {
+ "authenticationEnabled=true when authorization is enabled with authorizationEnabled=true.");
}

if (!config.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
&& config.isLoadBalancerEnabled()
&& LinuxInfoUtils.isLinux()
&& !LinuxInfoUtils.checkHasNicSpeeds()) {
throw new IllegalStateException("Unable to read VM NIC speed. You must set "
+ "[loadBalancerOverrideBrokerNicSpeedGbps] to override it when load balancer is enabled.");
}

localMetadataStore = createLocalMetadataStore();
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);

Expand Down
Loading

0 comments on commit 16b07d4

Please sign in to comment.