Skip to content

Commit

Permalink
Weighted work balancer: dynamic consumer weights adjustment (#1559)
Browse files Browse the repository at this point in the history
* Weighted work balancer: scoring consumer nodes

* don't use null as a special value of timestamp
  • Loading branch information
piotrrzysko authored Sep 21, 2022
1 parent 621f8b2 commit 7cd3b37
Show file tree
Hide file tree
Showing 34 changed files with 1,115 additions and 410 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ public void registerGauge(String name, Gauge<?> gauge) {
}
}

public void unregisterGauge(String name) {
String path = pathCompiler.compile(name);
metricRegistry.remove(path);
}

private String metricRegistryName(String metricDisplayName, TopicName topicName, String subscription) {
PathContext pathContext = PathContext.pathContext()
.withGroup(escapeDots(topicName.getGroupName()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.config.WorkloadProperties.TargetWeightCalculationStrategy.UnknownTargetWeightCalculationStrategyException;
import pl.allegro.tech.hermes.consumers.config.WorkloadProperties.WeightedWorkBalancingProperties;
import pl.allegro.tech.hermes.consumers.config.WorkloadProperties.WorkBalancingStrategy.UnknownWorkBalancingStrategyException;
import pl.allegro.tech.hermes.consumers.consumer.ConsumerAuthorizationHandler;
Expand Down Expand Up @@ -39,16 +40,20 @@
import pl.allegro.tech.hermes.consumers.supervisor.workload.ClusterAssignmentCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.NoOpBalancingListener;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkBalancer;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkloadSupervisor;
import pl.allegro.tech.hermes.consumers.supervisor.workload.selective.SelectiveWorkBalancer;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.AvgTargetWeightCalculator;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ConsumerNodeLoadRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.CurrentLoadProvider;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.NoOpConsumerNodeLoadRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.NoOpSubscriptionProfileRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfileProvider;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ScoringTargetWeightCalculator;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfileRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfilesCalculator;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.TargetWeightCalculator;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.WeightedWorkBalancer;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.WeightedWorkBalancingListener;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.WeightedWorkloadMetrics;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ZookeeperConsumerNodeLoadRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ZookeeperSubscriptionProfileRegistry;
import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus;
Expand Down Expand Up @@ -119,7 +124,8 @@ public WorkloadSupervisor workloadSupervisor(InternalNotificationsBus notificati
@Bean
public WorkBalancer workBalancer(WorkloadProperties workloadProperties,
Clock clock,
SubscriptionProfileProvider subscriptionProfileProvider) {
CurrentLoadProvider currentLoadProvider,
TargetWeightCalculator targetWeightCalculator) {
switch (workloadProperties.getWorkBalancingStrategy()) {
case SELECTIVE:
return new SelectiveWorkBalancer();
Expand All @@ -129,7 +135,8 @@ public WorkBalancer workBalancer(WorkloadProperties workloadProperties,
clock,
weightedWorkBalancingProperties.getStabilizationWindowSize(),
weightedWorkBalancingProperties.getMinSignificantChangePercent(),
subscriptionProfileProvider
currentLoadProvider,
targetWeightCalculator
);
}
throw new UnknownWorkBalancingStrategyException();
Expand Down Expand Up @@ -168,42 +175,75 @@ public ConsumerNodeLoadRegistry consumerNodeLoadRegistry(CuratorFramework curato
}

@Bean
public SubscriptionProfilesCalculator subscriptionProfilesCalculator(ConsumerNodeLoadRegistry consumerNodeLoadRegistry,
SubscriptionProfileRegistry subscriptionProfileRegistry,
WorkloadProperties workloadProperties,
Clock clock) {
return new SubscriptionProfilesCalculator(
consumerNodeLoadRegistry,
subscriptionProfileRegistry,
clock,
workloadProperties.getWeightedWorkBalancing().getWeightWindowSize()
);
public TargetWeightCalculator targetWeightCalculator(WorkloadProperties workloadProperties,
WeightedWorkloadMetrics weightedWorkloadMetrics,
Clock clock) {
WeightedWorkBalancingProperties weightedWorkBalancing = workloadProperties.getWeightedWorkBalancing();
switch (weightedWorkBalancing.getTargetWeightCalculationStrategy()) {
case AVG:
return new AvgTargetWeightCalculator(weightedWorkloadMetrics);
case SCORING:
return new ScoringTargetWeightCalculator(
weightedWorkloadMetrics,
clock,
weightedWorkBalancing.getWeightWindowSize(),
weightedWorkBalancing.getScoringGain()
);
}
throw new UnknownTargetWeightCalculationStrategyException();
}

@Bean
public SubscriptionProfileRegistry subscriptionProfileRegistry(CuratorFramework curator,
SubscriptionIds subscriptionIds,
ZookeeperPaths zookeeperPaths,
WorkloadProperties workloadProperties,
KafkaClustersProperties kafkaClustersProperties,
DatacenterNameProvider datacenterNameProvider) {
public BalancingListener balancingListener(ConsumerNodeLoadRegistry consumerNodeLoadRegistry,
SubscriptionProfileRegistry subscriptionProfileRegistry,
WorkloadProperties workloadProperties,
CurrentLoadProvider currentLoadProvider,
WeightedWorkloadMetrics weightedWorkloadMetrics,
Clock clock) {
switch (workloadProperties.getWorkBalancingStrategy()) {
case SELECTIVE:
return new NoOpSubscriptionProfileRegistry();
return new NoOpBalancingListener();
case WEIGHTED:
KafkaProperties kafkaProperties = kafkaClustersProperties.toKafkaProperties(datacenterNameProvider);
WeightedWorkBalancingProperties weightedWorkBalancing = workloadProperties.getWeightedWorkBalancing();
return new ZookeeperSubscriptionProfileRegistry(
curator,
subscriptionIds,
zookeeperPaths,
kafkaProperties.getClusterName(),
weightedWorkBalancing.getSubscriptionProfilesEncoderBufferSizeBytes()
return new WeightedWorkBalancingListener(
consumerNodeLoadRegistry,
subscriptionProfileRegistry,
currentLoadProvider,
weightedWorkloadMetrics,
clock,
workloadProperties.getWeightedWorkBalancing().getWeightWindowSize()
);
}
throw new UnknownWorkBalancingStrategyException();
}

@Bean
public CurrentLoadProvider currentLoadProvider() {
return new CurrentLoadProvider();
}

@Bean
public WeightedWorkloadMetrics weightedWorkloadMetrics(HermesMetrics hermesMetrics) {
return new WeightedWorkloadMetrics(hermesMetrics);
}

@Bean
public SubscriptionProfileRegistry subscriptionProfileRegistry(CuratorFramework curator,
SubscriptionIds subscriptionIds,
ZookeeperPaths zookeeperPaths,
WorkloadProperties workloadProperties,
KafkaClustersProperties kafkaClustersProperties,
DatacenterNameProvider datacenterNameProvider) {
KafkaProperties kafkaProperties = kafkaClustersProperties.toKafkaProperties(datacenterNameProvider);
WeightedWorkBalancingProperties weightedWorkBalancing = workloadProperties.getWeightedWorkBalancing();
return new ZookeeperSubscriptionProfileRegistry(
curator,
subscriptionIds,
zookeeperPaths,
kafkaProperties.getClusterName(),
weightedWorkBalancing.getSubscriptionProfilesEncoderBufferSizeBytes()
);
}

@Bean
public Retransmitter retransmitter(SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator,
KafkaClustersProperties kafkaClustersProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public static class WeightedWorkBalancingProperties {

private Duration weightWindowSize = Duration.ofMinutes(15);

private TargetWeightCalculationStrategy targetWeightCalculationStrategy = TargetWeightCalculationStrategy.AVG;

private double scoringGain = 1.0d;

public int getConsumerLoadEncoderBufferSizeBytes() {
return consumerLoadEncoderBufferSizeBytes;
}
Expand Down Expand Up @@ -191,6 +195,22 @@ public Duration getWeightWindowSize() {
public void setWeightWindowSize(Duration weightWindowSize) {
this.weightWindowSize = weightWindowSize;
}

public TargetWeightCalculationStrategy getTargetWeightCalculationStrategy() {
return targetWeightCalculationStrategy;
}

public void setTargetWeightCalculationStrategy(TargetWeightCalculationStrategy targetWeightCalculationStrategy) {
this.targetWeightCalculationStrategy = targetWeightCalculationStrategy;
}

public double getScoringGain() {
return scoringGain;
}

public void setScoringGain(double scoringGain) {
this.scoringGain = scoringGain;
}
}

public enum WorkBalancingStrategy {
Expand All @@ -204,4 +224,16 @@ public UnknownWorkBalancingStrategyException() {
}
}
}

public enum TargetWeightCalculationStrategy {
AVG,
SCORING;

public static class UnknownTargetWeightCalculationStrategyException extends InternalProcessingException {

public UnknownTargetWeightCalculationStrategyException() {
super("Unknown target weight calculation strategy. Use one of: " + Arrays.toString(values()));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void run() {
}
} else {
balancingMetrics.reset();
balancingListener.onBalancingSkipped();
}
} catch (Exception e) {
logger.error("Caught exception when running balancing job", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public interface BalancingListener {
void onBeforeBalancing(List<String> activeConsumers);

void onAfterBalancing(WorkDistributionChanges changes);

void onBalancingSkipped();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ public void onBeforeBalancing(List<String> activeConsumers) {
public void onAfterBalancing(WorkDistributionChanges changes) {

}

@Override
public void onBalancingSkipped() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted;

import java.util.Collection;
import java.util.Map;

import static java.util.stream.Collectors.toMap;

public class AvgTargetWeightCalculator implements TargetWeightCalculator {

private final WeightedWorkloadMetrics metrics;

public AvgTargetWeightCalculator(WeightedWorkloadMetrics metrics) {
this.metrics = metrics;
}

@Override
public Map<String, Weight> calculate(Collection<ConsumerNode> consumers) {
if (consumers.isEmpty()) {
return Map.of();
}

metrics.reportCurrentWeights(consumers);
Weight sum = consumers.stream()
.map(ConsumerNode::getWeight)
.reduce(Weight.ZERO, Weight::add);
Weight average = sum.divide(consumers.size());

Map<String, Weight> newWeights = consumers.stream()
.collect(toMap(ConsumerNode::getConsumerId, ignore -> average));
metrics.reportProposedWeights(newWeights);
return newWeights;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ class ConsumerNode {
static Comparator<ConsumerNode> LIGHTEST_CONSUMER_FIRST = comparing(ConsumerNode::getWeight);

private final String consumerId;
private final ConsumerNodeLoad initialLoad;
private final int maxSubscriptionsPerConsumer;
private final Set<ConsumerTask> tasks = new HashSet<>();
private Weight weight = Weight.ZERO;

ConsumerNode(String consumerId, int maxSubscriptionsPerConsumer) {
ConsumerNode(String consumerId, ConsumerNodeLoad initialLoad, int maxSubscriptionsPerConsumer) {
this.consumerId = consumerId;
this.initialLoad = initialLoad;
this.maxSubscriptionsPerConsumer = maxSubscriptionsPerConsumer;
}

Expand Down Expand Up @@ -60,6 +62,10 @@ Weight getWeight() {
return weight;
}

ConsumerNodeLoad getInitialLoad() {
return initialLoad;
}

@Override
public String toString() {
return consumerId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,34 @@
import pl.allegro.tech.hermes.api.SubscriptionName;

import java.util.Map;
import java.util.Objects;

class ConsumerNodeLoad {

static final ConsumerNodeLoad UNDEFINED = new ConsumerNodeLoad(Map.of());
static final ConsumerNodeLoad UNDEFINED = new ConsumerNodeLoad(-1d, Map.of());

private final double cpuUtilization;
private final Map<SubscriptionName, SubscriptionLoad> loadPerSubscription;

ConsumerNodeLoad(Map<SubscriptionName, SubscriptionLoad> loadPerSubscription) {
ConsumerNodeLoad(double cpuUtilization, Map<SubscriptionName, SubscriptionLoad> loadPerSubscription) {
this.cpuUtilization = cpuUtilization;
this.loadPerSubscription = loadPerSubscription;
}

Map<SubscriptionName, SubscriptionLoad> getLoadPerSubscription() {
return loadPerSubscription;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ConsumerNodeLoad that = (ConsumerNodeLoad) o;
return Objects.equals(loadPerSubscription, that.loadPerSubscription);
double getCpuUtilization() {
return cpuUtilization;
}

@Override
public int hashCode() {
return Objects.hash(loadPerSubscription);
double sumOperationsPerSecond() {
return loadPerSubscription.values().stream()
.mapToDouble(SubscriptionLoad::getOperationsPerSecond)
.sum();
}

boolean isDefined() {
return cpuUtilization != UNDEFINED.getCpuUtilization();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ ConsumerNodeLoad decode(byte[] bytes) {

body.wrap(buffer, header.encodedLength(), header.blockLength(), header.version());

return new ConsumerNodeLoad(decodeSubscriptionLoads(body));
return new ConsumerNodeLoad(body.cpuUtilization(), decodeSubscriptionLoads(body));
}

private Map<SubscriptionName, SubscriptionLoad> decodeSubscriptionLoads(ConsumerLoadDecoder body) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ class ConsumerNodeLoadEncoder {
this.buffer = new ExpandableDirectByteBuffer(bufferSize);
}

byte[] encode(ConsumerNodeLoad metrics) {
Map<SubscriptionId, SubscriptionLoad> subscriptionLoads = mapToSubscriptionIds(metrics);
byte[] encode(ConsumerNodeLoad consumerNodeLoad) {
Map<SubscriptionId, SubscriptionLoad> subscriptionLoads = mapToSubscriptionIds(consumerNodeLoad);

MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
ConsumerLoadEncoder body = new ConsumerLoadEncoder()
.wrapAndApplyHeader(buffer, 0, headerEncoder);

SubscriptionsEncoder loadPerSubscriptionEncoder = body
.cpuUtilization(consumerNodeLoad.getCpuUtilization())
.subscriptionsCount(subscriptionLoads.size());

for (Map.Entry<SubscriptionId, SubscriptionLoad> entry : subscriptionLoads.entrySet()) {
Expand Down
Loading

0 comments on commit 7cd3b37

Please sign in to comment.